about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@yhbt.net>2020-07-17 06:31:51 +0000
committerEric Wong <e@yhbt.net>2020-07-17 20:56:51 +0000
commit1e63e0409004f1edb352f53729e2d4aed4825a22 (patch)
tree1c3db8856122e13122be01e15538220f6f04f18d
parentc6d0a81e6ca7a5da9f9ff193f0992705aa9f9682 (diff)
downloadpublic-inbox-1e63e0409004f1edb352f53729e2d4aed4825a22.tar.gz
While it makes the code flow slightly less well in some places,
it saves us runtime allocations and indentation.
-rw-r--r--lib/PublicInbox/InboxWritable.pm42
-rw-r--r--lib/PublicInbox/SearchIdx.pm40
-rw-r--r--lib/PublicInbox/V2Writable.pm102
-rw-r--r--lib/PublicInbox/Xapcmd.pm35
4 files changed, 111 insertions, 108 deletions
diff --git a/lib/PublicInbox/InboxWritable.pm b/lib/PublicInbox/InboxWritable.pm
index 875dcce2..1f3f6672 100644
--- a/lib/PublicInbox/InboxWritable.pm
+++ b/lib/PublicInbox/InboxWritable.pm
@@ -37,27 +37,33 @@ sub assert_usable_dir {
         die "no inboxdir defined for $self->{name}\n";
 }
 
