user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 3/4] lei: q: results output to Maildir and mbox* working
Date: Fri, 15 Jan 2021 23:36:23 -1200	[thread overview]
Message-ID: <20210116113624.19930-4-e@80x24.org> (raw)
In-Reply-To: <20210116113624.19930-1-e@80x24.org>

All the augment and deduplication stuff seems to be working
based on unit tests.  OpPipe is a nice general addition that
will probably make future state machines easier.
---
 MANIFEST                       |   1 +
 lib/PublicInbox/LEI.pm         |  27 +++++---
 lib/PublicInbox/LeiDedupe.pm   |   3 +-
 lib/PublicInbox/LeiOverview.pm |  44 ++++++++----
 lib/PublicInbox/LeiQuery.pm    |  14 ++--
 lib/PublicInbox/LeiToMail.pm   |  89 ++++++++++++++++---------
 lib/PublicInbox/LeiXSearch.pm  | 118 ++++++++++++++++++++++++---------
 lib/PublicInbox/OpPipe.pm      |  41 ++++++++++++
 t/lei.t                        |  20 ++++++
 t/lei_to_mail.t                |   4 +-
 10 files changed, 266 insertions(+), 95 deletions(-)
 create mode 100644 lib/PublicInbox/OpPipe.pm

diff --git a/MANIFEST b/MANIFEST
index 0ebdaccc..0de1de4a 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -193,6 +193,7 @@ lib/PublicInbox/NNTPD.pm
 lib/PublicInbox/NNTPdeflate.pm
 lib/PublicInbox/NewsWWW.pm
 lib/PublicInbox/OnDestroy.pm
+lib/PublicInbox/OpPipe.pm
 lib/PublicInbox/Over.pm
 lib/PublicInbox/OverIdx.pm
 lib/PublicInbox/ProcessPipe.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 5568904d..f849c9df 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -256,7 +256,9 @@ sub puts ($;@) { print { shift->{1} } map { "$_\n" } @_ }
 sub out ($;@) { print { shift->{1} } @_ }
 
 sub err ($;@) {
-	print { shift->{2} } @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
+	my $self = shift;
+	my $err = $self->{2} // *STDERR{IO};
+	print $err @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
 }
 
 sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
