about summary refs log tree commit homepage
path: root/lib/PublicInbox/LEI.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LEI.pm')
-rw-r--r--lib/PublicInbox/LEI.pm638
1 files changed, 346 insertions, 292 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index d81ca296..81f940fe 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -9,8 +9,9 @@ package PublicInbox::LEI;
 use v5.12;
 use parent qw(PublicInbox::DS PublicInbox::LeiExternal
         PublicInbox::LeiQuery);
+use autodie qw(bind chdir fork open pipe socket socketpair syswrite unlink);
 use Getopt::Long ();
-use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un);
 use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET);
 use Cwd qw(getcwd);
 use POSIX qw(strftime);
@@ -18,33 +19,35 @@ use IO::Handle ();
 use Fcntl qw(SEEK_SET);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::DS qw(dwaitpid);
-use PublicInbox::Spawn qw(spawn popen_rd);
+use PublicInbox::Spawn qw(run_wait popen_rd run_qx);
 use PublicInbox::Lock;
 use PublicInbox::Eml;
 use PublicInbox::Import;
 use PublicInbox::ContentHash qw(git_sha);
+use PublicInbox::IPC;
 use Time::HiRes qw(stat); # ctime comparisons for config cache
-use File::Path qw(mkpath);
+use File::Path ();
 use File::Spec;
+use Carp qw(carp);
 use Sys::Syslog qw(openlog syslog closelog);
 our $quit = \&CORE::exit;
-our ($current_lei, $errors_log, $listener, $oldset, $dir_idle,
-        $recv_cmd, $send_cmd);
+our ($current_lei, $errors_log, $listener, $oldset, $dir_idle);
 my $GLP = Getopt::Long::Parser->new;
 $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
 my $GLP_PASS = Getopt::Long::Parser->new;
 $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
 
-our %PATH2CFG; # persistent for socket daemon
-our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] }
+our (%PATH2CFG, # persistent for socket daemon
+$MDIR2CFGPATH, # location => { /path/to/config => [ ino watches ] }
+$OPT, # shared between optparse and opt_dash callback (for Getopt::Long)
+$daemon_pid
+);
 
 # TBD: this is a documentation mechanism to show a subcommand
 # (may) pass options through to another command:
 sub pass_through { $GLP_PASS }
 