+sub _init_v1 {
+        my ($self, $skip_artnum) = @_;
+        if (defined($self->{indexlevel}) || defined($skip_artnum)) {
+                require PublicInbox::SearchIdx;
+                require PublicInbox::Msgmap;
+                my $sidx = PublicInbox::SearchIdx->new($self, 1); # just create
+                $sidx->begin_txn_lazy;
+                if (defined $skip_artnum) {
+                        my $mm = PublicInbox::Msgmap->new($self->{inboxdir}, 1);
+                        $mm->{dbh}->begin_work;
+                        $mm->skip_artnum($skip_artnum);
+                        $mm->{dbh}->commit;
+                }
+                $sidx->commit_txn_lazy;
+        } else {
+                open my $fh, '>>', "$self->{inboxdir}/ssoma.lock" or
+                        die "$self->{inboxdir}/ssoma.lock: $!\n";
+        }
+}
+
 sub init_inbox {
         my ($self, $shards, $skip_epoch, $skip_artnum) = @_;
         if ($self->version == 1) {
                 my $dir = assert_usable_dir($self);
                 PublicInbox::Import::init_bare($dir);
-                if (defined($self->{indexlevel}) || defined($skip_artnum)) {
-                        require PublicInbox::SearchIdx;
-                        require PublicInbox::Msgmap;
-                        my $sidx = PublicInbox::SearchIdx->new($self, 1); # just create
-                        $sidx->begin_txn_lazy;
-                        $self->with_umask(sub {
-                                my $mm = PublicInbox::Msgmap->new($dir, 1);
-                                $mm->{dbh}->begin_work;
-                                $mm->skip_artnum($skip_artnum);
-                                $mm->{dbh}->commit;
-                        }) if defined($skip_artnum);
-                        $sidx->commit_txn_lazy;
-                } else {
-                        open my $fh, '>>', "$dir/ssoma.lock" or
-                                die "$dir/ssoma.lock: $!\n";
-                }
+                $self->umask_prepare;
+                $self->with_umask(\&_init_v1, $self, $skip_artnum);
         } else {
                 my $v2w = importer($self);
                 $v2w->init_inbox($shards, $skip_epoch, $skip_artnum);
@@ -255,9 +261,9 @@ sub _umask_for {
 }
 
 sub with_umask {
-        my ($self, $cb) = @_;
+        my ($self, $cb, @arg) = @_;
         my $old = umask $self->{umask};
-        my $rv = eval { $cb->() };
+        my $rv = eval { $cb->(@arg) };
         my $err = $@;
         umask $old;
         die $err if $err;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 4caa66d3..c93c9034 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -585,7 +585,7 @@ sub unindex_both { # git->cat_async callback
 sub index_sync {
         my ($self, $opts) = @_;
         delete $self->{lock_path} if $opts->{-skip_lock};
-        $self->{-inbox}->with_umask(sub { $self->_index_sync($opts) })
+        $self->{-inbox}->with_umask(\&_index_sync, $self, $opts);
 }
 
 sub too_big ($$$) {
@@ -854,17 +854,18 @@ sub remote_remove {
         }
 }
 
-sub begin_txn_lazy {
+sub _begin_txn {
         my ($self) = @_;
-        return if $self->{txn};
+        my $xdb = $self->{xdb} || $self->_xdb_acquire;
+        $self->{over}->begin_lazy if $self->{over};
+        $xdb->begin_transaction if $xdb;
+        $self->{txn} = 1;
+        $xdb;
+}
 
-        $self->{-inbox}->with_umask(sub {
-                my $xdb = $self->{xdb} || $self->_xdb_acquire;
-                $self->{over}->begin_lazy if $self->{over};
-                $xdb->begin_transaction if $xdb;
-                $self->{txn} = 1;
-                $xdb;
-        });
+sub begin_txn_lazy {
+        my ($self) = @_;
+        $self->{-inbox}->with_umask(\&_begin_txn, $self) if !$self->{txn};
 }
 
 # store 'indexlevel=medium' in v2 shard=0 and v1 (only one shard)
@@ -882,16 +883,19 @@ sub set_indexlevel {
         }
 }
 
+sub _commit_txn {
+        my ($self) = @_;
+        if (my $xdb = $self->{xdb}) {
+                set_indexlevel($self);
+                $xdb->commit_transaction;
+        }
+        $self->{over}->commit_lazy if $self->{over};
+}
+
 sub commit_txn_lazy {
         my ($self) = @_;
-        delete $self->{txn} or return;
-        $self->{-inbox}->with_umask(sub {
-                if (my $xdb = $self->{xdb}) {
-                        set_indexlevel($self);
-                        $xdb->commit_transaction;
-                }
-                $self->{over}->commit_lazy if $self->{over};
-        });
+        delete($self->{txn}) and
+                $self->{-inbox}->with_umask(\&_commit_txn, $self);
 }
 
 sub worker_done {
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 0119ea76..b51c8525 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -147,10 +147,8 @@ sub init_inbox {
 # returns undef on duplicate or spam
 # mimics Import::add and wraps it for v2
 sub add {
-        my ($self, $mime, $check_cb) = @_;
-        $self->{-inbox}->with_umask(sub {
-                _add($self, $mime, $check_cb)
-        });
+        my ($self, $eml, $check_cb) = @_;
+        $self->{-inbox}->with_umask(\&_add, $self, $eml, $check_cb);
 }
 
 # indexes a message, returns true if checkpointing is needed
@@ -276,6 +274,28 @@ sub idx_shard {
         $self->{idx_shards}->[$shard_i];
 }
 
+sub _idx_init { # with_umask callback
+        my ($self, $opt) = @_;
+        $self->lock_acquire unless $opt && $opt->{-skip_lock};
+        $self->{over}->create;
+
+        # xcpdb can change shard count while -watch is idle
+        my $nshards = count_shards($self);
+        $self->{shards} = $nshards if $nshards && $nshards != $self->{shards};
+
+        # need to create all shards before initializing msgmap FD
+        # idx_shards must be visible to all forked processes
+        my $max = $self->{shards} - 1;
+        my $idx = $self->{idx_shards} = [];
+        push @$idx, PublicInbox::SearchIdxShard->new($self, $_) for (0..$max);
+
+        # Now that all subprocesses are up, we can open the FDs
+        # for SQLite:
+        my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
+                "$self->{-inbox}->{inboxdir}/msgmap.sqlite3", 1);
+        $mm->{dbh}->begin_work;
+}
+
 # idempotent
 sub idx_init {
         my ($self, $opt) = @_;
@@ -285,13 +305,10 @@ sub idx_init {
         # do not leak read-only FDs to child processes, we only have these
         # FDs for duplicate detection so they should not be
         # frequently activated.
+        # delete @$ibx{qw(git mm search)};
         delete $ibx->{$_} foreach (qw(git mm search));
 
-        my $indexlevel = $ibx->{indexlevel};
-        if ($indexlevel && $indexlevel eq 'basic') {
-                $self->{parallel} = 0;
-        }
-
+        $self->{parallel} = 0 if ($ibx->{indexlevel}//'') eq 'basic';
         if ($self->{parallel}) {
                 pipe(my ($r, $w)) or die "pipe failed: $!";
                 # pipe for barrier notifications doesn't need to be big,
@@ -301,33 +318,8 @@ sub idx_init {
                 $w->autoflush(1);
         }
 
-        my $over = $self->{over};
         $ibx->umask_prepare;
-        $ibx->with_umask(sub {
-                $self->lock_acquire unless ($opt && $opt->{-skip_lock});
-                $over->create;
-
-                # xcpdb can change shard count while -watch is idle
-                my $nshards = count_shards($self);
-                if ($nshards && $nshards != $self->{shards}) {
-                        $self->{shards} = $nshards;
-                }
-
-                # need to create all shards before initializing msgmap FD
-                my $max = $self->{shards} - 1;
-
-                # idx_shards must be visible to all forked processes
-                my $idx = $self->{idx_shards} = [];
-                for my $i (0..$max) {
-                        push @$idx, PublicInbox::SearchIdxShard->new($self, $i);
-                }
-
-                # Now that all subprocesses are up, we can open the FDs
-                # for SQLite:
-                my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
-                        "$self->{-inbox}->{inboxdir}/msgmap.sqlite3", 1);
-                $mm->{dbh}->begin_work;
-        });
+        $ibx->with_umask(\&_idx_init, $self, $opt);
 }
 
 # returns an array mapping [ epoch => latest_commit ]
@@ -379,24 +371,24 @@ sub content_matches ($$) {
 
 # used for removing or replacing (purging)
 sub rewrite_internal ($$;$$$) {
-        my ($self, $old_mime, $cmt_msg, $new_mime, $sref) = @_;
+        my ($self, $old_eml, $cmt_msg, $new_eml, $sref) = @_;
         $self->idx_init;
         my ($im, $need_reindex, $replace_map);
         if ($sref) {
                 $replace_map = {}; # oid => sref
-                $need_reindex = [] if $new_mime;
+                $need_reindex = [] if $new_eml;
         } else {
                 $im = $self->importer;
         }
         my $over = $self->{over};
-        my $chashes = content_hashes($old_mime);
-        my @removed;
-        my $mids = mids($old_mime->header_obj);
+        my $chashes = content_hashes($old_eml);
+        my $removed = [];
+        my $mids = mids($old_eml->header_obj);
 
         # We avoid introducing new blobs into git since the raw content
         # can be slightly different, so we do not need the user-supplied
         # message now that we have the mids and content_hash
-        $old_mime = undef;
+        $old_eml = undef;
         my $mark;
 
         foreach my $mid (@$mids) {
@@ -422,15 +414,15 @@ sub rewrite_internal ($$;$$$) {
                 }
                 foreach my $num (keys %gone) {
                         my ($smsg, $mime, $orig) = @{$gone{$num}};
-                        # @removed should only be set once assuming
+                        # $removed should only be set once assuming
                         # no bugs in our deduplication code:
-                        @removed = (undef, $mime, $smsg);
+                        $removed = [ undef, $mime, $smsg ];
                         my $oid = $smsg->{blob};
                         if ($replace_map) {
                                 $replace_map->{$oid} = $sref;
                         } else {
                                 ($mark, undef) = $im->remove($orig, $cmt_msg);
-                                $removed[0] = $mark;
+                                $removed->[0] = $mark;
                         }
                         $orig = undef;
                         if ($need_reindex) { # ->replace
@@ -447,28 +439,26 @@ sub rewrite_internal ($$;$$$) {
                 $self->{last_commit}->[$self->{epoch_max}] = $cmt;
         }
         if ($replace_map && scalar keys %$replace_map) {
-                my $rewrites = _replace_oids($self, $new_mime, $replace_map);
+                my $rewrites = _replace_oids($self, $new_eml, $replace_map);
                 return { rewrites => $rewrites, need_reindex => $need_reindex };
         }
-        defined($mark) ? @removed : undef;
+        defined($mark) ? $removed : undef;
 }
 
 # public (see PublicInbox::Import->remove), but note the 3rd element
 # (retval[2]) is not part of the stable API shared with Import->remove
 sub remove {
-        my ($self, $mime, $cmt_msg) = @_;
-        my @ret;
-        $self->{-inbox}->with_umask(sub {
-                @ret = rewrite_internal($self, $mime, $cmt_msg);
-        });
-        defined($ret[0]) ? @ret : undef;
+        my ($self, $eml, $cmt_msg) = @_;
+        my $r = $self->{-inbox}->with_umask(\&rewrite_internal,
+                                                $self, $eml, $cmt_msg);
+        defined($r) && defined($r->[0]) ? @$r: undef;
 }
 
 sub _replace ($$;$$) {
-        my ($self, $old_mime, $new_mime, $sref) = @_;
-        my $rewritten = $self->{-inbox}->with_umask(sub {
-                rewrite_internal($self, $old_mime, undef, $new_mime, $sref);
-        }) or return;
+        my ($self, $old_eml, $new_eml, $sref) = @_;
+        my $arg = [ $self, $old_eml, undef, $new_eml, $sref ];
+        my $rewritten = $self->{-inbox}->with_umask(\&rewrite_internal,
+                        $self, $old_eml, undef, $new_eml, $sref) or return;
 
         my $rewrites = $rewritten->{rewrites};
         # ->done is called if there are rewrites since we gc+prune from git
diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm
index a57fa559..c04f935c 100644
--- a/lib/PublicInbox/Xapcmd.pm
+++ b/lib/PublicInbox/Xapcmd.pm
@@ -229,6 +229,24 @@ sub prepare_run {
 
 sub check_compact () { runnable_or_die($XAPIAN_COMPACT) }
 
+sub _run {
+        my ($ibx, $cb, $opt, $reindex) = @_;
+        my $im = $ibx->importer(0);
+        $im->lock_acquire;
+        my ($tmp, $queue) = prepare_run($ibx, $opt);
+
+        # fine-grained locking if we prepare for reindex
+        if (!$opt->{-coarse_lock}) {
+                prepare_reindex($ibx, $im, $reindex);
+                $im->lock_release;
+        }
+
+        $ibx->cleanup;
+        process_queue($queue, $cb, $opt);
+        $im->lock_acquire if !$opt->{-coarse_lock};
+        commit_changes($ibx, $im, $tmp, $opt);
+}
+
 sub run {
         my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact'
         my $cb = \&${\"PublicInbox::Xapcmd::$task"};
@@ -248,22 +266,7 @@ sub run {
         local %SIG = %SIG;
         setup_signals();
         $ibx->umask_prepare;
-        $ibx->with_umask(sub {
-                my $im = $ibx->importer(0);
-                $im->lock_acquire;
-                my ($tmp, $queue) = prepare_run($ibx, $opt);
-
-                # fine-grained locking if we prepare for reindex
-                if (!$opt->{-coarse_lock}) {
-                        prepare_reindex($ibx, $im, $reindex);
-                        $im->lock_release;
-                }
-
-                $ibx->cleanup;
-                process_queue($queue, $cb, $opt);
-                $im->lock_acquire if !$opt->{-coarse_lock};
-                commit_changes($ibx, $im, $tmp, $opt);
-        });
+        $ibx->with_umask(\&_run, $ibx, $cb, $opt, $reindex);
 }
 
 sub cpdb_retryable ($$) {