about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/DirIdle.pm1
-rw-r--r--lib/PublicInbox/Import.pm84
-rw-r--r--lib/PublicInbox/Lock.pm12
-rw-r--r--lib/PublicInbox/Spawn.pm2
-rw-r--r--lib/PublicInbox/V2Writable.pm28
-rw-r--r--lib/PublicInbox/WatchMaildir.pm36
-rw-r--r--t/psgi_search.t1
7 files changed, 101 insertions, 63 deletions
diff --git a/lib/PublicInbox/DirIdle.pm b/lib/PublicInbox/DirIdle.pm
index 89cce305..daa2212b 100644
--- a/lib/PublicInbox/DirIdle.pm
+++ b/lib/PublicInbox/DirIdle.pm
@@ -44,6 +44,7 @@ sub new {
 sub event_step {
         my ($self) = @_;
         my $cb = $self->{cb};
+        local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
         eval {
                 my @events = $self->{inot}->read; # Linux::Inotify2->read
                 $cb->($_) for @events;
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index b50c662c..07a49518 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -48,32 +48,35 @@ sub gfi_start {
 
         return ($self->{in}, $self->{out}) if $self->{pid};
 
-        my ($out_r, $out_w);
+        my (@ret, $out_r, $out_w);
         pipe($out_r, $out_w) or die "pipe failed: $!";
-        my $git = $self->{git};
 
         $self->lock_acquire;
-
-        local $/ = "\n";
-        my $ref = $self->{ref};
-        chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref));
-        if ($self->{path_type} ne '2/38' && $self->{tip}) {
-                local $/ = "\0";
-                my @tree = $git->qx(qw(ls-tree -r -z --name-only), $ref);
-                chomp @tree;
-                $self->{-tree} = { map { $_ => 1 } @tree };
+        eval {
+                my ($git, $ref) = @$self{qw(git ref)};
+                local $/ = "\n";
+                chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref));
+                if ($self->{path_type} ne '2/38' && $self->{tip}) {
+                        local $/ = "\0";
+                        my @t = $git->qx(qw(ls-tree -r -z --name-only), $ref);
+                        chomp @t;
+                        $self->{-tree} = { map { $_ => 1 } @t };
+                }
+                my @cmd = ('git', "--git-dir=$git->{git_dir}",
+                        qw(fast-import --quiet --done --date-format=raw));
+                my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r });
+                $out_w->autoflush(1);
+                $self->{in} = $in_r;
+                $self->{out} = $out_w;
+                $self->{pid} = $pid;
+                $self->{nchg} = 0;
+                @ret = ($in_r, $out_w);
+        };
+        if ($@) {
+                $self->lock_release;
+                die $@;
         }
-
-        my $git_dir = $git->{git_dir};
-        my @cmd = ('git', "--git-dir=$git_dir", qw(fast-import
-                        --quiet --done --date-format=raw));
-        my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r });
-        $out_w->autoflush(1);
-        $self->{in} = $in_r;
-        $self->{out} = $out_w;
-        $self->{pid} = $pid;
-        $self->{nchg} = 0;
-        ($in_r, $out_w);
+        @ret;
 }
 
 sub wfail () { die "write to fast-import failed: $!" }
@@ -175,13 +178,16 @@ sub _update_git_info ($$) {
                 my $env = { GIT_INDEX_FILE => $index };
                 run_die([@cmd, qw(read-tree -m -v -i), $self->{ref}], $env);
         }
-        run_die([@cmd, 'update-server-info']);
+        eval { run_die([@cmd, 'update-server-info']) };
         my $ibx = $self->{ibx};
-        ($ibx && $self->{path_type} eq '2/38') and eval {
-                require PublicInbox::SearchIdx;
-                my $s = PublicInbox::SearchIdx->new($ibx);
-                $s->index_sync({ ref => $self->{ref} });
-        };
+        if ($ibx && $ibx->version == 1 && -d "$ibx->{inboxdir}/public-inbox" &&
+                                eval { require PublicInbox::SearchIdx }) {
+                eval {
+                        my $s = PublicInbox::SearchIdx->new($ibx);
+                        $s->index_sync({ ref => $self->{ref} });
+                };
+                warn "$ibx->{inboxdir} index failed: $@\n" if $@;
+        }
         eval { run_die([@cmd, qw(gc --auto)]) } if $do_gc;
 }
 
