about summary refs log tree commit homepage
path: root/lib/PublicInbox/LEI.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LEI.pm')
-rw-r--r--lib/PublicInbox/LEI.pm820
1 files changed, 454 insertions, 366 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index df0bfab6..e9a0de6c 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Backend for `lei' (local email interface).  Unlike the C10K-oriented
@@ -6,47 +6,49 @@
 # local clients with read/write access to the FS and use as many
 # system resources as the local user has access to.
 package PublicInbox::LEI;
-use strict;
-use v5.10.1;
+use v5.12;
 use parent qw(PublicInbox::DS PublicInbox::LeiExternal
         PublicInbox::LeiQuery);
+use autodie qw(bind chdir open pipe socket socketpair syswrite unlink);
 use Getopt::Long ();
-use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
-use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
+use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un);
+use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET);
 use Cwd qw(getcwd);
 use POSIX qw(strftime);
 use IO::Handle ();
 use Fcntl qw(SEEK_SET);
 use PublicInbox::Config;
-use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET);
-use PublicInbox::Sigfd;
-use PublicInbox::DS qw(now dwaitpid);
-use PublicInbox::Spawn qw(spawn popen_rd);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::Spawn qw(run_wait popen_rd run_qx);
 use PublicInbox::Lock;
 use PublicInbox::Eml;
 use PublicInbox::Import;
 use PublicInbox::ContentHash qw(git_sha);
+use PublicInbox::OnDestroy;
+use PublicInbox::IPC;
 use Time::HiRes qw(stat); # ctime comparisons for config cache
-use File::Path qw(mkpath);
+use File::Path ();
 use File::Spec;
+use Carp qw(carp);
 use Sys::Syslog qw(openlog syslog closelog);
 our $quit = \&CORE::exit;
-our ($current_lei, $errors_log, $listener, $oldset, $dir_idle,
-        $recv_cmd, $send_cmd);
+our ($current_lei, $errors_log, $listener, $oldset, $dir_idle);
 my $GLP = Getopt::Long::Parser->new;
 $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
 my $GLP_PASS = Getopt::Long::Parser->new;
 $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
 
-our %PATH2CFG; # persistent for socket daemon
-our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] }
+our (%PATH2CFG, # persistent for socket daemon
+$MDIR2CFGPATH, # location => { /path/to/config => [ ino watches ] }
+$OPT, # shared between optparse and opt_dash callback (for Getopt::Long)
+$daemon_pid
+);
 
 # TBD: this is a documentation mechanism to show a subcommand
 # (may) pass options through to another command:
 sub pass_through { $GLP_PASS }
 
-my $OPT;
-sub opt_dash ($$) {
+sub opt_dash ($$) { # callback runs inside optparse
         my ($spec, $re_str) = @_; # 'limit|n=i', '([0-9]+)'
         my ($key) = ($spec =~ m/\A([a-z]+)/g);
         my $cb = sub { # Getopt::Long "<>" catch-all handler
@@ -78,19 +80,16 @@ sub rel2abs {
                 return $p;
         }
         my $pwd = $self->{env}->{PWD};
-        my $cwd;
         if (defined $pwd) {
-                my $xcwd = $self->{3} //
-                        ($cwd = getcwd() // die "getcwd(PWD=$pwd): $!");
                 if (my @st_pwd = stat($pwd)) {
-                        my @st_cwd = stat($xcwd) or die "stat($xcwd): $!";
+                        my @st_cwd = stat($self->{3}) or die "stat({3}): $!";
                         "@st_pwd[1,0]" eq "@st_cwd[1,0]" or
                                 $self->{env}->{PWD} = $pwd = undef;
                 } else { # PWD was invalid
                         $self->{env}->{PWD} = $pwd = undef;
                 }
         }
-        $pwd //= $self->{env}->{PWD} = $cwd // getcwd() // die "getcwd: $!";
+        $pwd //= $self->{env}->{PWD} = getcwd() // die "getcwd: $!";
         File::Spec->rel2abs($p, $pwd);
 }
 
@@ -134,9 +133,10 @@ sub url_folder_cache {
 
 sub ale {
         my ($self) = @_;
-        $self->{ale} //= do {
+        $self->{ale} // do {
                 require PublicInbox::LeiALE;
-                $self->_lei_cfg(1)->{ale} //= PublicInbox::LeiALE->new($self);
+                my $cfg = $self->_lei_cfg(1);
+                $self->{ale} = $cfg->{ale} //= PublicInbox::LeiALE->new($self);
         };
 }
 
@@ -149,7 +149,7 @@ sub index_opt {
 
 my @c_opt = qw(c=s@ C=s@ quiet|q);
 my @net_opt = (qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt());
-my @lxs_opt = qw(remote! local! external! include|I=s@ exclude=s@ only=s@
+my @lxs_opt = qw(remote! local! external! include|I=s@ exclude=s@ only|O=s@
         import-remote!);
 
 # we don't support -C as an alias for --find-copies since it's already
@@ -162,7 +162,7 @@ our @diff_opt = qw(unified|U=i output-indicator-new=s output-indicator-old=s
         rename-empty! check ws-error-highlight=s full-index binary
         abbrev:i break-rewrites|B:s find-renames|M:s find-copies:s
         find-copies-harder irreversible-delete|D l=i diff-filter=s
-        S=s G=s find-object=s pickaxe-all pickaxe-regex O=s R
+        S=s G=s find-object=s pickaxe-all pickaxe-regex R
         relative:s text|a ignore-cr-at-eol ignore-space-at-eol
         ignore-space-change|b ignore-all-space|w ignore-blank-lines
         inter-hunk-context=i function-context|W exit-code ext-diff
@@ -177,13 +177,14 @@ our %CMD = ( # sorted in order of importance/use:
         'stdin|', # /|\z/ must be first for lone dash
         @lxs_opt, @net_opt,
         qw(save! output|mfolder|o=s format|f=s dedupe|d=s threads|t+
+        thread-id|T=s
         sort|s=s reverse|r offset=i pretty jobs|j=s globoff|g augment|a
         import-before! lock=s@ rsyncable alert=s@ mua=s verbose|v+
         shared color! mail-sync!), @c_opt, opt_dash('limit|n=i', '[0-9]+') ],
 
 'up' => [ 'OUTPUT...|--all', 'update saved search',
-        qw(jobs|j=s lock=s@ alert=s@ mua=s verbose|v+
-        remote-fudge-time=s all:s), @c_opt ],
+        qw(jobs|j=s lock=s@ alert=s@ mua=s verbose|v+ exclude=s@
+        remote-fudge-time=s all:s remote! local! external!), @net_opt, @c_opt ],
 
 'lcat' => [ '--stdin|MSGID_OR_URL...', 'display local copy of message(s)',
         'stdin|', # /|\z/ must be first for lone dash
@@ -201,8 +202,13 @@ our %CMD = ( # sorted in order of importance/use:
 'rediff' => [ '--stdin|LOCATION...',
                 'regenerate a diff with different options',
         'stdin|', # /|\z/ must be first for lone dash
-        qw(git-dir=s@ cwd! verbose|v+ color:s no-color drq:1 dequote-only:1),
-        @diff_opt, @lxs_opt, @net_opt, @c_opt ],
+        qw(git-dir=s@ cwd! verbose|v+ color:s no-color drq:1 dequote-only:1
+        order-file=s), @diff_opt, @lxs_opt, @net_opt, @c_opt ],
+
+'mail-diff' => [ '--stdin|LOCATION...', 'diff the contents of emails',
+        'stdin|', # /|\z/ must be first for lone dash
+        qw(verbose|v+ in-format|F=s color:s no-color raw-header),
+        @diff_opt, @net_opt, @c_opt ],
 
 'add-external' => [ 'LOCATION',
         'add/set priority of a publicinbox|extindex for extra matches',
@@ -214,34 +220,35 @@ our %CMD = ( # sorted in order of importance/use:
 'ls-mail-sync' => [ '[FILTER]', 'list mail sync folders',
                 qw(z|0 globoff|g invert-match|v local remote), @c_opt ],
 'ls-mail-source' => [ 'URL', 'list IMAP or NNTP mail source folders',
-                qw(z|0 ascii l pretty url), @c_opt ],
+                qw(z|0 ascii l pretty url), @net_opt, @c_opt ],
 'forget-external' => [ 'LOCATION...|--prune',
         'exclude further results from a publicinbox|extindex',
         qw(prune), @c_opt ],
 
 'ls-search' => [ '[PREFIX]', 'list saved search queries',
                 qw(format|f=s pretty l ascii z|0), @c_opt ],
-'forget-search' => [ 'OUTPUT', 'forget a saved search',
-                qw(verbose|v+), @c_opt ],
+'forget-search' => [ 'OUTPUT...|--prune', 'forget a saved search',
+                qw(verbose|v+ prune:s), @c_opt ],
 'edit-search' => [ 'OUTPUT', "edit saved search via `git config --edit'",
                         @c_opt ],
 'rm' => [ '--stdin|LOCATION...',
         'remove a message from the index and prevent reindexing',
         'stdin|', # /|\z/ must be first for lone dash
-        qw(in-format|F=s lock=s@), @net_opt, @c_opt ],
+        qw(in-format|F=s lock=s@ commit-delay=i), @net_opt, @c_opt ],
 'plonk' => [ '--threads|--from=IDENT',
         'exclude mail matching From: or threads from non-Message-ID searches',
         qw(stdin| threads|t from|f=s mid=s oid=s), @c_opt ],
