user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 3/4] lei: q: results output to Maildir and mbox* working
  2021-01-16 11:36  7% [PATCH 0/4] lei q: outputs to Maildir and mbox* working Eric Wong
@ 2021-01-16 11:36  3% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2021-01-16 11:36 UTC (permalink / raw)
  To: meta

All the augment and deduplication stuff seems to be working
based on unit tests.  OpPipe is a nice general addition that
will probably make future state machines easier.
---
 MANIFEST                       |   1 +
 lib/PublicInbox/LEI.pm         |  27 +++++---
 lib/PublicInbox/LeiDedupe.pm   |   3 +-
 lib/PublicInbox/LeiOverview.pm |  44 ++++++++----
 lib/PublicInbox/LeiQuery.pm    |  14 ++--
 lib/PublicInbox/LeiToMail.pm   |  89 ++++++++++++++++---------
 lib/PublicInbox/LeiXSearch.pm  | 118 ++++++++++++++++++++++++---------
 lib/PublicInbox/OpPipe.pm      |  41 ++++++++++++
 t/lei.t                        |  20 ++++++
 t/lei_to_mail.t                |   4 +-
 10 files changed, 266 insertions(+), 95 deletions(-)
 create mode 100644 lib/PublicInbox/OpPipe.pm

diff --git a/MANIFEST b/MANIFEST
index 0ebdaccc..0de1de4a 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -193,6 +193,7 @@ lib/PublicInbox/NNTPD.pm
 lib/PublicInbox/NNTPdeflate.pm
 lib/PublicInbox/NewsWWW.pm
 lib/PublicInbox/OnDestroy.pm
+lib/PublicInbox/OpPipe.pm
 lib/PublicInbox/Over.pm
 lib/PublicInbox/OverIdx.pm
 lib/PublicInbox/ProcessPipe.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 5568904d..f849c9df 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -256,7 +256,9 @@ sub puts ($;@) { print { shift->{1} } map { "$_\n" } @_ }
 sub out ($;@) { print { shift->{1} } @_ }
 
 sub err ($;@) {
-	print { shift->{2} } @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
+	my $self = shift;
+	my $err = $self->{2} // *STDERR{IO};
+	print $err @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
 }
 
 sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
