user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 22/22] lei: query: restore JSON output overview
Date: Sun, 10 Jan 2021 12:15:19 +0000	[thread overview]
Message-ID: <20210110121519.17044-23-e@80x24.org> (raw)
In-Reply-To: <20210110121519.17044-1-e@80x24.org>

This internal API is better suited for fork-friendliness (but
locking + dedupe still needs to be re-added).

Normal "json" is the default, though stream-friendly "concatjson"
and "jsonl" (AKA "ndjson" AKA "ldjson") all seem working
(though tests aren't working, yet).

For normal "json", the biggest downside is the necessity of a
trailing "null" element at the end of the array because of
parallel processes, since (AFAIK) regular JSON doesn't allow
trailing commas, unlike JavaScript.
---
 MANIFEST                       |   1 +
 lib/PublicInbox/LeiOverview.pm | 188 +++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiQuery.pm    |  66 +++---------
 lib/PublicInbox/LeiXSearch.pm  |  25 +++--
 4 files changed, 217 insertions(+), 63 deletions(-)
 create mode 100644 lib/PublicInbox/LeiOverview.pm

diff --git a/MANIFEST b/MANIFEST
index caddd8df..810aec42 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -166,6 +166,7 @@ lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
 lib/PublicInbox/LeiDedupe.pm
 lib/PublicInbox/LeiExternal.pm
+lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiSearch.pm
 lib/PublicInbox/LeiStore.pm
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
new file mode 100644
index 00000000..8a1f4f82
--- /dev/null
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -0,0 +1,188 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# per-mitem/smsg iterators for search results
+# "ovv" => "Overview viewer"
+package PublicInbox::LeiOverview;
+use strict;
+use v5.10.1;
+use POSIX qw(strftime);
+use File::Spec;
+use PublicInbox::MID qw($MID_EXTRACT);
+use PublicInbox::Address qw(pairs);
+use PublicInbox::Config;
+use PublicInbox::Search qw(get_pct);
+
+# cf. https://en.wikipedia.org/wiki/JSON_streaming
+my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
+
+sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
+
+sub new {
+	my ($class, $lei) = @_;
+	my $opt = $lei->{opt};
+	my $out = $opt->{output} // '-';
+	$out = '/dev/stdout' if $out eq '-';
+
+	my $fmt = $opt->{'format'};
+	$fmt = lc($fmt) if defined $fmt;
+	if ($out =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
+		my $ofmt = lc $1;
+		$fmt //= $ofmt;
+		return $lei->fail(<<"") if $fmt ne $ofmt;
+--format=$fmt and --output=$ofmt conflict
+
+	}
+	$fmt //= 'json' if $out eq '/dev/stdout';
+	$fmt //= 'maildir'; # TODO
+
+	if (index($out, '://') < 0) { # not a URL, so assume path
+		 $out = File::Spec->canonpath($out);
+	} # else URL
+
+	my $self = bless { fmt => $fmt, out => $out }, $class;
+	my $json;
+	if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
+		$json = $self->{json} = ref(PublicInbox::Config->json);
+	}
+	my ($isatty, $seekable);
+	if ($out eq '/dev/stdout') {
+		$isatty = -t $lei->{1};
+		$lei->start_pager if $isatty;
+		$opt->{pretty} //= $isatty;
+	} elsif ($json) {
+		return $lei->fail('JSON formats only output to stdout');
+	}
+	$self;
+}
+
+# called once by parent
+sub ovv_begin {
+	my ($self, $lei) = @_;
+	if ($self->{fmt} eq 'json') {
+		print { $lei->{1} } '[';
+	} # TODO HTML/Atom/...
+}
+
+# called once by parent (via PublicInbox::EOFpipe)
+sub ovv_end {
+	my ($self, $lei) = @_;
+	if ($self->{fmt} eq 'json') {
+		# JSON doesn't allow trailing commas, and preventing
+		# trailing commas is a PITA when parallelizing outputs
+		print { $lei->{1} } "null]\n";
+	} elsif ($self->{fmt} eq 'concatjson') {
+		print { $lei->{1} } "\n";
+	}
+}
+
+sub ovv_atfork_child {
+	my ($self) = @_;
+	# reopen dedupe here
+}
+
+# prepares an smsg for JSON
+sub _unbless_smsg {
+	my ($smsg, $mitem) = @_;
+
+	delete @$smsg{qw(lines bytes num tid)};
+	$smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
+	$smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
+	$smsg->{relevance} = get_pct($mitem) if $mitem;
+
+	if (my $r = delete $smsg->{references}) {
+		$smsg->{references} = [
+				map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
+	}
+	if (my $m = delete($smsg->{mid})) {
+		$smsg->{'m'} = "<$m>";
+	}
+	for my $f (qw(from to cc)) {
+		my $v = delete $smsg->{$f} or next;
+		$smsg->{substr($f, 0, 1)} = pairs($v);
+	}
+	$smsg->{'s'} = delete $smsg->{subject};
+	# can we be bothered to parse From/To/Cc into arrays?
+	scalar { %$smsg }; # unbless
+}
+
+sub ovv_atexit_child {
+	my ($self, $lei) = @_;
+	my $bref = delete $lei->{ovv_buf} or return;
+	print { $lei->{1} } $$bref;
+}
+
+# JSON module ->pretty output wastes too much vertical white space,
+# this (IMHO) provides better use of screen real-estate while not
+# being excessively compact:
+sub _json_pretty {
+	my ($json, $k, $v) = @_;
+	if (ref $v eq 'ARRAY') {
+		if (@$v) {
+			my $sep = ",\n" . (' ' x (length($k) + 7));
+			if (ref($v->[0])) { # f/t/c
+				$v = '[' . join($sep, map {
+					my $pair = $json->encode($_);
+					$pair =~ s/(null|"),"/$1, "/g;
+					$pair;
+				} @$v) . ']';
+			} else { # references
+				$v = '[' . join($sep, map {
+					substr($json->encode([$_]), 1, -1);
+				} @$v) . ']';
+			}
+		} else {
+			$v = '[]';
+		}
+	}
+	qq{  "$k": }.$v;
+}
+
+sub ovv_each_smsg_cb {
+	my ($self, $lei) = @_;
+	$lei->{ovv_buf} = \(my $buf = '');
+	my $json = $self->{json}->new;
+	if ($json) {
+		$json->utf8->canonical;
+		$json->ascii(1) if $lei->{opt}->{ascii};
+	}
+	if ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
+		my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+		sub { # DIY prettiness :P
+			my ($smsg, $mitem) = @_;
+			$smsg = _unbless_smsg($smsg, $mitem);
+			$buf .= "{\n";
+			$buf .= join(",\n", map {
+				my $v = $smsg->{$_};
+				if (ref($v)) {
+					_json_pretty($json, $_, $v);
+				} else {
+					$v = $json->encode([$v]);
+					qq{  "$_": }.substr($v, 1, -1);
+				}
+			} sort keys %$smsg);
+			$buf .= $EOR;
+			if (length($buf) > 65536) {
+				print { $lei->{1} } $buf;
+				$buf = '';
+			}
+		}
+	} elsif ($json) {
+		my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+		sub {
+			my ($smsg, $mitem) = @_;
+			delete @$smsg{qw(tid num)};
+			$buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
+			if (length($buf) > 65536) {
+				print { $lei->{1} } $buf;
+				$buf = '';
+			}
+		}
+	} elsif ($self->{fmt} eq 'oid') {
+		sub {
+			my ($smsg, $mitem) = @_;
+		}
+	} # else { ...
+}
+
+1;
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 2f4b99e5..7ca01454 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -5,43 +5,8 @@
 package PublicInbox::LeiQuery;
 use strict;
 use v5.10.1;
-use PublicInbox::MID qw($MID_EXTRACT);
-use POSIX qw(strftime);
-use PublicInbox::Address qw(pairs);
 use PublicInbox::DS qw(dwaitpid);
 
-sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
-
-# prepares an smsg for JSON
-sub _smsg_unbless ($) {
-	my ($smsg) = @_;
-
-	delete @$smsg{qw(lines bytes)};
-	$smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
-	$smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
-
-	if (my $r = delete $smsg->{references}) {
-		$smsg->{references} = [
-				map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
-	}
-	if (my $m = delete($smsg->{mid})) {
-		$smsg->{'m'} = "<$m>";
-	}
-	# XXX breaking to/cc, into structured arrays or tables which
-	# distinguish "$phrase <$address>" causes pretty printing JSON
-	# to take up too much vertical space.  I can't get either
-	# Cpanel::JSON::XS or JSON::XS or jq(1) only indent when
-	# wrapping is necessary, rather than blindly indenting and
-	# adding vertical space everywhere.
-	for my $f (qw(from to cc)) {
-		my $v = delete $smsg->{$f} or next;
-		$smsg->{substr($f, 0, 1)} = $v;
-	}
-	$smsg->{'s'} = delete $smsg->{subject};
-	# can we be bothered to parse From/To/Cc into arrays?
-	scalar { %$smsg }; # unbless
-}
-
 sub _vivify_external { # _externals_each callback
 	my ($src, $dir) = @_;
 	if (-f "$dir/ei.lock") {
@@ -68,6 +33,7 @@ sub lei_q {
 	# src: LeiXSearch || LeiSearch || Inbox
 	my @srcs;
 	require PublicInbox::LeiXSearch;
+	require PublicInbox::LeiOverview;
 	my $lxs = PublicInbox::LeiXSearch->new;
 
 	# --external is enabled by default, but allow --no-external
@@ -83,23 +49,9 @@ sub lei_q {
 			// $lxs->wq_workers($j);
 	}
 	unshift(@srcs, $sto->search) if $opt->{'local'};
-	my $out = $opt->{output} // '-';
-	$out = 'json:/dev/stdout' if $out eq '-';
-	my $isatty = -t $self->{1};
 	# no forking workers after this
-	$self->start_pager if $isatty;
-	my $json = substr($out, 0, 5) eq 'json:' ?
-		ref(PublicInbox::Config->json)->new : undef;
-	if ($json) {
-		if ($opt->{pretty} //= $isatty) {
-			$json->pretty(1)->space_before(0);
-			$json->indent_length($opt->{indent} // 2);
-		}
-		$json->utf8; # avoid Wide character in print warnings
-		$json->ascii(1) if $opt->{ascii}; # for "\uXXXX"
-		$json->canonical;
-	}
-
+	require PublicInbox::LeiOverview;
+	$self->{ovv} = PublicInbox::LeiOverview->new($self);
 	my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
 	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
 	$mset_opt{qstr} = join(' ', map {;
@@ -124,7 +76,17 @@ sub lei_q {
 	$mset_opt{relevance} //= -2 if $opt->{thread};
 	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
 	$self->{mset_opt} = \%mset_opt;
-	$lxs->do_query($self, \@srcs);
+	$self->{ovv}->ovv_begin($self);
+	pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
+	require PublicInbox::EOFpipe;
+	my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self);
+	$lxs->do_query($self, $qry_done, \@srcs);
+	$eof->event_step unless $self->{sock};
+}
+
+sub query_done { # PublicInbox::EOFpipe callback
+	my ($self) = @_;
+	$self->{ovv}->ovv_end($self);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 94f7c2bc..c030b2b2 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -8,7 +8,6 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::Search qw(get_pct);
 use Sys::Syslog qw(syslog);
 
 sub new {
@@ -102,26 +101,26 @@ sub query_thread_mset { # for --thread
 	}
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
+	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
 	do {
 		$mset = $srch->mset($mo->{qstr}, $mo);
 		my $ids = $srch->mset_to_artnums($mset, $mo);
 		my $ctx = { ids => $ids };
 		my $i = 0;
-		my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+		my %n2item = map { ($ids->[$i++], $_) } $mset->items;
 		while ($over->expand_thread($ctx)) {
 			for my $n (@{$ctx->{xids}}) {
 				my $smsg = $over->get_art($n) or next;
 				# next if $dd->is_smsg_dup($smsg); TODO
-				if (my $p = delete $n2p{$smsg->{num}}) {
-					$smsg->{relevance} = $p;
-				}
-				print { $self->{1} } Dumper($smsg);
+				my $mitem = delete $n2item{$smsg->{num}};
+				$each_smsg->($smsg, $mitem);
 				# $self->out($buf .= $ORS);
 				# $emit_cb->($smsg);
 			}
 			@{$ctx->{xids}} = ();
 		}
 	} while (_mset_more($mset, $mo));
+	$lei->{ovv}->ovv_atexit_child($lei);
 }
 
 sub query_mset { # non-parallel for non-"--thread" users
@@ -130,23 +129,24 @@ sub query_mset { # non-parallel for non-"--thread" users
 	my $mset;
 	local %SIG = (%SIG, $lei->atfork_child_wq($self));
 	$self->attach_external($_) for @$srcs;
+	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
 	do {
 		$mset = $self->mset($mo->{qstr}, $mo);
 		for my $it ($mset->items) {
 			my $smsg = smsg_for($self, $it) or next;
 			# next if $dd->is_smsg_dup($smsg);
-			$smsg->{relevance} = get_pct($it);
-			use Data::Dumper;
-			print { $self->{1} } Dumper($smsg);
+			$each_smsg->($smsg, $it);
 			# $self->out($buf .= $ORS) if defined $buf;
 			#$emit_cb->($smsg);
 		}
 	} while (_mset_more($mset, $mo));
+	$lei->{ovv}->ovv_atexit_child($lei);
 }
 
 sub do_query {
-	my ($self, $lei_orig, $srcs) = @_;
+	my ($self, $lei_orig, $qry_done, $srcs) = @_;
 	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+	$io[0] = $qry_done; # don't need stdin
 	$io[1]->autoflush(1);
 	$io[2]->autoflush(1);
 	if ($lei->{opt}->{thread}) {
@@ -160,6 +160,9 @@ sub do_query {
 	for my $rmt (@{$self->{remotes} // []}) {
 		$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
 	}
+
+	# sent off to children, they will drop remaining references to it
+	close $qry_done;
 }
 
 sub ipc_atfork_child {
@@ -170,7 +173,7 @@ sub ipc_atfork_child {
 
 sub ipc_atfork_prepare {
 	my ($self) = @_;
-	$self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]);
+	$self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
 	$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 

      parent reply	other threads:[~2021-01-10 12:15 UTC|newest]

Thread overview: 23+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
2021-01-10 12:14 ` [PATCH 01/22] lei query + pagination sorta working Eric Wong
2021-01-10 12:14 ` [PATCH 02/22] lei q: deduplicate smsg Eric Wong
2021-01-10 12:15 ` [PATCH 03/22] ds: block signals when reaping Eric Wong
2021-01-10 12:15 ` [PATCH 04/22] ipc: add support for asynchronous callbacks Eric Wong
2021-01-10 12:15 ` [PATCH 05/22] cmd_ipc: send FDs with buffer payload Eric Wong
2021-01-10 12:15 ` [PATCH 06/22] ipc: avoid excessive evals Eric Wong
2021-01-10 12:15 ` [PATCH 07/22] ipc: work queue support via SOCK_SEQPACKET Eric Wong
2021-01-10 12:15 ` [PATCH 08/22] ipc: eliminate ipc_worker_stop method Eric Wong
2021-01-10 12:15 ` [PATCH 09/22] ipc: wq: support dynamic worker count change Eric Wong
2021-01-10 12:15 ` [PATCH 10/22] ipc: drop -ipc_parent_pid field Eric Wong
2021-01-10 12:15 ` [PATCH 11/22] ipc: DESTROY and wq_workers methods Eric Wong
2021-01-10 12:15 ` [PATCH 12/22] lei: rename $w to $wpager for warning message Eric Wong
2021-01-10 12:15 ` [PATCH 13/22] lei: fix oneshot TTY detection by passing STD*{GLOB} Eric Wong
2021-01-10 12:15 ` [PATCH 14/22] lei: query: ensure pager exit is instantaneous Eric Wong
2021-01-10 12:15 ` [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs Eric Wong
2021-01-10 12:15 ` [PATCH 16/22] ipc: fix IO::FDPass use with a worker limit of 1 Eric Wong
2021-01-10 12:15 ` [PATCH 17/22] ipc: drop unused fields, default sighandlers for wq Eric Wong
2021-01-10 12:15 ` [PATCH 18/22] lei: get rid of client {pid} field Eric Wong
2021-01-10 12:15 ` [PATCH 19/22] lei: fork + FD cleanup Eric Wong
2021-01-10 12:15 ` [PATCH 20/22] lei: run pager in client script Eric Wong
2021-01-10 12:15 ` [PATCH 21/22] lei_xsearch: transfer 4 FDs internally, drop IO::FDPass Eric Wong
2021-01-10 12:15 ` Eric Wong [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: http://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210110121519.17044-23-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).