diff options
author | Eric Wong <e@80x24.org> | 2023-11-07 13:01:47 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2023-11-07 13:23:36 +0000 |
commit | e97a30e7624dfb8645aa33a94844bcf28c7e1379 (patch) | |
tree | 9f28b7273c795b0e0579d42728fdfad1e5dbfeac /lib/PublicInbox | |
parent | c1f27d21213f05bd4656e24ec5a5a076ebaa6afd (diff) | |
download | public-inbox-e97a30e7624dfb8645aa33a94844bcf28c7e1379.tar.gz |
When dealing with large search results, we need to deal with EPIPE not just from the pager, but also EPIPE or ECONNRESET between lei_xsearch and lei2mail processes. Without this fix, lei_xsearch processes could linger and get stuck writing to dead lei2mail processes if a user aborts the pager early during a large result set. To ensure lei_xsearch processes don't linger around after lei2mail workers all die, we must close $l2m->{-wq_s2} before spawning lei_xsearch processes, since $l2m->{-wq_s2} is only used in lei2mail workers. For `git cat-file' processes, we also need to trigger PublicInbox::Git->close to handle unpredictable destructor ordering to avoid using uninitialized IO refs. This combines with the `git_to_mail' change to deal with process cleanup handling from premature shutdowns. To test all this, we can't just rely on a single message being large, but also need to rely on the result set being large enough to saturate the lei_xsearch -> lei2mail socket so we rely on GIANT_INBOX_DIR once again.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/Git.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiOverview.pm | 3 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 8 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 3 |
4 files changed, 13 insertions, 3 deletions
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 11712db2..292c359a 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -276,6 +276,7 @@ sub cat_async_step ($$) { sub cat_async_wait ($) { my ($self) = @_; + $self->close if !$self->{sock}; my $inflight = $self->{inflight} or return; while (scalar(@$inflight)) { cat_async_step($self, $inflight); @@ -331,6 +332,7 @@ sub check_async_wait ($) { my ($self) = @_; return cat_async_wait($self) if $self->{-bc}; my $ck = $self->{ck} or return; + $ck->close if !$ck->{sock}; my $inflight = $ck->{inflight} or return; check_async_step($ck, $inflight) while (scalar(@$inflight)); } diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index 066c40bd..129dabf8 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -212,7 +212,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually sub { my ($smsg, $mitem, $eml) = @_; $smsg->{pct} = get_pct($mitem) if $mitem; - $l2m->wq_io_do('write_mail', [], $smsg, $eml); + eval { $l2m->wq_io_do('write_mail', [], $smsg, $eml) }; + $lei->fail($@) if $@ && !$!{ECONNRESET} && !$!{EPIPE}; } } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index e80163e2..b73af68a 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -8,11 +8,13 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::IO; +use PublicInbox::Git; use PublicInbox::Spawn qw(spawn); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); use PublicInbox::Syscall qw(rename_noreplace); use autodie qw(open seek close); +use Carp qw(croak); my %kw2char = ( # Maildir characters draft => 'D', @@ -132,8 +134,12 @@ sub eml2mboxcl2 { sub git_to_mail { # git->cat_async callback my ($bref, $oid, $type, $size, $smsg) = @_; - my $self = delete $smsg->{l2m} // die "BUG: no l2m"; $type // return; # called by PublicInbox::Git::close + my $self = delete $smsg->{l2m}; + if (!defined($self)) { + return if $PublicInbox::Git::in_cleanup; + croak "BUG: no l2m (type=$type)"; + } eval { if ($type eq 'missing' && ($bref = $self->{-lms_rw}->local_blob($oid, 1))) { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 5443188d..6c8dfe10 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -20,7 +20,7 @@ use PublicInbox::LEI; use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR); use PublicInbox::ContentHash qw(git_sha); use POSIX qw(strftime); -use autodie qw(open read seek truncate); +use autodie qw(close open read seek truncate); use PublicInbox::Syscall qw($F_SETPIPE_SZ); sub new { @@ -543,6 +543,7 @@ sub do_query { pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; delete $l2m->{au_peers}; + close(delete $l2m->{-wq_s2}); # share wq_s1 with lei_xsearch } $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }, |