-'tag' => [ 'KEYWORDS...',
+tag => [ 'KEYWORDS... LOCATION...|--stdin',
         'set/unset keywords and/or labels on message(s)',
-        qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@),
+        qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@ commit-delay=i),
         @net_opt, @c_opt, pass_through('-kw:foo for delete') ],
 
 'purge-mailsource' => [ 'LOCATION|--all',
         'remove imported messages from IMAP, Maildirs, and MH',
         qw(exact! all jobs:i indexed), @c_opt ],
 
-'add-watch' => [ 'LOCATION...', 'watch for new messages and flag changes',
+'add-watch' => [ 'LOCATION... [LABELS...]',
+        'watch for new messages and flag changes',
         qw(poll-interval=s state=s recursive|r), @c_opt ],
 'rm-watch' => [ 'LOCATION...', 'remove specified watch(es)',
         qw(recursive|r), @c_opt ],
@@ -252,35 +259,41 @@ our %CMD = ( # sorted in order of importance/use:
 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
         qw(prune), @c_opt ],
 
-'index' => [ 'LOCATION...', 'one-time index from URL or filesystem',
+'reindex' => [ '', 'reindex all locally-indexed messages', @c_opt ],
+
+'index' => [ 'LOCATION... [LABELS...]', 'one-time index from URL or filesystem',
         qw(in-format|F=s kw! offset=i recursive|r exclude=s include|I=s
         verbose|v+ incremental!), @net_opt, # mainly for --proxy=
          @c_opt ],
-'import' => [ 'LOCATION...|--stdin',
+import => [ 'LOCATION...|--stdin [LABELS...]',
         'one-time import/update from URL or filesystem',
         qw(stdin| offset=i recursive|r exclude=s include|I=s new-only
-        lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!),
+        lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!
+        commit-delay=i sort|s:s@),
         @net_opt, @c_opt ],
 'forget-mail-sync' => [ 'LOCATION...',
         'forget sync information for a mail folder', @c_opt ],
 'refresh-mail-sync' => [ 'LOCATION...|--all',
-        'prune dangling sync data for a mail folder', 'all:s', @c_opt ],
+        'prune dangling sync data for a mail folder', 'all:s',
+                @net_opt, @c_opt ],
 'export-kw' => [ 'LOCATION...|--all',
         'one-time export of keywords of sync sources',
         qw(all:s mode=s), @net_opt, @c_opt ],
 'convert' => [ 'LOCATION...|--stdin',
         'one-time conversion from URL or filesystem to another format',
-        qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s lock=s@ kw!),
+        qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s lock=s@ kw!
+                rsyncable sort|s:s@),
         @net_opt, @c_opt ],