-my $OPT;
-sub opt_dash ($$) {
+sub opt_dash ($$) { # callback runs inside optparse
         my ($spec, $re_str) = @_; # 'limit|n=i', '([0-9]+)'
         my ($key) = ($spec =~ m/\A([a-z]+)/g);
         my $cb = sub { # Getopt::Long "<>" catch-all handler
@@ -158,7 +161,7 @@ our @diff_opt = qw(unified|U=i output-indicator-new=s output-indicator-old=s
         rename-empty! check ws-error-highlight=s full-index binary
         abbrev:i break-rewrites|B:s find-renames|M:s find-copies:s
         find-copies-harder irreversible-delete|D l=i diff-filter=s
-        S=s G=s find-object=s pickaxe-all pickaxe-regex O=s R
+        S=s G=s find-object=s pickaxe-all pickaxe-regex R
         relative:s text|a ignore-cr-at-eol ignore-space-at-eol
         ignore-space-change|b ignore-all-space|w ignore-blank-lines
         inter-hunk-context=i function-context|W exit-code ext-diff
@@ -197,8 +200,8 @@ our %CMD = ( # sorted in order of importance/use:
 'rediff' => [ '--stdin|LOCATION...',
                 'regenerate a diff with different options',
         'stdin|', # /|\z/ must be first for lone dash
-        qw(git-dir=s@ cwd! verbose|v+ color:s no-color drq:1 dequote-only:1),
-        @diff_opt, @lxs_opt, @net_opt, @c_opt ],
+        qw(git-dir=s@ cwd! verbose|v+ color:s no-color drq:1 dequote-only:1
+        order-file=s), @diff_opt, @lxs_opt, @net_opt, @c_opt ],
 
 'mail-diff' => [ '--stdin|LOCATION...', 'diff the contents of emails',
         'stdin|', # /|\z/ must be first for lone dash
@@ -229,20 +232,21 @@ our %CMD = ( # sorted in order of importance/use:
 'rm' => [ '--stdin|LOCATION...',
         'remove a message from the index and prevent reindexing',
         'stdin|', # /|\z/ must be first for lone dash
-        qw(in-format|F=s lock=s@), @net_opt, @c_opt ],
+        qw(in-format|F=s lock=s@ commit-delay=i), @net_opt, @c_opt ],
 'plonk' => [ '--threads|--from=IDENT',
         'exclude mail matching From: or threads from non-Message-ID searches',
         qw(stdin| threads|t from|f=s mid=s oid=s), @c_opt ],
-'tag' => [ 'KEYWORDS...',
+tag => [ 'KEYWORDS... LOCATION...|--stdin',
         'set/unset keywords and/or labels on message(s)',
-        qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@),
+        qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@ commit-delay=i),
         @net_opt, @c_opt, pass_through('-kw:foo for delete') ],
 
 'purge-mailsource' => [ 'LOCATION|--all',
         'remove imported messages from IMAP, Maildirs, and MH',
         qw(exact! all jobs:i indexed), @c_opt ],
 
-'add-watch' => [ 'LOCATION...', 'watch for new messages and flag changes',
+'add-watch' => [ 'LOCATION... [LABELS...]',
+        'watch for new messages and flag changes',
         qw(poll-interval=s state=s recursive|r), @c_opt ],
 'rm-watch' => [ 'LOCATION...', 'remove specified watch(es)',
         qw(recursive|r), @c_opt ],
@@ -253,14 +257,17 @@ our %CMD = ( # sorted in order of importance/use:
 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
         qw(prune), @c_opt ],
 
-'index' => [ 'LOCATION...', 'one-time index from URL or filesystem',
+'reindex' => [ '', 'reindex all locally-indexed messages', @c_opt ],
+
+'index' => [ 'LOCATION... [LABELS...]', 'one-time index from URL or filesystem',
         qw(in-format|F=s kw! offset=i recursive|r exclude=s include|I=s
         verbose|v+ incremental!), @net_opt, # mainly for --proxy=
          @c_opt ],
-'import' => [ 'LOCATION...|--stdin',
+import => [ 'LOCATION...|--stdin [LABELS...]',
         'one-time import/update from URL or filesystem',
         qw(stdin| offset=i recursive|r exclude=s include|I=s new-only
-        lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!),
+        lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!
+        commit-delay=i sort|s:s@),
         @net_opt, @c_opt ],
 'forget-mail-sync' => [ 'LOCATION...',
         'forget sync information for a mail folder', @c_opt ],
@@ -272,13 +279,15 @@ our %CMD = ( # sorted in order of importance/use:
         qw(all:s mode=s), @net_opt, @c_opt ],
 'convert' => [ 'LOCATION...|--stdin',
         'one-time conversion from URL or filesystem to another format',
-        qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s lock=s@ kw!),
+        qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s lock=s@ kw!
+                rsyncable sort|s:s@),
         @net_opt, @c_opt ],
 'p2q' => [ 'LOCATION_OR_COMMIT...|--stdin',
         "use a patch to generate a query for `lei q --stdin'",
         qw(stdin| in-format|F=s want|w=s@ uri debug), @net_opt, @c_opt ],
 'config' => [ '[...]', sub {
-                'git-config(1) wrapper for '._config_path($_[0]);
+                'git-config(1) wrapper for '._config_path($_[0]). "\n" .
+        '-l/--list and other common git-config uses are supported'
         }, qw(config-file|system|global|file|f=s), # for conflict detection
          qw(edit|e c=s@ C=s@), pass_through('git config') ],
 'inspect' => [ 'ITEMS...|--stdin', 'inspect lei/store and/or local external',
@@ -312,6 +321,9 @@ our %CMD = ( # sorted in order of importance/use:
 my $stdin_formats = [ 'MAIL_FORMAT|eml|mboxrd|mboxcl2|mboxcl|mboxo',
                         'specify message input format' ];
 my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ];
+my $sort_out = [ 'VAL|received|relevance|docid',
+                "order of results is `--output'-dependent"];
+my $sort_in = [ 'sequence|mtime|size', 'sort input (format-dependent)' ];
 
 # we use \x{a0} (non-breaking SP) to avoid wrapping in PublicInbox::LeiHelp
 my %OPTDESC = (
@@ -344,6 +356,7 @@ my %OPTDESC = (
 'no-torsocks' => 'alias for --torsocks=no',
 'save!' =>  "do not save a search for `lei up'",
 'import-remote!' => 'do not memoize remote messages into local store',
+'import-before!' => 'do not import before writing to output (DANGEROUS)',
 
 'type=s' => [ 'any|mid|git', 'disambiguate type' ],
 
@@ -397,8 +410,10 @@ my %OPTDESC = (
                 'include specified external(s) in search' ],
 'only|O=s@        q' => [ 'LOCATION',
                 'only use specified external(s) for search' ],
-'jobs=s        q' => [ '[SEARCH_JOBS][,WRITER_JOBS]',
-                'control number of search and writer jobs' ],
+'jobs|j=s' => [ 'JOBSPEC',
+                'control number of query and writer jobs' .
+                "integers delimited by `,', either of which may be omitted"
+                ],
 'jobs|j=i        add-external' => 'set parallelism when indexing after --mirror',
 
 'in-format|F=s' => $stdin_formats,
@@ -416,8 +431,10 @@ my %OPTDESC = (
 'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ],
 'offset=i' => ['OFF', 'search result offset (default: 0)'],
 
-'sort|s=s' => [ 'VAL|received|relevance|docid',
-                "order of results is `--output'-dependent"],
+'sort|s=s        q' => $sort_out,
+'sort|s=s        lcat' => $sort_out,
+'sort|s:s@        convert' => $sort_in,
+'sort|s:s@        import' => $sort_in,
 'reverse|r' => 'reverse search results', # like sort(1)
 
 'boost=i' => 'increase/decrease priority of results (default: 0)',
@@ -451,6 +468,7 @@ my %OPTDESC = (
 'z|0' => 'use NUL \\0 instead of newline (CR) to delimit lines',
 
 'signal|s=s' => [ 'SIG', 'signal to send lei-daemon (default: TERM)' ],
+'edit|e        config' => 'open an editor to modify the lei config file',
 ); # %OPTDESC
 
 my %CONFIG_KEYS = (
@@ -462,7 +480,7 @@ my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers
 sub _drop_wq {
         my ($self) = @_;
         for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) {
-                $wq->wq_kill('-TERM');
+                $wq->wq_kill(-POSIX::SIGTERM());
                 $wq->DESTROY;
         }
 }
@@ -470,17 +488,18 @@ sub _drop_wq {
 # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
 sub x_it ($$) {
         my ($self, $code) = @_;
-        local $current_lei = $self;
         # make sure client sees stdout before exit
         $self->{1}->autoflush(1) if $self->{1};
         stop_pager($self);
         if ($self->{pkt_op_p}) { # worker => lei-daemon
                 $self->{pkt_op_p}->pkt_do('x_it', $code);
+                exit($code >> 8) if $$ != $daemon_pid;
         } elsif ($self->{sock}) { # lei->daemon => lei(1) client
-                send($self->{sock}, "x_it $code", MSG_EOR);
+                send($self->{sock}, "x_it $code", 0);
         } elsif ($quit == \&CORE::exit) { # an admin (one-shot) command
                 exit($code >> 8);
         } # else ignore if client disconnected
+        $self->dclose if $$ == $daemon_pid;
 }
 
 sub err ($;@) {
@@ -489,7 +508,7 @@ sub err ($;@) {
         my @eor = (substr($_[-1]//'', -1, 1) eq "\n" ? () : ("\n"));
         print $err @_, @eor and return;
         my $old_err = delete $self->{2};
-        close($old_err) if $! == EPIPE && $old_err;
+        $old_err->close if $! == EPIPE && $old_err;
         $err = $self->{2} = ($self->{pgr} // [])->[2] // *STDERR{GLOB};
         print $err @_, @eor or print STDERR @_, @eor;
 }
@@ -504,8 +523,7 @@ sub qfin { # show message on finalization (LeiFinmsg)
 
 sub fail_handler ($;$$) {
         my ($lei, $code, $io) = @_;
-        local $current_lei = $lei;
-        close($io) if $io; # needed to avoid warnings on SIGPIPE
+        $io->close if $io; # needed to avoid warnings on SIGPIPE
         _drop_wq($lei);
         x_it($lei, $code // (1 << 8));
 }
@@ -514,13 +532,17 @@ sub sigpipe_handler { # handles SIGPIPE from @WQ_KEYS workers
         fail_handler($_[0], 13, delete $_[0]->{1});
 }
 
-sub fail ($$;$) {
-        my ($self, $msg, $exit_code) = @_;
-        local $current_lei = $self;
-        $self->{failed}++;
-        warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg;
-        $self->{pkt_op_p}->pkt_do('fail_handler') if $self->{pkt_op_p};
-        x_it($self, ($exit_code // 1) << 8);
+sub fail ($;@) {
+        my ($lei, @msg) = @_;
+        my $exit_code = ($msg[0]//'') =~ /\A-?[0-9]+\z/ ? shift(@msg) : undef;
+        local $current_lei = $lei;
+        $lei->{failed}++;
+        if (@msg) {
+                push @msg, "\n" if substr($msg[-1], -1, 1);
+                warn @msg;
+        }
+        $lei->{pkt_op_p}->pkt_do('fail_handler') if $lei->{pkt_op_p};
+        x_it($lei, $exit_code // (1 << 8));
         undef;
 }
 
@@ -540,18 +562,17 @@ sub child_error { # passes non-fatal curl exit codes to user
         local $current_lei = $self;
         $child_error ||= 1 << 8;
         warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg;
+        $self->{child_error} ||= $child_error;
         if ($self->{pkt_op_p}) { # to top lei-daemon
                 $self->{pkt_op_p}->pkt_do('child_error', $child_error);
         } elsif ($self->{sock}) { # to lei(1) client
-                send($self->{sock}, "child_error $child_error", MSG_EOR);
-        } else { # non-lei admin command
-                $self->{child_error} ||= $child_error;
+                send($self->{sock}, "child_error $child_error", 0);
         } # else noop if client disconnected
 }
 
 sub note_sigpipe { # triggers sigpipe_handler
         my ($self, $fd) = @_;
-        close(delete($self->{$fd})); # explicit close silences Perl warning
+        delete($self->{$fd})->close; # explicit close silences Perl warning
         $self->{pkt_op_p}->pkt_do('sigpipe_handler') if $self->{pkt_op_p};
         x_it($self, 13);
 }
@@ -559,20 +580,21 @@ sub note_sigpipe { # triggers sigpipe_handler
 sub _lei_atfork_child {
         my ($self, $persist) = @_;
         # we need to explicitly close things which are on stack
+        my $cfg = $self->{cfg};
+        delete @$cfg{qw(-watches -lei_note_event)};
         if ($persist) {
-                open $self->{3}, '<', '/' or die "open(/) $!";
+                open $self->{3}, '<', '/';
                 fchdir($self);
                 close($_) for (grep(defined, delete @$self{qw(0 1 2 sock)}));
-                if (my $cfg = $self->{cfg}) {
-                        delete @$cfg{qw(-lei_store -watches -lei_note_event)};
-                }
+                delete $cfg->{-lei_store};
         } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly
-                open STDERR, '+>&='.fileno($self->{2}) or warn "open $!";
+                open STDERR, '+>&='.fileno($self->{2}); # idempotent w/ fileno
                 STDERR->autoflush(1);
+                $self->{2} = \*STDERR;
                 POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!";
         }
         close($_) for (grep(defined, delete @$self{qw(old_1 au_done)}));
-        delete $self->{-socks};
+        close($_) for (@{delete($self->{-socks}) // []});
         if (my $op_c = delete $self->{pkt_op_c}) {
                 close(delete $op_c->{sock});
         }
@@ -584,7 +606,7 @@ sub _lei_atfork_child {
         $dir_idle->force_close if $dir_idle;
         undef $dir_idle;
         %PATH2CFG = ();
-        $MDIR2CFGPATH = {};
+        $MDIR2CFGPATH = undef;
         eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush';
         undef $errors_log;
         $quit = \&CORE::exit;
@@ -617,8 +639,9 @@ sub pkt_op_pair {
 }
 
 sub incr {
-        my ($self, $field, $nr) = @_;
-        $self->{counters}->{$field} += $nr;
+        my $lei = shift;
+        $lei->{incr_pid} = $$ if @_;
+        while (my ($f, $n) = splice(@_, 0, 2)) { $lei->{$f} += $n }
 }
 
 sub pkt_ops {
@@ -641,12 +664,12 @@ sub workers_start {
         my $end = $lei->pkt_op_pair;
         my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
         $flds->{lei} = $lei;
-        $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
+        $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds,
+                $wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
         delete $lei->{pkt_op_p};
         my $op_c = delete $lei->{pkt_op_c};
         @$end = ();
         $lei->event_step_init;
-        $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
         ($op_c, $ops);
 }
 
@@ -680,7 +703,7 @@ sub optparse ($$$) {
         # allow _complete --help to complete, not show help
         return 1 if substr($cmd, 0, 1) eq '_';
         $self->{cmd} = $cmd;
-        $OPT = $self->{opt} //= {};
+        local $OPT = $self->{opt} //= {};
         my $info = $CMD{$cmd} // [ '[...]' ];
         my ($proto, undef, @spec) = @$info;
         my $glp = ref($spec[-1]) eq ref($GLP) ? pop(@spec) : $GLP;
@@ -700,6 +723,14 @@ sub optparse ($$$) {
         # "-" aliases "stdin" or "clear"
         $OPT->{$lone_dash} = ${$OPT->{$lone_dash}} if defined $lone_dash;
 
+        if ($proto =~ s/\s*\[?(?:KEYWORDS|LABELS)\.\.\.\]?\s*//g) {
+                require PublicInbox::LeiInput;
+                my @err = PublicInbox::LeiInput::vmd_mod_extract($self, $argv);
+                return $self->fail(join("\n", @err)) if @err;
+        } else {
+                warn "proto $proto\n" if $cmd =~ /(add-watch|tag|index)/;
+        }
+
         my $i = 0;
         my $POS_ARG = '[A-Z][A-Z0-9_]+';
         my ($err, $inf);
@@ -727,11 +758,10 @@ sub optparse ($$$) {
                                         # w/o args means stdin
                                         if ($sw eq 'stdin' && !@$argv &&
                                                         (-p $self->{0} ||
-                                                         -f _) && -r _) {
+                                                         -f _)) {
                                                 $OPT->{stdin} //= 1;
                                         }
-                                        $ok = defined($OPT->{$sw});
-                                        last if $ok;
+                                        $ok = defined($OPT->{$sw}) and last;
                                 } elsif (defined($argv->[$i])) {
                                         $ok = 1;
                                         $i++;
@@ -752,38 +782,7 @@ sub optparse ($$$) {
         $err ? fail($self, "usage: lei $cmd $proto\nE: $err") : 1;
 }
 
-sub _tmp_cfg { # for lei -c <name>=<value> ...
-        my ($self) = @_;
-        my $cfg = _lei_cfg($self, 1);
-        require File::Temp;
-        my $ft = File::Temp->new(TEMPLATE => 'lei_cfg-XXXX', TMPDIR => 1);
-        my $tmp = { '-f' => $ft->filename, -tmp => $ft };
-        $ft->autoflush(1);
-        print $ft <<EOM or return fail($self, "$tmp->{-f}: $!");
-[include]
-        path = $cfg->{-f}
-EOM
-        $tmp = $self->{cfg} = bless { %$cfg, %$tmp }, ref($cfg);
-        for (@{$self->{opt}->{c}}) {
-                /\A([^=\.]+\.[^=]+)(?:=(.*))?\z/ or return fail($self, <<EOM);
-`-c $_' is not of the form -c <name>=<value>'
-EOM
-                my $name = $1;
-                my $value = $2 // 1;
-                _config($self, '--add', $name, $value);
-                if (defined(my $v = $tmp->{$name})) {
-                        if (ref($v) eq 'ARRAY') {
-                                push @$v, $value;
-                        } else {
-                                $tmp->{$name} = [ $v, $value ];
-                        }
-                } else {
-                        $tmp->{$name} = $value;
-                }
-        }
-}
-
-sub lazy_cb ($$$) {
+sub lazy_cb ($$$) { # $pfx is _complete_ or lei_
         my ($self, $cmd, $pfx) = @_;
         my $ucmd = $cmd;
         $ucmd =~ tr/-/_/;
@@ -796,11 +795,19 @@ sub lazy_cb ($$$) {
                 $pkg->can($pfx.$ucmd) : undef;
 }
 
+sub do_env {
+        my $lei = shift;
+        fchdir($lei);
+        my $cb = shift // return ($lei, %{$lei->{env}}) ;
+        local ($current_lei, %ENV) = ($lei, %{$lei->{env}});
+        $cb = $lei->can($cb) if !ref($cb); # $cb may be a scalar sub name
+        eval { $cb->($lei, @_) };
+        $lei->fail($@) if $@;
+}
+
 sub dispatch {
         my ($self, $cmd, @argv) = @_;
-        fchdir($self);
-        local %ENV = %{$self->{env}};
-        local $current_lei = $self; # for __WARN__
+        local ($current_lei, %ENV) = do_env($self);
         $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
         return _help($self, 'no command given') unless defined($cmd);
         # do not support Getopt bundling for this
@@ -812,14 +819,12 @@ sub dispatch {
         }
         if (my $cb = lazy_cb(__PACKAGE__, $cmd, 'lei_')) {
                 optparse($self, $cmd, \@argv) or return;
-                $self->{opt}->{c} and (_tmp_cfg($self) // return);
                 if (my $chdir = $self->{opt}->{C}) {
                         for my $d (@$chdir) {
                                 next if $d eq ''; # same as git(1)
-                                chdir $d or return fail($self, "cd $d: $!");
+                                chdir $d;
                         }
-                        open $self->{3}, '<', '.' or
-                                return fail($self, "open . $!");
+                        open($self->{3}, '<', '.');
                 }
                 $cb->($self, @argv);
         } elsif (grep(/\A-/, $cmd, @argv)) { # --help or -h only
@@ -837,28 +842,30 @@ sub _lei_cfg ($;$) {
         my $f = _config_path($self);
         my @st = stat($f);
         my $cur_st = @st ? pack('dd', $st[10], $st[7]) : ''; # 10:ctime, 7:size
-        my ($sto, $sto_dir, $watches, $lne);
-        if (my $cfg = $PATH2CFG{$f}) { # reuse existing object in common case
-                return ($self->{cfg} = $cfg) if $cur_st eq $cfg->{-st};
+        my ($sto, $sto_dir, $watches, $lne, $cfg);
+        if ($cfg = $PATH2CFG{$f}) { # reuse existing object in common case
+                ($cur_st eq $cfg->{-st} && !$self->{opt}->{c}) and
+                        return ($self->{cfg} = $cfg);
+                # reuse some fields below if they match:
                 ($sto, $sto_dir, $watches, $lne) =
                                 @$cfg{qw(-lei_store leistore.dir -watches
                                         -lei_note_event)};
         }
         if (!@st) {
-                unless ($creat) {
-                        delete $self->{cfg};
-                        return bless {}, 'PublicInbox::Config';
+                unless ($creat) { # any commands which write to cfg must creat
+                        $cfg = PublicInbox::Config->git_config_dump(
+                                                        '/dev/null', $self);
+                        return ($self->{cfg} = $cfg);
                 }
                 my ($cfg_dir) = ($f =~ m!(.*?/)[^/]+\z!);
-                -d $cfg_dir or mkpath($cfg_dir) or die "mkpath($cfg_dir): $!\n";
-                open my $fh, '>>', $f or die "open($f): $!\n";
+                File::Path::mkpath($cfg_dir);
+                open my $fh, '>>', $f;
                 @st = stat($fh) or die "fstat($f): $!\n";
                 $cur_st = pack('dd', $st[10], $st[7]);
                 qerr($self, "# $f created") if $self->{cmd} ne 'config';
         }
-        my $cfg = PublicInbox::Config->git_config_dump($f, $self->{2});
+        $cfg = PublicInbox::Config->git_config_dump($f, $self);
         $cfg->{-st} = $cur_st;
-        $cfg->{'-f'} = $f;
         if ($sto && canonpath_harder($sto_dir // store_path($self))
                         eq canonpath_harder($cfg->{'leistore.dir'} //
                                                 store_path($self))) {
@@ -870,7 +877,7 @@ sub _lei_cfg ($;$) {
                 # FIXME: use inotify/EVFILT_VNODE to detect unlinked configs
                 delete(@PATH2CFG{grep(!-f, keys %PATH2CFG)});
         }
-        $self->{cfg} = $PATH2CFG{$f} = $cfg;
+        $self->{cfg} = $self->{opt}->{c} ? $cfg : ($PATH2CFG{$f} = $cfg);
         refresh_watches($self);
         $cfg;
 }
@@ -886,16 +893,49 @@ sub _lei_store ($;$) {
         };
 }
 
+# returns true on success, undef
+# argv[0] eq `+e' means errors do not ->fail # (like `sh +e')
 sub _config {
         my ($self, @argv) = @_;
-        my %env = (%{$self->{env}}, GIT_CONFIG => undef);
+        my $err_ok = ($argv[0] // '') eq '+e' ? shift(@argv) : undef;
+        my %env;
+        my %opt = map { $_ => $self->{$_} } (0..2);
         my $cfg = _lei_cfg($self, 1);
-        my $cmd = [ qw(git config -f), $cfg->{'-f'}, @argv ];
-        my %rdr = map { $_ => $self->{$_} } (0..2);
-        waitpid(spawn($cmd, \%env, \%rdr), 0);
+        my $opt_c = delete local $cfg->{-opt_c};
+        my @file_arg;
+        if ($opt_c) {
+                my ($set, $get, $nondash);
+                for (@argv) { # order matters for git-config
+                        if (!$nondash) {
+                                if (/\A--(?:add|rename-section|remove-section|
+                                                replace-all|
+                                                unset-all|unset)\z/x) {
+                                        ++$set;
+                                } elsif ($_ eq '-l' || $_ eq '--list' ||
+                                                /\A--get/) {
+                                        ++$get;
+                                } elsif (/\A-/) { # -z and such
+                                } else {
+                                        ++$nondash;
+                                }
+                        } else {
+                                ++$nondash;
+                        }
+                }
+                if ($set || ($nondash//0) > 1 && !$get) {
+                        @file_arg = ('-f', $cfg->{-f});
+                        $env{GIT_CONFIG} = $file_arg[1];
+                } else { # OK, we can use `-c n=v' for read-only
+                        $cfg->{-opt_c} = $opt_c;
+                        $env{GIT_CONFIG} = undef;
+                }
+        }
+        my $cmd = $cfg->config_cmd(\%env, \%opt);
+        push @$cmd, @file_arg, @argv;
+        run_wait($cmd, \%env, \%opt) ? ($err_ok ? undef : fail($self, $?)) : 1;
 }
 
-sub lei_daemon_pid { puts shift, $$ }
+sub lei_daemon_pid { puts shift, $daemon_pid }
 
 sub lei_daemon_kill {
         my ($self) = @_;
@@ -1000,9 +1040,10 @@ sub start_mua {
                 $io->[0] = $self->{1} if $self->{opt}->{stdin} && -t $self->{1};
                 send_exec_cmd($self, $io, \@cmd, {});
         }
-        if ($self->{lxs} && $self->{au_done}) { # kick wait_startq
-                syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0));
-        }
+
+        # kick wait_startq:
+        syswrite($self->{au_done}, 'q') if $self->{lxs} && $self->{au_done};
+
         return unless -t $self->{2}; # XXX how to determine non-TUI MUAs?
         $self->{opt}->{quiet} = 1;
         delete $self->{-progress};
@@ -1011,9 +1052,11 @@ sub start_mua {
 
 sub send_exec_cmd { # tell script/lei to execute a command
         my ($self, $io, $cmd, $env) = @_;
-        my $sock = $self->{sock} // die 'lei client gone';
-        my $fds = [ map { fileno($_) } @$io ];
-        $send_cmd->($sock, $fds, exec_buf($cmd, $env), MSG_EOR);
+        $PublicInbox::IPC::send_cmd->(
+                        $self->{sock} // die('lei client gone'),
+                        [ map { fileno($_) } @$io ],
+                        exec_buf($cmd, $env), 0) //
+                Carp::croak("sendmsg: $!");
 }
 
 sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
@@ -1023,7 +1066,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
         while (my $op = shift(@$alerts)) {
                 if ($op eq ':WINCH') {
                         # hit the process group that started the MUA
-                        send($sock, '-WINCH', MSG_EOR) if $sock;
+                        send($sock, '-WINCH', 0) if $sock;
                 } elsif ($op eq ':bell') {
                         out($self, "\a");
                 } elsif ($op =~ /(?<!\\),/) { # bare ',' (not ',,')
@@ -1032,7 +1075,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
                         my $cmd = $1; # run an arbitrary command
                         require Text::ParseWords;
                         $cmd = [ Text::ParseWords::shellwords($cmd) ];
-                        send($sock, exec_buf($cmd, {}), MSG_EOR) if $sock;
+                        send($sock, exec_buf($cmd, {}), 0) if $sock;
                 } else {
                         warn("W: unsupported --alert=$op\n"); # non-fatal
                 }
@@ -1056,16 +1099,15 @@ sub path_to_fd {
 # caller needs to "-t $self->{1}" to check if tty
 sub start_pager {
         my ($self, $new_env) = @_;
-        my $fh = popen_rd([qw(git var GIT_PAGER)]);
-        chomp(my $pager = <$fh> // '');
-        close($fh) or warn "`git var PAGER' error: \$?=$?";
+        chomp(my $pager = run_qx([qw(git var GIT_PAGER)]));
+        warn "`git var PAGER' error: \$?=$?" if $?;
         return if $pager eq 'cat' || $pager eq '';
         $new_env //= {};
         $new_env->{LESS} //= 'FRX';
         $new_env->{LV} //= '-c';
         $new_env->{MORE} = $new_env->{LESS} if $^O eq 'freebsd';
-        pipe(my ($r, $wpager)) or return warn "pipe: $!";
-        my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
+        my $rdr = { 1 => $self->{1}, 2 => $self->{2} };
+        CORE::pipe($rdr->{0}, my $wpager) or return warn "pipe: $!";
         my $pgr = [ undef, @$rdr{1, 2} ];
         my $env = $self->{env};
         if ($self->{sock}) { # lei(1) process runs it
@@ -1085,17 +1127,17 @@ sub pgr_err {
         my ($self, @msg) = @_;
         return warn(@msg) unless $self->{sock} && -t $self->{2};
         start_pager($self, { LESS => 'RX' }); # no 'F' so we prompt
-        print { $self->{2} } @msg;
+        say { $self->{2} } @msg, '# -quit pager to continue-';
         $self->{2}->autoflush(1);
         stop_pager($self);
-        send($self->{sock}, 'wait', MSG_EOR); # wait for user to quit pager
+        send($self->{sock}, 'wait', 0); # wait for user to quit pager
 }
 
 sub stop_pager {
         my ($self) = @_;
         my $pgr = delete($self->{pgr}) or return;
         $self->{2} = $pgr->[2];
-        close(delete($self->{1})) if $self->{1};
+        delete($self->{1})->close if $self->{1};
         $self->{1} = $pgr->[1];
 }
 
@@ -1105,24 +1147,22 @@ sub accept_dispatch { # Listener {post_accept} callback
         my $self = bless { sock => $sock }, __PACKAGE__;
         vec(my $rvec = '', fileno($sock), 1) = 1;
         select($rvec, undef, undef, 60) or
-                return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
+                return send($sock, 'timed out waiting to recv FDs', 0);
         # (4096 * 33) >MAX_ARG_STRLEN
-        my @fds = $recv_cmd->($sock, my $buf, 4096 * 33) or return; # EOF
+        my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
+                return; # EOF
         if (!defined($fds[0])) {
                 warn(my $msg = "recv_cmd failed: $!");
-                return send($sock, $msg, MSG_EOR);
+                return send($sock, $msg, 0);
         } else {
                 my $i = 0;
-                for my $fd (@fds) {
-                        open($self->{$i++}, '+<&=', $fd) and next;
-                        send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR);
-                }
-                $i == 4 or return send($sock, 'not enough FDs='.($i-1), MSG_EOR)
+                open($self->{$i++}, '+<&=', $_) for @fds;
+                $i == 4 or return send($sock, 'not enough FDs='.($i-1), 0)
         }
         # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
         # $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
         substr($buf, -2, 2, '') eq "\0\0" or  # s/\0\0\z//
-                return send($sock, 'request command truncated', MSG_EOR);
+                return send($sock, 'request command truncated', 0);
         my ($argc, @argv) = split(/\0/, $buf, -1);
         undef $buf;
         my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -1145,11 +1185,11 @@ sub event_step {
         local %ENV = %{$self->{env}};
         local $current_lei = $self;
         eval {
-                my @fds = $recv_cmd->($self->{sock} // return, my $buf, 4096);
+                my @fds = $PublicInbox::IPC::recv_cmd->(
+                        $self->{sock} // return, my $buf, 4096);
                 if (scalar(@fds) == 1 && !defined($fds[0])) {
                         return if $! == EAGAIN;
                         die "recvmsg: $!" if $! != ECONNRESET;
-                        $buf = '';
                         @fds = (); # for open loop below:
                 }
                 for (@fds) { open my $rfh, '+<&=', $_ }
@@ -1167,7 +1207,7 @@ sub event_step {
                         die "unrecognized client signal: $buf";
                 }
                 my $s = $self->{-socks} // []; # lei up --all
-                @$s = grep { send($_, $buf, MSG_EOR) } @$s;
+                @$s = grep { send($_, $buf, 0) } @$s;
         };
         if (my $err = $@) {
                 eval { $self->fail($err) };
@@ -1185,8 +1225,6 @@ sub event_step_init {
         };
 }
 
-sub noop {}
-
 sub oldset { $oldset }
 
 sub dump_and_clear_log {
@@ -1203,48 +1241,87 @@ sub dump_and_clear_log {
 sub cfg2lei ($) {
         my ($cfg) = @_;
         my $lei = bless { env => { %{$cfg->{-env}} } }, __PACKAGE__;
-        open($lei->{0}, '<&', \*STDIN) or die "dup 0: $!";
-        open($lei->{1}, '>>&', \*STDOUT) or die "dup 1: $!";
-        open($lei->{2}, '>>&', \*STDERR) or die "dup 2: $!";
-        open($lei->{3}, '<', '/') or die "open /: $!";
-        my ($x, $y);
-        socketpair($x, $y, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
+        open($lei->{0}, '<&', \*STDIN);
+        open($lei->{1}, '>>&', \*STDOUT);
+        open($lei->{2}, '>>&', \*STDERR);
+        open($lei->{3}, '<', '/');
+        socketpair(my $x, my $y, AF_UNIX, SOCK_SEQPACKET, 0);
         $lei->{sock} = $x;
         require PublicInbox::LeiSelfSocket;
         PublicInbox::LeiSelfSocket->new($y); # adds to event loop
         $lei;
 }
 
+sub note_event ($@) { # runs lei_note_event for a given config file
+        my ($cfg_f, @args) = @_;
+        my $cfg = $PATH2CFG{$cfg_f} // return;
+        eval { cfg2lei($cfg)->dispatch('note-event', @args) };
+        carp "E: note-event $cfg_f: $@\n" if $@;
+}
+
 sub dir_idle_handler ($) { # PublicInbox::DirIdle callback
         my ($ev) = @_; # Linux::Inotify2::Event or duck type
         my $fn = $ev->fullname;
         if ($fn =~ m!\A(.+)/(new|cur)/([^/]+)\z!) { # Maildir file
-                my ($mdir, $nc, $bn) = ($1, $2, $3);
-                $nc = '' if $ev->IN_DELETE || $ev->IN_MOVED_FROM;
-                for my $f (keys %{$MDIR2CFGPATH->{$mdir} // {}}) {
-                        my $cfg = $PATH2CFG{$f} // next;
-                        eval {
-                                my $lei = cfg2lei($cfg);
-                                $lei->dispatch('note-event',
-                                                "maildir:$mdir", $nc, $bn, $fn);
-                        };
-                        warn "E: note-event $f: $@\n" if $@;
+                my ($loc, $new_cur, $bn) = ("maildir:$1", $2, $3);
+                $new_cur = '' if $ev->IN_DELETE || $ev->IN_MOVED_FROM;
+                for my $cfg_f (keys %{$MDIR2CFGPATH->{$loc} // {}}) {
+                        note_event($cfg_f, $loc, $new_cur, $bn, $fn);
                 }
-        }
+        } elsif ($fn =~ m!\A(.+)/([0-9]+)\z!) { # MH mail message file
+                my ($loc, $n, $new_cur) = ("mh:$1", $2, '+');
+                $new_cur = '' if $ev->IN_DELETE || $ev->IN_MOVED_FROM;
+                for my $cfg_f (keys %{$MDIR2CFGPATH->{$loc} // {}}) {
+                        note_event($cfg_f, $loc, $new_cur, $n, $fn);
+                }
+        } elsif ($fn =~ m!\A(.+)/\.mh_sequences\z!) { # reread flags
+                my $loc = "mh:$1";
+                for my $cfg_f (keys %{$MDIR2CFGPATH->{$loc} // {}}) {
+                        note_event($cfg_f, $loc, '.mh_sequences')
+                }
+        } # else we don't care
         if ($ev->can('cancel') && ($ev->IN_IGNORE || $ev->IN_UNMOUNT)) {
                 $ev->cancel;
         }
         if ($fn =~ m!\A(.+)/(?:new|cur)\z! && !-e $fn) {
-                delete $MDIR2CFGPATH->{$1};
+                delete $MDIR2CFGPATH->{"maildir:$1"};
         }
-        if (!-e $fn) { # config file or Maildir gone
-                for my $cfgpaths (values %$MDIR2CFGPATH) {
-                        delete $cfgpaths->{$fn};
-                }
+        if (!-e $fn) { # config file, Maildir, or MH dir gone
+                delete $_->{$fn} for values %$MDIR2CFGPATH; # config file
+                delete @$MDIR2CFGPATH{"maildir:$fn", "mh:$fn"};
                 delete $PATH2CFG{$fn};
         }
 }
 
+sub can_stay_alive { # PublicInbox::DS::post_loop_do cb
+        my ($path, $dev_ino_expect) = @_;
+        if (my @st = defined($$path) ? stat($$path) : ()) {
+                if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
+                        warn "$$path dev/ino changed, quitting\n";
+                        $$path = undef;
+                }
+        } elsif (defined($$path)) { # ENOENT is common
+                warn "stat($$path): $!, quitting ...\n" if $! != ENOENT;
+                undef $$path;
+                $quit->();
+        }
+        return 1 if defined($$path);
+        my $n = PublicInbox::DS::close_non_busy() or do {
+                eval 'PublicInbox::LeiNoteEvent::flush_task()';
+                # drop stores only if no clients
+                for my $cfg (values %PATH2CFG) {
+                        my $lne = delete($cfg->{-lei_note_event});
+                        $lne->wq_close if $lne;
+                        my $sto = delete($cfg->{-lei_store}) // next;
+                        eval { $sto->wq_do('done') if $sto->{-wq_s1} };
+                        warn "E: $@ (dropping store for $cfg->{-f})" if $@;
+                        $sto->wq_close;
+                }
+        };
+        # returns true: continue, false: stop
+        $n + scalar(keys(%PublicInbox::DS::AWAIT_PIDS));
+}
+
 # lei(1) calls this when it can't connect
 sub lazy_start {
         my ($path, $errno, $narg) = @_;
@@ -1252,106 +1329,66 @@ sub lazy_start {
         my ($sock_dir) = ($path =~ m!\A(.+?)/[^/]+\z!);
         $errors_log = "$sock_dir/errors.log";
         my $addr = pack_sockaddr_un($path);
-        my $lk = bless { lock_path => $errors_log }, 'PublicInbox::Lock';
+        my $lk = PublicInbox::Lock->new($errors_log);
         umask(077) // die("umask(077): $!");
         $lk->lock_acquire;
-        socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
+        socket($listener, AF_UNIX, SOCK_SEQPACKET, 0);
         if ($errno == ECONNREFUSED || $errno == ENOENT) {
                 return if connect($listener, $addr); # another process won
-                if ($errno == ECONNREFUSED && -S $path) {
-                        unlink($path) or die "unlink($path): $!";
-                }
+                unlink($path) if $errno == ECONNREFUSED && -S $path;
         } else {
                 $! = $errno; # allow interpolation to stringify in die
                 die "connect($path): $!";
         }
-        bind($listener, $addr) or die "bind($path): $!";
+        bind($listener, $addr);
         $lk->lock_release;
         undef $lk;
         my @st = stat($path) or die "stat($path): $!";
         my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
-        local $oldset = PublicInbox::DS::block_signals();
-        if ($narg == 5) {
-                $send_cmd = PublicInbox::Spawn->can('send_cmd4');
-                $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
-                        require PublicInbox::CmdIPC4;
-                        $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4');
-                        PublicInbox::CmdIPC4->can('recv_cmd4');
-                } // do {
-                        $send_cmd = PublicInbox::Syscall->can('send_cmd4');
-                        PublicInbox::Syscall->can('recv_cmd4');
-                };
-        }
-        $recv_cmd or die <<"";
+        local $oldset = PublicInbox::DS::block_signals(POSIX::SIGALRM);
+        die "incompatible narg=$narg" if $narg != 5;
+        $PublicInbox::IPC::send_cmd or die <<"";
 (Socket::MsgHdr || Inline::C) missing/unconfigured (narg=$narg);
 
         require PublicInbox::Listener;
         require PublicInbox::PktOp;
         (-p STDOUT) or die "E: stdout must be a pipe\n";
-        open(STDIN, '+>>', $errors_log) or die "open($errors_log): $!";
+        open(STDIN, '+>>', $errors_log);
         STDIN->autoflush(1);
         dump_and_clear_log();
         POSIX::setsid() > 0 or die "setsid: $!";
-        my $pid = fork // die "fork: $!";
+        my $pid = fork;
         return if $pid;
         $0 = "lei-daemon $path";
-        local %PATH2CFG;
-        local $MDIR2CFGPATH;
+        local (%PATH2CFG, $MDIR2CFGPATH);
+        local $daemon_pid = $$;
         $listener->blocking(0);
         my $exit_code;
         my $pil = PublicInbox::Listener->new($listener, \&accept_dispatch);
         local $quit = do {
                 my (undef, $eof_p) = PublicInbox::PktOp->pair;
                 sub {
-                        $exit_code //= shift;
+                        $exit_code //= eval("POSIX::SIG$_[0] + 128") if @_;
+                        $dir_idle->close if $dir_idle; # EPOLL_CTL_DEL
+                        $dir_idle = undef; # let RC take care of it
                         eval 'PublicInbox::LeiNoteEvent::flush_task()';
-                        my $lis = $pil or exit($exit_code);
+                        my $lis = $pil or exit($exit_code // 0);
                         # closing eof_p triggers \&noop wakeup
                         $listener = $eof_p = $pil = $path = undef;
                         $lis->close; # DS::close
-                        PublicInbox::DS->SetLoopTimeout(1000);
                 };
         };
-        my $sig = {
-                CHLD => \&PublicInbox::DS::enqueue_reap,
-                QUIT => $quit,
-                INT => $quit,
-                TERM => $quit,
-                HUP => \&noop,
-                USR1 => \&noop,
-                USR2 => \&noop,
-        };
+        my $sig = { CHLD => \&PublicInbox::DS::enqueue_reap };
+        $sig->{$_} = $quit for qw(QUIT INT TERM);
+        $sig->{$_} = \&PublicInbox::Config::noop for qw(HUP USR1 USR2);
         require PublicInbox::DirIdle;
         local $dir_idle = PublicInbox::DirIdle->new(sub {
-                # just rely on wakeup to hit PostLoopCallback set below
+                # just rely on wakeup to hit post_loop_do
                 dir_idle_handler($_[0]) if $_[0]->fullname ne $path;
         });
         $dir_idle->add_watches([$sock_dir]);
-        PublicInbox::DS->SetPostLoopCallback(sub {
-                my ($dmap, undef) = @_;
-                if (@st = defined($path) ? stat($path) : ()) {
-                        if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
-                                warn "$path dev/ino changed, quitting\n";
-                                $path = undef;
-                        }
-                } elsif (defined($path)) { # ENOENT is common
-                        warn "stat($path): $!, quitting ...\n" if $! != ENOENT;
-                        undef $path;
-                        $quit->();
-                }
-                return 1 if defined($path);
-                my $n = 0;
-                for my $s (values %$dmap) {
-                        $s->can('busy') or next;
-                        if ($s->busy) {
-                                ++$n;
-                        } else {
-                                $s->close;
-                        }
-                }
-                $n; # true: continue, false: stop
-        });
-
+        local @PublicInbox::DS::post_loop_do = (\&can_stay_alive,
+                                                \$path, $dev_ino_expect);
         # STDIN was redirected to /dev/null above, closing STDERR and
         # STDOUT will cause the calling `lei' client process to finish
         # reading the <$daemon> pipe.
@@ -1359,13 +1396,13 @@ sub lazy_start {
                 $current_lei ? err($current_lei, @_) : warn(
                   strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(time))," $$ ", @_);
         };
-        open STDERR, '>&STDIN' or die "redirect stderr failed: $!";
-        open STDOUT, '>&STDIN' or die "redirect stdout failed: $!";
+        local $SIG{PIPE} = 'IGNORE';
+        local $SIG{ALRM} = 'IGNORE';
+        open STDERR, '>&STDIN';
+        open STDOUT, '>&STDIN';
         # $daemon pipe to `lei' closed, main loop begins:
         eval { PublicInbox::DS::event_loop($sig, $oldset) };
         warn "event loop error: $@\n" if $@;
-        # exit() may trigger waitpid via various DESTROY, ensure interruptible
-        PublicInbox::DS::sig_setmask($oldset);
         dump_and_clear_log();
         exit($exit_code // 0);
 }
@@ -1376,9 +1413,10 @@ sub busy { 1 } # prevent daemon-shutdown if client is connected
 # can immediately reread it
 sub DESTROY {
         my ($self) = @_;
-        if (my $counters = delete $self->{counters}) {
-                for my $k (sort keys %$counters) {
-                        my $nr = $counters->{$k};
+        if (defined($self->{incr_pid}) && $self->{incr_pid} == $$) {
+                for my $k (sort(grep(/\A-nr_/, keys %$self))) {
+                        my $nr = $self->{$k};
+                        substr($k, 0, length('-nr_'), '');
                         $self->child_error(0, "$nr $k messages");
                 }
         }
@@ -1388,9 +1426,8 @@ sub DESTROY {
         # preserve $? for ->fail or ->x_it code
 }
 
-sub wq_done_wait { # dwaitpid callback
-        my ($arg, $pid) = @_;
-        my ($wq, $lei) = @$arg;
+sub wq_done_wait { # awaitpid cb (via wq_eof)
+        my ($pid, $wq, $lei) = @_;
         local $current_lei = $lei;
         my $err_type = $lei->{-err_type};
         $? and $lei->child_error($?,
@@ -1400,8 +1437,7 @@ sub wq_done_wait { # dwaitpid callback
 
 sub fchdir {
         my ($lei) = @_;
-        my $dh = $lei->{3} // die 'BUG: lei->{3} (CWD) gone';
-        chdir($dh) || die "fchdir: $!";
+        chdir($lei->{3} // die 'BUG: lei->{3} (CWD) gone');
 }
 
 sub wq_eof { # EOF callback for main daemon
@@ -1417,19 +1453,22 @@ sub watch_state_ok ($) {
         $state =~ /\Apause|(?:import|index|tag)-(?:ro|rw)\z/;
 }
 
-sub cancel_maildir_watch ($$) {
-        my ($d, $cfg_f) = @_;
-        my $w = delete $MDIR2CFGPATH->{$d}->{$cfg_f};
-        scalar(keys %{$MDIR2CFGPATH->{$d}}) or
-                delete $MDIR2CFGPATH->{$d};
-        for my $x (@{$w // []}) { $x->cancel }
+sub cancel_dir_watch ($$$) {
+        my ($type, $d, $cfg_f) = @_;
+        my $loc = "$type:".canonpath_harder($d);
+        my $w = delete $MDIR2CFGPATH->{$loc}->{$cfg_f};
+        delete $MDIR2CFGPATH->{$loc} if !(keys %{$MDIR2CFGPATH->{$loc}});
+        $_->cancel for @$w;
 }
 
-sub add_maildir_watch ($$) {
-        my ($d, $cfg_f) = @_;
-        if (!exists($MDIR2CFGPATH->{$d}->{$cfg_f})) {
-                my @w = $dir_idle->add_watches(["$d/cur", "$d/new"], 1);
-                push @{$MDIR2CFGPATH->{$d}->{$cfg_f}}, @w if @w;
+sub add_dir_watch ($$$) {
+        my ($type, $d, $cfg_f) = @_;
+        $d = canonpath_harder($d);
+        my $loc = "$type:$d";
+        my @dirs = $type eq 'mh' ? ($d) : ("$d/cur", "$d/new");
+        if (!exists($MDIR2CFGPATH->{$loc}->{$cfg_f})) {
+                my @w = $dir_idle->add_watches(\@dirs, 1);
+                push @{$MDIR2CFGPATH->{$loc}->{$cfg_f}}, @w if @w;
         }
 }
 
@@ -1442,24 +1481,20 @@ sub refresh_watches {
         my %seen;
         my $cfg_f = $cfg->{'-f'};
         for my $w (grep(/\Awatch\..+\.state\z/, keys %$cfg)) {
-                my $url = substr($w, length('watch.'), -length('.state'));
+                my $loc = substr($w, length('watch.'), -length('.state'));
                 require PublicInbox::LeiWatch;
-                $watches->{$url} //= PublicInbox::LeiWatch->new($url);
-                $seen{$url} = undef;
-                my $state = $cfg->get_1("watch.$url.state");
+                $watches->{$loc} //= PublicInbox::LeiWatch->new($loc);
+                $seen{$loc} = undef;
+                my $state = $cfg->get_1("watch.$loc.state");
                 if (!watch_state_ok($state)) {
-                        warn("watch.$url.state=$state not supported\n");
-                        next;
-                }
-                if ($url =~ /\Amaildir:(.+)/i) {
-                        my $d = canonpath_harder($1);
-                        if ($state eq 'pause') {
-                                cancel_maildir_watch($d, $cfg_f);
-                        } else {
-                                add_maildir_watch($d, $cfg_f);
-                        }
+                        warn("watch.$loc.state=$state not supported\n");
+                } elsif ($loc =~ /\A(maildir|mh):(.+)\z/i) {
+                        my ($type, $d) = ($1, $2);
+                        $state eq 'pause' ?
+                                cancel_dir_watch($type, $d, $cfg_f) :
+                                add_dir_watch($type, $d, $cfg_f);
                 } else { # TODO: imap/nntp/jmap
-                        $lei->child_error(0, "E: watch $url not supported, yet")
+                        $lei->child_error(0, "E: watch $loc not supported, yet")
                 }
         }
 
@@ -1467,29 +1502,28 @@ sub refresh_watches {
         my $lms = $lei->lms;
         if ($lms) {
                 $lms->lms_write_prepare;
-                for my $d ($lms->folders('maildir:')) {
-                        substr($d, 0, length('maildir:')) = '';
-
+                for my $loc ($lms->folders(qr/\A(?:maildir|mh):/)) {
+                        my $old = $loc;
+                        my ($type, $d) = split /:/, $loc, 2;
                         # fixup old bugs while we're iterating:
-                        my $cd = canonpath_harder($d);
-                        my $f = "maildir:$cd";
-                        $lms->rename_folder("maildir:$d", $f) if $d ne $cd;
-                        next if $watches->{$f}; # may be set to pause
+                        $d = canonpath_harder($d);
+                        $loc = "$type:$d";
+                        $lms->rename_folder($old, $loc) if $old ne $loc;
+                        next if $watches->{$loc}; # may be set to pause
                         require PublicInbox::LeiWatch;
-                        $watches->{$f} = PublicInbox::LeiWatch->new($f);
-                        $seen{$f} = undef;
-                        add_maildir_watch($cd, $cfg_f);
+                        $watches->{$loc} = PublicInbox::LeiWatch->new($loc);
+                        $seen{$loc} = undef;
+                        add_dir_watch($type, $d, $cfg_f);
                 }
         }
         if ($old) { # cull old non-existent entries
-                for my $url (keys %$old) {
-                        next if exists $seen{$url};
-                        delete $old->{$url};
-                        if ($url =~ /\Amaildir:(.+)/i) {
-                                my $d = canonpath_harder($1);
-                                cancel_maildir_watch($d, $cfg_f);
+                for my $loc (keys %$old) {
+                        next if exists $seen{$loc};
+                        delete $old->{$loc};
+                        if ($loc =~ /\A(maildir|mh):(.+)\z/i) {
+                                cancel_dir_watch($1, $2, $cfg_f);
                         } else { # TODO: imap/nntp/jmap
-                                $lei->child_error(0, "E: watch $url TODO");
+                                $lei->child_error(0, "E: watch $loc TODO");
                         }
                 }
         }
@@ -1517,22 +1551,22 @@ sub lms {
 
 sub sto_done_request {
         my ($lei, $wq) = @_;
-        return unless $lei->{sto};
+        return unless $lei->{sto} && $lei->{sto}->{-wq_s1};
         local $current_lei = $lei;
-        my $sock = $wq ? $wq->{lei_sock} : undef;
-        eval {
-                if ($sock //= $lei->{sock}) { # issue, async wait
-                        $lei->{sto}->wq_io_do('done', [ $sock ]);
-                } else { # forcibly wait
-                        my $wait = $lei->{sto}->wq_do('done');
-                }
-        };
+        if (my $n = $lei->{opt}->{'commit-delay'}) {
+                eval { $lei->{sto}->wq_do('schedule_commit', $n) };
+        } else {
+                my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
+                my $errfh = $lei->{2} // *STDERR{GLOB};
+                my @io = $s ? ($errfh, $s) : ($errfh);
+                eval { $lei->{sto}->wq_io_do('done', \@io) };
+        }
         warn($@) if $@;
 }
 
 sub cfg_dump ($$) {
         my ($lei, $f) = @_;
-        my $ret = eval { PublicInbox::Config->git_config_dump($f, $lei->{2}) };
+        my $ret = eval { PublicInbox::Config->git_config_dump($f, $lei) };
         return $ret if !$@;
         warn($@);
         undef;
@@ -1541,7 +1575,7 @@ sub cfg_dump ($$) {
 sub request_umask {
         my ($lei) = @_;
         my $s = $lei->{sock} // return;
-        send($s, 'umask', MSG_EOR) // die "send: $!";
+        send($s, 'umask', 0) // die "send: $!";
         vec(my $rvec = '', fileno($s), 1) = 1;
         select($rvec, undef, undef, 2) or die 'timeout waiting for umask';
         recv($s, my $v, 5, 0) // die "recv: $!";
@@ -1549,4 +1583,24 @@ sub request_umask {
         $u eq 'u' or warn "E: recv $v has no umask";
 }
 
+sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
+        my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
+        $_[1] // return $lei->fail("error reading stdin: $!");
+        $lei->{stdin_buf} .= $_[-1];
+        do_env($lei, $cb) if $_[-1] eq '';
+}
+
+sub slurp_stdin {
+        my ($lei, $cb) = @_;
+        require PublicInbox::InputPipe;
+        my $in = $lei->{0};
+        if (-t $in) { # run cat via script/lei and read from it
+                $in = undef;
+                pipe($in, my $wr);
+                say { $lei->{2} } '# enter query, Ctrl-D when done';
+                send_exec_cmd($lei, [ $lei->{0}, $wr ], ['cat'], {});
+        }
+        PublicInbox::InputPipe::consume($in, \&_stdin_cb, $lei, $cb);
+}
+
 1;