diff options
-rw-r--r-- | MANIFEST | 1 | ||||
-rw-r--r-- | lib/PublicInbox/IMAP.pm | 178 | ||||
-rw-r--r-- | t/imap.t | 43 | ||||
-rw-r--r-- | xt/mem-imapd-tls.t | 243 |
4 files changed, 420 insertions, 45 deletions
@@ -355,6 +355,7 @@ xt/git-http-backend.t xt/git_async_cmp.t xt/imapd-mbsync-oimap.t xt/imapd-validate.t +xt/mem-imapd-tls.t xt/mem-msgview.t xt/msgtime_cmp.t xt/nntpd-validate.t diff --git a/lib/PublicInbox/IMAP.pm b/lib/PublicInbox/IMAP.pm index ff01d0b5..bc890517 100644 --- a/lib/PublicInbox/IMAP.pm +++ b/lib/PublicInbox/IMAP.pm @@ -16,18 +16,17 @@ # are for the limitations of git clients, while slices are # for the limitations of IMAP clients. # -# * sequence numbers are estimated based on slice. If they -# wrong, they're higher than than the corresponding UID -# because UIDs have gaps due to spam removals. -# We only support an ephemeral mapping non-UID "FETCH" -# because mutt header caching relies on it; mutt uses -# UID requests everywhere else. +# * We also take advantage of slices being only 50K to store +# "UID offset" to message sequence number (MSN) mapping +# as a 50K uint16_t array (via pack("S*", ...)). "UID offset" +# is the offset from {uid_base} which determines the start of +# the mailbox slice. package PublicInbox::IMAP; use strict; use base qw(PublicInbox::DS); use fields qw(imapd ibx long_cb -login_tag - uid_base -idle_tag -idle_max); + uid_base -idle_tag uo2m); use PublicInbox::Eml; use PublicInbox::EmlContentFoo qw(parse_content_disposition); use PublicInbox::DS qw(now); @@ -50,7 +49,9 @@ die "neither Email::Address::XS nor Mail::Address loaded: $@" if !$Address; sub LINE_MAX () { 8000 } # RFC 2683 3.2.1.5 -# changing this will cause grief for clients which cache +# Changing UID_SLICE will cause grief for clients which cache. +# This also needs to be <64K: we pack it into a uint16_t +# for long_response UID (offset) => MSN mappings sub UID_SLICE () { 50_000 } # these values area also used for sorting @@ -196,23 +197,118 @@ sub cmd_capability ($$) { sub cmd_noop ($$) { "$_[1] OK Noop done\r\n" } +# uo2m: UID Offset to MSN, this is an arrayref by default, +# but uo2m_hibernate can compact and deduplicate it +sub uo2m_ary_new ($) { + my ($self) = @_; + my $base = $self->{uid_base}; + my $uids = $self->{ibx}->over->uid_range($base + 1, $base + UID_SLICE); + + # convert UIDs to offsets from {base} + my @tmp; # [$UID_OFFSET] => $MSN + my $msn = 0; + ++$base; + $tmp[$_ - $base] = ++$msn for @$uids; + \@tmp; +} + +# changes UID-offset-to-MSN mapping into a deduplicated scalar: +# uint16_t uo2m[UID_SLICE]. +# May be swapped out for idle clients if THP is disabled. +sub uo2m_hibernate ($) { + my ($self) = @_; + ref(my $uo2m = $self->{uo2m}) or return; + my %dedupe = ( uo2m_pack($uo2m) => undef ); + $self->{uo2m} = (keys(%dedupe))[0]; + undef; +} + +sub uo2m_last_uid ($) { + my ($self) = @_; + my $uo2m = $self->{uo2m} or die 'BUG: uo2m_last_uid w/o {uo2m}'; + (ref($uo2m) ? @$uo2m : (length($uo2m) >> 1)) + $self->{uid_base}; +} + +sub uo2m_pack ($) { + # $_[0] is an arrayref of MSNs, it may have undef gaps if there + # are gaps in the corresponding UIDs: [ msn1, msn2, undef, msn3 ] + no warnings 'uninitialized'; + pack('S*', @{$_[0]}); +} + +# extend {uo2m} to account for new messages which arrived since +# {uo2m} was created. +sub uo2m_extend ($$) { + my ($self, $new_uid_max) = @_; + defined(my $uo2m = $self->{uo2m}) or + return($self->{uo2m} = uo2m_ary_new($self)); + my $beg = uo2m_last_uid($self); # last UID we've learned + return $uo2m if $beg >= $new_uid_max; # fast path + + # need to extend the current range: + my $base = $self->{uid_base}; + ++$beg; + my $uids = $self->{ibx}->over->uid_range($beg, $base + UID_SLICE); + my @tmp; # [$UID_OFFSET] => $MSN + if (ref($uo2m)) { + my $msn = $uo2m->[-1]; + $tmp[$_ - $beg] = ++$msn for @$uids; + push @$uo2m, @tmp; + $uo2m; + } else { + my $msn = unpack('S', substr($uo2m, -2, 2)); + $tmp[$_ - $beg] = ++$msn for @$uids; + $uo2m .= uo2m_pack(\@tmp); + my %dedupe = ($uo2m => undef); + $self->{uo2m} = (keys %dedupe)[0]; + } +} + +# the flexible version which works on scalars and array refs. +# Must call uo2m_extend before this +sub uid2msn ($$) { + my ($self, $uid) = @_; + my $uo2m = $self->{uo2m}; + my $off = $uid - $self->{uid_base} - 1; + ref($uo2m) ? $uo2m->[$off] : unpack('S', substr($uo2m, $off << 1, 2)); +} + +# returns an arrayref of UIDs, so MSNs can be translated to UIDs via: +# $msn2uid->[$MSN-1] => $UID. The result of this is always ephemeral +# and does not live beyond the event loop. +sub msn2uid ($) { + my ($self) = @_; + my $base = $self->{uid_base}; + my $uo2m = uo2m_extend($self, $base + UID_SLICE); + $uo2m = [ unpack('S*', $uo2m) ] if !ref($uo2m); + + my $uo = 0; + my @msn2uid; + for my $msn (@$uo2m) { + ++$uo; + $msn2uid[$msn - 1] = $uo + $base if $msn; + } + \@msn2uid; +} + +# converts a set of message sequence numbers in requests to UIDs: +sub msn_to_uid_range ($$) { + my $msn2uid = $_[0]; + $_[1] =~ s!([0-9]+)!$msn2uid->[$1 - 1] // ($msn2uid->[-1] + 1)!sge; +} + # called by PublicInbox::InboxIdle sub on_inbox_unlock { my ($self, $ibx) = @_; - my $new = $ibx->over->max; - my $uid_base = $self->{uid_base} // 0; - my $uid_end = $uid_base + UID_SLICE; - defined(my $old = $self->{-idle_max}) or die 'BUG: -idle_max unset'; - $new = $uid_end if $new > $uid_end; + my $old = uo2m_last_uid($self); + my $uid_end = $self->{uid_base} + UID_SLICE; + uo2m_extend($self, $uid_end); + my $new = uo2m_last_uid($self); if ($new > $old) { - $self->{-idle_max} = $new; - $new -= $uid_base; - $old -= $uid_base; - $self->msg_more("* $_ EXISTS\r\n") for (($old + 1)..($new - 1)); - $self->write(\"* $new EXISTS\r\n"); + my $msn = uid2msn($self, $new); + $self->write(\"* $msn EXISTS\r\n"); } elsif ($new == $uid_end) { # max exceeded $uid_end # continue idling w/o inotify - delete $self->{-idle_max}; my $sock = $self->{sock} or return; $ibx->unsubscribe_unlock(fileno($sock)); } @@ -245,9 +341,9 @@ sub cmd_idle ($$) { my $fd = fileno($sock); # only do inotify on most recent slice if ($max < $uid_end) { + uo2m_extend($self, $uid_end); $ibx->subscribe_unlock($fd, $self); $self->{imapd}->idler_start; - $self->{-idle_max} = $max; } $idle_timer //= PublicInbox::DS::later(\&idle_tick_all); $IDLERS->{$fd} = $self; @@ -497,12 +593,10 @@ sub requeue_once ($) { $self->requeue if $new_size == 1; } -# my ($msn, $UID) = @_; -sub fetch_msn_uid ($$) { '* '.(${$_[0]}++).' FETCH (UID '.$_[1] } - sub fetch_run_ops { - my ($self, $msn, $smsg, $bref, $ops, $partial) = @_; - $self->msg_more(fetch_msn_uid($msn, $smsg->{num})); + my ($self, $smsg, $bref, $ops, $partial) = @_; + my $uid = $smsg->{num}; + $self->msg_more('* '.uid2msn($self, $uid)." FETCH (UID $uid"); my ($eml, $k); for (my $i = 0; $i < @$ops;) { $k = $ops->[$i++]; @@ -523,7 +617,7 @@ sub fetch_blob_cb { # called by git->cat_async via git_async_cat } else { $smsg->{blob} eq $oid or die "BUG: $smsg->{blob} != $oid"; } - fetch_run_ops($self, $range_info->[3], $smsg, $bref, $ops, $partial); + fetch_run_ops($self, $smsg, $bref, $ops, $partial); requeue_once($self); } @@ -627,8 +721,7 @@ sub range_step ($$) { } else { return 'BAD fetch range'; } - my $msn = $beg - $uid_base; - [ $beg, $end, $$range_csv, \$msn ]; + [ $beg, $end, $$range_csv ]; } sub refill_range ($$$) { @@ -653,6 +746,7 @@ sub fetch_blob { # long_response return; } } + uo2m_extend($self, $msgs->[-1]->{num}); git_async_cat($self->{ibx}->git, $msgs->[0]->{blob}, \&fetch_blob_cb, \@_); } @@ -665,7 +759,8 @@ sub fetch_smsg { # long_response return; } } - fetch_run_ops($self, $range_info->[3], $_, undef, $ops) for @$msgs; + uo2m_extend($self, $msgs->[-1]->{num}); + fetch_run_ops($self, $_, undef, $ops) for @$msgs; @$msgs = (); 1; # more } @@ -696,10 +791,12 @@ sub fetch_uid { # long_response $self->write("$tag $err\r\n"); return; } + my $adj = $self->{uid_base} + 1; + my $uo2m = uo2m_extend($self, $uids->[-1]); + $uo2m = [ unpack('S*', $uo2m) ] if !ref($uo2m); my ($i, $k); - my $msn = $range_info->[3]; for (@$uids) { - $self->msg_more(fetch_msn_uid($msn, $_)); + $self->msg_more("* $uo2m->[$_ - $adj] FETCH (UID $_"); for ($i = 0; $i < @$ops;) { $k = $ops->[$i++]; $ops->[$i++]->($self, $k); @@ -956,25 +1053,14 @@ sub cmd_uid_fetch ($$$$;@) { my ($cb, $ops, $partial) = fetch_compile(\@want); return "$tag $cb\r\n" unless $ops; + # cb is one of fetch_blob, fetch_smsg, fetch_uid $range_csv = 'bad' if $range_csv !~ $valid_range; my $range_info = range_step($self, \$range_csv); return "$tag $range_info\r\n" if !ref($range_info); + uo2m_hibernate($self) if $cb == \&fetch_blob; # slow, save RAM long_response($self, $cb, $tag, [], $range_info, $ops, $partial); } -# returns an arrayref of UIDs, so MSNs can be translated via: -# $msn2uid->[$MSN-1] => $UID -sub msn2uid ($) { - my ($self) = @_; - my $x = $self->{uid_base}; - $self->{ibx}->over->uid_range($x + 1, $x + UID_SLICE); -} - -sub msn_to_uid_range ($$) { - my $msn2uid = $_[0]; - $_[1] =~ s!([0-9]+)!$msn2uid->[$1 - 1] // ($msn2uid->[-1] + 1)!sge; -} - sub cmd_fetch ($$$$;@) { my ($self, $tag, $range_csv, @want) = @_; my $ibx = $self->{ibx} or return "$tag BAD No mailbox selected\r\n"; @@ -986,6 +1072,7 @@ sub cmd_fetch ($$$$;@) { msn_to_uid_range(msn2uid($self), $range_csv); my $range_info = range_step($self, \$range_csv); return "$tag $range_info\r\n" if !ref($range_info); + uo2m_hibernate($self) if $cb == \&fetch_blob; # slow, save RAM long_response($self, $cb, $tag, [], $range_info, $ops, $partial); } @@ -1318,7 +1405,8 @@ sub event_step { $self->write(\"\* BAD request too long\r\n"); return $self->close; } - $self->do_read($rbuf, LINE_MAX, length($$rbuf)) or return; + $self->do_read($rbuf, LINE_MAX, length($$rbuf)) or + return uo2m_hibernate($self); $line = index($$rbuf, "\n"); } $line = substr($$rbuf, 0, $line + 1, ''); @@ -130,4 +130,47 @@ EOF ], 'placed op_eml_new before emit_body'); } +# UID <=> MSN mapping + +sub uo2m_str_new ($) { + no warnings 'uninitialized'; # uom2m_ary_new may have may have undef + pack('S*', @{$_[0]->uo2m_ary_new}); # 2 bytes per-MSN +} + +{ + my $ibx = bless { uid_range => [ 1, 2, 4 ] }, 'Uo2mTestInbox'; + my $imap = bless { uid_base => 0, ibx => $ibx }, 'PublicInbox::IMAP'; + my $uo2m = $imap->uo2m_ary_new; + is_deeply($uo2m, [ 1, 2, undef, 3 ], 'uo2m ary'); + $uo2m = uo2m_str_new($imap); + is_deeply([ unpack('S*', $uo2m) ], [ 1, 2, 0, 3 ], 'uo2m str'); + + $ibx->{uid_range} = [ 1, 2, 4, 5, 6 ]; + for ([ 1, 2, undef, 3 ], $uo2m) { + $imap->{uo2m} = $_; + is($imap->uid2msn(1), 1, 'uid2msn'); + is($imap->uid2msn(4), 3, 'uid2msn'); + is($imap->uo2m_last_uid, 4, 'uo2m_last_uid'); + $imap->uo2m_extend(6); + is($imap->uid2msn(5), 4, 'uid2msn 5 => 4'); + is($imap->uid2msn(6), 5, 'uid2msn 6 => 5'); + is($imap->uo2m_last_uid, 6, 'uo2m_last_uid'); + + my $msn2uid = $imap->msn2uid; + my $range = '1,4:5'; + $imap->can('msn_to_uid_range')->($msn2uid, $range); + is($range, '1,5:6', 'range converted'); + } +} + done_testing; + +package Uo2mTestInbox; +use strict; +require PublicInbox::DummyInbox; +our @ISA = qw(PublicInbox::DummyInbox); +sub over { shift } +sub uid_range { + my ($self, $beg, $end, undef) = @_; + [ grep { $_ >= $beg && $_ <= $end } @{$self->{uid_range}} ]; +} diff --git a/xt/mem-imapd-tls.t b/xt/mem-imapd-tls.t new file mode 100644 index 00000000..accf7564 --- /dev/null +++ b/xt/mem-imapd-tls.t @@ -0,0 +1,243 @@ +#!perl -w +# Copyright (C) 2020 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +# Idle client memory usage test, particularly after EXAMINE when +# Message Sequence Numbers are loaded +use strict; +use Test::More; +use Socket qw(SOCK_STREAM IPPROTO_TCP SOL_SOCKET); +use PublicInbox::TestCommon; +use PublicInbox::Syscall qw(:epoll); +use PublicInbox::DS; +require_mods(qw(DBD::SQLite)); +my $inboxdir = $ENV{GIANT_INBOX_DIR}; +my $TEST_TLS; +SKIP: { + require_mods('IO::Socket::SSL', 1); + $TEST_TLS = $ENV{TEST_TLS} // 1; +}; +plan skip_all => "GIANT_INBOX_DIR not defined for $0" unless $inboxdir; +diag 'TEST_COMPRESS='.($ENV{TEST_COMPRESS} // 1) . " TEST_TLS=$TEST_TLS"; + +my ($cert, $key) = qw(certs/server-cert.pem certs/server-key.pem); +if ($TEST_TLS) { + if (!-r $key || !-r $cert) { + plan skip_all => + "certs/ missing for $0, run ./certs/create-certs.perl"; + } + use_ok 'PublicInbox::TLS'; +} +my ($tmpdir, $for_destroy) = tmpdir(); +my ($out, $err) = ("$tmpdir/stdout.log", "$tmpdir/stderr.log"); +my $pi_config = "$tmpdir/pi_config"; +my $group = 'inbox.test'; +local $SIG{PIPE} = 'IGNORE'; # for IMAPC (below) +my $imaps = tcp_server(); +{ + open my $fh, '>', $pi_config or die "open: $!\n"; + print $fh <<EOF or die; +[publicinbox "imapd-tls"] + inboxdir = $inboxdir + address = $group\@example.com + newsgroup = $group + indexlevel = basic +EOF + close $fh or die "close: $!\n"; +} +my $imaps_addr = $imaps->sockhost . ':' . $imaps->sockport; +my $env = { PI_CONFIG => $pi_config }; +my $arg = $TEST_TLS ? [ "-limaps://$imaps_addr/?cert=$cert,key=$key" ] : []; +my $cmd = [ '-imapd', '-W0', @$arg, "--stdout=$out", "--stderr=$err" ]; +my $td = start_script($cmd, $env, { 3 => $imaps }); +my %ssl_opt; +if ($TEST_TLS) { + %ssl_opt = ( + SSL_hostname => 'server.local', + SSL_verifycn_name => 'server.local', + SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_PEER(), + SSL_ca_file => 'certs/test-ca.pem', + ); + my $ctx = IO::Socket::SSL::SSL_Context->new(%ssl_opt); + + # cf. https://rt.cpan.org/Ticket/Display.html?id=129463 + my $mode = eval { Net::SSLeay::MODE_RELEASE_BUFFERS() }; + if ($mode && $ctx->{context}) { + eval { Net::SSLeay::CTX_set_mode($ctx->{context}, $mode) }; + warn "W: $@ (setting SSL_MODE_RELEASE_BUFFERS)\n" if $@; + } + + $ssl_opt{SSL_reuse_ctx} = $ctx; + $ssl_opt{SSL_startHandshake} = 0; +} +chomp(my $nfd = `/bin/sh -c 'ulimit -n'`); +$nfd -= 10; +ok($nfd > 0, 'positive FD count'); +my $MAX_FD = 10000; +$nfd = $MAX_FD if $nfd >= $MAX_FD; +our $DONE = 0; +sub once { 0 }; # stops event loop + +# setup the event loop so that it exits at every step +# while we're still doing connect(2) +PublicInbox::DS->SetLoopTimeout(0); +PublicInbox::DS->SetPostLoopCallback(\&once); +my $pid = $td->{pid}; +if ($^O eq 'linux' && open(my $f, '<', "/proc/$pid/status")) { + diag(grep(/RssAnon/, <$f>)); +} + +foreach my $n (1..$nfd) { + my $io = tcp_connect($imaps, Blocking => 0); + $io = IO::Socket::SSL->start_SSL($io, %ssl_opt) if $TEST_TLS; + IMAPC->new($io); + + # one step through the event loop + # do a little work as we connect: + PublicInbox::DS->EventLoop; + + # try not to overflow the listen() backlog: + if (!($n % 128) && $DONE != $n) { + diag("nr: ($n) $DONE/$nfd"); + PublicInbox::DS->SetLoopTimeout(-1); + PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $n }); + + # clear the backlog: + PublicInbox::DS->EventLoop; + + # resume looping + PublicInbox::DS->SetLoopTimeout(0); + PublicInbox::DS->SetPostLoopCallback(\&once); + } +} + +# run the event loop normally, now: +diag "done?: @".time." $DONE/$nfd"; +if ($DONE != $nfd) { + PublicInbox::DS->SetLoopTimeout(-1); + PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $nfd }); + PublicInbox::DS->EventLoop; +} +is($nfd, $DONE, "$nfd/$DONE done"); +if ($^O eq 'linux' && open(my $f, '<', "/proc/$pid/status")) { + diag(grep(/RssAnon/, <$f>)); + diag " SELF lsof | wc -l ".`lsof -p $$ |wc -l`; + diag "SERVER lsof | wc -l ".`lsof -p $pid |wc -l`; +} +PublicInbox::DS->Reset; +$td->kill; +$td->join; +is($?, 0, 'no error in exited process'); +done_testing; + +package IMAPC; +use strict; +use base qw(PublicInbox::DS); +use fields qw(step zin); +use PublicInbox::Syscall qw(EPOLLIN EPOLLOUT EPOLLONESHOT); +use Errno qw(EAGAIN); +# determines where we start event_step +use constant FIRST_STEP => ($ENV{TEST_COMPRESS} // 1) ? -2 : 0; + +# return true if complete, false if incomplete (or failure) +sub connect_tls_step { + my ($self) = @_; + my $sock = $self->{sock} or return; + return 1 if $sock->connect_SSL; + return $self->drop("$!") if $! != EAGAIN; + if (my $ev = PublicInbox::TLS::epollbit()) { + unshift @{$self->{wbuf}}, \&connect_tls_step; + PublicInbox::DS::epwait($sock, $ev | EPOLLONESHOT); + 0; + } else { + $self->drop('BUG? EAGAIN but '.PublicInbox::TLS::err()); + } +} + +sub event_step { + my ($self) = @_; + + # TLS negotiation happens in flush_write via {wbuf} + return unless $self->flush_write && $self->{sock}; + + if ($self->{step} == -2) { + $self->do_read(\(my $buf = ''), 128) or return; + $buf =~ /\A\* OK / or die 'no greeting'; + $self->{step} = -1; + $self->write(\"1 COMPRESS DEFLATE\r\n"); + } + if ($self->{step} == -1) { + $self->do_read(\(my $buf = ''), 128) or return; + $buf =~ /\A1 OK / or die "no compression $buf"; + IMAPCdeflate->enable($self); + $self->{step} = 1; + $self->write(\"2 EXAMINE inbox.test.0\r\n"); + } + if ($self->{step} == 0) { + $self->do_read(\(my $buf = ''), 128) or return; + $buf =~ /\A\* OK / or die 'no greeting'; + $self->{step} = 1; + $self->write(\"2 EXAMINE inbox.test.0\r\n"); + } + if ($self->{step} == 1) { + my $buf = ''; + until ($buf =~ /^2 OK \[READ-ONLY/ms) { + $self->do_read(\$buf, 4096, length($buf)) or return; + } + $self->{step} = 2; + $self->write(\"3 UID FETCH 1 (UID FLAGS)\r\n"); + } + if ($self->{step} == 2) { + my $buf = ''; + until ($buf =~ /^3 OK /ms) { + $self->do_read(\$buf, 4096, length($buf)) or return; + } + $self->{step} = 3; + $self->write(\"4 IDLE\r\n"); + } + if ($self->{step} == 3) { + $self->do_read(\(my $buf = ''), 128) or return; + no warnings 'once'; + $::DONE++; + $self->{step} = 5; # all done + } else { + warn "$self->{step} Should never get here $self"; + } +} + +sub new { + my ($class, $io) = @_; + my $self = fields::new($class); + + # wait for connect(), and maybe SSL_connect() + $self->SUPER::new($io, EPOLLOUT|EPOLLONESHOT); + if ($io->can('connect_SSL')) { + $self->{wbuf} = [ \&connect_tls_step ]; + } + $self->{step} = FIRST_STEP; + $self; +} + +1; +package IMAPCdeflate; +use strict; +use base qw(IMAPC); # parent doesn't work for fields +use Hash::Util qw(unlock_hash); # dependency of fields for perl 5.10+, anyways +use Compress::Raw::Zlib; +use PublicInbox::IMAPdeflate; +my %ZIN_OPT; +BEGIN { + %ZIN_OPT = ( -WindowBits => -15, -AppendOutput => 1 ); + *write = \&PublicInbox::IMAPdeflate::write; + *do_read = \&PublicInbox::IMAPdeflate::do_read; +}; + +sub enable { + my ($class, $self) = @_; + my ($in, $err) = Compress::Raw::Zlib::Inflate->new(%ZIN_OPT); + die "Inflate->new failed: $err" if $err != Z_OK; + unlock_hash(%$self); + bless $self, $class; + $self->{zin} = $in; +} + +1; |