about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-04-03 10:48:27 +0000
committerEric Wong <e@80x24.org>2021-04-03 18:38:47 +0000
commit842e684f0a4154787274843eb3c9be2eef11b160 (patch)
treebece05a1c22a104953ec7007cfa7e23cdf4ac42b /lib
parentb3e2975029ae938bb232aaa0cbc3dabda55d57d6 (diff)
downloadpublic-inbox-842e684f0a4154787274843eb3c9be2eef11b160.tar.gz
Since every command that writes to lei/store calls ->done
to commit its output, we can rely on that to return a
pathname for a readable file with errors in it.

Errors can still get crossed up if multiple lei commands
are writing to the store at once, but reduces the delay
in seeing them and ensures it won't get seen when somebody
is attempting to use shell completion.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/LeiStore.pm69
-rw-r--r--lib/PublicInbox/LeiStoreErr.pm30
2 files changed, 88 insertions, 11 deletions
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 87082638..094e1555 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -3,9 +3,14 @@
 #
 # Local storage (cache/memo) for lei(1), suitable for personal/private
 # mail iff on encrypted device/FS.  Based on v2, but only deduplicates
-# based on git OID.
+# git storage based on git OID (index deduplication is done in ContentHash)
 #
 # for xref3, the following are constant: $eidx_key = '.', $xnum = -1
+#
+# We rely on the synchronous IPC API for this in lei-daemon and
+# multiple lei clients to write to it at once.  This allows the
+# lei/store IPC process to be decoupled from network latency in
+# lei WQ workers.
 package PublicInbox::LeiStore;
 use strict;
 use v5.10.1;
@@ -19,7 +24,10 @@ use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::MID qw(mids);
 use PublicInbox::LeiSearch;
 use PublicInbox::MDA;
+use PublicInbox::Spawn qw(spawn);
 use List::Util qw(max);
