From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 27B3C1F8DB for ; Fri, 17 Jul 2020 06:31:56 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 4/8] with_umask: pass args to callback Date: Fri, 17 Jul 2020 06:31:51 +0000 Message-Id: <20200717063155.3734-5-e@yhbt.net> In-Reply-To: <20200717063155.3734-1-e@yhbt.net> References: <20200717063155.3734-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: While it makes the code flow slightly less well in some places, it saves us runtime allocations and indentation. --- lib/PublicInbox/InboxWritable.pm | 42 +++++++------ lib/PublicInbox/SearchIdx.pm | 40 ++++++------ lib/PublicInbox/V2Writable.pm | 102 ++++++++++++++----------------- lib/PublicInbox/Xapcmd.pm | 35 ++++++----- 4 files changed, 111 insertions(+), 108 deletions(-) diff --git a/lib/PublicInbox/InboxWritable.pm b/lib/PublicInbox/InboxWritable.pm index 875dcce2..1f3f6672 100644 --- a/lib/PublicInbox/InboxWritable.pm +++ b/lib/PublicInbox/InboxWritable.pm @@ -37,27 +37,33 @@ sub assert_usable_dir { die "no inboxdir defined for $self->{name}\n"; } +sub _init_v1 { + my ($self, $skip_artnum) = @_; + if (defined($self->{indexlevel}) || defined($skip_artnum)) { + require PublicInbox::SearchIdx; + require PublicInbox::Msgmap; + my $sidx = PublicInbox::SearchIdx->new($self, 1); # just create + $sidx->begin_txn_lazy; + if (defined $skip_artnum) { + my $mm = PublicInbox::Msgmap->new($self->{inboxdir}, 1); + $mm->{dbh}->begin_work; + $mm->skip_artnum($skip_artnum); + $mm->{dbh}->commit; + } + $sidx->commit_txn_lazy; + } else { + open my $fh, '>>', "$self->{inboxdir}/ssoma.lock" or + die "$self->{inboxdir}/ssoma.lock: $!\n"; + } +} + sub init_inbox { my ($self, $shards, $skip_epoch, $skip_artnum) = @_; if ($self->version == 1) { my $dir = assert_usable_dir($self); PublicInbox::Import::init_bare($dir); - if (defined($self->{indexlevel}) || defined($skip_artnum)) { - require PublicInbox::SearchIdx; - require PublicInbox::Msgmap; - my $sidx = PublicInbox::SearchIdx->new($self, 1); # just create - $sidx->begin_txn_lazy; - $self->with_umask(sub { - my $mm = PublicInbox::Msgmap->new($dir, 1); - $mm->{dbh}->begin_work; - $mm->skip_artnum($skip_artnum); - $mm->{dbh}->commit; - }) if defined($skip_artnum); - $sidx->commit_txn_lazy; - } else { - open my $fh, '>>', "$dir/ssoma.lock" or - die "$dir/ssoma.lock: $!\n"; - } + $self->umask_prepare; + $self->with_umask(\&_init_v1, $self, $skip_artnum); } else { my $v2w = importer($self); $v2w->init_inbox($shards, $skip_epoch, $skip_artnum); @@ -255,9 +261,9 @@ sub _umask_for { } sub with_umask { - my ($self, $cb) = @_; + my ($self, $cb, @arg) = @_; my $old = umask $self->{umask}; - my $rv = eval { $cb->() }; + my $rv = eval { $cb->(@arg) }; my $err = $@; umask $old; die $err if $err; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 4caa66d3..c93c9034 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -585,7 +585,7 @@ sub unindex_both { # git->cat_async callback sub index_sync { my ($self, $opts) = @_; delete $self->{lock_path} if $opts->{-skip_lock}; - $self->{-inbox}->with_umask(sub { $self->_index_sync($opts) }) + $self->{-inbox}->with_umask(\&_index_sync, $self, $opts); } sub too_big ($$$) { @@ -854,17 +854,18 @@ sub remote_remove { } } -sub begin_txn_lazy { +sub _begin_txn { my ($self) = @_; - return if $self->{txn}; + my $xdb = $self->{xdb} || $self->_xdb_acquire; + $self->{over}->begin_lazy if $self->{over}; + $xdb->begin_transaction if $xdb; + $self->{txn} = 1; + $xdb; +} - $self->{-inbox}->with_umask(sub { - my $xdb = $self->{xdb} || $self->_xdb_acquire; - $self->{over}->begin_lazy if $self->{over}; - $xdb->begin_transaction if $xdb; - $self->{txn} = 1; - $xdb; - }); +sub begin_txn_lazy { + my ($self) = @_; + $self->{-inbox}->with_umask(\&_begin_txn, $self) if !$self->{txn}; } # store 'indexlevel=medium' in v2 shard=0 and v1 (only one shard) @@ -882,16 +883,19 @@ sub set_indexlevel { } } +sub _commit_txn { + my ($self) = @_; + if (my $xdb = $self->{xdb}) { + set_indexlevel($self); + $xdb->commit_transaction; + } + $self->{over}->commit_lazy if $self->{over}; +} + sub commit_txn_lazy { my ($self) = @_; - delete $self->{txn} or return; - $self->{-inbox}->with_umask(sub { - if (my $xdb = $self->{xdb}) { - set_indexlevel($self); - $xdb->commit_transaction; - } - $self->{over}->commit_lazy if $self->{over}; - }); + delete($self->{txn}) and + $self->{-inbox}->with_umask(\&_commit_txn, $self); } sub worker_done { diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 0119ea76..b51c8525 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -147,10 +147,8 @@ sub init_inbox { # returns undef on duplicate or spam # mimics Import::add and wraps it for v2 sub add { - my ($self, $mime, $check_cb) = @_; - $self->{-inbox}->with_umask(sub { - _add($self, $mime, $check_cb) - }); + my ($self, $eml, $check_cb) = @_; + $self->{-inbox}->with_umask(\&_add, $self, $eml, $check_cb); } # indexes a message, returns true if checkpointing is needed @@ -276,6 +274,28 @@ sub idx_shard { $self->{idx_shards}->[$shard_i]; } +sub _idx_init { # with_umask callback + my ($self, $opt) = @_; + $self->lock_acquire unless $opt && $opt->{-skip_lock}; + $self->{over}->create; + + # xcpdb can change shard count while -watch is idle + my $nshards = count_shards($self); + $self->{shards} = $nshards if $nshards && $nshards != $self->{shards}; + + # need to create all shards before initializing msgmap FD + # idx_shards must be visible to all forked processes + my $max = $self->{shards} - 1; + my $idx = $self->{idx_shards} = []; + push @$idx, PublicInbox::SearchIdxShard->new($self, $_) for (0..$max); + + # Now that all subprocesses are up, we can open the FDs + # for SQLite: + my $mm = $self->{mm} = PublicInbox::Msgmap->new_file( + "$self->{-inbox}->{inboxdir}/msgmap.sqlite3", 1); + $mm->{dbh}->begin_work; +} + # idempotent sub idx_init { my ($self, $opt) = @_; @@ -285,13 +305,10 @@ sub idx_init { # do not leak read-only FDs to child processes, we only have these # FDs for duplicate detection so they should not be # frequently activated. + # delete @$ibx{qw(git mm search)}; delete $ibx->{$_} foreach (qw(git mm search)); - my $indexlevel = $ibx->{indexlevel}; - if ($indexlevel && $indexlevel eq 'basic') { - $self->{parallel} = 0; - } - + $self->{parallel} = 0 if ($ibx->{indexlevel}//'') eq 'basic'; if ($self->{parallel}) { pipe(my ($r, $w)) or die "pipe failed: $!"; # pipe for barrier notifications doesn't need to be big, @@ -301,33 +318,8 @@ sub idx_init { $w->autoflush(1); } - my $over = $self->{over}; $ibx->umask_prepare; - $ibx->with_umask(sub { - $self->lock_acquire unless ($opt && $opt->{-skip_lock}); - $over->create; - - # xcpdb can change shard count while -watch is idle - my $nshards = count_shards($self); - if ($nshards && $nshards != $self->{shards}) { - $self->{shards} = $nshards; - } - - # need to create all shards before initializing msgmap FD - my $max = $self->{shards} - 1; - - # idx_shards must be visible to all forked processes - my $idx = $self->{idx_shards} = []; - for my $i (0..$max) { - push @$idx, PublicInbox::SearchIdxShard->new($self, $i); - } - - # Now that all subprocesses are up, we can open the FDs - # for SQLite: - my $mm = $self->{mm} = PublicInbox::Msgmap->new_file( - "$self->{-inbox}->{inboxdir}/msgmap.sqlite3", 1); - $mm->{dbh}->begin_work; - }); + $ibx->with_umask(\&_idx_init, $self, $opt); } # returns an array mapping [ epoch => latest_commit ] @@ -379,24 +371,24 @@ sub content_matches ($$) { # used for removing or replacing (purging) sub rewrite_internal ($$;$$$) { - my ($self, $old_mime, $cmt_msg, $new_mime, $sref) = @_; + my ($self, $old_eml, $cmt_msg, $new_eml, $sref) = @_; $self->idx_init; my ($im, $need_reindex, $replace_map); if ($sref) { $replace_map = {}; # oid => sref - $need_reindex = [] if $new_mime; + $need_reindex = [] if $new_eml; } else { $im = $self->importer; } my $over = $self->{over}; - my $chashes = content_hashes($old_mime); - my @removed; - my $mids = mids($old_mime->header_obj); + my $chashes = content_hashes($old_eml); + my $removed = []; + my $mids = mids($old_eml->header_obj); # We avoid introducing new blobs into git since the raw content # can be slightly different, so we do not need the user-supplied # message now that we have the mids and content_hash - $old_mime = undef; + $old_eml = undef; my $mark; foreach my $mid (@$mids) { @@ -422,15 +414,15 @@ sub rewrite_internal ($$;$$$) { } foreach my $num (keys %gone) { my ($smsg, $mime, $orig) = @{$gone{$num}}; - # @removed should only be set once assuming + # $removed should only be set once assuming # no bugs in our deduplication code: - @removed = (undef, $mime, $smsg); + $removed = [ undef, $mime, $smsg ]; my $oid = $smsg->{blob}; if ($replace_map) { $replace_map->{$oid} = $sref; } else { ($mark, undef) = $im->remove($orig, $cmt_msg); - $removed[0] = $mark; + $removed->[0] = $mark; } $orig = undef; if ($need_reindex) { # ->replace @@ -447,28 +439,26 @@ sub rewrite_internal ($$;$$$) { $self->{last_commit}->[$self->{epoch_max}] = $cmt; } if ($replace_map && scalar keys %$replace_map) { - my $rewrites = _replace_oids($self, $new_mime, $replace_map); + my $rewrites = _replace_oids($self, $new_eml, $replace_map); return { rewrites => $rewrites, need_reindex => $need_reindex }; } - defined($mark) ? @removed : undef; + defined($mark) ? $removed : undef; } # public (see PublicInbox::Import->remove), but note the 3rd element # (retval[2]) is not part of the stable API shared with Import->remove sub remove { - my ($self, $mime, $cmt_msg) = @_; - my @ret; - $self->{-inbox}->with_umask(sub { - @ret = rewrite_internal($self, $mime, $cmt_msg); - }); - defined($ret[0]) ? @ret : undef; + my ($self, $eml, $cmt_msg) = @_; + my $r = $self->{-inbox}->with_umask(\&rewrite_internal, + $self, $eml, $cmt_msg); + defined($r) && defined($r->[0]) ? @$r: undef; } sub _replace ($$;$$) { - my ($self, $old_mime, $new_mime, $sref) = @_; - my $rewritten = $self->{-inbox}->with_umask(sub { - rewrite_internal($self, $old_mime, undef, $new_mime, $sref); - }) or return; + my ($self, $old_eml, $new_eml, $sref) = @_; + my $arg = [ $self, $old_eml, undef, $new_eml, $sref ]; + my $rewritten = $self->{-inbox}->with_umask(\&rewrite_internal, + $self, $old_eml, undef, $new_eml, $sref) or return; my $rewrites = $rewritten->{rewrites}; # ->done is called if there are rewrites since we gc+prune from git diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index a57fa559..c04f935c 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -229,6 +229,24 @@ sub prepare_run { sub check_compact () { runnable_or_die($XAPIAN_COMPACT) } +sub _run { + my ($ibx, $cb, $opt, $reindex) = @_; + my $im = $ibx->importer(0); + $im->lock_acquire; + my ($tmp, $queue) = prepare_run($ibx, $opt); + + # fine-grained locking if we prepare for reindex + if (!$opt->{-coarse_lock}) { + prepare_reindex($ibx, $im, $reindex); + $im->lock_release; + } + + $ibx->cleanup; + process_queue($queue, $cb, $opt); + $im->lock_acquire if !$opt->{-coarse_lock}; + commit_changes($ibx, $im, $tmp, $opt); +} + sub run { my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact' my $cb = \&${\"PublicInbox::Xapcmd::$task"}; @@ -248,22 +266,7 @@ sub run { local %SIG = %SIG; setup_signals(); $ibx->umask_prepare; - $ibx->with_umask(sub { - my $im = $ibx->importer(0); - $im->lock_acquire; - my ($tmp, $queue) = prepare_run($ibx, $opt); - - # fine-grained locking if we prepare for reindex - if (!$opt->{-coarse_lock}) { - prepare_reindex($ibx, $im, $reindex); - $im->lock_release; - } - - $ibx->cleanup; - process_queue($queue, $cb, $opt); - $im->lock_acquire if !$opt->{-coarse_lock}; - commit_changes($ibx, $im, $tmp, $opt); - }); + $ibx->with_umask(\&_run, $ibx, $cb, $opt, $reindex); } sub cpdb_retryable ($$) {