-'p2q' => [ 'FILE|COMMIT_OID|--stdin',
+'p2q' => [ 'LOCATION_OR_COMMIT...|--stdin',
         "use a patch to generate a query for `lei q --stdin'",
-        qw(stdin| want|w=s@ uri debug), @c_opt ],
+        qw(stdin| in-format|F=s want|w=s@ uri debug), @net_opt, @c_opt ],
 'config' => [ '[...]', sub {
-                'git-config(1) wrapper for '._config_path($_[0]);
+                'git-config(1) wrapper for '._config_path($_[0]). "\n" .
+        '-l/--list and other common git-config uses are supported'
         }, qw(config-file|system|global|file|f=s), # for conflict detection
          qw(edit|e c=s@ C=s@), pass_through('git config') ],
 'inspect' => [ 'ITEMS...|--stdin', 'inspect lei/store and/or local external',
-        qw(stdin| pretty ascii dir=s), @c_opt ],
+        qw(stdin| pretty ascii dir|d=s), @c_opt ],
 
 'init' => [ '[DIRNAME]', sub {
         "initialize storage, default: ".store_path($_[0]);
@@ -310,6 +323,9 @@ our %CMD = ( # sorted in order of importance/use:
 my $stdin_formats = [ 'MAIL_FORMAT|eml|mboxrd|mboxcl2|mboxcl|mboxo',
                         'specify message input format' ];
 my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ];
+my $sort_out = [ 'VAL|received|relevance|docid',
+                "order of results is `--output'-dependent"];
+my $sort_in = [ 'sequence|mtime|size', 'sort input (format-dependent)' ];
 
 # we use \x{a0} (non-breaking SP) to avoid wrapping in PublicInbox::LeiHelp
 my %OPTDESC = (
@@ -333,7 +349,8 @@ my %OPTDESC = (
 'path-a|a=s' => 'pre-image pathname associated with OID',
 'path-b|b=s' => 'post-image pathname associated with OID',
 'git-dir=s@' => 'additional git repository to scan',
-'dir=s        inspect' => 'specify a inboxdir, extindex topdir or Xapian shard',
+'dir|d=s        inspect' =>
+        'specify a inboxdir, extindex topdir or Xapian shard',
 'proxy=s' => [ 'PROTO://HOST[:PORT]', # shared with curl(1)
         "proxy for (e.g. `socks5h://0:9050')" ],
 'torsocks=s' => ['VAL|auto|no|yes',
@@ -341,6 +358,7 @@ my %OPTDESC = (
 'no-torsocks' => 'alias for --torsocks=no',
 'save!' =>  "do not save a search for `lei up'",
 'import-remote!' => 'do not memoize remote messages into local store',
+'import-before!' => 'do not import before writing to output (DANGEROUS)',
 
 'type=s' => [ 'any|mid|git', 'disambiguate type' ],
 
@@ -351,6 +369,7 @@ my %OPTDESC = (
 
 'want|w=s@' => [ 'PREFIX|dfpost|dfn', # common ones in help...
                 'search prefixes to extract (default: dfpost7)' ],
+'uri        p2q' => [ 'URI escape output' ],
 
 'alert=s@' => ['CMD,:WINCH,:bell,<any command>',
         'run command(s) or perform ops when done writing to output ' .
@@ -391,10 +410,12 @@ my %OPTDESC = (
                 'exclude specified external(s) from search' ],
 'include|I=s@        q' => [ 'LOCATION',
                 'include specified external(s) in search' ],
-'only=s@        q' => [ 'LOCATION',
+'only|O=s@        q' => [ 'LOCATION',
                 'only use specified external(s) for search' ],
-'jobs=s        q' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
-                'control number of search and writer jobs' ],
+'jobs|j=s' => [ 'JOBSPEC',
+                'control number of query and writer jobs' .
+                "integers delimited by `,', either of which may be omitted"
+                ],
 'jobs|j=i        add-external' => 'set parallelism when indexing after --mirror',
 
 'in-format|F=s' => $stdin_formats,
@@ -406,11 +427,16 @@ my %OPTDESC = (
 'url        ls-mail-source' => 'show full URL of newsgroup or IMAP folder',
 'format|f=s        ls-external' => $ls_format,
 
+'prune:s        forget-search' =>
+        ['TYPE|local|remote', 'prune all, remote or local folders' ],
+
 'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ],
 'offset=i' => ['OFF', 'search result offset (default: 0)'],
 
-'sort|s=s' => [ 'VAL|received|relevance|docid',
-                "order of results is `--output'-dependent"],
+'sort|s=s        q' => $sort_out,
+'sort|s=s        lcat' => $sort_out,
+'sort|s:s@        convert' => $sort_in,
+'sort|s:s@        import' => $sort_in,
 'reverse|r' => 'reverse search results', # like sort(1)
 
 'boost=i' => 'increase/decrease priority of results (default: 0)',
@@ -444,22 +470,19 @@ my %OPTDESC = (
 'z|0' => 'use NUL \\0 instead of newline (CR) to delimit lines',
 
 'signal|s=s' => [ 'SIG', 'signal to send lei-daemon (default: TERM)' ],
+'edit|e        config' => 'open an editor to modify the lei config file',
 ); # %OPTDESC
 
 my %CONFIG_KEYS = (
         'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers
 
 sub _drop_wq {
         my ($self) = @_;
         for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) {
-                if ($wq->wq_kill) {
-                        $wq->wq_close(0, undef, $self);
-                } elsif ($wq->wq_kill_old) {
-                        $wq->wq_wait_old(undef, $self);
-                }
+                $wq->wq_kill(-POSIX::SIGTERM());
                 $wq->DESTROY;
         }
 }
@@ -472,11 +495,13 @@ sub x_it ($$) {
         stop_pager($self);
         if ($self->{pkt_op_p}) { # worker => lei-daemon
                 $self->{pkt_op_p}->pkt_do('x_it', $code);
+                exit($code >> 8) if $$ != $daemon_pid;
         } elsif ($self->{sock}) { # lei->daemon => lei(1) client
-                send($self->{sock}, "x_it $code", MSG_EOR);
+                send($self->{sock}, "x_it $code", 0);
         } elsif ($quit == \&CORE::exit) { # an admin (one-shot) command
                 exit($code >> 8);
         } # else ignore if client disconnected
+        $self->dclose if $$ == $daemon_pid;
 }
 
 sub err ($;@) {
@@ -485,7 +510,7 @@ sub err ($;@) {
         my @eor = (substr($_[-1]//'', -1, 1) eq "\n" ? () : ("\n"));
         print $err @_, @eor and return;
         my $old_err = delete $self->{2};
-        close($old_err) if $! == EPIPE && $old_err;
+        $old_err->close if $! == EPIPE && $old_err;
         $err = $self->{2} = ($self->{pgr} // [])->[2] // *STDERR{GLOB};
         print $err @_, @eor or print STDERR @_, @eor;
 }
@@ -500,7 +525,7 @@ sub qfin { # show message on finalization (LeiFinmsg)
 
 sub fail_handler ($;$$) {
         my ($lei, $code, $io) = @_;
-        close($io) if $io; # needed to avoid warnings on SIGPIPE
+        $io->close if $io; # needed to avoid warnings on SIGPIPE
         _drop_wq($lei);
         x_it($lei, $code // (1 << 8));
 }
@@ -509,18 +534,17 @@ sub sigpipe_handler { # handles SIGPIPE from @WQ_KEYS workers
         fail_handler($_[0], 13, delete $_[0]->{1});
 }
 
-# PublicInbox::OnDestroy callback for SIGINT to take out the entire pgid
-sub sigint_reap {
-        my ($pgid) = @_;
-        dwaitpid($pgid) if kill('-INT', $pgid);
-}
-
-sub fail ($$;$) {
-        my ($self, $buf, $exit_code) = @_;
-        $self->{failed}++;
-        err($self, $buf) if defined $buf;
-        $self->{pkt_op_p}->pkt_do('fail_handler') if $self->{pkt_op_p};
-        x_it($self, ($exit_code // 1) << 8);
+sub fail ($;@) {
+        my ($lei, @msg) = @_;
+        my $exit_code = ($msg[0]//'') =~ /\A-?[0-9]+\z/ ? shift(@msg) : undef;
+        local $current_lei = $lei;
+        $lei->{failed}++;
+        if (@msg) {
+                push @msg, "\n" if substr($msg[-1], -1, 1);
+                warn @msg;
+        }
+        $lei->{pkt_op_p}->pkt_do('fail_handler') if $lei->{pkt_op_p};
+        x_it($lei, $exit_code // (1 << 8));
         undef;
 }
 
@@ -537,20 +561,20 @@ sub puts ($;@) { out(shift, map { "$_\n" } @_) }
 
 sub child_error { # passes non-fatal curl exit codes to user
         my ($self, $child_error, $msg) = @_; # child_error is $?
+        local $current_lei = $self;
         $child_error ||= 1 << 8;
-        $self->err($msg) if $msg;
+        warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg;
+        $self->{child_error} ||= $child_error;
         if ($self->{pkt_op_p}) { # to top lei-daemon
                 $self->{pkt_op_p}->pkt_do('child_error', $child_error);
         } elsif ($self->{sock}) { # to lei(1) client
-                send($self->{sock}, "child_error $child_error", MSG_EOR);
-        } else { # non-lei admin command
-                $self->{child_error} ||= $child_error;
+                send($self->{sock}, "child_error $child_error", 0);
         } # else noop if client disconnected
 }
 
 sub note_sigpipe { # triggers sigpipe_handler
         my ($self, $fd) = @_;
-        close(delete($self->{$fd})); # explicit close silences Perl warning
+        delete($self->{$fd})->close; # explicit close silences Perl warning
         $self->{pkt_op_p}->pkt_do('sigpipe_handler') if $self->{pkt_op_p};
         x_it($self, 13);
 }
@@ -558,17 +582,21 @@ sub note_sigpipe { # triggers sigpipe_handler
 sub _lei_atfork_child {
         my ($self, $persist) = @_;
         # we need to explicitly close things which are on stack
+        my $cfg = $self->{cfg};
+        delete @$cfg{qw(-watches -lei_note_event)};
         if ($persist) {
-                chdir '/' or die "chdir(/): $!";
+                open $self->{3}, '<', '/';
+                fchdir($self);
                 close($_) for (grep(defined, delete @$self{qw(0 1 2 sock)}));
-                if (my $cfg = $self->{cfg}) {
-                        delete @$cfg{qw(-lei_store -watches -lei_note_event)};
-                }
+                delete $cfg->{-lei_store};
         } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly
-                open STDERR, '+>&='.fileno($self->{2}) or warn "open $!";
+                open STDERR, '+>&='.fileno($self->{2}); # idempotent w/ fileno
                 STDERR->autoflush(1);
+                $self->{2} = \*STDERR;
+                POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!";
         }
-        close($_) for (grep(defined, delete @$self{qw(3 old_1 au_done)}));
+        close($_) for (grep(defined, delete @$self{qw(old_1 au_done)}));
+        close($_) for (@{delete($self->{-socks}) // []});
         if (my $op_c = delete $self->{pkt_op_c}) {
                 close(delete $op_c->{sock});
         }
@@ -580,12 +608,17 @@ sub _lei_atfork_child {
         $dir_idle->force_close if $dir_idle;
         undef $dir_idle;
         %PATH2CFG = ();
-        $MDIR2CFGPATH = {};
+        $MDIR2CFGPATH = undef;
         eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush';
         undef $errors_log;
         $quit = \&CORE::exit;
-        $self->{-eml_noisy} or # only "lei import" sets this atm
-                $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
+        if (!$self->{-eml_noisy}) { # only "lei import" sets this atm
+                my $cb = $SIG{__WARN__} // \&CORE::warn;
+                $SIG{__WARN__} = sub {
+                        $cb->(@_) unless PublicInbox::Eml::warn_ignore(@_)
+                };
+        }
+        $SIG{TERM} = sub { exit(128 + 15) };
         $current_lei = $persist ? undef : $self; # for SIG{__WARN__}
 }
 
@@ -600,16 +633,16 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
 
 sub pkt_op_pair {
         my ($self) = @_;
-        require PublicInbox::OnDestroy;
         require PublicInbox::PktOp;
-        my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self);
+        my $end = on_destroy \&_delete_pkt_op, $self;
         @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair;
         $end;
 }
 
 sub incr {
-        my ($self, $field, $nr) = @_;
-        $self->{counters}->{$field} += $nr;
+        my $lei = shift;
+        $lei->{incr_pid} = $$ if @_;
+        while (my ($f, $n) = splice(@_, 0, 2)) { $lei->{$f} += $n }
 }
 
 sub pkt_ops {
@@ -624,12 +657,16 @@ sub pkt_ops {
 
 sub workers_start {
         my ($lei, $wq, $jobs, $ops, $flds) = @_;
-        $ops = pkt_ops($lei, { ($ops ? %$ops : ()) });
+        $ops //= {};
+        ($wq->can('net_merge_all_done') && $lei->{auth}) and
+                $lei->{auth}->op_merge($ops, $wq, $lei);
+        pkt_ops($lei, $ops);
         $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
         my $end = $lei->pkt_op_pair;
         my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
         $flds->{lei} = $lei;
-        $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
+        $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds,
+                $wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
         delete $lei->{pkt_op_p};
         my $op_c = delete $lei->{pkt_op_c};
         @$end = ();
@@ -640,12 +677,23 @@ sub workers_start {
 # call this when we're ready to wait on events and yield to other clients
 sub wait_wq_events {
         my ($lei, $op_c, $ops) = @_;
+        my $wq1 = $lei->{wq1};
+        ($wq1 && $wq1->can('net_merge_all_done') && !$lei->{auth}) and
+                $wq1->net_merge_all_done;
         for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
-                $wq->wq_close(1);
+                $wq->wq_close;
         }
+        $wq1->{lei_sock} = $lei->{sock} if $wq1;
         $op_c->{ops} = $ops;
 }
 
+sub wq1_start {
+        my ($lei, $wq, $jobs) = @_;
+        my ($op_c, $ops) = workers_start($lei, $wq, $jobs // 1);
+        $lei->{wq1} = $wq;
+        wait_wq_events($lei, $op_c, $ops); # net_merge_all_done if !{auth}
+}
+
 sub _help {
         require PublicInbox::LeiHelp;
         PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
@@ -656,7 +704,7 @@ sub optparse ($$$) {
         # allow _complete --help to complete, not show help
         return 1 if substr($cmd, 0, 1) eq '_';
         $self->{cmd} = $cmd;
-        $OPT = $self->{opt} //= {};
+        local $OPT = $self->{opt} //= {};
         my $info = $CMD{$cmd} // [ '[...]' ];
         my ($proto, undef, @spec) = @$info;
         my $glp = ref($spec[-1]) eq ref($GLP) ? pop(@spec) : $GLP;
@@ -676,6 +724,12 @@ sub optparse ($$$) {
         # "-" aliases "stdin" or "clear"
         $OPT->{$lone_dash} = ${$OPT->{$lone_dash}} if defined $lone_dash;
 
+        if ($proto =~ s/\s*\[?(?:KEYWORDS|LABELS)\.\.\.\]?\s*//g) {
+                require PublicInbox::LeiInput;
+                my @err = PublicInbox::LeiInput::vmd_mod_extract($self, $argv);
+                return $self->fail(join("\n", @err)) if @err;
+        }
+
         my $i = 0;
         my $POS_ARG = '[A-Z][A-Z0-9_]+';
         my ($err, $inf);
@@ -703,11 +757,10 @@ sub optparse ($$$) {
                                         # w/o args means stdin
                                         if ($sw eq 'stdin' && !@$argv &&
                                                         (-p $self->{0} ||
-                                                         -f _) && -r _) {
+                                                         -f _)) {
                                                 $OPT->{stdin} //= 1;
                                         }
-                                        $ok = defined($OPT->{$sw});
-                                        last if $ok;
+                                        $ok = defined($OPT->{$sw}) and last;
                                 } elsif (defined($argv->[$i])) {
                                         $ok = 1;
                                         $i++;
@@ -728,38 +781,7 @@ sub optparse ($$$) {
         $err ? fail($self, "usage: lei $cmd $proto\nE: $err") : 1;
 }
 
-sub _tmp_cfg { # for lei -c <name>=<value> ...
-        my ($self) = @_;
-        my $cfg = _lei_cfg($self, 1);
-        require File::Temp;
-        my $ft = File::Temp->new(TEMPLATE => 'lei_cfg-XXXX', TMPDIR => 1);
-        my $tmp = { '-f' => $ft->filename, -tmp => $ft };
-        $ft->autoflush(1);
-        print $ft <<EOM or return fail($self, "$tmp->{-f}: $!");
-[include]
-        path = $cfg->{-f}
-EOM
-        $tmp = $self->{cfg} = bless { %$cfg, %$tmp }, ref($cfg);
-        for (@{$self->{opt}->{c}}) {
-                /\A([^=\.]+\.[^=]+)(?:=(.*))?\z/ or return fail($self, <<EOM);
-`-c $_' is not of the form -c <name>=<value>'
-EOM
-                my $name = $1;
-                my $value = $2 // 1;
-                _config($self, '--add', $name, $value);
-                if (defined(my $v = $tmp->{$name})) {
-                        if (ref($v) eq 'ARRAY') {
-                                push @$v, $value;
-                        } else {
-                                $tmp->{$name} = [ $v, $value ];
-                        }
-                } else {
-                        $tmp->{$name} = $value;
-                }
-        }
-}
-
-sub lazy_cb ($$$) {
+sub lazy_cb ($$$) { # $pfx is _complete_ or lei_
         my ($self, $cmd, $pfx) = @_;
         my $ucmd = $cmd;
         $ucmd =~ tr/-/_/;
@@ -772,11 +794,19 @@ sub lazy_cb ($$$) {
                 $pkg->can($pfx.$ucmd) : undef;
 }
 
+sub do_env {
+        my $lei = shift;
+        fchdir($lei);
+        my $cb = shift // return ($lei, %{$lei->{env}}) ;
+        local ($current_lei, %ENV) = ($lei, %{$lei->{env}});
+        $cb = $lei->can($cb) if !ref($cb); # $cb may be a scalar sub name
+        eval { $cb->($lei, @_) };
+        $lei->fail($@) if $@;
+}
+
 sub dispatch {
         my ($self, $cmd, @argv) = @_;
-        fchdir($self) or return;
-        local %ENV = %{$self->{env}};
-        local $current_lei = $self; # for __WARN__
+        local ($current_lei, %ENV) = do_env($self);
         $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
         return _help($self, 'no command given') unless defined($cmd);
         # do not support Getopt bundling for this
@@ -788,13 +818,12 @@ sub dispatch {
         }
         if (my $cb = lazy_cb(__PACKAGE__, $cmd, 'lei_')) {
                 optparse($self, $cmd, \@argv) or return;
-                $self->{opt}->{c} and (_tmp_cfg($self) // return);
                 if (my $chdir = $self->{opt}->{C}) {
                         for my $d (@$chdir) {
                                 next if $d eq ''; # same as git(1)
-                                chdir $d or return fail($self, "cd $d: $!");
+                                chdir $d;
                         }
-                        open $self->{3}, '.' or return fail($self, "open . $!");
+                        open($self->{3}, '<', '.');
                 }
                 $cb->($self, @argv);
         } elsif (grep(/\A-/, $cmd, @argv)) { # --help or -h only
@@ -812,28 +841,30 @@ sub _lei_cfg ($;$) {
         my $f = _config_path($self);
         my @st = stat($f);
         my $cur_st = @st ? pack('dd', $st[10], $st[7]) : ''; # 10:ctime, 7:size
-        my ($sto, $sto_dir, $watches, $lne);
-        if (my $cfg = $PATH2CFG{$f}) { # reuse existing object in common case
-                return ($self->{cfg} = $cfg) if $cur_st eq $cfg->{-st};
+        my ($sto, $sto_dir, $watches, $lne, $cfg);
+        if ($cfg = $PATH2CFG{$f}) { # reuse existing object in common case
+                ($cur_st eq $cfg->{-st} && !$self->{opt}->{c}) and
+                        return ($self->{cfg} = $cfg);
+                # reuse some fields below if they match:
                 ($sto, $sto_dir, $watches, $lne) =
                                 @$cfg{qw(-lei_store leistore.dir -watches
                                         -lei_note_event)};
         }
         if (!@st) {
-                unless ($creat) {
-                        delete $self->{cfg};
-                        return bless {}, 'PublicInbox::Config';
+                unless ($creat) { # any commands which write to cfg must creat
+                        $cfg = PublicInbox::Config->git_config_dump(
+                                                        '/dev/null', $self);
+                        return ($self->{cfg} = $cfg);
                 }
                 my ($cfg_dir) = ($f =~ m!(.*?/)[^/]+\z!);
-                -d $cfg_dir or mkpath($cfg_dir) or die "mkpath($cfg_dir): $!\n";
-                open my $fh, '>>', $f or die "open($f): $!\n";
+                File::Path::mkpath($cfg_dir);
+                open my $fh, '>>', $f;
                 @st = stat($fh) or die "fstat($f): $!\n";
                 $cur_st = pack('dd', $st[10], $st[7]);
                 qerr($self, "# $f created") if $self->{cmd} ne 'config';
         }
-        my $cfg = PublicInbox::Config->git_config_dump($f, $self->{2});
+        $cfg = PublicInbox::Config->git_config_dump($f, $self);
         $cfg->{-st} = $cur_st;
-        $cfg->{'-f'} = $f;
         if ($sto && canonpath_harder($sto_dir // store_path($self))
                         eq canonpath_harder($cfg->{'leistore.dir'} //
                                                 store_path($self))) {
@@ -843,11 +874,9 @@ sub _lei_cfg ($;$) {
         }
         if (scalar(keys %PATH2CFG) > 5) {
                 # FIXME: use inotify/EVFILT_VNODE to detect unlinked configs
-                for my $k (keys %PATH2CFG) {
-                        delete($PATH2CFG{$k}) unless -f $k
-                }
+                delete(@PATH2CFG{grep(!-f, keys %PATH2CFG)});
         }
-        $self->{cfg} = $PATH2CFG{$f} = $cfg;
+        $self->{cfg} = $self->{opt}->{c} ? $cfg : ($PATH2CFG{$f} = $cfg);
         refresh_watches($self);
         $cfg;
 }
@@ -863,16 +892,49 @@ sub _lei_store ($;$) {
         };
 }
 
+# returns true on success, undef
+# argv[0] eq `+e' means errors do not ->fail # (like `sh +e')
 sub _config {
         my ($self, @argv) = @_;
-        my %env = (%{$self->{env}}, GIT_CONFIG => undef);
+        my $err_ok = ($argv[0] // '') eq '+e' ? shift(@argv) : undef;
+        my %env;
+        my %opt = map { $_ => $self->{$_} } (0..2);
         my $cfg = _lei_cfg($self, 1);
-        my $cmd = [ qw(git config -f), $cfg->{'-f'}, @argv ];
-        my %rdr = map { $_ => $self->{$_} } (0..2);
-        waitpid(spawn($cmd, \%env, \%rdr), 0);
+        my $opt_c = delete local $cfg->{-opt_c};
+        my @file_arg;
+        if ($opt_c) {
+                my ($set, $get, $nondash);
+                for (@argv) { # order matters for git-config
+                        if (!$nondash) {
+                                if (/\A--(?:add|rename-section|remove-section|
+                                                replace-all|
+                                                unset-all|unset)\z/x) {
+                                        ++$set;
+                                } elsif ($_ eq '-l' || $_ eq '--list' ||
+                                                /\A--get/) {
+                                        ++$get;
+                                } elsif (/\A-/) { # -z and such
+                                } else {
+                                        ++$nondash;
+                                }
+                        } else {
+                                ++$nondash;
+                        }
+                }
+                if ($set || ($nondash//0) > 1 && !$get) {
+                        @file_arg = ('-f', $cfg->{-f});
+                        $env{GIT_CONFIG} = $file_arg[1];
+                } else { # OK, we can use `-c n=v' for read-only
+                        $cfg->{-opt_c} = $opt_c;
+                        $env{GIT_CONFIG} = undef;
+                }
+        }
+        my $cmd = $cfg->config_cmd(\%env, \%opt);
+        push @$cmd, @file_arg, @argv;
+        run_wait($cmd, \%env, \%opt) ? ($err_ok ? undef : fail($self, $?)) : 1;
 }
 
-sub lei_daemon_pid { puts shift, $$ }
+sub lei_daemon_pid { puts shift, $daemon_pid }
 
 sub lei_daemon_kill {
         my ($self) = @_;
@@ -977,9 +1039,10 @@ sub start_mua {
                 $io->[0] = $self->{1} if $self->{opt}->{stdin} && -t $self->{1};
                 send_exec_cmd($self, $io, \@cmd, {});
         }
-        if ($self->{lxs} && $self->{au_done}) { # kick wait_startq
-                syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0));
-        }
+
+        # kick wait_startq:
+        syswrite($self->{au_done}, 'q') if $self->{lxs} && $self->{au_done};
+
         return unless -t $self->{2}; # XXX how to determine non-TUI MUAs?
         $self->{opt}->{quiet} = 1;
         delete $self->{-progress};
@@ -988,9 +1051,11 @@ sub start_mua {
 
 sub send_exec_cmd { # tell script/lei to execute a command
         my ($self, $io, $cmd, $env) = @_;
-        my $sock = $self->{sock} // die 'lei client gone';
-        my $fds = [ map { fileno($_) } @$io ];
-        $send_cmd->($sock, $fds, exec_buf($cmd, $env), MSG_EOR);
+        $PublicInbox::IPC::send_cmd->(
+                        $self->{sock} // die('lei client gone'),
+                        [ map { fileno($_) } @$io ],
+                        exec_buf($cmd, $env), 0) //
+                Carp::croak("sendmsg: $!");
 }
 
 sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
@@ -1000,7 +1065,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
         while (my $op = shift(@$alerts)) {
                 if ($op eq ':WINCH') {
                         # hit the process group that started the MUA
-                        send($sock, '-WINCH', MSG_EOR) if $sock;
+                        send($sock, '-WINCH', 0) if $sock;
                 } elsif ($op eq ':bell') {
                         out($self, "\a");
                 } elsif ($op =~ /(?<!\\),/) { # bare ',' (not ',,')
@@ -1009,9 +1074,9 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
                         my $cmd = $1; # run an arbitrary command
                         require Text::ParseWords;
                         $cmd = [ Text::ParseWords::shellwords($cmd) ];
-                        send($sock, exec_buf($cmd, {}), MSG_EOR) if $sock;
+                        send($sock, exec_buf($cmd, {}), 0) if $sock;
                 } else {
-                        err($self, "W: unsupported --alert=$op"); # non-fatal
+                        warn("W: unsupported --alert=$op\n"); # non-fatal
                 }
         }
 }
@@ -1033,16 +1098,15 @@ sub path_to_fd {
 # caller needs to "-t $self->{1}" to check if tty
 sub start_pager {
         my ($self, $new_env) = @_;
-        my $fh = popen_rd([qw(git var GIT_PAGER)]);
-        chomp(my $pager = <$fh> // '');
-        close($fh) or warn "`git var PAGER' error: \$?=$?";
+        chomp(my $pager = run_qx([qw(git var GIT_PAGER)]));
+        warn "`git var PAGER' error: \$?=$?" if $?;
         return if $pager eq 'cat' || $pager eq '';
         $new_env //= {};
         $new_env->{LESS} //= 'FRX';
         $new_env->{LV} //= '-c';
         $new_env->{MORE} = $new_env->{LESS} if $^O eq 'freebsd';
-        pipe(my ($r, $wpager)) or return warn "pipe: $!";
-        my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
+        my $rdr = { 1 => $self->{1}, 2 => $self->{2} };
+        CORE::pipe($rdr->{0}, my $wpager) or return warn "pipe: $!";
         my $pgr = [ undef, @$rdr{1, 2} ];
         my $env = $self->{env};
         if ($self->{sock}) { # lei(1) process runs it
@@ -1060,19 +1124,19 @@ sub start_pager {
 # display a message for user before spawning full-screen $VISUAL
 sub pgr_err {
         my ($self, @msg) = @_;
-        return $self->err(@msg) unless $self->{sock} && -t $self->{2};
+        return warn(@msg) unless $self->{sock} && -t $self->{2};
         start_pager($self, { LESS => 'RX' }); # no 'F' so we prompt
-        print { $self->{2} } @msg;
+        say { $self->{2} } @msg, '# -quit pager to continue-';
         $self->{2}->autoflush(1);
         stop_pager($self);
-        send($self->{sock}, 'wait', MSG_EOR); # wait for user to quit pager
+        send($self->{sock}, 'wait', 0); # wait for user to quit pager
 }
 
 sub stop_pager {
         my ($self) = @_;
         my $pgr = delete($self->{pgr}) or return;
         $self->{2} = $pgr->[2];
-        close(delete($self->{1})) if $self->{1};
+        delete($self->{1})->close if $self->{1};
         $self->{1} = $pgr->[1];
 }
 
@@ -1082,34 +1146,33 @@ sub accept_dispatch { # Listener {post_accept} callback
         my $self = bless { sock => $sock }, __PACKAGE__;
         vec(my $rvec = '', fileno($sock), 1) = 1;
         select($rvec, undef, undef, 60) or
-                return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
+                return send($sock, 'timed out waiting to recv FDs', 0);
         # (4096 * 33) >MAX_ARG_STRLEN
-        my @fds = $recv_cmd->($sock, my $buf, 4096 * 33) or return; # EOF
+        my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
+                return; # EOF
         if (!defined($fds[0])) {
                 warn(my $msg = "recv_cmd failed: $!");
-                return send($sock, $msg, MSG_EOR);
+                return send($sock, $msg, 0);
         } else {
                 my $i = 0;
-                for my $fd (@fds) {
-                        open($self->{$i++}, '+<&=', $fd) and next;
-                        send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR);
-                }
-                $i == 4 or return send($sock, 'not enough FDs='.($i-1), MSG_EOR)
+                open($self->{$i++}, '+<&=', $_) for @fds;
+                $i == 4 or return send($sock, 'not enough FDs='.($i-1), 0)
         }
         # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
         # $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
         substr($buf, -2, 2, '') eq "\0\0" or  # s/\0\0\z//
-                return send($sock, 'request command truncated', MSG_EOR);
+                return send($sock, 'request command truncated', 0);
         my ($argc, @argv) = split(/\0/, $buf, -1);
         undef $buf;
         my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
         $self->{env} = \%env;
         eval { dispatch($self, @argv) };
-        send($sock, $@, MSG_EOR) if $@;
+        $self->fail($@) if $@;
 }
 
 sub dclose {
         my ($self) = @_;
+        local $current_lei = $self;
         delete $self->{-progress};
         _drop_wq($self) if $self->{failed};
         $self->close if $self->{-event_init_done}; # PublicInbox::DS::close
@@ -1121,26 +1184,29 @@ sub event_step {
         local %ENV = %{$self->{env}};
         local $current_lei = $self;
         eval {
-                my $buf;
-                while (my @fds = $recv_cmd->($self->{sock}, $buf, 4096)) {
-                        if (scalar(@fds) == 1 && !defined($fds[0])) {
-                                return if $! == EAGAIN;
-                                next if $! == EINTR;
-                                last if $! == ECONNRESET;
-                                die "recvmsg: $!";
-                        }
-                        for (@fds) { open my $rfh, '+<&=', $_ }
+                my @fds = $PublicInbox::IPC::recv_cmd->(
+                        $self->{sock} // return, my $buf, 4096);
+                if (scalar(@fds) == 1 && !defined($fds[0])) {
+                        return if $! == EAGAIN;
+                        die "recvmsg: $!" if $! != ECONNRESET;
+                        @fds = (); # for open loop below:
                 }
+                for (@fds) { open my $rfh, '+<&=', $_ }
                 if ($buf eq '') {
                         _drop_wq($self); # EOF, client disconnected
                         dclose($self);
-                } elsif ($buf =~ /\A(STOP|CONT)\z/) {
+                        $buf = 'TERM';
+                }
+                if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) {
+                        my $sig = "-$buf";
                         for my $wq (grep(defined, @$self{@WQ_KEYS})) {
-                                $wq->wq_kill($buf) or $wq->wq_kill_old($buf);
+                                $wq->wq_kill($sig);
                         }
                 } else {
                         die "unrecognized client signal: $buf";
                 }
+                my $s = $self->{-socks} // []; # lei up --all
+                @$s = grep { send($_, $buf, 0) } @$s;
         };
         if (my $err = $@) {
                 eval { $self->fail($err) };
@@ -1151,15 +1217,13 @@ sub event_step {
 sub event_step_init {
         my ($self) = @_;
         my $sock = $self->{sock} or return;
-        $self->{-event_init_done} //= do { # persist til $ops done
+        $self->{-event_init_done} // do { # persist til $ops done
                 $sock->blocking(0);
-                $self->SUPER::new($sock, EPOLLIN|EPOLLET);
-                $sock;
+                $self->SUPER::new($sock, EPOLLIN);
+                $self->{-event_init_done} = $sock;
         };
 }
 
-sub noop {}
-
 sub oldset { $oldset }
 
 sub dump_and_clear_log {
@@ -1176,48 +1240,87 @@ sub dump_and_clear_log {
 sub cfg2lei ($) {
         my ($cfg) = @_;
         my $lei = bless { env => { %{$cfg->{-env}} } }, __PACKAGE__;
-        open($lei->{0}, '<&', \*STDIN) or die "dup 0: $!";
-        open($lei->{1}, '>>&', \*STDOUT) or die "dup 1: $!";
-        open($lei->{2}, '>>&', \*STDERR) or die "dup 2: $!";
-        open($lei->{3}, '/') or die "open /: $!";
-        my ($x, $y);
-        socketpair($x, $y, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
+        open($lei->{0}, '<&', \*STDIN);
+        open($lei->{1}, '>>&', \*STDOUT);
+        open($lei->{2}, '>>&', \*STDERR);
+        open($lei->{3}, '<', '/');
+        socketpair(my $x, my $y, AF_UNIX, SOCK_SEQPACKET, 0);
         $lei->{sock} = $x;
         require PublicInbox::LeiSelfSocket;
         PublicInbox::LeiSelfSocket->new($y); # adds to event loop
         $lei;
 }
 
+sub note_event ($@) { # runs lei_note_event for a given config file
+        my ($cfg_f, @args) = @_;
+        my $cfg = $PATH2CFG{$cfg_f} // return;
+        eval { cfg2lei($cfg)->dispatch('note-event', @args) };
+        carp "E: note-event $cfg_f: $@\n" if $@;
+}
+
 sub dir_idle_handler ($) { # PublicInbox::DirIdle callback
         my ($ev) = @_; # Linux::Inotify2::Event or duck type
         my $fn = $ev->fullname;
         if ($fn =~ m!\A(.+)/(new|cur)/([^/]+)\z!) { # Maildir file
-                my ($mdir, $nc, $bn) = ($1, $2, $3);
-                $nc = '' if $ev->IN_DELETE;
-                for my $f (keys %{$MDIR2CFGPATH->{$mdir} // {}}) {
-                        my $cfg = $PATH2CFG{$f} // next;
-                        eval {
-                                my $lei = cfg2lei($cfg);
-                                $lei->dispatch('note-event',
-                                                "maildir:$mdir", $nc, $bn, $fn);
-                        };
-                        warn "E: note-event $f: $@\n" if $@;
+                my ($loc, $new_cur, $bn) = ("maildir:$1", $2, $3);
+                $new_cur = '' if $ev->IN_DELETE || $ev->IN_MOVED_FROM;
+                for my $cfg_f (keys %{$MDIR2CFGPATH->{$loc} // {}}) {
+                        note_event($cfg_f, $loc, $new_cur, $bn, $fn);
                 }
-        }
+        } elsif ($fn =~ m!\A(.+)/([0-9]+)\z!) { # MH mail message file
+                my ($loc, $n, $new_cur) = ("mh:$1", $2, '+');
+                $new_cur = '' if $ev->IN_DELETE || $ev->IN_MOVED_FROM;
+                for my $cfg_f (keys %{$MDIR2CFGPATH->{$loc} // {}}) {
+                        note_event($cfg_f, $loc, $new_cur, $n, $fn);
+                }
+        } elsif ($fn =~ m!\A(.+)/\.mh_sequences\z!) { # reread flags
+                my $loc = "mh:$1";
+                for my $cfg_f (keys %{$MDIR2CFGPATH->{$loc} // {}}) {
+                        note_event($cfg_f, $loc, '.mh_sequences')
+                }
+        } # else we don't care
         if ($ev->can('cancel') && ($ev->IN_IGNORE || $ev->IN_UNMOUNT)) {
                 $ev->cancel;
         }
         if ($fn =~ m!\A(.+)/(?:new|cur)\z! && !-e $fn) {
-                delete $MDIR2CFGPATH->{$1};
+                delete $MDIR2CFGPATH->{"maildir:$1"};
         }
-        if (!-e $fn) { # config file or Maildir gone
-                for my $cfgpaths (values %$MDIR2CFGPATH) {
-                        delete $cfgpaths->{$fn};
-                }
+        if (!-e $fn) { # config file, Maildir, or MH dir gone
+                delete $_->{$fn} for values %$MDIR2CFGPATH; # config file
+                delete @$MDIR2CFGPATH{"maildir:$fn", "mh:$fn"};
                 delete $PATH2CFG{$fn};
         }
 }
 
+sub can_stay_alive { # PublicInbox::DS::post_loop_do cb
+        my ($path, $dev_ino_expect) = @_;
+        if (my @st = defined($$path) ? stat($$path) : ()) {
+                if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
+                        warn "$$path dev/ino changed, quitting\n";
+                        $$path = undef;
+                }
+        } elsif (defined($$path)) { # ENOENT is common
+                warn "stat($$path): $!, quitting ...\n" if $! != ENOENT;
+                undef $$path;
+                $quit->();
+        }
+        return 1 if defined($$path);
+        my $n = PublicInbox::DS::close_non_busy() or do {
+                eval 'PublicInbox::LeiNoteEvent::flush_task()';
+                # drop stores only if no clients
+                for my $cfg (values %PATH2CFG) {
+                        my $lne = delete($cfg->{-lei_note_event});
+                        $lne->wq_close if $lne;
+                        my $sto = delete($cfg->{-lei_store}) // next;
+                        eval { $sto->wq_do('done') if $sto->{-wq_s1} };
+                        warn "E: $@ (dropping store for $cfg->{-f})" if $@;
+                        $sto->wq_close;
+                }
+        };
+        # returns true: continue, false: stop
+        $n + scalar(keys(%PublicInbox::DS::AWAIT_PIDS));
+}
+
 # lei(1) calls this when it can't connect
 sub lazy_start {
         my ($path, $errno, $narg) = @_;
@@ -1225,115 +1328,66 @@ sub lazy_start {
         my ($sock_dir) = ($path =~ m!\A(.+?)/[^/]+\z!);
         $errors_log = "$sock_dir/errors.log";
         my $addr = pack_sockaddr_un($path);
-        my $lk = bless { lock_path => $errors_log }, 'PublicInbox::Lock';
+        my $lk = PublicInbox::Lock->new($errors_log);
         umask(077) // die("umask(077): $!");
         $lk->lock_acquire;
-        socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
+        socket($listener, AF_UNIX, SOCK_SEQPACKET, 0);
         if ($errno == ECONNREFUSED || $errno == ENOENT) {
                 return if connect($listener, $addr); # another process won
-                if ($errno == ECONNREFUSED && -S $path) {
-                        unlink($path) or die "unlink($path): $!";
-                }
+                unlink($path) if $errno == ECONNREFUSED && -S $path;
         } else {
                 $! = $errno; # allow interpolation to stringify in die
                 die "connect($path): $!";
         }
-        bind($listener, $addr) or die "bind($path): $!";
+        bind($listener, $addr);
         $lk->lock_release;
         undef $lk;
         my @st = stat($path) or die "stat($path): $!";
         my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
-        local $oldset = PublicInbox::DS::block_signals();
-        if ($narg == 5) {
-                $send_cmd = PublicInbox::Spawn->can('send_cmd4');
-                $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
-                        require PublicInbox::CmdIPC4;
-                        $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4');
-                        PublicInbox::CmdIPC4->can('recv_cmd4');
-                };
-        }
-        $recv_cmd or die <<"";
+        local $oldset = PublicInbox::DS::block_signals(POSIX::SIGALRM);
+        die "incompatible narg=$narg" if $narg != 5;
+        $PublicInbox::IPC::send_cmd or die <<"";
 (Socket::MsgHdr || Inline::C) missing/unconfigured (narg=$narg);
 
         require PublicInbox::Listener;
         require PublicInbox::PktOp;
         (-p STDOUT) or die "E: stdout must be a pipe\n";
-        open(STDIN, '+>>', $errors_log) or die "open($errors_log): $!";
+        open(STDIN, '+>>', $errors_log);
         STDIN->autoflush(1);
         dump_and_clear_log();
         POSIX::setsid() > 0 or die "setsid: $!";
-        my $pid = fork // die "fork: $!";
+        my $pid = PublicInbox::OnDestroy::fork_tmp;
         return if $pid;
         $0 = "lei-daemon $path";
-        local %PATH2CFG;
-        local $MDIR2CFGPATH;
+        local (%PATH2CFG, $MDIR2CFGPATH);
+        local $daemon_pid = $$;
         $listener->blocking(0);
         my $exit_code;
         my $pil = PublicInbox::Listener->new($listener, \&accept_dispatch);
         local $quit = do {
                 my (undef, $eof_p) = PublicInbox::PktOp->pair;
                 sub {
-                        $exit_code //= shift;
+                        $exit_code //= eval("POSIX::SIG$_[0] + 128") if @_;
+                        $dir_idle->close if $dir_idle; # EPOLL_CTL_DEL
+                        $dir_idle = undef; # let RC take care of it
                         eval 'PublicInbox::LeiNoteEvent::flush_task()';
-                        my $lis = $pil or exit($exit_code);
+                        my $lis = $pil or exit($exit_code // 0);
                         # closing eof_p triggers \&noop wakeup
                         $listener = $eof_p = $pil = $path = undef;
                         $lis->close; # DS::close
-                        PublicInbox::DS->SetLoopTimeout(1000);
                 };
         };
-        my $sig = {
-                CHLD => \&PublicInbox::DS::enqueue_reap,
-                QUIT => $quit,
-                INT => $quit,
-                TERM => $quit,
-                HUP => \&noop,
-                USR1 => \&noop,
-                USR2 => \&noop,
-        };
-        my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
-        local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
-        undef $sig;
-        local $SIG{PIPE} = 'IGNORE';
+        my $sig = { CHLD => \&PublicInbox::DS::enqueue_reap };
+        $sig->{$_} = $quit for qw(QUIT INT TERM);
+        $sig->{$_} = \&PublicInbox::Config::noop for qw(HUP USR1 USR2);
         require PublicInbox::DirIdle;
-        local $dir_idle = PublicInbox::DirIdle->new([$sock_dir], sub {
-                # just rely on wakeup to hit PostLoopCallback set below
+        local $dir_idle = PublicInbox::DirIdle->new(sub {
+                # just rely on wakeup to hit post_loop_do
                 dir_idle_handler($_[0]) if $_[0]->fullname ne $path;
-        }, 1);
-        if ($sigfd) {
-                undef $sigfd; # unref, already in DS::DescriptorMap
-        } else {
-                # wake up every second to accept signals if we don't
-                # have signalfd or IO::KQueue:
-                PublicInbox::DS::sig_setmask($oldset);
-                PublicInbox::DS->SetLoopTimeout(1000);
-        }
-        PublicInbox::DS->SetPostLoopCallback(sub {
-                my ($dmap, undef) = @_;
-                if (@st = defined($path) ? stat($path) : ()) {
-                        if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
-                                warn "$path dev/ino changed, quitting\n";
-                                $path = undef;
-                        }
-                } elsif (defined($path)) { # ENOENT is common
-                        warn "stat($path): $!, quitting ...\n" if $! != ENOENT;
-                        undef $path;
-                        $quit->();
-                }
-                return 1 if defined($path);
-                my $now = now();
-                my $n = 0;
-                for my $s (values %$dmap) {
-                        $s->can('busy') or next;
-                        if ($s->busy($now)) {
-                                ++$n;
-                        } else {
-                                $s->close;
-                        }
-                }
-                $n; # true: continue, false: stop
         });
-
+        $dir_idle->add_watches([$sock_dir]);
+        local @PublicInbox::DS::post_loop_do = (\&can_stay_alive,
+                                                \$path, $dev_ino_expect);
         # STDIN was redirected to /dev/null above, closing STDERR and
         # STDOUT will cause the calling `lei' client process to finish
         # reading the <$daemon> pipe.
@@ -1341,13 +1395,13 @@ sub lazy_start {
                 $current_lei ? err($current_lei, @_) : warn(
                   strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(time))," $$ ", @_);
         };
-        open STDERR, '>&STDIN' or die "redirect stderr failed: $!";
-        open STDOUT, '>&STDIN' or die "redirect stdout failed: $!";
+        local $SIG{PIPE} = 'IGNORE';
+        local $SIG{ALRM} = 'IGNORE';
+        open STDERR, '>&STDIN';
+        open STDOUT, '>&STDIN';
         # $daemon pipe to `lei' closed, main loop begins:
-        eval { PublicInbox::DS->EventLoop };
+        eval { PublicInbox::DS::event_loop($sig, $oldset) };
         warn "event loop error: $@\n" if $@;
-        # exit() may trigger waitpid via various DESTROY, ensure interruptible
-        PublicInbox::DS::sig_setmask($oldset);
         dump_and_clear_log();
         exit($exit_code // 0);
 }
@@ -1358,9 +1412,10 @@ sub busy { 1 } # prevent daemon-shutdown if client is connected
 # can immediately reread it
 sub DESTROY {
         my ($self) = @_;
-        if (my $counters = delete $self->{counters}) {
-                for my $k (sort keys %$counters) {
-                        my $nr = $counters->{$k};
+        if (defined($self->{incr_pid}) && $self->{incr_pid} == $$) {
+                for my $k (sort(grep(/\A-nr_/, keys %$self))) {
+                        my $nr = $self->{$k};
+                        substr($k, 0, length('-nr_'), '');
                         $self->child_error(0, "$nr $k messages");
                 }
         }
@@ -1370,25 +1425,26 @@ sub DESTROY {
         # preserve $? for ->fail or ->x_it code
 }
 
-sub wq_done_wait { # dwaitpid callback
-        my ($arg, $pid) = @_;
-        my ($wq, $lei) = @$arg;
+sub wq_done_wait { # awaitpid cb (via wq_eof)
+        my ($pid, $wq, $lei) = @_;
+        local $current_lei = $lei;
         my $err_type = $lei->{-err_type};
         $? and $lei->child_error($?,
-                        $err_type ? "$err_type errors during $lei->{cmd}" : ());
+                $err_type ? "$err_type errors during $lei->{cmd} \$?=$?" : ());
         $lei->dclose;
 }
 
 sub fchdir {
         my ($lei) = @_;
-        my $dh = $lei->{3} // die 'BUG: lei->{3} (CWD) gone';
-        chdir($dh) || $lei->fail("fchdir: $!");
+        chdir($lei->{3} // die 'BUG: lei->{3} (CWD) gone');
 }
 
 sub wq_eof { # EOF callback for main daemon
-        my ($lei) = @_;
-        my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
-        $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei);
+        my ($lei, $wq_fld) = @_;
+        local $current_lei = $lei;
+        my $wq = delete $lei->{$wq_fld // 'wq1'};
+        $lei->sto_barrier_request($wq);
+        $wq // $lei->fail; # already failed
 }
 
 sub watch_state_ok ($) {
@@ -1396,19 +1452,22 @@ sub watch_state_ok ($) {
         $state =~ /\Apause|(?:import|index|tag)-(?:ro|rw)\z/;
 }
 
-sub cancel_maildir_watch ($$) {
-        my ($d, $cfg_f) = @_;
-        my $w = delete $MDIR2CFGPATH->{$d}->{$cfg_f};
-        scalar(keys %{$MDIR2CFGPATH->{$d}}) or
-                delete $MDIR2CFGPATH->{$d};
-        for my $x (@{$w // []}) { $x->cancel }
+sub cancel_dir_watch ($$$) {
+        my ($type, $d, $cfg_f) = @_;
+        my $loc = "$type:".canonpath_harder($d);
+        my $w = delete $MDIR2CFGPATH->{$loc}->{$cfg_f};
+        delete $MDIR2CFGPATH->{$loc} if !(keys %{$MDIR2CFGPATH->{$loc}});
+        $_->cancel for @$w;
 }
 
-sub add_maildir_watch ($$) {
-        my ($d, $cfg_f) = @_;
-        if (!exists($MDIR2CFGPATH->{$d}->{$cfg_f})) {
-                my @w = $dir_idle->add_watches(["$d/cur", "$d/new"], 1);
-                push @{$MDIR2CFGPATH->{$d}->{$cfg_f}}, @w if @w;
+sub add_dir_watch ($$$) {
+        my ($type, $d, $cfg_f) = @_;
+        $d = canonpath_harder($d);
+        my $loc = "$type:$d";
+        my @dirs = $type eq 'mh' ? ($d) : ("$d/cur", "$d/new");
+        if (!exists($MDIR2CFGPATH->{$loc}->{$cfg_f})) {
+                my @w = $dir_idle->add_watches(\@dirs, 1);
+                push @{$MDIR2CFGPATH->{$loc}->{$cfg_f}}, @w if @w;
         }
 }
 
@@ -1421,24 +1480,20 @@ sub refresh_watches {
         my %seen;
         my $cfg_f = $cfg->{'-f'};
         for my $w (grep(/\Awatch\..+\.state\z/, keys %$cfg)) {
-                my $url = substr($w, length('watch.'), -length('.state'));
+                my $loc = substr($w, length('watch.'), -length('.state'));
                 require PublicInbox::LeiWatch;
-                $watches->{$url} //= PublicInbox::LeiWatch->new($url);
-                $seen{$url} = undef;
-                my $state = $cfg->get_1("watch.$url.state");
+                $watches->{$loc} //= PublicInbox::LeiWatch->new($loc);
+                $seen{$loc} = undef;
+                my $state = $cfg->get_1("watch.$loc.state");
                 if (!watch_state_ok($state)) {
-                        $lei->err("watch.$url.state=$state not supported");
-                        next;
-                }
-                if ($url =~ /\Amaildir:(.+)/i) {
-                        my $d = canonpath_harder($1);
-                        if ($state eq 'pause') {
-                                cancel_maildir_watch($d, $cfg_f);
-                        } else {
-                                add_maildir_watch($d, $cfg_f);
-                        }
+                        warn("watch.$loc.state=$state not supported\n");
+                } elsif ($loc =~ /\A(maildir|mh):(.+)\z/i) {
+                        my ($type, $d) = ($1, $2);
+                        $state eq 'pause' ?
+                                cancel_dir_watch($type, $d, $cfg_f) :
+                                add_dir_watch($type, $d, $cfg_f);
                 } else { # TODO: imap/nntp/jmap
-                        $lei->child_error(0, "E: watch $url not supported, yet")
+                        $lei->child_error(0, "E: watch $loc not supported, yet")
                 }
         }
 
@@ -1446,29 +1501,28 @@ sub refresh_watches {
         my $lms = $lei->lms;
         if ($lms) {
                 $lms->lms_write_prepare;
-                for my $d ($lms->folders('maildir:')) {
-                        substr($d, 0, length('maildir:')) = '';
-
+                for my $loc ($lms->folders(qr/\A(?:maildir|mh):/)) {
+                        my $old = $loc;
+                        my ($type, $d) = split /:/, $loc, 2;
                         # fixup old bugs while we're iterating:
-                        my $cd = canonpath_harder($d);
-                        my $f = "maildir:$cd";
-                        $lms->rename_folder("maildir:$d", $f) if $d ne $cd;
-                        next if $watches->{$f}; # may be set to pause
+                        $d = canonpath_harder($d);
+                        $loc = "$type:$d";
+                        $lms->rename_folder($old, $loc) if $old ne $loc;
+                        next if $watches->{$loc}; # may be set to pause
                         require PublicInbox::LeiWatch;
-                        $watches->{$f} = PublicInbox::LeiWatch->new($f);
-                        $seen{$f} = undef;
-                        add_maildir_watch($cd, $cfg_f);
+                        $watches->{$loc} = PublicInbox::LeiWatch->new($loc);
+                        $seen{$loc} = undef;
+                        add_dir_watch($type, $d, $cfg_f);
                 }
         }
         if ($old) { # cull old non-existent entries
-                for my $url (keys %$old) {
-                        next if exists $seen{$url};
-                        delete $old->{$url};
-                        if ($url =~ /\Amaildir:(.+)/i) {
-                                my $d = canonpath_harder($1);
-                                cancel_maildir_watch($d, $cfg_f);
+                for my $loc (keys %$old) {
+                        next if exists $seen{$loc};
+                        delete $old->{$loc};
+                        if ($loc =~ /\A(maildir|mh):(.+)\z/i) {
+                                cancel_dir_watch($1, $2, $cfg_f);
                         } else { # TODO: imap/nntp/jmap
-                                $lei->child_error(0, "E: watch $url TODO");
+                                $lei->child_error(0, "E: watch $loc TODO");
                         }
                 }
         }
@@ -1487,31 +1541,65 @@ sub git_oid {
 }
 
 sub lms {
-        my ($lei, $rw) = @_;
+        my ($lei, $creat) = @_;
         my $sto = $lei->{sto} // _lei_store($lei) // return;
         require PublicInbox::LeiMailSync;
         my $f = "$sto->{priv_eidx}->{topdir}/mail_sync.sqlite3";
-        (-f $f || $rw) ? PublicInbox::LeiMailSync->new($f) : undef;
+        (-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef;
 }
 
-sub sto_done_request {
-        my ($lei, $sock) = @_;
-        eval {
-                if ($sock //= $lei->{sock}) { # issue, async wait
-                        $lei->{sto}->wq_io_do('done', [ $sock ]);
-                } else { # forcibly wait
-                        my $wait = $lei->{sto}->wq_do('done');
-                }
-        };
-        $lei->err($@) if $@;
+sub sto_barrier_request {
+        my ($lei, $wq) = @_;
+        return unless $lei->{sto} && $lei->{sto}->{-wq_s1};
+        local $current_lei = $lei;
+        if (my $n = $lei->{opt}->{'commit-delay'}) {
+                eval { $lei->{sto}->wq_do('schedule_commit', $n) };
+        } else {
+                my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
+                my $errfh = $lei->{2} // *STDERR{GLOB};
+                my @io = $s ? ($errfh, $s) : ($errfh);
+                eval { $lei->{sto}->wq_io_do('barrier', \@io, 1) };
+        }
+        warn($@) if $@;
 }
 
 sub cfg_dump ($$) {
         my ($lei, $f) = @_;
-        my $ret = eval { PublicInbox::Config->git_config_dump($f, $lei->{2}) };
+        my $ret = eval { PublicInbox::Config->git_config_dump($f, $lei) };
         return $ret if !$@;
-        $lei->err($@);
+        warn($@);
         undef;
 }
 
+sub request_umask {
+        my ($lei) = @_;
+        my $s = $lei->{sock} // return;
+        send($s, 'umask', 0) // die "send: $!";
+        vec(my $rvec = '', fileno($s), 1) = 1;
+        select($rvec, undef, undef, 2) or die 'timeout waiting for umask';
+        recv($s, my $v, 5, 0) // die "recv: $!";
+        (my $u, $lei->{client_umask}) = unpack('AV', $v);
+        $u eq 'u' or warn "E: recv $v has no umask";
+}
+
+sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
+        my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
+        $_[1] // return $lei->fail("error reading stdin: $!");
+        $lei->{stdin_buf} .= $_[-1];
+        do_env($lei, $cb) if $_[-1] eq '';
+}
+
+sub slurp_stdin {
+        my ($lei, $cb) = @_;
+        require PublicInbox::InputPipe;
+        my $in = $lei->{0};
+        if (-t $in) { # run cat via script/lei and read from it
+                $in = undef;
+                pipe($in, my $wr);
+                say { $lei->{2} } '# enter query, Ctrl-D when done';
+                send_exec_cmd($lei, [ $lei->{0}, $wr ], ['cat'], {});
+        }
+        PublicInbox::InputPipe::consume($in, \&_stdin_cb, $lei, $cb);
+}
+
 1;