about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-11-07 13:01:47 +0000
committerEric Wong <e@80x24.org>2023-11-07 13:23:36 +0000
commite97a30e7624dfb8645aa33a94844bcf28c7e1379 (patch)
tree9f28b7273c795b0e0579d42728fdfad1e5dbfeac /lib/PublicInbox
parentc1f27d21213f05bd4656e24ec5a5a076ebaa6afd (diff)
downloadpublic-inbox-e97a30e7624dfb8645aa33a94844bcf28c7e1379.tar.gz
When dealing with large search results, we need to deal with
EPIPE not just from the pager, but also EPIPE or ECONNRESET
between lei_xsearch and lei2mail processes.

Without this fix, lei_xsearch processes could linger and get
stuck writing to dead lei2mail processes if a user aborts the
pager early during a large result set.

To ensure lei_xsearch processes don't linger around after
lei2mail workers all die, we must close $l2m->{-wq_s2} before
spawning lei_xsearch processes, since $l2m->{-wq_s2} is only
used in lei2mail workers.

For `git cat-file' processes, we also need to trigger
PublicInbox::Git->close to handle unpredictable destructor
ordering to avoid using uninitialized IO refs.  This combines
with the `git_to_mail' change to deal with process cleanup
handling from premature shutdowns.

To test all this, we can't just rely on a single message being
large, but also need to rely on the result set being large
enough to saturate the lei_xsearch -> lei2mail socket so we
rely on GIANT_INBOX_DIR once again.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/Git.pm2
-rw-r--r--lib/PublicInbox/LeiOverview.pm3
-rw-r--r--lib/PublicInbox/LeiToMail.pm8
-rw-r--r--lib/PublicInbox/LeiXSearch.pm3
4 files changed, 13 insertions, 3 deletions
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 11712db2..292c359a 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -276,6 +276,7 @@ sub cat_async_step ($$) {
 
 sub cat_async_wait ($) {
         my ($self) = @_;
+        $self->close if !$self->{sock};
         my $inflight = $self->{inflight} or return;
         while (scalar(@$inflight)) {
                 cat_async_step($self, $inflight);
@@ -331,6 +332,7 @@ sub check_async_wait ($) {
         my ($self) = @_;
         return cat_async_wait($self) if $self->{-bc};
         my $ck = $self->{ck} or return;
+        $ck->close if !$ck->{sock};
         my $inflight = $ck->{inflight} or return;
         check_async_step($ck, $inflight) while (scalar(@$inflight));
 }
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 066c40bd..129dabf8 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -212,7 +212,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                 sub {
                         my ($smsg, $mitem, $eml) = @_;
                         $smsg->{pct} = get_pct($mitem) if $mitem;
-                        $l2m->wq_io_do('write_mail', [], $smsg, $eml);
+                        eval { $l2m->wq_io_do('write_mail', [], $smsg, $eml) };
+                        $lei->fail($@) if $@ && !$!{ECONNRESET} && !$!{EPIPE};
                 }
         } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
                 my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index e80163e2..b73af68a 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -8,11 +8,13 @@ use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::IO;
+use PublicInbox::Git;
 use PublicInbox::Spawn qw(spawn);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use PublicInbox::Syscall qw(rename_noreplace);
 use autodie qw(open seek close);
+use Carp qw(croak);
 
 my %kw2char = ( # Maildir characters
         draft => 'D',
@@ -132,8 +134,12 @@ sub eml2mboxcl2 {
 
 sub git_to_mail { # git->cat_async callback
         my ($bref, $oid, $type, $size, $smsg) = @_;
-        my $self = delete $smsg->{l2m} // die "BUG: no l2m";
         $type // return; # called by PublicInbox::Git::close
+        my $self = delete $smsg->{l2m};
+        if (!defined($self)) {
+                return if $PublicInbox::Git::in_cleanup;
+                croak "BUG: no l2m (type=$type)";
+        }
         eval {
                 if ($type eq 'missing' &&
                           ($bref = $self->{-lms_rw}->local_blob($oid, 1))) {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 5443188d..6c8dfe10 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -20,7 +20,7 @@ use PublicInbox::LEI;
 use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
 use PublicInbox::ContentHash qw(git_sha);
 use POSIX qw(strftime);
-use autodie qw(open read seek truncate);
+use autodie qw(close open read seek truncate);
 use PublicInbox::Syscall qw($F_SETPIPE_SZ);
 
 sub new {
@@ -543,6 +543,7 @@ sub do_query {
                 pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
                 fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
                 delete $l2m->{au_peers};
+                close(delete $l2m->{-wq_s2}); # share wq_s1 with lei_xsearch
         }
         $self->wq_workers_start('lei_xsearch', undef,
                                 $lei->oldset, { lei => $lei },