+use File::Temp ();
+use POSIX ();
 
 sub new {
         my (undef, $dir, $opt) = @_;
@@ -102,18 +110,27 @@ sub search {
         PublicInbox::LeiSearch->new($_[0]->{priv_eidx}->{topdir});
 }
 
+# follows the stderr file
+sub _tail_err {
+        my ($self) = @_;
+        print { $self->{-err_wr} } readline($self->{-tmp_err});
+}
+
 sub eidx_init {
         my ($self) = @_;
         my $eidx = $self->{priv_eidx};
+        my $tl = wantarray && $self->{-err_wr} ?
+                        PublicInbox::OnDestroy->new($$, \&_tail_err, $self) :
+                        undef;
         $eidx->idx_init({-private => 1});
-        $eidx;
+        wantarray ? ($eidx, $tl) : $eidx;
 }
 
 sub _docids_for ($$) {
         my ($self, $eml) = @_;
         my %docids;
+        my $eidx = $self->{priv_eidx};
         my ($chash, $mids) = PublicInbox::LeiSearch::content_key($eml);
-        my $eidx = eidx_init($self);
         my $oidx = $eidx->{oidx};
         my $im = $self->{im};
         for my $mid (@$mids) {
@@ -137,7 +154,7 @@ sub _docids_for ($$) {
 
 sub set_eml_vmd {
         my ($self, $eml, $vmd, $docids) = @_;
-        my $eidx = eidx_init($self);
+        my ($eidx, $tl) = eidx_init($self);
         $docids //= [ _docids_for($self, $eml) ];
         for my $docid (@$docids) {
                 $eidx->idx_shard($docid)->ipc_do('set_vmd', $docid, $vmd);
@@ -147,7 +164,7 @@ sub set_eml_vmd {
 
 sub add_eml_vmd {
         my ($self, $eml, $vmd) = @_;
-        my $eidx = eidx_init($self);
+        my ($eidx, $tl) = eidx_init($self);
         my @docids = _docids_for($self, $eml);
         for my $docid (@docids) {
                 $eidx->idx_shard($docid)->ipc_do('add_vmd', $docid, $vmd);
@@ -157,7 +174,7 @@ sub add_eml_vmd {
 
 sub remove_eml_vmd {
         my ($self, $eml, $vmd) = @_;
-        my $eidx = eidx_init($self);
+        my ($eidx, $tl) = eidx_init($self);
         my @docids = _docids_for($self, $eml);
         for my $docid (@docids) {
                 $eidx->idx_shard($docid)->ipc_do('remove_vmd', $docid, $vmd);
@@ -168,7 +185,7 @@ sub remove_eml_vmd {
 sub add_eml {
         my ($self, $eml, $vmd, $xoids) = @_;
         my $im = $self->importer; # may create new epoch
-        my $eidx = eidx_init($self); # writes ALL.git/objects/info/alternates
+        my ($eidx, $tl) = eidx_init($self); # updates/writes alternates file
         my $oidx = $eidx->{oidx}; # PublicInbox::Import::add checks this
         my $smsg = bless { -oidx => $oidx }, 'PublicInbox::Smsg';
         $im->add($eml, undef, $smsg) or return; # duplicate returns undef
@@ -257,7 +274,7 @@ sub _external_only ($$$) {
 
 sub update_xvmd {
         my ($self, $xoids, $eml, $vmd_mod) = @_;
-        my $eidx = eidx_init($self);
+        my ($eidx, $tl) = eidx_init($self);
         my $oidx = $eidx->{oidx};
         my %seen;
         for my $oid (keys %$xoids) {
@@ -294,7 +311,7 @@ sub update_xvmd {
 sub set_xvmd {
         my ($self, $xoids, $eml, $vmd) = @_;
 
-        my $eidx = eidx_init($self);
+        my ($eidx, $tl) = eidx_init($self);
         my $oidx = $eidx->{oidx};
         my %seen;
 
@@ -329,6 +346,21 @@ sub checkpoint {
         $self->{priv_eidx}->checkpoint($wait);
 }
 
+sub xchg_stderr {
+        my ($self) = @_;
+        _tail_err($self) if $self->{-err_wr};
+        my $dir = $self->{priv_eidx}->{topdir};
+        return unless -e $dir;
+        my $old = delete $self->{-tmp_err};
+        my $pfx = POSIX::strftime('%Y%m%d%H%M%S', gmtime(time));
+        my $err = File::Temp->new(TEMPLATE => "$pfx.$$.lei_storeXXXX",
+                                SUFFIX => '.err', DIR => $dir);
+        open STDERR, '>>', $err->filename or die "dup2: $!";
+        STDERR->autoflush(1); # shared with shard subprocesses
+        $self->{-tmp_err} = $err; # separate file description for RO access
+        undef;
+}
+
 sub done {
         my ($self) = @_;
         my $err = '';
@@ -339,7 +371,8 @@ sub done {
                         warn $err;
                 }
         }
-        $self->{priv_eidx}->done;
+        $self->{priv_eidx}->done; # V2Writable::done
+        xchg_stderr($self);
         die $err if $err;
 }
 
@@ -347,6 +380,11 @@ sub ipc_atfork_child {
         my ($self) = @_;
         my $lei = $self->{lei};
         $lei->_lei_atfork_child(1) if $lei;
+        xchg_stderr($self);
+        if (my $err = delete($self->{err_pipe})) {
+                close $err->[0];
+                $self->{-err_wr} = $err->[1];
+        }
         $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
         $self->SUPER::ipc_atfork_child;
 }
@@ -357,11 +395,20 @@ sub write_prepare {
                 my $d = $lei->store_path;
                 $self->ipc_lock_init("$d/ipc.lock");
                 substr($d, -length('/lei/store'), 10, '');
+                my $err_pipe;
+                unless ($lei->{oneshot}) {
+                        pipe(my ($r, $w)) or die "pipe: $!";
+                        $err_pipe = [ $r, $w ];
+                }
                 # Mail we import into lei are private, so headers filtered out
                 # by -mda for public mail are not appropriate
                 local @PublicInbox::MDA::BAD_HEADERS = ();
                 $self->ipc_worker_spawn("lei/store $d", $lei->oldset,
-                                        { lei => $lei });
+                                        { lei => $lei, err_pipe => $err_pipe });
+                if ($err_pipe) {
+                        require PublicInbox::LeiStoreErr;
+                        PublicInbox::LeiStoreErr->new($err_pipe->[0], $lei);
+                }
         }
         $lei->{sto} = $self;
 }
diff --git a/lib/PublicInbox/LeiStoreErr.pm b/lib/PublicInbox/LeiStoreErr.pm
new file mode 100644
index 00000000..68ce96d6
--- /dev/null
+++ b/lib/PublicInbox/LeiStoreErr.pm
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# forwards stderr from lei/store process to any lei clients using
+# the same store
+package PublicInbox::LeiStoreErr;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+
+sub new {
+        my ($cls, $rd, $lei) = @_;
+        my $self = bless { sock => $rd, store_path => $lei->store_path }, $cls;
+        $self->SUPER::new($rd, EPOLLIN | EPOLLONESHOT);
+}
+
+sub event_step {
+        my ($self) = @_;
+        $self->do_read(\(my $rbuf), 4096) or return;
+        my $cb;
+        for my $lei (values %PublicInbox::DS::DescriptorMap) {
+                $cb = $lei->can('store_path') // next;
+                next if $cb->($lei) ne $self->{store_path};
+                my $err = $lei->{2} // next;
+                print $err $rbuf;
+        }
+}
+
+1;