@@ -460,17 +466,23 @@ sub init_bare {
 sub done {
         my ($self) = @_;
         my $w = delete $self->{out} or return;
-        my $r = delete $self->{in} or die 'BUG: missing {in} when done';
-        print $w "done\n" or wfail;
-        my $pid = delete $self->{pid} or die 'BUG: missing {pid} when done';
-        waitpid($pid, 0) == $pid or die 'fast-import did not finish';
-        $? == 0 or die "fast-import failed: $?";
-
+        eval {
+                my $r = delete $self->{in} or die 'BUG: missing {in} when done';
+                print $w "done\n" or wfail;
+                my $pid = delete $self->{pid} or
+                                die 'BUG: missing {pid} when done';
+                waitpid($pid, 0) == $pid or die 'fast-import did not finish';
+                $? == 0 or die "fast-import failed: $?";
+        };
+        my $wait_err = $@;
         my $nchg = delete $self->{nchg};
-        _update_git_info($self, 1) if $nchg;
+        if ($nchg && !$wait_err) {
+                eval { _update_git_info($self, 1) };
+                warn "E: $self->{git}->{git_dir} update info: $@\n" if $@;
+        }
         $self->lock_release(!!$nchg);
-
         $self->{git}->cleanup;
+        die $wait_err if $wait_err;
 }
 
 sub atfork_child {
diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm
index ca43682f..b2c8227f 100644
--- a/lib/PublicInbox/Lock.pm
+++ b/lib/PublicInbox/Lock.pm
@@ -16,20 +16,20 @@ sub lock_acquire {
         croak 'already locked '.($lock_path // '(undef)') if $self->{lockfh};
         return unless defined($lock_path);
         sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
-                die "failed to open lock $lock_path: $!\n";
-        flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
+                croak "failed to open $lock_path: $!\n";
+        flock($lockfh, LOCK_EX) or croak "lock $lock_path failed: $!\n";
         $self->{lockfh} = $lockfh;
 }
 
 sub lock_release {
         my ($self, $wake) = @_;
-        return unless $self->{lock_path};
-        my $lockfh = delete $self->{lockfh} or croak 'not locked';
+        defined(my $lock_path = $self->{lock_path}) or return;
+        my $lockfh = delete $self->{lockfh} or croak "not locked: $lock_path";
 
         syswrite($lockfh, '.') if $wake;
 
-        flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
-        close $lockfh or die "close failed: $!\n";
+        flock($lockfh, LOCK_UN) or croak "unlock $lock_path failed: $!\n";
+        close $lockfh or croak "close $lock_path failed: $!\n";
 }
 
 1;
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 50f31851..508d43fd 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -275,7 +275,7 @@ sub spawn ($;$$) {
         }
         my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
         my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd);
-        die "fork_exec failed: $!\n" unless $pid > 0;
+        die "fork_exec @$cmd failed: $!\n" unless $pid > 0;
         $pid;
 }
 
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index e071bc1e..e1c9a393 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -660,21 +660,35 @@ sub barrier { checkpoint($_[0], 1) };
 # public
 sub done {
         my ($self) = @_;
-        my $im = delete $self->{im};
-        $im->done if $im; # PublicInbox::Import::done
-        checkpoint($self);
-        my $mm = delete $self->{mm};
-        $mm->{dbh}->commit if $mm;
+        my $err = '';
+        if (my $im = delete $self->{im}) {
+                eval { $im->done }; # PublicInbox::Import::done
+                $err .= "import done: $@\n" if $@;
+        }
+        if (!$err) {
+                eval { checkpoint($self) };
+                $err .= "checkpoint: $@\n" if $@;
+        }
+        if (my $mm = delete $self->{mm}) {
+                my $m = $err ? 'rollback' : 'commit';
+                eval { $mm->{dbh}->$m };
+                $err .= "msgmap $m: $@\n" if $@;
+        }
         my $shards = delete $self->{idx_shards};
         if ($shards) {
-                $_->remote_close for @$shards;
+                for (@$shards) {
+                        eval { $_->remote_close };
+                        $err .= "shard close: $@\n" if $@;
+                }
         }
-        $self->{over}->disconnect;
+        eval { $self->{over}->disconnect };
+        $err .= "over disconnect: $@\n" if $@;
         delete $self->{bnote};
         my $nbytes = $self->{total_bytes};
         $self->{total_bytes} = 0;
         $self->lock_release(!!$nbytes) if $shards;
         $self->{ibx}->git->cleanup;
+        die $err if $err;
 }
 
 sub fill_alternates ($$) {
diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm
index 7547f6e4..fad708d8 100644
--- a/lib/PublicInbox/WatchMaildir.pm
+++ b/lib/PublicInbox/WatchMaildir.pm
@@ -124,8 +124,10 @@ sub new {
 sub _done_for_now {
         my ($self) = @_;
         local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
-        for (values %{$self->{importers}}) {
-                $_->done if $_; # $_ may be undef during cleanup
+        for my $im (values %{$self->{importers}}) {
+                next if !$im; # $im may be undef during cleanup
+                eval { $im->done };
+                warn "$im->{ibx}->{name} ->done: $@\n" if $@;
         }
 }
 
@@ -137,12 +139,15 @@ sub remove_eml_i { # each_inbox callback
                 $im->remove($eml, 'spam');
                 if (my $scrub = $ibx->filter($im)) {
                         my $scrubbed = $scrub->scrub($eml, 1);
-                        $scrubbed or return;
-                        $scrubbed == REJECT() and return;
-                        $im->remove($scrubbed, 'spam');
+                        if ($scrubbed && $scrubbed != REJECT) {
+                                $im->remove($scrubbed, 'spam');
+                        }
                 }
         };
-        warn "error removing spam at: $loc from $ibx->{name}: $@\n" if $@;
+        if ($@) {
+                warn "error removing spam at: $loc from $ibx->{name}: $@\n";
+                _done_for_now($self);
+        }
 }
 
 sub _remove_spam {
@@ -155,7 +160,6 @@ sub _remove_spam {
 
 sub import_eml ($$$) {
         my ($self, $ibx, $eml) = @_;
-        my $im = _importer_for($self, $ibx);
 
         # any header match means it's eligible for the inbox:
         if (my $watch_hdrs = $ibx->{-watchheaders}) {
@@ -167,13 +171,19 @@ sub import_eml ($$$) {
                 }
                 return unless $ok;
         }
-
-        if (my $scrub = $ibx->filter($im)) {
-                my $ret = $scrub->scrub($eml) or return;
-                $ret == REJECT() and return;
-                $eml = $ret;
+        eval {
+                my $im = _importer_for($self, $ibx);
+                if (my $scrub = $ibx->filter($im)) {
+                        my $scrubbed = $scrub->scrub($eml) or return;
+                        $scrubbed == REJECT and return;
+                        $eml = $scrubbed;
+                }
+                $im->add($eml, $self->{spamcheck});
+        };
+        if ($@) {
+                warn "$ibx->{name} add failed: $@\n";
+                _done_for_now($self);
         }
-        $im->add($eml, $self->{spamcheck});
 }
 
 sub _try_path {
diff --git a/t/psgi_search.t b/t/psgi_search.t
index 64f8b1ac..2d12ba6a 100644
--- a/t/psgi_search.t
+++ b/t/psgi_search.t
@@ -14,6 +14,7 @@ my @mods = qw(DBD::SQLite Search::Xapian HTTP::Request::Common Plack::Test
 require_mods(@mods);
 use_ok($_) for (qw(HTTP::Request::Common Plack::Test));
 use_ok 'PublicInbox::WWW';
+use_ok 'PublicInbox::SearchIdx';
 my ($tmpdir, $for_destroy) = tmpdir();
 
 my $ibx = PublicInbox::Inbox->new({