@@ -270,8 +272,11 @@ sub fail ($$;$) {
 
 sub atfork_prepare_wq {
 	my ($self, $wq) = @_;
-	push @{$wq->{-ipc_atfork_child_close}}, @TO_CLOSE_ATFORK_CHILD,
-				grep { defined } @$self{qw(0 1 2 sock)}
+	my $tcafc = $wq->{-ipc_atfork_child_close};
+	push @$tcafc, @TO_CLOSE_ATFORK_CHILD;
+	if (my $sock = $self->{sock}) {
+		push @$tcafc, @$self{qw(0 1 2)}, $sock;
+	}
 }
 
 # usage: my %sig = $lei->atfork_child_wq($wq);
@@ -286,7 +291,9 @@ sub atfork_child_wq {
 	PIPE => sub {
 		$self->x_it(13); # SIGPIPE = 13
 		# we need to close explicitly to avoid Perl warning on SIGPIPE
-		close($_) for (delete @$self{1..2});
+		close(delete $self->{1});
+		# regular files and /dev/null (-c) won't trigger SIGPIPE
+		close(delete $self->{2}) unless (-f $self->{2} || -c _);
 		syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait
 		die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
 	});
@@ -641,7 +648,7 @@ sub start_pager {
 	$new_env{MORE} = 'FRX' if $^O eq 'freebsd';
 	pipe(my ($r, $wpager)) or return warn "pipe: $!";
 	my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
-	my $pgr = [ undef, @$rdr{1, 2} ];
+	my $pgr = [ undef, @$rdr{1, 2}, $$ ];
 	if (my $sock = $self->{sock}) { # lei(1) process runs it
 		delete @new_env{keys %$env}; # only set iff unset
 		my $buf = "exec 1\0".$pager;
@@ -664,7 +671,7 @@ sub stop_pager {
 	# do not restore original stdout, just close it so we error out
 	close(delete($self->{1})) if $self->{1};
 	my $pid = $pgr->[0];
-	dwaitpid($pid, undef, $self->{sock}) if $pid;
+	dwaitpid($pid, undef, $self->{sock}) if $pid && $pgr->[3] == $$;
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -706,7 +713,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 sub dclose {
 	my ($self) = @_;
 	delete $self->{lxs}; # stops LeiXSearch queries
-	$self->close; # PublicInbox::DS::close
+	$self->close if $self->{sock}; # PublicInbox::DS::close
 }
 
 # for long-running results
@@ -737,8 +744,10 @@ sub event_step {
 
 sub event_step_init {
 	my ($self) = @_;
-	$self->{sock}->blocking(0);
-	$self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET);
+	if (my $sock = $self->{sock}) { # using DS->EventLoop
+		$sock->blocking(0);
+		$self->SUPER::new($sock, EPOLLIN|EPOLLET);
+	}
 }
 
 sub noop {}
diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 81754361..3f478aa4 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -89,8 +89,9 @@ sub true { 1 }
 sub dedupe_none ($) { (\&true, \&true) }
 
 sub new {
-	my ($cls, $lei, $dst) = @_;
+	my ($cls, $lei) = @_;
 	my $dd = $lei->{opt}->{dedupe} // 'content';
+	my $dst = $lei->{ovv}->{dst};
 
 	# allow "none" to bypass Eml->new if writing to directory:
 	return if ($dd eq 'none' && substr($dst // '', -1) eq '/');
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 9846bc8a..c0b423f6 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -15,6 +15,8 @@ use PublicInbox::MID qw($MID_EXTRACT);
 use PublicInbox::Address qw(pairs);
 use PublicInbox::Config;
 use PublicInbox::Search qw(get_pct);
+use PublicInbox::LeiDedupe;
+use PublicInbox::LeiToMail;
 
 # cf. https://en.wikipedia.org/wiki/JSON_streaming
 my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
@@ -44,7 +46,7 @@ sub new {
 
 	my $fmt = $opt->{'format'};
 	$fmt = lc($fmt) if defined $fmt;
-	if ($dst =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
+	if ($dst =~ s/\A([a-z0-9]+)://is) { # e.g. Maildir:/home/user/Mail/
 		my $ofmt = lc $1;
 		$fmt //= $ofmt;
 		return $lei->fail(<<"") if $fmt ne $ofmt;
@@ -52,13 +54,14 @@ sub new {
 
 	}
 	$fmt //= 'json' if $dst eq '/dev/stdout';
-	$fmt //= 'maildir'; # TODO
+	$fmt //= 'maildir';
 
 	if (index($dst, '://') < 0) { # not a URL, so assume path
 		 $dst = File::Spec->canonpath($dst);
 	} # else URL
 
 	my $self = bless { fmt => $fmt, dst => $dst }, $class;
+	$lei->{ovv} = $self;
 	my $json;
 	if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
 		$json = $self->{json} = ref(PublicInbox::Config->json);
@@ -75,11 +78,13 @@ sub new {
 		} else {
 			ovv_out_lk_init($self);
 		}
-	} elsif ($json) {
-		return $lei->fail('JSON formats only output to stdout');
-	} else {
-		return $lei->fail("TODO: $dst -f $fmt");
 	}
+	if (!$json) {
+		# default to the cheapest sort since MUA usually resorts
+		$lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
+		$lei->{l2m} = PublicInbox::LeiToMail->new($lei);
+	}
+	$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
 	$self;
 }
 
@@ -135,9 +140,13 @@ sub _unbless_smsg {
 
 sub ovv_atexit_child {
 	my ($self, $lei) = @_;
+	if (my $git = delete $self->{git}) {
+		$git->async_wait_all;
+	}
 	if (my $bref = delete $lei->{ovv_buf}) {
+		my $out = $lei->{1} or return;
 		my $lk = $self->lock_for_scope;
-		print { $lei->{1} } $$bref;
+		print $out $$bref;
 	}
 }
 
@@ -167,17 +176,28 @@ sub _json_pretty {
 	qq{  "$k": }.$v;
 }
 
-sub ovv_each_smsg_cb {
-	my ($self, $lei) = @_;
+sub ovv_each_smsg_cb { # runs in wq worker usually
+	my ($self, $lei, $ibxish) = @_;
 	$lei->{ovv_buf} = \(my $buf = '');
 	delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
-	my $json = $self->{json}->new;
+	my $json;
 	$lei->{1}->autoflush(1);
-	if ($json) {
+	if (my $pkg = $self->{json}) {
+		$json = $pkg->new;
 		$json->utf8->canonical;
 		$json->ascii(1) if $lei->{opt}->{ascii};
 	}
-	if ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
+	if (my $l2m = $lei->{l2m}) {
+		my $wcb = $l2m->write_cb($lei);
+		my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
+		$self->{git} = $git; # for ovv_atexit_child
+		my $g2m = $l2m->can('git_to_mail');
+		sub {
+			my ($smsg, $mitem) = @_;
+			my $kw = []; # TODO get from mitem
+			$git->cat_async($smsg->{blob}, $g2m, [ $wcb, $kw ]);
+		};
+	} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
 		my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
 		sub { # DIY prettiness :P
 			my ($smsg, $mitem) = @_;
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 69d2f9a6..a80d5887 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -23,8 +23,6 @@ sub _vivify_external { # _externals_each callback
 # the main "lei q SEARCH_TERMS" method
 sub lei_q {
 	my ($self, @argv) = @_;
-	my $sto = $self->_lei_store(1);
-	my $cfg = $self->_lei_cfg(1);
 	my $opt = $self->{opt};
 
 	# --local is enabled by default
@@ -32,7 +30,7 @@ sub lei_q {
 	my @srcs;
 	require PublicInbox::LeiXSearch;
 	require PublicInbox::LeiOverview;
-	require PublicInbox::LeiDedupe;
+	PublicInbox::Config->json;
 	my $lxs = PublicInbox::LeiXSearch->new;
 
 	# --external is enabled by default, but allow --no-external
@@ -46,10 +44,10 @@ sub lei_q {
 	$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
 		// $lxs->wq_workers($j);
 
-	unshift(@srcs, $sto->search) if $opt->{'local'};
 	# no forking workers after this
-	$self->{ovv} = PublicInbox::LeiOverview->new($self);
-	$self->{dd} = PublicInbox::LeiDedupe->new($self);
+	my $ovv = PublicInbox::LeiOverview->new($self) or return;
+	my $sto = $self->_lei_store(1);
+	unshift(@srcs, $sto->search) if $opt->{'local'};
 	my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
 	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
 	$mset_opt{qstr} = join(' ', map {;
@@ -69,12 +67,10 @@ sub lei_q {
 			die "unrecognized --sort=$sort\n";
 		}
 	}
-	# $self->out($json->encode(\%mset_opt));
 	# descending docid order
 	$mset_opt{relevance} //= -2 if $opt->{thread};
-	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
 	$self->{mset_opt} = \%mset_opt;
-	$self->{ovv}->ovv_begin($self);
+	$ovv->ovv_begin($self);
 	$lxs->do_query($self, \@srcs);
 }
 
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 5d4b7978..744f331d 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -187,8 +187,9 @@ sub zsfx2cmd ($$$) {
 	\@cmd;
 }
 
-sub compress_dst {
-	my ($self, $zsfx, $lei) = @_;
+sub _post_augment_mbox { # open a compressor process
+	my ($self, $lei) = @_;
+	my $zsfx = $self->{zsfx} or return;
 	my $cmd = zsfx2cmd($zsfx, undef, $lei);
 	pipe(my ($r, $w)) or die "pipe: $!";
 	my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
@@ -209,7 +210,9 @@ sub decompress_src ($$$) {
 
 sub dup_src ($) {
 	my ($in) = @_;
-	open my $dup, '+>>&', $in or die "dup: $!";
+	# fileno needed because wq_set_recv_modes only used ">&=" for {1}
+	# and Perl blindly trusts that to reject the '+' (readability flag)
+	open my $dup, '+>>&=', fileno($in) or die "dup: $!";
 	$dup;
 }
 
@@ -321,11 +324,13 @@ sub new {
 	} else {
 		die "bad mail --format=$fmt\n";
 	}
-	my $dedupe = $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei, $dst);
+	$lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
 	$self;
 }
 
-sub _prepare_maildir {
+sub _pre_augment_maildir {} # noop
+
+sub _do_augment_maildir {
 	my ($self, $lei) = @_;
 	my $dst = $lei->{ovv}->{dst};
 	if ($lei->{opt}->{augment}) {
@@ -338,6 +343,11 @@ sub _prepare_maildir {
 	} else { # clobber existing Maildir
 		_maildir_each_file($dst, \&_unlink);
 	}
+}
+
+sub _post_augment_maildir {
+	my ($self, $lei) = @_;
+	my $dst = $lei->{ovv}->{dst};
 	for my $x (qw(tmp new cur)) {
 		my $d = $dst.$x;
 		next if -d $d;
@@ -347,45 +357,64 @@ sub _prepare_maildir {
 	}
 }
 
-sub _prepare_mbox {
+sub _pre_augment_mbox {
 	my ($self, $lei) = @_;
 	my $dst = $lei->{ovv}->{dst};
-	my ($out, $seekable);
-	if ($dst eq '/dev/stdout') {
-		$out = $lei->{1};
-	} else {
+	if ($dst ne '/dev/stdout') {
 		my $mode = -p $dst ? '>' : '+>>';
 		if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) {
 			$! == ENOENT or die "unlink($dst): $!";
 		}
-		open $out, $mode, $dst or die "open($dst): $!";
-		# Perl does SEEK_END even with O_APPEND :<
-		$seekable = seek($out, 0, SEEK_SET);
-		die "seek($dst): $!\n" if !$seekable && $! != ESPIPE;
+		open my $out, $mode, $dst or die "open($dst): $!";
 		$lei->{1} = $out;
 	}
+	# Perl does SEEK_END even with O_APPEND :<
+	$self->{seekable} = seek($lei->{1}, 0, SEEK_SET);
+	if (!$self->{seekable} && $! != ESPIPE && $dst ne '/dev/stdout') {
+		die "seek($dst): $!\n";
+	}
 	state $zsfx_allow = join('|', keys %zsfx2cmd);
-	my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
+	($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/);
+}
+
+sub _do_augment_mbox {
+	my ($self, $lei) = @_;
+	return if !$lei->{opt}->{augment};
 	my $dedupe = $lei->{dedupe};
-	if ($lei->{opt}->{augment}) {
-		die "cannot augment $dst, not seekable\n" if !$seekable;
-		if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
-			my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
-					dup_src($out);
-			my $fmt = $lei->{ovv}->{fmt};
-			require PublicInbox::MboxReader;
-			PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
-		}
-		# maybe some systems don't honor O_APPEND, Perl does this:
-		seek($out, 0, SEEK_END) or die "seek $dst: $!";
-		$dedupe->pause_dedupe if $dedupe;
+	my $dst = $lei->{ovv}->{dst};
+	die "cannot augment $dst, not seekable\n" if !$self->{seekable};
+	my $out = $lei->{1};
+	if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
+		my $zsfx = $self->{zsfx};
+		my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
+				dup_src($out);
+		my $fmt = $lei->{ovv}->{fmt};
+		require PublicInbox::MboxReader;
+		PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
 	}
-	compress_dst($self, $zsfx, $lei) if $zsfx;
+	# maybe some systems don't honor O_APPEND, Perl does this:
+	seek($out, 0, SEEK_END) or die "seek $dst: $!";
+	$dedupe->pause_dedupe if $dedupe;
+}
+
+sub pre_augment { # fast (1 disk seek), runs in main daemon
+	my ($self, $lei) = @_;
+	# _pre_augment_maildir, _pre_augment_mbox
+	my $m = "_pre_augment_$self->{base_type}";
+	$self->$m($lei);
+}
+
+sub do_augment { # slow, runs in wq worker
+	my ($self, $lei) = @_;
+	# _do_augment_maildir, _do_augment_mbox
+	my $m = "_do_augment_$self->{base_type}";
+	$self->$m($lei);
 }
 
-sub do_prepare {
+sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
 	my ($self, $lei) = @_;
-	my $m = "_prepare_$self->{base_type}";
+	# _post_augment_maildir, _post_augment_mbox
+	my $m = "_post_augment_$self->{base_type}";
 	$self->$m($lei);
 }
 
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 8b70167c..9563ad63 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -9,6 +9,10 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
 use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::OpPipe;
+use PublicInbox::Import;
+use File::Temp 0.19 (); # 0.19 for ->newdir
+use File::Spec ();
 
 sub new {
 	my ($class) = @_;
@@ -103,9 +107,9 @@ sub query_thread_mset { # for --thread
 	}
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
-	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
-	my $dd = $lei->{dd};
-	$dd->prepare_dedupe;
+	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
+	my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
+	$dedupe->prepare_dedupe;
 	do {
 		$mset = $srch->mset($mo->{qstr}, $mo);
 		my $ids = $srch->mset_to_artnums($mset, $mo);
@@ -115,7 +119,7 @@ sub query_thread_mset { # for --thread
 		while ($over->expand_thread($ctx)) {
 			for my $n (@{$ctx->{xids}}) {
 				my $smsg = $over->get_art($n) or next;
-				next if $dd->is_smsg_dup($smsg);
+				next if $dedupe->is_smsg_dup($smsg);
 				my $mitem = delete $n2item{$smsg->{num}};
 				$each_smsg->($smsg, $mitem);
 			}
@@ -132,65 +136,113 @@ sub query_mset { # non-parallel for non-"--thread" users
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
 	$self->attach_external($_) for @$srcs;
-	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
-	my $dd = $lei->{dd};
-	$dd->prepare_dedupe;
+	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
+	my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
+	$dedupe->prepare_dedupe;
 	do {
 		$mset = $self->mset($mo->{qstr}, $mo);
 		for my $it ($mset->items) {
 			my $smsg = smsg_for($self, $it) or next;
-			next if $dd->is_smsg_dup($smsg);
+			next if $dedupe->is_smsg_dup($smsg);
 			$each_smsg->($smsg, $it);
 		}
 	} while (_mset_more($mset, $mo));
 	$lei->{ovv}->ovv_atexit_child($lei);
 }
 
-sub query_done { # PublicInbox::EOFpipe callback
+sub git {
+	my ($self) = @_;
+	my (%seen, @dirs);
+	my $tmp = File::Temp->newdir('lei_xsrch_git-XXXXXXXX', TMPDIR => 1);
+	for my $ibx (@{$self->{shard2ibx} // []}) {
+		my $d = File::Spec->canonpath($ibx->git->{git_dir});
+		$seen{$d} //= push @dirs, "$d/objects\n"
+	}
+	my $git_dir = $tmp->dirname;
+	PublicInbox::Import::init_bare($git_dir);
+	my $f = "$git_dir/objects/info/alternates";
+	open my $alt, '>', $f or die "open($f): $!";
+	print $alt @dirs or die "print $f: $!";
+	close $alt or die "close $f: $!";
+	my $git = PublicInbox::Git->new($git_dir);
+	$git->{-tmp} = $tmp;
+	$git;
+}
+
+sub query_done { # EOF callback
 	my ($lei) = @_;
 	$lei->{ovv}->ovv_end($lei);
 	$lei->dclose;
 }
 
-sub do_query {
-	my ($self, $lei_orig, $srcs) = @_;
-	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+sub start_query { # always runs in main (lei-daemon) process
+	my ($self, $io, $lei, $srcs) = @_;
+	if (my $l2m = $lei->{l2m}) {
+		$lei->{1} = $io->[1];
+		$l2m->post_augment($lei);
+		$io->[1] = delete $lei->{1};
+	}
 	my $remotes = $self->{remotes} // [];
-	pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
-	$io[0] = $qry_done; # don't need stdin
-
 	if ($lei->{opt}->{thread}) {
 		$lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
 		for my $ibxish (@$srcs) {
-			$self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
+			$self->wq_do('query_thread_mset', $io, $lei, $ibxish);
 		}
 	} else {
 		$lei->{-parallel} = scalar(@$remotes);
-		$self->wq_do('query_mset', \@io, $lei, $srcs);
+		$self->wq_do('query_mset', $io, $lei, $srcs);
 	}
 	# TODO
 	for my $rmt (@$remotes) {
-		$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
+		$self->wq_do('query_thread_mbox', $io, $lei, $rmt);
 	}
-	@io = ();
-	close $qry_done; # fully closed when children are done
-
-	# query_done will run when query_*mset close $qry_done
-	if ($lei_orig->{sock}) { # watch for client premature exit
-		require PublicInbox::EOFpipe;
-		PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
-		$lei_orig->{lxs} = $self;
-		$lei_orig->event_step_init;
+	close $io->[0]; # qry_status_wr
+	@$io = ();
+}
+
+sub query_prepare { # wq_do
+	my ($self, $lei) = @_;
+	my %sig = $lei->atfork_child_wq($self);
+	local @SIG{keys %sig} = values %sig;
+	if (my $l2m = $lei->{l2m}) {
+		eval { $l2m->do_augment($lei) };
+		return $lei->fail($@) if $@;
+	}
+	# trigger PublicInbox::OpPipe->event_step
+	my $qry_status_wr = $lei->{0} or
+		return $lei->fail('BUG: qry_status_wr missing');
+	$qry_status_wr->autoflush(1);
+	print $qry_status_wr '.' or # this should never fail...
+		return $lei->fail("BUG? print qry_status_wr: $!");
+}
+
+sub do_query {
+	my ($self, $lei_orig, $srcs) = @_;
+	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+	$io[0] = undef;
+	pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
+
+	$lei_orig->{lxs} = $self;
+	$lei_orig->event_step_init; # wait for shutdowns
+	my $op_map = { '' => [ \&query_done, $lei_orig ] };
+	my $in_loop = exists $lei_orig->{sock};
+	my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
+	if (my $l2m = $lei->{l2m}) {
+		$l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
+		$io[1] = $lei_orig->{1};
+		$op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
+		$self->wq_do('query_prepare', \@io, $lei);
+		$opp->event_step if !$in_loop;
 	} else {
+		start_query($self, \@io, $lei, $srcs);
+	}
+	unless ($in_loop) {
 		my @pids = $self->wq_close;
-		# wait for close($lei->{0})
-		if (read($eof_wait, my $buf, 1)) {
-			# if we get a SIGPIPE from one, kill the rest
-			kill('TERM', @pids) if $buf eq '!';
-		}
+		# for the $lei->atfork_child_wq PIPE handler:
+		$op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+		$opp->event_step;
 		my $ipc_worker_reap = $self->can('ipc_worker_reap');
 		dwaitpid($_, $ipc_worker_reap, $self) for @pids;
-		query_done($lei_orig); # may SIGPIPE
 	}
 }
 
diff --git a/lib/PublicInbox/OpPipe.pm b/lib/PublicInbox/OpPipe.pm
new file mode 100644
index 00000000..295a8aa5
--- /dev/null
+++ b/lib/PublicInbox/OpPipe.pm
@@ -0,0 +1,41 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# bytecode dispatch pipe, reads a byte, runs a sub
+# byte => [ sub, @operands ]
+package PublicInbox::OpPipe;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN);
+
+sub new {
+	my ($cls, $rd, $op_map, $in_loop) = @_;
+	my $self = bless { sock => $rd, op_map => $op_map }, $cls;
+	# 1031: F_SETPIPE_SZ, 4096: page size
+	fcntl($rd, 1031, 4096) if $^O eq 'linux';
+	if ($in_loop) { # iff using DS->EventLoop
+		$rd->blocking(0);
+		$self->SUPER::new($rd, EPOLLIN);
+	}
+	$self;
+}
+
+sub event_step {
+	my ($self) = @_;
+	my $rd = $self->{sock};
+	my $byte;
+	until (defined(sysread($rd, $byte, 1))) {
+		return if $!{EAGAIN};
+		next if $!{EINTR};
+		die "read \$rd: $!";
+	}
+	my $op = $self->{op_map}->{$byte} or die "BUG: unknown byte `$byte'";
+	if ($byte eq '') { # close on EOF
+		$rd->blocking ? delete($self->{sock}) : $self->close;
+	}
+	my ($sub, @args) = @$op;
+	$sub->(@args);
+}
+
+1;
diff --git a/t/lei.t b/t/lei.t
index 2349dca4..c4692217 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -7,6 +7,7 @@ use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Config;
 use File::Path qw(rmtree);
+use Fcntl qw(SEEK_SET);
 require_git 2.6;
 require_mods(qw(json DBD::SQLite Search::Xapian));
 my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') };
@@ -188,6 +189,25 @@ my $test_external = sub {
 	# No double-quoting should be imposed on users on the CLI
 	$lei->('q', 's:use boolean prefix');
 	like($out, qr/search: use boolean prefix/, 'phrase search got result');
+
+	$lei->('q', '-o', "mboxcl2:$home/mbox", 's:use boolean prefix');
+	open my $mb, '<', "$home/mbox" or fail "no mbox: $!";
+	my @s = grep(/^Subject:/, <$mb>);
+	is(scalar(@s), 1, '1 result in mbox');
+	$lei->('q', '-a', '-o', "mboxcl2:$home/mbox", 's:see attachment');
+	is($err, '', 'no errors from augment');
+	seek($mb, 0, SEEK_SET) or BAIL_OUT "seek: $!";
+	@s = grep(/^Subject:/, <$mb>);
+	is(scalar(@s), 2, '2 results in mbox');
+
+	$lei->('q', '-a', '-o', "mboxcl2:$home/mbox", 's:nonexistent');
+	is($err, '', 'no errors on no results');
+	seek($mb, 0, SEEK_SET) or BAIL_OUT "seek: $!";
+	my @s2 = grep(/^Subject:/, <$mb>);
+	is_deeply(\@s2, \@s, 'same 2 old results w/ --augment and bad search');
+
+	$lei->('q', '-o', "mboxcl2:$home/mbox", 's:nonexistent');
+	is(-s "$home/mbox", 0, 'clobber w/o --augment');
 };
 
 my $test_lei_common = sub {
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index d5beb3d2..083e0df4 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -94,7 +94,9 @@ my $wcb_get = sub {
 		my $dup = Storable::thaw(Storable::freeze($l2m));
 		is_deeply($dup, $l2m, "$fmt round-trips through storable");
 	}
-	$l2m->do_prepare($lei);
+	$l2m->pre_augment($lei);
+	$l2m->do_augment($lei);
+	$l2m->post_augment($lei);
 	my $cb = $l2m->write_cb($lei);
 	delete $lei->{1};
 	$cb;

^ permalink raw reply related	[relevance 3%]

* [PATCH 0/4] lei q: outputs to Maildir and mbox* working
@ 2021-01-16 11:36  7% Eric Wong
  2021-01-16 11:36  3% ` [PATCH 3/4] lei: q: results output " Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2021-01-16 11:36 UTC (permalink / raw)
  To: meta

Only lightly-tested but this is the key "inspired by mairix"
part.  It's slow compared to mairix due to git storage and not
being able to use hardlinks, but git blob extraction will be
parallelizable.

Eric Wong (4):
  lei_to_mail: prepare for worker offload
  ipc: children don't kill on DESTROY, reduce FD sharing
  lei: q: results output to Maildir and mbox* working
  lei: pager: pass correct env in oneshot mode

 MANIFEST                       |   1 +
 lib/PublicInbox/IPC.pm         |  21 ++--
 lib/PublicInbox/LEI.pm         |  30 +++--
 lib/PublicInbox/LeiDedupe.pm   |   3 +-
 lib/PublicInbox/LeiOverview.pm |  60 ++++++----
 lib/PublicInbox/LeiQuery.pm    |  14 +--
 lib/PublicInbox/LeiToMail.pm   | 206 +++++++++++++++++++++------------
 lib/PublicInbox/LeiXSearch.pm  | 119 ++++++++++++++-----
 lib/PublicInbox/OpPipe.pm      |  41 +++++++
 t/lei.t                        |  20 ++++
 t/lei_to_mail.t                |  64 +++++-----
 11 files changed, 398 insertions(+), 181 deletions(-)
 create mode 100644 lib/PublicInbox/OpPipe.pm

^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-01-16 11:36  7% [PATCH 0/4] lei q: outputs to Maildir and mbox* working Eric Wong
2021-01-16 11:36  3% ` [PATCH 3/4] lei: q: results output " Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).