diff options
Diffstat (limited to 'lib/PublicInbox/LEI.pm')
-rw-r--r-- | lib/PublicInbox/LEI.pm | 645 |
1 files changed, 349 insertions, 296 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index d81ca296..e9a0de6c 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -9,8 +9,9 @@ package PublicInbox::LEI; use v5.12; use parent qw(PublicInbox::DS PublicInbox::LeiExternal PublicInbox::LeiQuery); +use autodie qw(bind chdir open pipe socket socketpair syswrite unlink); use Getopt::Long (); -use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); +use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un); use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET); use Cwd qw(getcwd); use POSIX qw(strftime); @@ -18,33 +19,36 @@ use IO::Handle (); use Fcntl qw(SEEK_SET); use PublicInbox::Config; use PublicInbox::Syscall qw(EPOLLIN); -use PublicInbox::DS qw(dwaitpid); -use PublicInbox::Spawn qw(spawn popen_rd); +use PublicInbox::Spawn qw(run_wait popen_rd run_qx); use PublicInbox::Lock; use PublicInbox::Eml; use PublicInbox::Import; use PublicInbox::ContentHash qw(git_sha); +use PublicInbox::OnDestroy; +use PublicInbox::IPC; use Time::HiRes qw(stat); # ctime comparisons for config cache -use File::Path qw(mkpath); +use File::Path (); use File::Spec; +use Carp qw(carp); use Sys::Syslog qw(openlog syslog closelog); our $quit = \&CORE::exit; -our ($current_lei, $errors_log, $listener, $oldset, $dir_idle, - $recv_cmd, $send_cmd); +our ($current_lei, $errors_log, $listener, $oldset, $dir_idle); my $GLP = Getopt::Long::Parser->new; $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev)); my $GLP_PASS = Getopt::Long::Parser->new; $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through)); -our %PATH2CFG; # persistent for socket daemon -our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] } +our (%PATH2CFG, # persistent for socket daemon +$MDIR2CFGPATH, # location => { /path/to/config => [ ino watches ] } +$OPT, # shared between optparse and opt_dash callback (for Getopt::Long) +$daemon_pid +); # TBD: this is a documentation mechanism to show a subcommand # (may) pass options through to another command: sub pass_through { $GLP_PASS } -my $OPT; -sub opt_dash ($$) { +sub opt_dash ($$) { # callback runs inside optparse my ($spec, $re_str) = @_; # 'limit|n=i', '([0-9]+)' my ($key) = ($spec =~ m/\A([a-z]+)/g); my $cb = sub { # Getopt::Long "<>" catch-all handler @@ -158,7 +162,7 @@ our @diff_opt = qw(unified|U=i output-indicator-new=s output-indicator-old=s rename-empty! check ws-error-highlight=s full-index binary abbrev:i break-rewrites|B:s find-renames|M:s find-copies:s find-copies-harder irreversible-delete|D l=i diff-filter=s - S=s G=s find-object=s pickaxe-all pickaxe-regex O=s R + S=s G=s find-object=s pickaxe-all pickaxe-regex R relative:s text|a ignore-cr-at-eol ignore-space-at-eol ignore-space-change|b ignore-all-space|w ignore-blank-lines inter-hunk-context=i function-context|W exit-code ext-diff @@ -173,6 +177,7 @@ our %CMD = ( # sorted in order of importance/use: 'stdin|', # /|\z/ must be first for lone dash @lxs_opt, @net_opt, qw(save! output|mfolder|o=s format|f=s dedupe|d=s threads|t+ + thread-id|T=s sort|s=s reverse|r offset=i pretty jobs|j=s globoff|g augment|a import-before! lock=s@ rsyncable alert=s@ mua=s verbose|v+ shared color! mail-sync!), @c_opt, opt_dash('limit|n=i', '[0-9]+') ], @@ -197,8 +202,8 @@ our %CMD = ( # sorted in order of importance/use: 'rediff' => [ '--stdin|LOCATION...', 'regenerate a diff with different options', 'stdin|', # /|\z/ must be first for lone dash - qw(git-dir=s@ cwd! verbose|v+ color:s no-color drq:1 dequote-only:1), - @diff_opt, @lxs_opt, @net_opt, @c_opt ], + qw(git-dir=s@ cwd! verbose|v+ color:s no-color drq:1 dequote-only:1 + order-file=s), @diff_opt, @lxs_opt, @net_opt, @c_opt ], 'mail-diff' => [ '--stdin|LOCATION...', 'diff the contents of emails', 'stdin|', # /|\z/ must be first for lone dash @@ -229,20 +234,21 @@ our %CMD = ( # sorted in order of importance/use: 'rm' => [ '--stdin|LOCATION...', 'remove a message from the index and prevent reindexing', 'stdin|', # /|\z/ must be first for lone dash - qw(in-format|F=s lock=s@), @net_opt, @c_opt ], + qw(in-format|F=s lock=s@ commit-delay=i), @net_opt, @c_opt ], 'plonk' => [ '--threads|--from=IDENT', 'exclude mail matching From: or threads from non-Message-ID searches', qw(stdin| threads|t from|f=s mid=s oid=s), @c_opt ], -'tag' => [ 'KEYWORDS...', +tag => [ 'KEYWORDS... LOCATION...|--stdin', 'set/unset keywords and/or labels on message(s)', - qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@), + qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@ commit-delay=i), @net_opt, @c_opt, pass_through('-kw:foo for delete') ], 'purge-mailsource' => [ 'LOCATION|--all', 'remove imported messages from IMAP, Maildirs, and MH', qw(exact! all jobs:i indexed), @c_opt ], -'add-watch' => [ 'LOCATION...', 'watch for new messages and flag changes', +'add-watch' => [ 'LOCATION... [LABELS...]', + 'watch for new messages and flag changes', qw(poll-interval=s state=s recursive|r), @c_opt ], 'rm-watch' => [ 'LOCATION...', 'remove specified watch(es)', qw(recursive|r), @c_opt ], @@ -253,14 +259,17 @@ our %CMD = ( # sorted in order of importance/use: 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch', qw(prune), @c_opt ], -'index' => [ 'LOCATION...', 'one-time index from URL or filesystem', +'reindex' => [ '', 'reindex all locally-indexed messages', @c_opt ], + +'index' => [ 'LOCATION... [LABELS...]', 'one-time index from URL or filesystem', qw(in-format|F=s kw! offset=i recursive|r exclude=s include|I=s verbose|v+ incremental!), @net_opt, # mainly for --proxy= @c_opt ], -'import' => [ 'LOCATION...|--stdin', +import => [ 'LOCATION...|--stdin [LABELS...]', 'one-time import/update from URL or filesystem', qw(stdin| offset=i recursive|r exclude=s include|I=s new-only - lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!), + lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync! + commit-delay=i sort|s:s@), @net_opt, @c_opt ], 'forget-mail-sync' => [ 'LOCATION...', 'forget sync information for a mail folder', @c_opt ], @@ -272,13 +281,15 @@ our %CMD = ( # sorted in order of importance/use: qw(all:s mode=s), @net_opt, @c_opt ], 'convert' => [ 'LOCATION...|--stdin', 'one-time conversion from URL or filesystem to another format', - qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s lock=s@ kw!), + qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s lock=s@ kw! + rsyncable sort|s:s@), @net_opt, @c_opt ], 'p2q' => [ 'LOCATION_OR_COMMIT...|--stdin', "use a patch to generate a query for `lei q --stdin'", qw(stdin| in-format|F=s want|w=s@ uri debug), @net_opt, @c_opt ], 'config' => [ '[...]', sub { - 'git-config(1) wrapper for '._config_path($_[0]); + 'git-config(1) wrapper for '._config_path($_[0]). "\n" . + '-l/--list and other common git-config uses are supported' }, qw(config-file|system|global|file|f=s), # for conflict detection qw(edit|e c=s@ C=s@), pass_through('git config') ], 'inspect' => [ 'ITEMS...|--stdin', 'inspect lei/store and/or local external', @@ -312,6 +323,9 @@ our %CMD = ( # sorted in order of importance/use: my $stdin_formats = [ 'MAIL_FORMAT|eml|mboxrd|mboxcl2|mboxcl|mboxo', 'specify message input format' ]; my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ]; +my $sort_out = [ 'VAL|received|relevance|docid', + "order of results is `--output'-dependent"]; +my $sort_in = [ 'sequence|mtime|size', 'sort input (format-dependent)' ]; # we use \x{a0} (non-breaking SP) to avoid wrapping in PublicInbox::LeiHelp my %OPTDESC = ( @@ -344,6 +358,7 @@ my %OPTDESC = ( 'no-torsocks' => 'alias for --torsocks=no', 'save!' => "do not save a search for `lei up'", 'import-remote!' => 'do not memoize remote messages into local store', +'import-before!' => 'do not import before writing to output (DANGEROUS)', 'type=s' => [ 'any|mid|git', 'disambiguate type' ], @@ -397,8 +412,10 @@ my %OPTDESC = ( 'include specified external(s) in search' ], 'only|O=s@ q' => [ 'LOCATION', 'only use specified external(s) for search' ], -'jobs=s q' => [ '[SEARCH_JOBS][,WRITER_JOBS]', - 'control number of search and writer jobs' ], +'jobs|j=s' => [ 'JOBSPEC', + 'control number of query and writer jobs' . + "integers delimited by `,', either of which may be omitted" + ], 'jobs|j=i add-external' => 'set parallelism when indexing after --mirror', 'in-format|F=s' => $stdin_formats, @@ -416,8 +433,10 @@ my %OPTDESC = ( 'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ], 'offset=i' => ['OFF', 'search result offset (default: 0)'], -'sort|s=s' => [ 'VAL|received|relevance|docid', - "order of results is `--output'-dependent"], +'sort|s=s q' => $sort_out, +'sort|s=s lcat' => $sort_out, +'sort|s:s@ convert' => $sort_in, +'sort|s:s@ import' => $sort_in, 'reverse|r' => 'reverse search results', # like sort(1) 'boost=i' => 'increase/decrease priority of results (default: 0)', @@ -451,6 +470,7 @@ my %OPTDESC = ( 'z|0' => 'use NUL \\0 instead of newline (CR) to delimit lines', 'signal|s=s' => [ 'SIG', 'signal to send lei-daemon (default: TERM)' ], +'edit|e config' => 'open an editor to modify the lei config file', ); # %OPTDESC my %CONFIG_KEYS = ( @@ -462,7 +482,7 @@ my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers sub _drop_wq { my ($self) = @_; for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) { - $wq->wq_kill('-TERM'); + $wq->wq_kill(-POSIX::SIGTERM()); $wq->DESTROY; } } @@ -470,17 +490,18 @@ sub _drop_wq { # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE sub x_it ($$) { my ($self, $code) = @_; - local $current_lei = $self; # make sure client sees stdout before exit $self->{1}->autoflush(1) if $self->{1}; stop_pager($self); if ($self->{pkt_op_p}) { # worker => lei-daemon $self->{pkt_op_p}->pkt_do('x_it', $code); + exit($code >> 8) if $$ != $daemon_pid; } elsif ($self->{sock}) { # lei->daemon => lei(1) client - send($self->{sock}, "x_it $code", MSG_EOR); + send($self->{sock}, "x_it $code", 0); } elsif ($quit == \&CORE::exit) { # an admin (one-shot) command exit($code >> 8); } # else ignore if client disconnected + $self->dclose if $$ == $daemon_pid; } sub err ($;@) { @@ -489,7 +510,7 @@ sub err ($;@) { my @eor = (substr($_[-1]//'', -1, 1) eq "\n" ? () : ("\n")); print $err @_, @eor and return; my $old_err = delete $self->{2}; - close($old_err) if $! == EPIPE && $old_err; + $old_err->close if $! == EPIPE && $old_err; $err = $self->{2} = ($self->{pgr} // [])->[2] // *STDERR{GLOB}; print $err @_, @eor or print STDERR @_, @eor; } @@ -504,8 +525,7 @@ sub qfin { # show message on finalization (LeiFinmsg) sub fail_handler ($;$$) { my ($lei, $code, $io) = @_; - local $current_lei = $lei; - close($io) if $io; # needed to avoid warnings on SIGPIPE + $io->close if $io; # needed to avoid warnings on SIGPIPE _drop_wq($lei); x_it($lei, $code // (1 << 8)); } @@ -514,13 +534,17 @@ sub sigpipe_handler { # handles SIGPIPE from @WQ_KEYS workers fail_handler($_[0], 13, delete $_[0]->{1}); } -sub fail ($$;$) { - my ($self, $msg, $exit_code) = @_; - local $current_lei = $self; - $self->{failed}++; - warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg; - $self->{pkt_op_p}->pkt_do('fail_handler') if $self->{pkt_op_p}; - x_it($self, ($exit_code // 1) << 8); +sub fail ($;@) { + my ($lei, @msg) = @_; + my $exit_code = ($msg[0]//'') =~ /\A-?[0-9]+\z/ ? shift(@msg) : undef; + local $current_lei = $lei; + $lei->{failed}++; + if (@msg) { + push @msg, "\n" if substr($msg[-1], -1, 1); + warn @msg; + } + $lei->{pkt_op_p}->pkt_do('fail_handler') if $lei->{pkt_op_p}; + x_it($lei, $exit_code // (1 << 8)); undef; } @@ -540,18 +564,17 @@ sub child_error { # passes non-fatal curl exit codes to user local $current_lei = $self; $child_error ||= 1 << 8; warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg; + $self->{child_error} ||= $child_error; if ($self->{pkt_op_p}) { # to top lei-daemon $self->{pkt_op_p}->pkt_do('child_error', $child_error); } elsif ($self->{sock}) { # to lei(1) client - send($self->{sock}, "child_error $child_error", MSG_EOR); - } else { # non-lei admin command - $self->{child_error} ||= $child_error; + send($self->{sock}, "child_error $child_error", 0); } # else noop if client disconnected } sub note_sigpipe { # triggers sigpipe_handler my ($self, $fd) = @_; - close(delete($self->{$fd})); # explicit close silences Perl warning + delete($self->{$fd})->close; # explicit close silences Perl warning $self->{pkt_op_p}->pkt_do('sigpipe_handler') if $self->{pkt_op_p}; x_it($self, 13); } @@ -559,20 +582,21 @@ sub note_sigpipe { # triggers sigpipe_handler sub _lei_atfork_child { my ($self, $persist) = @_; # we need to explicitly close things which are on stack + my $cfg = $self->{cfg}; + delete @$cfg{qw(-watches -lei_note_event)}; if ($persist) { - open $self->{3}, '<', '/' or die "open(/) $!"; + open $self->{3}, '<', '/'; fchdir($self); close($_) for (grep(defined, delete @$self{qw(0 1 2 sock)})); - if (my $cfg = $self->{cfg}) { - delete @$cfg{qw(-lei_store -watches -lei_note_event)}; - } + delete $cfg->{-lei_store}; } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly - open STDERR, '+>&='.fileno($self->{2}) or warn "open $!"; + open STDERR, '+>&='.fileno($self->{2}); # idempotent w/ fileno STDERR->autoflush(1); + $self->{2} = \*STDERR; POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!"; } close($_) for (grep(defined, delete @$self{qw(old_1 au_done)})); - delete $self->{-socks}; + close($_) for (@{delete($self->{-socks}) // []}); if (my $op_c = delete $self->{pkt_op_c}) { close(delete $op_c->{sock}); } @@ -584,7 +608,7 @@ sub _lei_atfork_child { $dir_idle->force_close if $dir_idle; undef $dir_idle; %PATH2CFG = (); - $MDIR2CFGPATH = {}; + $MDIR2CFGPATH = undef; eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush'; undef $errors_log; $quit = \&CORE::exit; @@ -609,16 +633,16 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die sub pkt_op_pair { my ($self) = @_; - require PublicInbox::OnDestroy; require PublicInbox::PktOp; - my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self); + my $end = on_destroy \&_delete_pkt_op, $self; @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair; $end; } sub incr { - my ($self, $field, $nr) = @_; - $self->{counters}->{$field} += $nr; + my $lei = shift; + $lei->{incr_pid} = $$ if @_; + while (my ($f, $n) = splice(@_, 0, 2)) { $lei->{$f} += $n } } sub pkt_ops { @@ -641,12 +665,12 @@ sub workers_start { my $end = $lei->pkt_op_pair; my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker"; $flds->{lei} = $lei; - $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds); + $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds, + $wq->can('_wq_done_wait') // \&wq_done_wait, $lei); delete $lei->{pkt_op_p}; my $op_c = delete $lei->{pkt_op_c}; @$end = (); $lei->event_step_init; - $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei); ($op_c, $ops); } @@ -680,7 +704,7 @@ sub optparse ($$$) { # allow _complete --help to complete, not show help return 1 if substr($cmd, 0, 1) eq '_'; $self->{cmd} = $cmd; - $OPT = $self->{opt} //= {}; + local $OPT = $self->{opt} //= {}; my $info = $CMD{$cmd} // [ '[...]' ]; my ($proto, undef, @spec) = @$info; my $glp = ref($spec[-1]) eq ref($GLP) ? pop(@spec) : $GLP; @@ -700,6 +724,12 @@ sub optparse ($$$) { # "-" aliases "stdin" or "clear" $OPT->{$lone_dash} = ${$OPT->{$lone_dash}} if defined $lone_dash; + if ($proto =~ s/\s*\[?(?:KEYWORDS|LABELS)\.\.\.\]?\s*//g) { + require PublicInbox::LeiInput; + my @err = PublicInbox::LeiInput::vmd_mod_extract($self, $argv); + return $self->fail(join("\n", @err)) if @err; + } + my $i = 0; my $POS_ARG = '[A-Z][A-Z0-9_]+'; my ($err, $inf); @@ -727,11 +757,10 @@ sub optparse ($$$) { # w/o args means stdin if ($sw eq 'stdin' && !@$argv && (-p $self->{0} || - -f _) && -r _) { + -f _)) { $OPT->{stdin} //= 1; } - $ok = defined($OPT->{$sw}); - last if $ok; + $ok = defined($OPT->{$sw}) and last; } elsif (defined($argv->[$i])) { $ok = 1; $i++; @@ -752,38 +781,7 @@ sub optparse ($$$) { $err ? fail($self, "usage: lei $cmd $proto\nE: $err") : 1; } -sub _tmp_cfg { # for lei -c <name>=<value> ... - my ($self) = @_; - my $cfg = _lei_cfg($self, 1); - require File::Temp; - my $ft = File::Temp->new(TEMPLATE => 'lei_cfg-XXXX', TMPDIR => 1); - my $tmp = { '-f' => $ft->filename, -tmp => $ft }; - $ft->autoflush(1); - print $ft <<EOM or return fail($self, "$tmp->{-f}: $!"); -[include] - path = $cfg->{-f} -EOM - $tmp = $self->{cfg} = bless { %$cfg, %$tmp }, ref($cfg); - for (@{$self->{opt}->{c}}) { - /\A([^=\.]+\.[^=]+)(?:=(.*))?\z/ or return fail($self, <<EOM); -`-c $_' is not of the form -c <name>=<value>' -EOM - my $name = $1; - my $value = $2 // 1; - _config($self, '--add', $name, $value); - if (defined(my $v = $tmp->{$name})) { - if (ref($v) eq 'ARRAY') { - push @$v, $value; - } else { - $tmp->{$name} = [ $v, $value ]; - } - } else { - $tmp->{$name} = $value; - } - } -} - -sub lazy_cb ($$$) { +sub lazy_cb ($$$) { # $pfx is _complete_ or lei_ my ($self, $cmd, $pfx) = @_; my $ucmd = $cmd; $ucmd =~ tr/-/_/; @@ -796,11 +794,19 @@ sub lazy_cb ($$$) { $pkg->can($pfx.$ucmd) : undef; } +sub do_env { + my $lei = shift; + fchdir($lei); + my $cb = shift // return ($lei, %{$lei->{env}}) ; + local ($current_lei, %ENV) = ($lei, %{$lei->{env}}); + $cb = $lei->can($cb) if !ref($cb); # $cb may be a scalar sub name + eval { $cb->($lei, @_) }; + $lei->fail($@) if $@; +} + sub dispatch { my ($self, $cmd, @argv) = @_; - fchdir($self); - local %ENV = %{$self->{env}}; - local $current_lei = $self; # for __WARN__ + local ($current_lei, %ENV) = do_env($self); $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY return _help($self, 'no command given') unless defined($cmd); # do not support Getopt bundling for this @@ -812,14 +818,12 @@ sub dispatch { } if (my $cb = lazy_cb(__PACKAGE__, $cmd, 'lei_')) { optparse($self, $cmd, \@argv) or return; - $self->{opt}->{c} and (_tmp_cfg($self) // return); if (my $chdir = $self->{opt}->{C}) { for my $d (@$chdir) { next if $d eq ''; # same as git(1) - chdir $d or return fail($self, "cd $d: $!"); + chdir $d; } - open $self->{3}, '<', '.' or - return fail($self, "open . $!"); + open($self->{3}, '<', '.'); } $cb->($self, @argv); } elsif (grep(/\A-/, $cmd, @argv)) { # --help or -h only @@ -837,28 +841,30 @@ sub _lei_cfg ($;$) { my $f = _config_path($self); my @st = stat($f); my $cur_st = @st ? pack('dd', $st[10], $st[7]) : ''; # 10:ctime, 7:size - my ($sto, $sto_dir, $watches, $lne); - if (my $cfg = $PATH2CFG{$f}) { # reuse existing object in common case - return ($self->{cfg} = $cfg) if $cur_st eq $cfg->{-st}; + my ($sto, $sto_dir, $watches, $lne, $cfg); + if ($cfg = $PATH2CFG{$f}) { # reuse existing object in common case + ($cur_st eq $cfg->{-st} && !$self->{opt}->{c}) and + return ($self->{cfg} = $cfg); + # reuse some fields below if they match: ($sto, $sto_dir, $watches, $lne) = @$cfg{qw(-lei_store leistore.dir -watches -lei_note_event)}; } if (!@st) { - unless ($creat) { - delete $self->{cfg}; - return bless {}, 'PublicInbox::Config'; + unless ($creat) { # any commands which write to cfg must creat + $cfg = PublicInbox::Config->git_config_dump( + '/dev/null', $self); + return ($self->{cfg} = $cfg); } my ($cfg_dir) = ($f =~ m!(.*?/)[^/]+\z!); - -d $cfg_dir or mkpath($cfg_dir) or die "mkpath($cfg_dir): $!\n"; - open my $fh, '>>', $f or die "open($f): $!\n"; + File::Path::mkpath($cfg_dir); + open my $fh, '>>', $f; @st = stat($fh) or die "fstat($f): $!\n"; $cur_st = pack('dd', $st[10], $st[7]); qerr($self, "# $f created") if $self->{cmd} ne 'config'; } - my $cfg = PublicInbox::Config->git_config_dump($f, $self->{2}); + $cfg = PublicInbox::Config->git_config_dump($f, $self); $cfg->{-st} = $cur_st; - $cfg->{'-f'} = $f; if ($sto && canonpath_harder($sto_dir // store_path($self)) eq canonpath_harder($cfg->{'leistore.dir'} // store_path($self))) { @@ -870,7 +876,7 @@ sub _lei_cfg ($;$) { # FIXME: use inotify/EVFILT_VNODE to detect unlinked configs delete(@PATH2CFG{grep(!-f, keys %PATH2CFG)}); } - $self->{cfg} = $PATH2CFG{$f} = $cfg; + $self->{cfg} = $self->{opt}->{c} ? $cfg : ($PATH2CFG{$f} = $cfg); refresh_watches($self); $cfg; } @@ -886,16 +892,49 @@ sub _lei_store ($;$) { }; } +# returns true on success, undef +# argv[0] eq `+e' means errors do not ->fail # (like `sh +e') sub _config { my ($self, @argv) = @_; - my %env = (%{$self->{env}}, GIT_CONFIG => undef); + my $err_ok = ($argv[0] // '') eq '+e' ? shift(@argv) : undef; + my %env; + my %opt = map { $_ => $self->{$_} } (0..2); my $cfg = _lei_cfg($self, 1); - my $cmd = [ qw(git config -f), $cfg->{'-f'}, @argv ]; - my %rdr = map { $_ => $self->{$_} } (0..2); - waitpid(spawn($cmd, \%env, \%rdr), 0); + my $opt_c = delete local $cfg->{-opt_c}; + my @file_arg; + if ($opt_c) { + my ($set, $get, $nondash); + for (@argv) { # order matters for git-config + if (!$nondash) { + if (/\A--(?:add|rename-section|remove-section| + replace-all| + unset-all|unset)\z/x) { + ++$set; + } elsif ($_ eq '-l' || $_ eq '--list' || + /\A--get/) { + ++$get; + } elsif (/\A-/) { # -z and such + } else { + ++$nondash; + } + } else { + ++$nondash; + } + } + if ($set || ($nondash//0) > 1 && !$get) { + @file_arg = ('-f', $cfg->{-f}); + $env{GIT_CONFIG} = $file_arg[1]; + } else { # OK, we can use `-c n=v' for read-only + $cfg->{-opt_c} = $opt_c; + $env{GIT_CONFIG} = undef; + } + } + my $cmd = $cfg->config_cmd(\%env, \%opt); + push @$cmd, @file_arg, @argv; + run_wait($cmd, \%env, \%opt) ? ($err_ok ? undef : fail($self, $?)) : 1; } -sub lei_daemon_pid { puts shift, $$ } +sub lei_daemon_pid { puts shift, $daemon_pid } sub lei_daemon_kill { my ($self) = @_; @@ -1000,9 +1039,10 @@ sub start_mua { $io->[0] = $self->{1} if $self->{opt}->{stdin} && -t $self->{1}; send_exec_cmd($self, $io, \@cmd, {}); } - if ($self->{lxs} && $self->{au_done}) { # kick wait_startq - syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0)); - } + + # kick wait_startq: + syswrite($self->{au_done}, 'q') if $self->{lxs} && $self->{au_done}; + return unless -t $self->{2}; # XXX how to determine non-TUI MUAs? $self->{opt}->{quiet} = 1; delete $self->{-progress}; @@ -1011,9 +1051,11 @@ sub start_mua { sub send_exec_cmd { # tell script/lei to execute a command my ($self, $io, $cmd, $env) = @_; - my $sock = $self->{sock} // die 'lei client gone'; - my $fds = [ map { fileno($_) } @$io ]; - $send_cmd->($sock, $fds, exec_buf($cmd, $env), MSG_EOR); + $PublicInbox::IPC::send_cmd->( + $self->{sock} // die('lei client gone'), + [ map { fileno($_) } @$io ], + exec_buf($cmd, $env), 0) // + Carp::croak("sendmsg: $!"); } sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail @@ -1023,7 +1065,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail while (my $op = shift(@$alerts)) { if ($op eq ':WINCH') { # hit the process group that started the MUA - send($sock, '-WINCH', MSG_EOR) if $sock; + send($sock, '-WINCH', 0) if $sock; } elsif ($op eq ':bell') { out($self, "\a"); } elsif ($op =~ /(?<!\\),/) { # bare ',' (not ',,') @@ -1032,7 +1074,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail my $cmd = $1; # run an arbitrary command require Text::ParseWords; $cmd = [ Text::ParseWords::shellwords($cmd) ]; - send($sock, exec_buf($cmd, {}), MSG_EOR) if $sock; + send($sock, exec_buf($cmd, {}), 0) if $sock; } else { warn("W: unsupported --alert=$op\n"); # non-fatal } @@ -1056,16 +1098,15 @@ sub path_to_fd { # caller needs to "-t $self->{1}" to check if tty sub start_pager { my ($self, $new_env) = @_; - my $fh = popen_rd([qw(git var GIT_PAGER)]); - chomp(my $pager = <$fh> // ''); - close($fh) or warn "`git var PAGER' error: \$?=$?"; + chomp(my $pager = run_qx([qw(git var GIT_PAGER)])); + warn "`git var PAGER' error: \$?=$?" if $?; return if $pager eq 'cat' || $pager eq ''; $new_env //= {}; $new_env->{LESS} //= 'FRX'; $new_env->{LV} //= '-c'; $new_env->{MORE} = $new_env->{LESS} if $^O eq 'freebsd'; - pipe(my ($r, $wpager)) or return warn "pipe: $!"; - my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} }; + my $rdr = { 1 => $self->{1}, 2 => $self->{2} }; + CORE::pipe($rdr->{0}, my $wpager) or return warn "pipe: $!"; my $pgr = [ undef, @$rdr{1, 2} ]; my $env = $self->{env}; if ($self->{sock}) { # lei(1) process runs it @@ -1085,17 +1126,17 @@ sub pgr_err { my ($self, @msg) = @_; return warn(@msg) unless $self->{sock} && -t $self->{2}; start_pager($self, { LESS => 'RX' }); # no 'F' so we prompt - print { $self->{2} } @msg; + say { $self->{2} } @msg, '# -quit pager to continue-'; $self->{2}->autoflush(1); stop_pager($self); - send($self->{sock}, 'wait', MSG_EOR); # wait for user to quit pager + send($self->{sock}, 'wait', 0); # wait for user to quit pager } sub stop_pager { my ($self) = @_; my $pgr = delete($self->{pgr}) or return; $self->{2} = $pgr->[2]; - close(delete($self->{1})) if $self->{1}; + delete($self->{1})->close if $self->{1}; $self->{1} = $pgr->[1]; } @@ -1105,24 +1146,22 @@ sub accept_dispatch { # Listener {post_accept} callback my $self = bless { sock => $sock }, __PACKAGE__; vec(my $rvec = '', fileno($sock), 1) = 1; select($rvec, undef, undef, 60) or - return send($sock, 'timed out waiting to recv FDs', MSG_EOR); + return send($sock, 'timed out waiting to recv FDs', 0); # (4096 * 33) >MAX_ARG_STRLEN - my @fds = $recv_cmd->($sock, my $buf, 4096 * 33) or return; # EOF + my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or + return; # EOF if (!defined($fds[0])) { warn(my $msg = "recv_cmd failed: $!"); - return send($sock, $msg, MSG_EOR); + return send($sock, $msg, 0); } else { my $i = 0; - for my $fd (@fds) { - open($self->{$i++}, '+<&=', $fd) and next; - send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR); - } - $i == 4 or return send($sock, 'not enough FDs='.($i-1), MSG_EOR) + open($self->{$i++}, '+<&=', $_) for @fds; + $i == 4 or return send($sock, 'not enough FDs='.($i-1), 0) } # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV); # $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0"; substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z// - return send($sock, 'request command truncated', MSG_EOR); + return send($sock, 'request command truncated', 0); my ($argc, @argv) = split(/\0/, $buf, -1); undef $buf; my %env = map { split(/=/, $_, 2) } splice(@argv, $argc); @@ -1145,11 +1184,11 @@ sub event_step { local %ENV = %{$self->{env}}; local $current_lei = $self; eval { - my @fds = $recv_cmd->($self->{sock} // return, my $buf, 4096); + my @fds = $PublicInbox::IPC::recv_cmd->( + $self->{sock} // return, my $buf, 4096); if (scalar(@fds) == 1 && !defined($fds[0])) { return if $! == EAGAIN; die "recvmsg: $!" if $! != ECONNRESET; - $buf = ''; @fds = (); # for open loop below: } for (@fds) { open my $rfh, '+<&=', $_ } @@ -1167,7 +1206,7 @@ sub event_step { die "unrecognized client signal: $buf"; } my $s = $self->{-socks} // []; # lei up --all - @$s = grep { send($_, $buf, MSG_EOR) } @$s; + @$s = grep { send($_, $buf, 0) } @$s; }; if (my $err = $@) { eval { $self->fail($err) }; @@ -1185,8 +1224,6 @@ sub event_step_init { }; } -sub noop {} - sub oldset { $oldset } sub dump_and_clear_log { @@ -1203,48 +1240,87 @@ sub dump_and_clear_log { sub cfg2lei ($) { my ($cfg) = @_; my $lei = bless { env => { %{$cfg->{-env}} } }, __PACKAGE__; - open($lei->{0}, '<&', \*STDIN) or die "dup 0: $!"; - open($lei->{1}, '>>&', \*STDOUT) or die "dup 1: $!"; - open($lei->{2}, '>>&', \*STDERR) or die "dup 2: $!"; - open($lei->{3}, '<', '/') or die "open /: $!"; - my ($x, $y); - socketpair($x, $y, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!"; + open($lei->{0}, '<&', \*STDIN); + open($lei->{1}, '>>&', \*STDOUT); + open($lei->{2}, '>>&', \*STDERR); + open($lei->{3}, '<', '/'); + socketpair(my $x, my $y, AF_UNIX, SOCK_SEQPACKET, 0); $lei->{sock} = $x; require PublicInbox::LeiSelfSocket; PublicInbox::LeiSelfSocket->new($y); # adds to event loop $lei; } +sub note_event ($@) { # runs lei_note_event for a given config file + my ($cfg_f, @args) = @_; + my $cfg = $PATH2CFG{$cfg_f} // return; + eval { cfg2lei($cfg)->dispatch('note-event', @args) }; + carp "E: note-event $cfg_f: $@\n" if $@; +} + sub dir_idle_handler ($) { # PublicInbox::DirIdle callback my ($ev) = @_; # Linux::Inotify2::Event or duck type my $fn = $ev->fullname; if ($fn =~ m!\A(.+)/(new|cur)/([^/]+)\z!) { # Maildir file - my ($mdir, $nc, $bn) = ($1, $2, $3); - $nc = '' if $ev->IN_DELETE || $ev->IN_MOVED_FROM; - for my $f (keys %{$MDIR2CFGPATH->{$mdir} // {}}) { - my $cfg = $PATH2CFG{$f} // next; - eval { - my $lei = cfg2lei($cfg); - $lei->dispatch('note-event', - "maildir:$mdir", $nc, $bn, $fn); - }; - warn "E: note-event $f: $@\n" if $@; + my ($loc, $new_cur, $bn) = ("maildir:$1", $2, $3); + $new_cur = '' if $ev->IN_DELETE || $ev->IN_MOVED_FROM; + for my $cfg_f (keys %{$MDIR2CFGPATH->{$loc} // {}}) { + note_event($cfg_f, $loc, $new_cur, $bn, $fn); } - } + } elsif ($fn =~ m!\A(.+)/([0-9]+)\z!) { # MH mail message file + my ($loc, $n, $new_cur) = ("mh:$1", $2, '+'); + $new_cur = '' if $ev->IN_DELETE || $ev->IN_MOVED_FROM; + for my $cfg_f (keys %{$MDIR2CFGPATH->{$loc} // {}}) { + note_event($cfg_f, $loc, $new_cur, $n, $fn); + } + } elsif ($fn =~ m!\A(.+)/\.mh_sequences\z!) { # reread flags + my $loc = "mh:$1"; + for my $cfg_f (keys %{$MDIR2CFGPATH->{$loc} // {}}) { + note_event($cfg_f, $loc, '.mh_sequences') + } + } # else we don't care if ($ev->can('cancel') && ($ev->IN_IGNORE || $ev->IN_UNMOUNT)) { $ev->cancel; } if ($fn =~ m!\A(.+)/(?:new|cur)\z! && !-e $fn) { - delete $MDIR2CFGPATH->{$1}; + delete $MDIR2CFGPATH->{"maildir:$1"}; } - if (!-e $fn) { # config file or Maildir gone - for my $cfgpaths (values %$MDIR2CFGPATH) { - delete $cfgpaths->{$fn}; - } + if (!-e $fn) { # config file, Maildir, or MH dir gone + delete $_->{$fn} for values %$MDIR2CFGPATH; # config file + delete @$MDIR2CFGPATH{"maildir:$fn", "mh:$fn"}; delete $PATH2CFG{$fn}; } } +sub can_stay_alive { # PublicInbox::DS::post_loop_do cb + my ($path, $dev_ino_expect) = @_; + if (my @st = defined($$path) ? stat($$path) : ()) { + if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) { + warn "$$path dev/ino changed, quitting\n"; + $$path = undef; + } + } elsif (defined($$path)) { # ENOENT is common + warn "stat($$path): $!, quitting ...\n" if $! != ENOENT; + undef $$path; + $quit->(); + } + return 1 if defined($$path); + my $n = PublicInbox::DS::close_non_busy() or do { + eval 'PublicInbox::LeiNoteEvent::flush_task()'; + # drop stores only if no clients + for my $cfg (values %PATH2CFG) { + my $lne = delete($cfg->{-lei_note_event}); + $lne->wq_close if $lne; + my $sto = delete($cfg->{-lei_store}) // next; + eval { $sto->wq_do('done') if $sto->{-wq_s1} }; + warn "E: $@ (dropping store for $cfg->{-f})" if $@; + $sto->wq_close; + } + }; + # returns true: continue, false: stop + $n + scalar(keys(%PublicInbox::DS::AWAIT_PIDS)); +} + # lei(1) calls this when it can't connect sub lazy_start { my ($path, $errno, $narg) = @_; @@ -1252,106 +1328,66 @@ sub lazy_start { my ($sock_dir) = ($path =~ m!\A(.+?)/[^/]+\z!); $errors_log = "$sock_dir/errors.log"; my $addr = pack_sockaddr_un($path); - my $lk = bless { lock_path => $errors_log }, 'PublicInbox::Lock'; + my $lk = PublicInbox::Lock->new($errors_log); umask(077) // die("umask(077): $!"); $lk->lock_acquire; - socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!"; + socket($listener, AF_UNIX, SOCK_SEQPACKET, 0); if ($errno == ECONNREFUSED || $errno == ENOENT) { return if connect($listener, $addr); # another process won - if ($errno == ECONNREFUSED && -S $path) { - unlink($path) or die "unlink($path): $!"; - } + unlink($path) if $errno == ECONNREFUSED && -S $path; } else { $! = $errno; # allow interpolation to stringify in die die "connect($path): $!"; } - bind($listener, $addr) or die "bind($path): $!"; + bind($listener, $addr); $lk->lock_release; undef $lk; my @st = stat($path) or die "stat($path): $!"; my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino - local $oldset = PublicInbox::DS::block_signals(); - if ($narg == 5) { - $send_cmd = PublicInbox::Spawn->can('send_cmd4'); - $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do { - require PublicInbox::CmdIPC4; - $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4'); - PublicInbox::CmdIPC4->can('recv_cmd4'); - } // do { - $send_cmd = PublicInbox::Syscall->can('send_cmd4'); - PublicInbox::Syscall->can('recv_cmd4'); - }; - } - $recv_cmd or die <<""; + local $oldset = PublicInbox::DS::block_signals(POSIX::SIGALRM); + die "incompatible narg=$narg" if $narg != 5; + $PublicInbox::IPC::send_cmd or die <<""; (Socket::MsgHdr || Inline::C) missing/unconfigured (narg=$narg); require PublicInbox::Listener; require PublicInbox::PktOp; (-p STDOUT) or die "E: stdout must be a pipe\n"; - open(STDIN, '+>>', $errors_log) or die "open($errors_log): $!"; + open(STDIN, '+>>', $errors_log); STDIN->autoflush(1); dump_and_clear_log(); POSIX::setsid() > 0 or die "setsid: $!"; - my $pid = fork // die "fork: $!"; + my $pid = PublicInbox::OnDestroy::fork_tmp; return if $pid; $0 = "lei-daemon $path"; - local %PATH2CFG; - local $MDIR2CFGPATH; + local (%PATH2CFG, $MDIR2CFGPATH); + local $daemon_pid = $$; $listener->blocking(0); my $exit_code; my $pil = PublicInbox::Listener->new($listener, \&accept_dispatch); local $quit = do { my (undef, $eof_p) = PublicInbox::PktOp->pair; sub { - $exit_code //= shift; + $exit_code //= eval("POSIX::SIG$_[0] + 128") if @_; + $dir_idle->close if $dir_idle; # EPOLL_CTL_DEL + $dir_idle = undef; # let RC take care of it eval 'PublicInbox::LeiNoteEvent::flush_task()'; - my $lis = $pil or exit($exit_code); + my $lis = $pil or exit($exit_code // 0); # closing eof_p triggers \&noop wakeup $listener = $eof_p = $pil = $path = undef; $lis->close; # DS::close - PublicInbox::DS->SetLoopTimeout(1000); }; }; - my $sig = { - CHLD => \&PublicInbox::DS::enqueue_reap, - QUIT => $quit, - INT => $quit, - TERM => $quit, - HUP => \&noop, - USR1 => \&noop, - USR2 => \&noop, - }; + my $sig = { CHLD => \&PublicInbox::DS::enqueue_reap }; + $sig->{$_} = $quit for qw(QUIT INT TERM); + $sig->{$_} = \&PublicInbox::Config::noop for qw(HUP USR1 USR2); require PublicInbox::DirIdle; local $dir_idle = PublicInbox::DirIdle->new(sub { - # just rely on wakeup to hit PostLoopCallback set below + # just rely on wakeup to hit post_loop_do dir_idle_handler($_[0]) if $_[0]->fullname ne $path; }); $dir_idle->add_watches([$sock_dir]); - PublicInbox::DS->SetPostLoopCallback(sub { - my ($dmap, undef) = @_; - if (@st = defined($path) ? stat($path) : ()) { - if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) { - warn "$path dev/ino changed, quitting\n"; - $path = undef; - } - } elsif (defined($path)) { # ENOENT is common - warn "stat($path): $!, quitting ...\n" if $! != ENOENT; - undef $path; - $quit->(); - } - return 1 if defined($path); - my $n = 0; - for my $s (values %$dmap) { - $s->can('busy') or next; - if ($s->busy) { - ++$n; - } else { - $s->close; - } - } - $n; # true: continue, false: stop - }); - + local @PublicInbox::DS::post_loop_do = (\&can_stay_alive, + \$path, $dev_ino_expect); # STDIN was redirected to /dev/null above, closing STDERR and # STDOUT will cause the calling `lei' client process to finish # reading the <$daemon> pipe. @@ -1359,13 +1395,13 @@ sub lazy_start { $current_lei ? err($current_lei, @_) : warn( strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(time))," $$ ", @_); }; - open STDERR, '>&STDIN' or die "redirect stderr failed: $!"; - open STDOUT, '>&STDIN' or die "redirect stdout failed: $!"; + local $SIG{PIPE} = 'IGNORE'; + local $SIG{ALRM} = 'IGNORE'; + open STDERR, '>&STDIN'; + open STDOUT, '>&STDIN'; # $daemon pipe to `lei' closed, main loop begins: eval { PublicInbox::DS::event_loop($sig, $oldset) }; warn "event loop error: $@\n" if $@; - # exit() may trigger waitpid via various DESTROY, ensure interruptible - PublicInbox::DS::sig_setmask($oldset); dump_and_clear_log(); exit($exit_code // 0); } @@ -1376,9 +1412,10 @@ sub busy { 1 } # prevent daemon-shutdown if client is connected # can immediately reread it sub DESTROY { my ($self) = @_; - if (my $counters = delete $self->{counters}) { - for my $k (sort keys %$counters) { - my $nr = $counters->{$k}; + if (defined($self->{incr_pid}) && $self->{incr_pid} == $$) { + for my $k (sort(grep(/\A-nr_/, keys %$self))) { + my $nr = $self->{$k}; + substr($k, 0, length('-nr_'), ''); $self->child_error(0, "$nr $k messages"); } } @@ -1388,9 +1425,8 @@ sub DESTROY { # preserve $? for ->fail or ->x_it code } -sub wq_done_wait { # dwaitpid callback - my ($arg, $pid) = @_; - my ($wq, $lei) = @$arg; +sub wq_done_wait { # awaitpid cb (via wq_eof) + my ($pid, $wq, $lei) = @_; local $current_lei = $lei; my $err_type = $lei->{-err_type}; $? and $lei->child_error($?, @@ -1400,15 +1436,14 @@ sub wq_done_wait { # dwaitpid callback sub fchdir { my ($lei) = @_; - my $dh = $lei->{3} // die 'BUG: lei->{3} (CWD) gone'; - chdir($dh) || die "fchdir: $!"; + chdir($lei->{3} // die 'BUG: lei->{3} (CWD) gone'); } sub wq_eof { # EOF callback for main daemon my ($lei, $wq_fld) = @_; local $current_lei = $lei; my $wq = delete $lei->{$wq_fld // 'wq1'}; - $lei->sto_done_request($wq); + $lei->sto_barrier_request($wq); $wq // $lei->fail; # already failed } @@ -1417,19 +1452,22 @@ sub watch_state_ok ($) { $state =~ /\Apause|(?:import|index|tag)-(?:ro|rw)\z/; } -sub cancel_maildir_watch ($$) { - my ($d, $cfg_f) = @_; - my $w = delete $MDIR2CFGPATH->{$d}->{$cfg_f}; - scalar(keys %{$MDIR2CFGPATH->{$d}}) or - delete $MDIR2CFGPATH->{$d}; - for my $x (@{$w // []}) { $x->cancel } +sub cancel_dir_watch ($$$) { + my ($type, $d, $cfg_f) = @_; + my $loc = "$type:".canonpath_harder($d); + my $w = delete $MDIR2CFGPATH->{$loc}->{$cfg_f}; + delete $MDIR2CFGPATH->{$loc} if !(keys %{$MDIR2CFGPATH->{$loc}}); + $_->cancel for @$w; } -sub add_maildir_watch ($$) { - my ($d, $cfg_f) = @_; - if (!exists($MDIR2CFGPATH->{$d}->{$cfg_f})) { - my @w = $dir_idle->add_watches(["$d/cur", "$d/new"], 1); - push @{$MDIR2CFGPATH->{$d}->{$cfg_f}}, @w if @w; +sub add_dir_watch ($$$) { + my ($type, $d, $cfg_f) = @_; + $d = canonpath_harder($d); + my $loc = "$type:$d"; + my @dirs = $type eq 'mh' ? ($d) : ("$d/cur", "$d/new"); + if (!exists($MDIR2CFGPATH->{$loc}->{$cfg_f})) { + my @w = $dir_idle->add_watches(\@dirs, 1); + push @{$MDIR2CFGPATH->{$loc}->{$cfg_f}}, @w if @w; } } @@ -1442,24 +1480,20 @@ sub refresh_watches { my %seen; my $cfg_f = $cfg->{'-f'}; for my $w (grep(/\Awatch\..+\.state\z/, keys %$cfg)) { - my $url = substr($w, length('watch.'), -length('.state')); + my $loc = substr($w, length('watch.'), -length('.state')); require PublicInbox::LeiWatch; - $watches->{$url} //= PublicInbox::LeiWatch->new($url); - $seen{$url} = undef; - my $state = $cfg->get_1("watch.$url.state"); + $watches->{$loc} //= PublicInbox::LeiWatch->new($loc); + $seen{$loc} = undef; + my $state = $cfg->get_1("watch.$loc.state"); if (!watch_state_ok($state)) { - warn("watch.$url.state=$state not supported\n"); - next; - } - if ($url =~ /\Amaildir:(.+)/i) { - my $d = canonpath_harder($1); - if ($state eq 'pause') { - cancel_maildir_watch($d, $cfg_f); - } else { - add_maildir_watch($d, $cfg_f); - } + warn("watch.$loc.state=$state not supported\n"); + } elsif ($loc =~ /\A(maildir|mh):(.+)\z/i) { + my ($type, $d) = ($1, $2); + $state eq 'pause' ? + cancel_dir_watch($type, $d, $cfg_f) : + add_dir_watch($type, $d, $cfg_f); } else { # TODO: imap/nntp/jmap - $lei->child_error(0, "E: watch $url not supported, yet") + $lei->child_error(0, "E: watch $loc not supported, yet") } } @@ -1467,29 +1501,28 @@ sub refresh_watches { my $lms = $lei->lms; if ($lms) { $lms->lms_write_prepare; - for my $d ($lms->folders('maildir:')) { - substr($d, 0, length('maildir:')) = ''; - + for my $loc ($lms->folders(qr/\A(?:maildir|mh):/)) { + my $old = $loc; + my ($type, $d) = split /:/, $loc, 2; # fixup old bugs while we're iterating: - my $cd = canonpath_harder($d); - my $f = "maildir:$cd"; - $lms->rename_folder("maildir:$d", $f) if $d ne $cd; - next if $watches->{$f}; # may be set to pause + $d = canonpath_harder($d); + $loc = "$type:$d"; + $lms->rename_folder($old, $loc) if $old ne $loc; + next if $watches->{$loc}; # may be set to pause require PublicInbox::LeiWatch; - $watches->{$f} = PublicInbox::LeiWatch->new($f); - $seen{$f} = undef; - add_maildir_watch($cd, $cfg_f); + $watches->{$loc} = PublicInbox::LeiWatch->new($loc); + $seen{$loc} = undef; + add_dir_watch($type, $d, $cfg_f); } } if ($old) { # cull old non-existent entries - for my $url (keys %$old) { - next if exists $seen{$url}; - delete $old->{$url}; - if ($url =~ /\Amaildir:(.+)/i) { - my $d = canonpath_harder($1); - cancel_maildir_watch($d, $cfg_f); + for my $loc (keys %$old) { + next if exists $seen{$loc}; + delete $old->{$loc}; + if ($loc =~ /\A(maildir|mh):(.+)\z/i) { + cancel_dir_watch($1, $2, $cfg_f); } else { # TODO: imap/nntp/jmap - $lei->child_error(0, "E: watch $url TODO"); + $lei->child_error(0, "E: watch $loc TODO"); } } } @@ -1515,24 +1548,24 @@ sub lms { (-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef; } -sub sto_done_request { +sub sto_barrier_request { my ($lei, $wq) = @_; - return unless $lei->{sto}; + return unless $lei->{sto} && $lei->{sto}->{-wq_s1}; local $current_lei = $lei; - my $sock = $wq ? $wq->{lei_sock} : undef; - eval { - if ($sock //= $lei->{sock}) { # issue, async wait - $lei->{sto}->wq_io_do('done', [ $sock ]); - } else { # forcibly wait - my $wait = $lei->{sto}->wq_do('done'); - } - }; + if (my $n = $lei->{opt}->{'commit-delay'}) { + eval { $lei->{sto}->wq_do('schedule_commit', $n) }; + } else { + my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock}; + my $errfh = $lei->{2} // *STDERR{GLOB}; + my @io = $s ? ($errfh, $s) : ($errfh); + eval { $lei->{sto}->wq_io_do('barrier', \@io, 1) }; + } warn($@) if $@; } sub cfg_dump ($$) { my ($lei, $f) = @_; - my $ret = eval { PublicInbox::Config->git_config_dump($f, $lei->{2}) }; + my $ret = eval { PublicInbox::Config->git_config_dump($f, $lei) }; return $ret if !$@; warn($@); undef; @@ -1541,7 +1574,7 @@ sub cfg_dump ($$) { sub request_umask { my ($lei) = @_; my $s = $lei->{sock} // return; - send($s, 'umask', MSG_EOR) // die "send: $!"; + send($s, 'umask', 0) // die "send: $!"; vec(my $rvec = '', fileno($s), 1) = 1; select($rvec, undef, undef, 2) or die 'timeout waiting for umask'; recv($s, my $v, 5, 0) // die "recv: $!"; @@ -1549,4 +1582,24 @@ sub request_umask { $u eq 'u' or warn "E: recv $v has no umask"; } +sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin + my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf + $_[1] // return $lei->fail("error reading stdin: $!"); + $lei->{stdin_buf} .= $_[-1]; + do_env($lei, $cb) if $_[-1] eq ''; +} + +sub slurp_stdin { + my ($lei, $cb) = @_; + require PublicInbox::InputPipe; + my $in = $lei->{0}; + if (-t $in) { # run cat via script/lei and read from it + $in = undef; + pipe($in, my $wr); + say { $lei->{2} } '# enter query, Ctrl-D when done'; + send_exec_cmd($lei, [ $lei->{0}, $wr ], ['cat'], {}); + } + PublicInbox::InputPipe::consume($in, \&_stdin_cb, $lei, $cb); +} + 1; |