diff options
-rw-r--r-- | lib/PublicInbox/DirIdle.pm | 1 | ||||
-rw-r--r-- | lib/PublicInbox/Import.pm | 84 | ||||
-rw-r--r-- | lib/PublicInbox/Lock.pm | 12 | ||||
-rw-r--r-- | lib/PublicInbox/Spawn.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/V2Writable.pm | 28 | ||||
-rw-r--r-- | lib/PublicInbox/WatchMaildir.pm | 36 | ||||
-rw-r--r-- | t/psgi_search.t | 1 |
7 files changed, 101 insertions, 63 deletions
diff --git a/lib/PublicInbox/DirIdle.pm b/lib/PublicInbox/DirIdle.pm index 89cce305..daa2212b 100644 --- a/lib/PublicInbox/DirIdle.pm +++ b/lib/PublicInbox/DirIdle.pm @@ -44,6 +44,7 @@ sub new { sub event_step { my ($self) = @_; my $cb = $self->{cb}; + local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously eval { my @events = $self->{inot}->read; # Linux::Inotify2->read $cb->($_) for @events; diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index b50c662c..07a49518 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -48,32 +48,35 @@ sub gfi_start { return ($self->{in}, $self->{out}) if $self->{pid}; - my ($out_r, $out_w); + my (@ret, $out_r, $out_w); pipe($out_r, $out_w) or die "pipe failed: $!"; - my $git = $self->{git}; $self->lock_acquire; - - local $/ = "\n"; - my $ref = $self->{ref}; - chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref)); - if ($self->{path_type} ne '2/38' && $self->{tip}) { - local $/ = "\0"; - my @tree = $git->qx(qw(ls-tree -r -z --name-only), $ref); - chomp @tree; - $self->{-tree} = { map { $_ => 1 } @tree }; + eval { + my ($git, $ref) = @$self{qw(git ref)}; + local $/ = "\n"; + chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref)); + if ($self->{path_type} ne '2/38' && $self->{tip}) { + local $/ = "\0"; + my @t = $git->qx(qw(ls-tree -r -z --name-only), $ref); + chomp @t; + $self->{-tree} = { map { $_ => 1 } @t }; + } + my @cmd = ('git', "--git-dir=$git->{git_dir}", + qw(fast-import --quiet --done --date-format=raw)); + my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r }); + $out_w->autoflush(1); + $self->{in} = $in_r; + $self->{out} = $out_w; + $self->{pid} = $pid; + $self->{nchg} = 0; + @ret = ($in_r, $out_w); + }; + if ($@) { + $self->lock_release; + die $@; } - - my $git_dir = $git->{git_dir}; - my @cmd = ('git', "--git-dir=$git_dir", qw(fast-import - --quiet --done --date-format=raw)); - my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r }); - $out_w->autoflush(1); - $self->{in} = $in_r; - $self->{out} = $out_w; - $self->{pid} = $pid; - $self->{nchg} = 0; - ($in_r, $out_w); + @ret; } sub wfail () { die "write to fast-import failed: $!" } @@ -175,13 +178,16 @@ sub _update_git_info ($$) { my $env = { GIT_INDEX_FILE => $index }; run_die([@cmd, qw(read-tree -m -v -i), $self->{ref}], $env); } - run_die([@cmd, 'update-server-info']); + eval { run_die([@cmd, 'update-server-info']) }; my $ibx = $self->{ibx}; - ($ibx && $self->{path_type} eq '2/38') and eval { - require PublicInbox::SearchIdx; - my $s = PublicInbox::SearchIdx->new($ibx); - $s->index_sync({ ref => $self->{ref} }); - }; + if ($ibx && $ibx->version == 1 && -d "$ibx->{inboxdir}/public-inbox" && + eval { require PublicInbox::SearchIdx }) { + eval { + my $s = PublicInbox::SearchIdx->new($ibx); + $s->index_sync({ ref => $self->{ref} }); + }; + warn "$ibx->{inboxdir} index failed: $@\n" if $@; + } eval { run_die([@cmd, qw(gc --auto)]) } if $do_gc; } @@ -460,17 +466,23 @@ sub init_bare { sub done { my ($self) = @_; my $w = delete $self->{out} or return; - my $r = delete $self->{in} or die 'BUG: missing {in} when done'; - print $w "done\n" or wfail; - my $pid = delete $self->{pid} or die 'BUG: missing {pid} when done'; - waitpid($pid, 0) == $pid or die 'fast-import did not finish'; - $? == 0 or die "fast-import failed: $?"; - + eval { + my $r = delete $self->{in} or die 'BUG: missing {in} when done'; + print $w "done\n" or wfail; + my $pid = delete $self->{pid} or + die 'BUG: missing {pid} when done'; + waitpid($pid, 0) == $pid or die 'fast-import did not finish'; + $? == 0 or die "fast-import failed: $?"; + }; + my $wait_err = $@; my $nchg = delete $self->{nchg}; - _update_git_info($self, 1) if $nchg; + if ($nchg && !$wait_err) { + eval { _update_git_info($self, 1) }; + warn "E: $self->{git}->{git_dir} update info: $@\n" if $@; + } $self->lock_release(!!$nchg); - $self->{git}->cleanup; + die $wait_err if $wait_err; } sub atfork_child { diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm index ca43682f..b2c8227f 100644 --- a/lib/PublicInbox/Lock.pm +++ b/lib/PublicInbox/Lock.pm @@ -16,20 +16,20 @@ sub lock_acquire { croak 'already locked '.($lock_path // '(undef)') if $self->{lockfh}; return unless defined($lock_path); sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or - die "failed to open lock $lock_path: $!\n"; - flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; + croak "failed to open $lock_path: $!\n"; + flock($lockfh, LOCK_EX) or croak "lock $lock_path failed: $!\n"; $self->{lockfh} = $lockfh; } sub lock_release { my ($self, $wake) = @_; - return unless $self->{lock_path}; - my $lockfh = delete $self->{lockfh} or croak 'not locked'; + defined(my $lock_path = $self->{lock_path}) or return; + my $lockfh = delete $self->{lockfh} or croak "not locked: $lock_path"; syswrite($lockfh, '.') if $wake; - flock($lockfh, LOCK_UN) or die "unlock failed: $!\n"; - close $lockfh or die "close failed: $!\n"; + flock($lockfh, LOCK_UN) or croak "unlock $lock_path failed: $!\n"; + close $lockfh or croak "close $lock_path failed: $!\n"; } 1; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 50f31851..508d43fd 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -275,7 +275,7 @@ sub spawn ($;$$) { } my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work? my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd); - die "fork_exec failed: $!\n" unless $pid > 0; + die "fork_exec @$cmd failed: $!\n" unless $pid > 0; $pid; } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index e071bc1e..e1c9a393 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -660,21 +660,35 @@ sub barrier { checkpoint($_[0], 1) }; # public sub done { my ($self) = @_; - my $im = delete $self->{im}; - $im->done if $im; # PublicInbox::Import::done - checkpoint($self); - my $mm = delete $self->{mm}; - $mm->{dbh}->commit if $mm; + my $err = ''; + if (my $im = delete $self->{im}) { + eval { $im->done }; # PublicInbox::Import::done + $err .= "import done: $@\n" if $@; + } + if (!$err) { + eval { checkpoint($self) }; + $err .= "checkpoint: $@\n" if $@; + } + if (my $mm = delete $self->{mm}) { + my $m = $err ? 'rollback' : 'commit'; + eval { $mm->{dbh}->$m }; + $err .= "msgmap $m: $@\n" if $@; + } my $shards = delete $self->{idx_shards}; if ($shards) { - $_->remote_close for @$shards; + for (@$shards) { + eval { $_->remote_close }; + $err .= "shard close: $@\n" if $@; + } } - $self->{over}->disconnect; + eval { $self->{over}->disconnect }; + $err .= "over disconnect: $@\n" if $@; delete $self->{bnote}; my $nbytes = $self->{total_bytes}; $self->{total_bytes} = 0; $self->lock_release(!!$nbytes) if $shards; $self->{ibx}->git->cleanup; + die $err if $err; } sub fill_alternates ($$) { diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index 7547f6e4..fad708d8 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -124,8 +124,10 @@ sub new { sub _done_for_now { my ($self) = @_; local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously - for (values %{$self->{importers}}) { - $_->done if $_; # $_ may be undef during cleanup + for my $im (values %{$self->{importers}}) { + next if !$im; # $im may be undef during cleanup + eval { $im->done }; + warn "$im->{ibx}->{name} ->done: $@\n" if $@; } } @@ -137,12 +139,15 @@ sub remove_eml_i { # each_inbox callback $im->remove($eml, 'spam'); if (my $scrub = $ibx->filter($im)) { my $scrubbed = $scrub->scrub($eml, 1); - $scrubbed or return; - $scrubbed == REJECT() and return; - $im->remove($scrubbed, 'spam'); + if ($scrubbed && $scrubbed != REJECT) { + $im->remove($scrubbed, 'spam'); + } } }; - warn "error removing spam at: $loc from $ibx->{name}: $@\n" if $@; + if ($@) { + warn "error removing spam at: $loc from $ibx->{name}: $@\n"; + _done_for_now($self); + } } sub _remove_spam { @@ -155,7 +160,6 @@ sub _remove_spam { sub import_eml ($$$) { my ($self, $ibx, $eml) = @_; - my $im = _importer_for($self, $ibx); # any header match means it's eligible for the inbox: if (my $watch_hdrs = $ibx->{-watchheaders}) { @@ -167,13 +171,19 @@ sub import_eml ($$$) { } return unless $ok; } - - if (my $scrub = $ibx->filter($im)) { - my $ret = $scrub->scrub($eml) or return; - $ret == REJECT() and return; - $eml = $ret; + eval { + my $im = _importer_for($self, $ibx); + if (my $scrub = $ibx->filter($im)) { + my $scrubbed = $scrub->scrub($eml) or return; + $scrubbed == REJECT and return; + $eml = $scrubbed; + } + $im->add($eml, $self->{spamcheck}); + }; + if ($@) { + warn "$ibx->{name} add failed: $@\n"; + _done_for_now($self); } - $im->add($eml, $self->{spamcheck}); } sub _try_path { diff --git a/t/psgi_search.t b/t/psgi_search.t index 64f8b1ac..2d12ba6a 100644 --- a/t/psgi_search.t +++ b/t/psgi_search.t @@ -14,6 +14,7 @@ my @mods = qw(DBD::SQLite Search::Xapian HTTP::Request::Common Plack::Test require_mods(@mods); use_ok($_) for (qw(HTTP::Request::Common Plack::Test)); use_ok 'PublicInbox::WWW'; +use_ok 'PublicInbox::SearchIdx'; my ($tmpdir, $for_destroy) = tmpdir(); my $ibx = PublicInbox::Inbox->new({ |