@@ -270,8 +272,11 @@ sub fail ($$;$) {
 
 sub atfork_prepare_wq {
 	my ($self, $wq) = @_;
-	push @{$wq->{-ipc_atfork_child_close}}, @TO_CLOSE_ATFORK_CHILD,
-				grep { defined } @$self{qw(0 1 2 sock)}
+	my $tcafc = $wq->{-ipc_atfork_child_close};
+	push @$tcafc, @TO_CLOSE_ATFORK_CHILD;
+	if (my $sock = $self->{sock}) {
+		push @$tcafc, @$self{qw(0 1 2)}, $sock;
+	}
 }
 
 # usage: my %sig = $lei->atfork_child_wq($wq);
@@ -286,7 +291,9 @@ sub atfork_child_wq {
 	PIPE => sub {
 		$self->x_it(13); # SIGPIPE = 13
 		# we need to close explicitly to avoid Perl warning on SIGPIPE
-		close($_) for (delete @$self{1..2});
+		close(delete $self->{1});
+		# regular files and /dev/null (-c) won't trigger SIGPIPE
+		close(delete $self->{2}) unless (-f $self->{2} || -c _);
 		syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait
 		die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
 	});
@@ -641,7 +648,7 @@ sub start_pager {
 	$new_env{MORE} = 'FRX' if $^O eq 'freebsd';
 	pipe(my ($r, $wpager)) or return warn "pipe: $!";
 	my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
-	my $pgr = [ undef, @$rdr{1, 2} ];
+	my $pgr = [ undef, @$rdr{1, 2}, $$ ];
 	if (my $sock = $self->{sock}) { # lei(1) process runs it
 		delete @new_env{keys %$env}; # only set iff unset
 		my $buf = "exec 1\0".$pager;
@@ -664,7 +671,7 @@ sub stop_pager {
 	# do not restore original stdout, just close it so we error out
 	close(delete($self->{1})) if $self->{1};
 	my $pid = $pgr->[0];
-	dwaitpid($pid, undef, $self->{sock}) if $pid;
+	dwaitpid($pid, undef, $self->{sock}) if $pid && $pgr->[3] == $$;
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -706,7 +713,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 sub dclose {
 	my ($self) = @_;
 	delete $self->{lxs}; # stops LeiXSearch queries
-	$self->close; # PublicInbox::DS::close
+	$self->close if $self->{sock}; # PublicInbox::DS::close
 }
 
 # for long-running results
@@ -737,8 +744,10 @@ sub event_step {
 
 sub event_step_init {
 	my ($self) = @_;
-	$self->{sock}->blocking(0);
-	$self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET);
+	if (my $sock = $self->{sock}) { # using DS->EventLoop
+		$sock->blocking(0);
+		$self->SUPER::new($sock, EPOLLIN|EPOLLET);
+	}
 }
 
 sub noop {}
diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 81754361..3f478aa4 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -89,8 +89,9 @@ sub true { 1 }
 sub dedupe_none ($) { (\&true, \&true) }
 
 sub new {
-	my ($cls, $lei, $dst) = @_;
+	my ($cls, $lei) = @_;
 	my $dd = $lei->{opt}->{dedupe} // 'content';
+	my $dst = $lei->{ovv}->{dst};
 
 	# allow "none" to bypass Eml->new if writing to directory:
 	return if ($dd eq 'none' && substr($dst // '', -1) eq '/');
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 9846bc8a..c0b423f6 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -15,6 +15,8 @@ use PublicInbox::MID qw($MID_EXTRACT);
 use PublicInbox::Address qw(pairs);
 use PublicInbox::Config;
 use PublicInbox::Search qw(get_pct);
+use PublicInbox::LeiDedupe;
+use PublicInbox::LeiToMail;
 
 # cf. https://en.wikipedia.org/wiki/JSON_streaming
 my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
@@ -44,7 +46,7 @@ sub new {
 
 	my $fmt = $opt->{'format'};
 	$fmt = lc($fmt) if defined $fmt;
-	if ($dst =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
+	if ($dst =~ s/\A([a-z0-9]+)://is) { # e.g. Maildir:/home/user/Mail/
 		my $ofmt = lc $1;
 		$fmt //= $ofmt;
 		return $lei->fail(<<"") if $fmt ne $ofmt;
@@ -52,13 +54,14 @@ sub new {
 
 	}
 	$fmt //= 'json' if $dst eq '/dev/stdout';
-	$fmt //= 'maildir'; # TODO
+	$fmt //= 'maildir';
 
 	if (index($dst, '://') < 0) { # not a URL, so assume path
 		 $dst = File::Spec->canonpath($dst);
 	} # else URL
 
 	my $self = bless { fmt => $fmt, dst => $dst }, $class;
+	$lei->{ovv} = $self;
 	my $json;
 	if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
 		$json = $self->{json} = ref(PublicInbox::Config->json);
@@ -75,11 +78,13 @@ sub new {
 		} else {
 			ovv_out_lk_init($self);
 		}
-	} elsif ($json) {
-		return $lei->fail('JSON formats only output to stdout');
-	} else {
-		return $lei->fail("TODO: $dst -f $fmt");
 	}
+	if (!$json) {
+		# default to the cheapest sort since MUA usually resorts
+		$lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
+		$lei->{l2m} = PublicInbox::LeiToMail->new($lei);
+	}
+	$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
 	$self;
 }
 
@@ -135,9 +140,13 @@ sub _unbless_smsg {
 
 sub ovv_atexit_child {
 	my ($self, $lei) = @_;
+	if (my $git = delete $self->{git}) {
+		$git->async_wait_all;
+	}
 	if (my $bref = delete $lei->{ovv_buf}) {
+		my $out = $lei->{1} or return;
 		my $lk = $self->lock_for_scope;
-		print { $lei->{1} } $$bref;
+		print $out $$bref;
 	}
 }
 
@@ -167,17 +176,28 @@ sub _json_pretty {
 	qq{  "$k": }.$v;
 }
 
-sub ovv_each_smsg_cb {
-	my ($self, $lei) = @_;
+sub ovv_each_smsg_cb { # runs in wq worker usually
+	my ($self, $lei, $ibxish) = @_;
 	$lei->{ovv_buf} = \(my $buf = '');
 	delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
-	my $json = $self->{json}->new;
+	my $json;
 	$lei->{1}->autoflush(1);
-	if ($json) {
+	if (my $pkg = $self->{json}) {
+		$json = $pkg->new;
 		$json->utf8->canonical;
 		$json->ascii(1) if $lei->{opt}->{ascii};
 	}
-	if ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
+	if (my $l2m = $lei->{l2m}) {
+		my $wcb = $l2m->write_cb($lei);
+		my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
+		$self->{git} = $git; # for ovv_atexit_child
+		my $g2m = $l2m->can('git_to_mail');
+		sub {
+			my ($smsg, $mitem) = @_;
+			my $kw = []; # TODO get from mitem
+			$git->cat_async($smsg->{blob}, $g2m, [ $wcb, $kw ]);
+		};
+	} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
 		my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
 		sub { # DIY prettiness :P
 			my ($smsg, $mitem) = @_;
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 69d2f9a6..a80d5887 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -23,8 +23,6 @@ sub _vivify_external { # _externals_each callback
 # the main "lei q SEARCH_TERMS" method
 sub lei_q {
 	my ($self, @argv) = @_;
-	my $sto = $self->_lei_store(1);
-	my $cfg = $self->_lei_cfg(1);
 	my $opt = $self->{opt};
 
 	# --local is enabled by default
@@ -32,7 +30,7 @@ sub lei_q {
 	my @srcs;
 	require PublicInbox::LeiXSearch;
 	require PublicInbox::LeiOverview;
-	require PublicInbox::LeiDedupe;
+	PublicInbox::Config->json;
 	my $lxs = PublicInbox::LeiXSearch->new;
 
 	# --external is enabled by default, but allow --no-external
@@ -46,10 +44,10 @@ sub lei_q {
 	$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
 		// $lxs->wq_workers($j);
 
-	unshift(@srcs, $sto->search) if $opt->{'local'};
 	# no forking workers after this
-	$self->{ovv} = PublicInbox::LeiOverview->new($self);
-	$self->{dd} = PublicInbox::LeiDedupe->new($self);
+	my $ovv = PublicInbox::LeiOverview->new($self) or return;
+	my $sto = $self->_lei_store(1);
+	unshift(@srcs, $sto->search) if $opt->{'local'};
 	my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
 	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
 	$mset_opt{qstr} = join(' ', map {;
@@ -69,12 +67,10 @@ sub lei_q {
 			die "unrecognized --sort=$sort\n";
 		}
 	}
-	# $self->out($json->encode(\%mset_opt));
 	# descending docid order
 	$mset_opt{relevance} //= -2 if $opt->{thread};
-	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
 	$self->{mset_opt} = \%mset_opt;
-	$self->{ovv}->ovv_begin($self);
+	$ovv->ovv_begin($self);
 	$lxs->do_query($self, \@srcs);
 }
 
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 5d4b7978..744f331d 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -187,8 +187,9 @@ sub zsfx2cmd ($$$) {
 	\@cmd;
 }
 
-sub compress_dst {
-	my ($self, $zsfx, $lei) = @_;
+sub _post_augment_mbox { # open a compressor process
+	my ($self, $lei) = @_;
+	my $zsfx = $self->{zsfx} or return;
 	my $cmd = zsfx2cmd($zsfx, undef, $lei);
 	pipe(my ($r, $w)) or die "pipe: $!";
 	my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
@@ -209,7 +210,9 @@ sub decompress_src ($$$) {
 
 sub dup_src ($) {
 	my ($in) = @_;
-	open my $dup, '+>>&', $in or die "dup: $!";
+	# fileno needed because wq_set_recv_modes only used ">&=" for {1}
+	# and Perl blindly trusts that to reject the '+' (readability flag)
+	open my $dup, '+>>&=', fileno($in) or die "dup: $!";
 	$dup;
 }
 
@@ -321,11 +324,13 @@ sub new {
 	} else {
 		die "bad mail --format=$fmt\n";
 	}
-	my $dedupe = $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei, $dst);
+	$lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
 	$self;
 }
 
-sub _prepare_maildir {
+sub _pre_augment_maildir {} # noop
+
+sub _do_augment_maildir {
 	my ($self, $lei) = @_;
 	my $dst = $lei->{ovv}->{dst};
 	if ($lei->{opt}->{augment}) {
@@ -338,6 +343,11 @@ sub _prepare_maildir {
 	} else { # clobber existing Maildir
 		_maildir_each_file($dst, \&_unlink);
 	}
+}
+
+sub _post_augment_maildir {
+	my ($self, $lei) = @_;
+	my $dst = $lei->{ovv}->{dst};
 	for my $x (qw(tmp new cur)) {
 		my $d = $dst.$x;
 		next if -d $d;
@@ -347,45 +357,64 @@ sub _prepare_maildir {
 	}
 }
 
-sub _prepare_mbox {
+sub _pre_augment_mbox {
 	my ($self, $lei) = @_;
 	my $dst = $lei->{ovv}->{dst};
-	my ($out, $seekable);
-	if ($dst eq '/dev/stdout') {
-		$out = $lei->{1};
-	} else {
+	if ($dst ne '/dev/stdout') {
 		my $mode = -p $dst ? '>' : '+>>';
 		if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) {
 			$! == ENOENT or die "unlink($dst): $!";
 		}
-		open $out, $mode, $dst or die "open($dst): $!";
-		# Perl does SEEK_END even with O_APPEND :<
-		$seekable = seek($out, 0, SEEK_SET);
-		die "seek($dst): $!\n" if !$seekable && $! != ESPIPE;
+		open my $out, $mode, $dst or die "open($dst): $!";
 		$lei->{1} = $out;
 	}
+	# Perl does SEEK_END even with O_APPEND :<
+	$self->{seekable} = seek($lei->{1}, 0, SEEK_SET);
+	if (!$self->{seekable} && $! != ESPIPE && $dst ne '/dev/stdout') {
+		die "seek($dst): $!\n";
+	}
 	state $zsfx_allow = join('|', keys %zsfx2cmd);
-	my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
+	($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/);
+}
+
+sub _do_augment_mbox {
+	my ($self, $lei) = @_;
+	return if !$lei->{opt}->{augment};
 	my $dedupe = $lei->{dedupe};
-	if ($lei->{opt}->{augment}) {
-		die "cannot augment $dst, not seekable\n" if !$seekable;
-		if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
-			my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
-					dup_src($out);
-			my $fmt = $lei->{ovv}->{fmt};
-			require PublicInbox::MboxReader;
-			PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
-		}
-		# maybe some systems don't honor O_APPEND, Perl does this:
-		seek($out, 0, SEEK_END) or die "seek $dst: $!";
-		$dedupe->pause_dedupe if $dedupe;
+	my $dst = $lei->{ovv}->{dst};
+	die "cannot augment $dst, not seekable\n" if !$self->{seekable};
+	my $out = $lei->{1};
+	if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
+		my $zsfx = $self->{zsfx};
+		my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
+				dup_src($out);
+		my $fmt = $lei->{ovv}->{fmt};
+		require PublicInbox::MboxReader;
+		PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
 	}
-	compress_dst($self, $zsfx, $lei) if $zsfx;
+	# maybe some systems don't honor O_APPEND, Perl does this:
+	seek($out, 0, SEEK_END) or die "seek $dst: $!";
+	$dedupe->pause_dedupe if $dedupe;
+}
+
+sub pre_augment { # fast (1 disk seek), runs in main daemon
+	my ($self, $lei) = @_;
+	# _pre_augment_maildir, _pre_augment_mbox
+	my $m = "_pre_augment_$self->{base_type}";
+	$self->$m($lei);
+}
+
+sub do_augment { # slow, runs in wq worker
+	my ($self, $lei) = @_;
+	# _do_augment_maildir, _do_augment_mbox
+	my $m = "_do_augment_$self->{base_type}";
+	$self->$m($lei);
 }
 
-sub do_prepare {
+sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
 	my ($self, $lei) = @_;
-	my $m = "_prepare_$self->{base_type}";
+	# _post_augment_maildir, _post_augment_mbox
+	my $m = "_post_augment_$self->{base_type}";
 	$self->$m($lei);
 }
 
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 8b70167c..9563ad63 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -9,6 +9,10 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
 use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::OpPipe;
+use PublicInbox::Import;
+use File::Temp 0.19 (); # 0.19 for ->newdir
+use File::Spec ();
 
 sub new {
 	my ($class) = @_;
@@ -103,9 +107,9 @@ sub query_thread_mset { # for --thread
 	}
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
-	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
-	my $dd = $lei->{dd};
-	$dd->prepare_dedupe;
+	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
+	my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
+	$dedupe->prepare_dedupe;
 	do {
 		$mset = $srch->mset($mo->{qstr}, $mo);
 		my $ids = $srch->mset_to_artnums($mset, $mo);
@@ -115,7 +119,7 @@ sub query_thread_mset { # for --thread
 		while ($over->expand_thread($ctx)) {
 			for my $n (@{$ctx->{xids}}) {
 				my $smsg = $over->get_art($n) or next;
-				next if $dd->is_smsg_dup($smsg);
+				next if $dedupe->is_smsg_dup($smsg);
 				my $mitem = delete $n2item{$smsg->{num}};
 				$each_smsg->($smsg, $mitem);
 			}
@@ -132,65 +136,113 @@ sub query_mset { # non-parallel for non-"--thread" users
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
 	$self->attach_external($_) for @$srcs;
-	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
-	my $dd = $lei->{dd};
-	$dd->prepare_dedupe;
+	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
+	my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
+	$dedupe->prepare_dedupe;
 	do {
 		$mset = $self->mset($mo->{qstr}, $mo);
 		for my $it ($mset->items) {
 			my $smsg = smsg_for($self, $it) or next;
-			next if $dd->is_smsg_dup($smsg);
+			next if $dedupe->is_smsg_dup($smsg);
 			$each_smsg->($smsg, $it);
 		}
 	} while (_mset_more($mset, $mo));
 	$lei->{ovv}->ovv_atexit_child($lei);
 }
 
-sub query_done { # PublicInbox::EOFpipe callback
+sub git {
+	my ($self) = @_;
+	my (%seen, @dirs);
+	my $tmp = File::Temp->newdir('lei_xsrch_git-XXXXXXXX', TMPDIR => 1);
+	for my $ibx (@{$self->{shard2ibx} // []}) {
+		my $d = File::Spec->canonpath($ibx->git->{git_dir});
+		$seen{$d} //= push @dirs, "$d/objects\n"
+	}
+	my $git_dir = $tmp->dirname;
+	PublicInbox::Import::init_bare($git_dir);
+	my $f = "$git_dir/objects/info/alternates";
+	open my $alt, '>', $f or die "open($f): $!";
+	print $alt @dirs or die "print $f: $!";
+	close $alt or die "close $f: $!";
+	my $git = PublicInbox::Git->new($git_dir);
+	$git->{-tmp} = $tmp;
+	$git;
+}
+
+sub query_done { # EOF callback
 	my ($lei) = @_;
 	$lei->{ovv}->ovv_end($lei);
 	$lei->dclose;
 }
 
-sub do_query {
-	my ($self, $lei_orig, $srcs) = @_;
-	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+sub start_query { # always runs in main (lei-daemon) process
+	my ($self, $io, $lei, $srcs) = @_;
+	if (my $l2m = $lei->{l2m}) {
+		$lei->{1} = $io->[1];
+		$l2m->post_augment($lei);
+		$io->[1] = delete $lei->{1};
+	}
 	my $remotes = $self->{remotes} // [];
-	pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
-	$io[0] = $qry_done; # don't need stdin
-
 	if ($lei->{opt}->{thread}) {
 		$lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
 		for my $ibxish (@$srcs) {
-			$self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
+			$self->wq_do('query_thread_mset', $io, $lei, $ibxish);
 		}
 	} else {
 		$lei->{-parallel} = scalar(@$remotes);
-		$self->wq_do('query_mset', \@io, $lei, $srcs);
+		$self->wq_do('query_mset', $io, $lei, $srcs);
 	}
 	# TODO
 	for my $rmt (@$remotes) {
-		$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
+		$self->wq_do('query_thread_mbox', $io, $lei, $rmt);
 	}
-	@io = ();
-	close $qry_done; # fully closed when children are done
-
-	# query_done will run when query_*mset close $qry_done
-	if ($lei_orig->{sock}) { # watch for client premature exit
-		require PublicInbox::EOFpipe;
-		PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
-		$lei_orig->{lxs} = $self;
-		$lei_orig->event_step_init;
+	close $io->[0]; # qry_status_wr
+	@$io = ();
+}
+
+sub query_prepare { # wq_do
+	my ($self, $lei) = @_;
+	my %sig = $lei->atfork_child_wq($self);
+	local @SIG{keys %sig} = values %sig;
+	if (my $l2m = $lei->{l2m}) {
+		eval { $l2m->do_augment($lei) };
+		return $lei->fail($@) if $@;
+	}
+	# trigger PublicInbox::OpPipe->event_step
+	my $qry_status_wr = $lei->{0} or
+		return $lei->fail('BUG: qry_status_wr missing');
+	$qry_status_wr->autoflush(1);
+	print $qry_status_wr '.' or # this should never fail...
+		return $lei->fail("BUG? print qry_status_wr: $!");
+}
+
+sub do_query {
+	my ($self, $lei_orig, $srcs) = @_;
+	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+	$io[0] = undef;
+	pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
+
+	$lei_orig->{lxs} = $self;
+	$lei_orig->event_step_init; # wait for shutdowns
+	my $op_map = { '' => [ \&query_done, $lei_orig ] };
+	my $in_loop = exists $lei_orig->{sock};
+	my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
+	if (my $l2m = $lei->{l2m}) {
+		$l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
+		$io[1] = $lei_orig->{1};
+		$op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
+		$self->wq_do('query_prepare', \@io, $lei);
+		$opp->event_step if !$in_loop;
 	} else {
+		start_query($self, \@io, $lei, $srcs);
+	}
+	unless ($in_loop) {
 		my @pids = $self->wq_close;
-		# wait for close($lei->{0})
-		if (read($eof_wait, my $buf, 1)) {
-			# if we get a SIGPIPE from one, kill the rest
-			kill('TERM', @pids) if $buf eq '!';
-		}
+		# for the $lei->atfork_child_wq PIPE handler:
+		$op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+		$opp->event_step;
 		my $ipc_worker_reap = $self->can('ipc_worker_reap');
 		dwaitpid($_, $ipc_worker_reap, $self) for @pids;
-		query_done($lei_orig); # may SIGPIPE
 	}
 }
 
diff --git a/lib/PublicInbox/OpPipe.pm b/lib/PublicInbox/OpPipe.pm
new file mode 100644
index 00000000..295a8aa5
--- /dev/null
+++ b/lib/PublicInbox/OpPipe.pm
@@ -0,0 +1,41 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# bytecode dispatch pipe, reads a byte, runs a sub
+# byte => [ sub, @operands ]
+package PublicInbox::OpPipe;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN);
+
+sub new {
+	my ($cls, $rd, $op_map, $in_loop) = @_;
+	my $self = bless { sock => $rd, op_map => $op_map }, $cls;
+	# 1031: F_SETPIPE_SZ, 4096: page size
+	fcntl($rd, 1031, 4096) if $^O eq 'linux';
+	if ($in_loop) { # iff using DS->EventLoop
+		$rd->blocking(0);
+		$self->SUPER::new($rd, EPOLLIN);
+	}
+	$self;
+}
+
+sub event_step {
+	my ($self) = @_;
+	my $rd = $self->{sock};
+	my $byte;
+	until (defined(sysread($rd, $byte, 1))) {
+		return if $!{EAGAIN};
+		next if $!{EINTR};
+		die "read \$rd: $!";
+	}
+	my $op = $self->{op_map}->{$byte} or die "BUG: unknown byte `$byte'";
+	if ($byte eq '') { # close on EOF
+		$rd->blocking ? delete($self->{sock}) : $self->close;
+	}
+	my ($sub, @args) = @$op;
+	$sub->(@args);
+}
+
+1;
diff --git a/t/lei.t b/t/lei.t
index 2349dca4..c4692217 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -7,6 +7,7 @@ use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Config;
 use File::Path qw(rmtree);
+use Fcntl qw(SEEK_SET);
 require_git 2.6;
 require_mods(qw(json DBD::SQLite Search::Xapian));
 my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') };
@@ -188,6 +189,25 @@ my $test_external = sub {
 	# No double-quoting should be imposed on users on the CLI
 	$lei->('q', 's:use boolean prefix');
 	like($out, qr/search: use boolean prefix/, 'phrase search got result');
+
+	$lei->('q', '-o', "mboxcl2:$home/mbox", 's:use boolean prefix');
+	open my $mb, '<', "$home/mbox" or fail "no mbox: $!";
+	my @s = grep(/^Subject:/, <$mb>);
+	is(scalar(@s), 1, '1 result in mbox');
+	$lei->('q', '-a', '-o', "mboxcl2:$home/mbox", 's:see attachment');
+	is($err, '', 'no errors from augment');
+	seek($mb, 0, SEEK_SET) or BAIL_OUT "seek: $!";
+	@s = grep(/^Subject:/, <$mb>);
+	is(scalar(@s), 2, '2 results in mbox');
+
+	$lei->('q', '-a', '-o', "mboxcl2:$home/mbox", 's:nonexistent');
+	is($err, '', 'no errors on no results');
+	seek($mb, 0, SEEK_SET) or BAIL_OUT "seek: $!";
+	my @s2 = grep(/^Subject:/, <$mb>);
+	is_deeply(\@s2, \@s, 'same 2 old results w/ --augment and bad search');
+
+	$lei->('q', '-o', "mboxcl2:$home/mbox", 's:nonexistent');
+	is(-s "$home/mbox", 0, 'clobber w/o --augment');
 };
 
 my $test_lei_common = sub {
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index d5beb3d2..083e0df4 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -94,7 +94,9 @@ my $wcb_get = sub {
 		my $dup = Storable::thaw(Storable::freeze($l2m));
 		is_deeply($dup, $l2m, "$fmt round-trips through storable");
 	}
-	$l2m->do_prepare($lei);
+	$l2m->pre_augment($lei);
+	$l2m->do_augment($lei);
+	$l2m->post_augment($lei);
 	my $cb = $l2m->write_cb($lei);
 	delete $lei->{1};
 	$cb;

  parent reply	other threads:[~2021-01-16 11:36 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-16 11:36 [PATCH 0/4] lei q: outputs to Maildir and mbox* working Eric Wong
2021-01-16 11:36 ` [PATCH 1/4] lei_to_mail: prepare for worker offload Eric Wong
2021-01-16 11:36 ` [PATCH 2/4] ipc: children don't kill on DESTROY, reduce FD sharing Eric Wong
2021-01-16 11:36 ` Eric Wong [this message]
2021-01-16 11:36 ` [PATCH 4/4] lei: pager: pass correct env in oneshot mode Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210116113624.19930-4-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).