about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-10 12:15:19 +0000
committerEric Wong <e@80x24.org>2021-01-12 03:51:43 +0000
commitb0898d18efbc8f646b736088f9600b87be88f91e (patch)
tree724cf4c2649eaf880087b2fbe2bf2b47ecde1675 /lib
parent3019046b3ab9736922762df111d60ef7647e36a3 (diff)
downloadpublic-inbox-b0898d18efbc8f646b736088f9600b87be88f91e.tar.gz
This internal API is better suited for fork-friendliness (but
locking + dedupe still needs to be re-added).

Normal "json" is the default, though stream-friendly "concatjson"
and "jsonl" (AKA "ndjson" AKA "ldjson") all seem working
(though tests aren't working, yet).

For normal "json", the biggest downside is the necessity of a
trailing "null" element at the end of the array because of
parallel processes, since (AFAIK) regular JSON doesn't allow
trailing commas, unlike JavaScript.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/LeiOverview.pm188
-rw-r--r--lib/PublicInbox/LeiQuery.pm66
-rw-r--r--lib/PublicInbox/LeiXSearch.pm25
3 files changed, 216 insertions, 63 deletions
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
new file mode 100644
index 00000000..8a1f4f82
--- /dev/null
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -0,0 +1,188 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# per-mitem/smsg iterators for search results
+# "ovv" => "Overview viewer"
+package PublicInbox::LeiOverview;
+use strict;
+use v5.10.1;
+use POSIX qw(strftime);
+use File::Spec;
+use PublicInbox::MID qw($MID_EXTRACT);
+use PublicInbox::Address qw(pairs);
+use PublicInbox::Config;
+use PublicInbox::Search qw(get_pct);
+
+# cf. https://en.wikipedia.org/wiki/JSON_streaming
+my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
+
+sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
+
+sub new {
+        my ($class, $lei) = @_;
+        my $opt = $lei->{opt};
+        my $out = $opt->{output} // '-';
+        $out = '/dev/stdout' if $out eq '-';
+
+        my $fmt = $opt->{'format'};
+        $fmt = lc($fmt) if defined $fmt;
+        if ($out =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
+                my $ofmt = lc $1;
+                $fmt //= $ofmt;
+                return $lei->fail(<<"") if $fmt ne $ofmt;
+--format=$fmt and --output=$ofmt conflict
+
+        }
+        $fmt //= 'json' if $out eq '/dev/stdout';
+        $fmt //= 'maildir'; # TODO
+
+        if (index($out, '://') < 0) { # not a URL, so assume path
+                 $out = File::Spec->canonpath($out);
+        } # else URL
+
+        my $self = bless { fmt => $fmt, out => $out }, $class;
+        my $json;
+        if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
+                $json = $self->{json} = ref(PublicInbox::Config->json);
+        }
+        my ($isatty, $seekable);
+        if ($out eq '/dev/stdout') {
+                $isatty = -t $lei->{1};
+                $lei->start_pager if $isatty;
+                $opt->{pretty} //= $isatty;
+        } elsif ($json) {
+                return $lei->fail('JSON formats only output to stdout');
+        }
+        $self;
+}
+
+# called once by parent
+sub ovv_begin {
+        my ($self, $lei) = @_;
+        if ($self->{fmt} eq 'json') {
+                print { $lei->{1} } '[';
+        } # TODO HTML/Atom/...
+}
+
+# called once by parent (via PublicInbox::EOFpipe)
+sub ovv_end {
+        my ($self, $lei) = @_;
+        if ($self->{fmt} eq 'json') {
+                # JSON doesn't allow trailing commas, and preventing
+                # trailing commas is a PITA when parallelizing outputs
+                print { $lei->{1} } "null]\n";
+        } elsif ($self->{fmt} eq 'concatjson') {
+                print { $lei->{1} } "\n";
+        }
+}
+
+sub ovv_atfork_child {
+        my ($self) = @_;
+        # reopen dedupe here
+}
+
+# prepares an smsg for JSON
+sub _unbless_smsg {
+        my ($smsg, $mitem) = @_;
+
+        delete @$smsg{qw(lines bytes num tid)};
+        $smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
+        $smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
+        $smsg->{relevance} = get_pct($mitem) if $mitem;
+
+        if (my $r = delete $smsg->{references}) {
+                $smsg->{references} = [
+                                map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
+        }
+        if (my $m = delete($smsg->{mid})) {
+                $smsg->{'m'} = "<$m>";
+        }
+        for my $f (qw(from to cc)) {
+                my $v = delete $smsg->{$f} or next;
+                $smsg->{substr($f, 0, 1)} = pairs($v);
+        }
+        $smsg->{'s'} = delete $smsg->{subject};
+        # can we be bothered to parse From/To/Cc into arrays?
+        scalar { %$smsg }; # unbless
+}
+
+sub ovv_atexit_child {
+        my ($self, $lei) = @_;
+        my $bref = delete $lei->{ovv_buf} or return;
+        print { $lei->{1} } $$bref;
+}
+
+# JSON module ->pretty output wastes too much vertical white space,
+# this (IMHO) provides better use of screen real-estate while not
+# being excessively compact:
+sub _json_pretty {
+        my ($json, $k, $v) = @_;
+        if (ref $v eq 'ARRAY') {
+                if (@$v) {
+                        my $sep = ",\n" . (' ' x (length($k) + 7));
+                        if (ref($v->[0])) { # f/t/c
+                                $v = '[' . join($sep, map {
+                                        my $pair = $json->encode($_);
+                                        $pair =~ s/(null|"),"/$1, "/g;
+                                        $pair;
+                                } @$v) . ']';
+                        } else { # references
+                                $v = '[' . join($sep, map {
+                                        substr($json->encode([$_]), 1, -1);
+                                } @$v) . ']';
+                        }
+                } else {
+                        $v = '[]';
+                }
+        }
+        qq{  "$k": }.$v;
+}
+
+sub ovv_each_smsg_cb {
+        my ($self, $lei) = @_;
+        $lei->{ovv_buf} = \(my $buf = '');
+        my $json = $self->{json}->new;
+        if ($json) {
+                $json->utf8->canonical;
+                $json->ascii(1) if $lei->{opt}->{ascii};
+        }
+        if ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
+                my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+                sub { # DIY prettiness :P
+                        my ($smsg, $mitem) = @_;
+                        $smsg = _unbless_smsg($smsg, $mitem);
+                        $buf .= "{\n";
+                        $buf .= join(",\n", map {
+                                my $v = $smsg->{$_};
+                                if (ref($v)) {
+                                        _json_pretty($json, $_, $v);
+                                } else {
+                                        $v = $json->encode([$v]);
+                                        qq{  "$_": }.substr($v, 1, -1);
+                                }
+                        } sort keys %$smsg);
+                        $buf .= $EOR;
+                        if (length($buf) > 65536) {
+                                print { $lei->{1} } $buf;
+                                $buf = '';
+                        }
+                }
+        } elsif ($json) {
+                my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+                sub {
+                        my ($smsg, $mitem) = @_;
+                        delete @$smsg{qw(tid num)};
+                        $buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
+                        if (length($buf) > 65536) {
+                                print { $lei->{1} } $buf;
+                                $buf = '';
+                        }
+                }
+        } elsif ($self->{fmt} eq 'oid') {
+                sub {
+                        my ($smsg, $mitem) = @_;
+                }
+        } # else { ...
+}
+
+1;
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 2f4b99e5..7ca01454 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -5,43 +5,8 @@
 package PublicInbox::LeiQuery;
 use strict;
 use v5.10.1;
-use PublicInbox::MID qw($MID_EXTRACT);
-use POSIX qw(strftime);
-use PublicInbox::Address qw(pairs);
 use PublicInbox::DS qw(dwaitpid);
 
-sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
-
-# prepares an smsg for JSON
-sub _smsg_unbless ($) {
-        my ($smsg) = @_;
-
-        delete @$smsg{qw(lines bytes)};
-        $smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
-        $smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
-
-        if (my $r = delete $smsg->{references}) {
-                $smsg->{references} = [
-                                map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
-        }
-        if (my $m = delete($smsg->{mid})) {
-                $smsg->{'m'} = "<$m>";
-        }
-        # XXX breaking to/cc, into structured arrays or tables which
-        # distinguish "$phrase <$address>" causes pretty printing JSON
-        # to take up too much vertical space.  I can't get either
-        # Cpanel::JSON::XS or JSON::XS or jq(1) only indent when
-        # wrapping is necessary, rather than blindly indenting and
-        # adding vertical space everywhere.
-        for my $f (qw(from to cc)) {
-                my $v = delete $smsg->{$f} or next;
-                $smsg->{substr($f, 0, 1)} = $v;
-        }
-        $smsg->{'s'} = delete $smsg->{subject};
-        # can we be bothered to parse From/To/Cc into arrays?
-        scalar { %$smsg }; # unbless
-}
-
 sub _vivify_external { # _externals_each callback
         my ($src, $dir) = @_;
         if (-f "$dir/ei.lock") {
@@ -68,6 +33,7 @@ sub lei_q {
         # src: LeiXSearch || LeiSearch || Inbox
         my @srcs;
         require PublicInbox::LeiXSearch;
+        require PublicInbox::LeiOverview;
         my $lxs = PublicInbox::LeiXSearch->new;
 
         # --external is enabled by default, but allow --no-external
@@ -83,23 +49,9 @@ sub lei_q {
                         // $lxs->wq_workers($j);
         }
         unshift(@srcs, $sto->search) if $opt->{'local'};
-        my $out = $opt->{output} // '-';
-        $out = 'json:/dev/stdout' if $out eq '-';
-        my $isatty = -t $self->{1};
         # no forking workers after this
-        $self->start_pager if $isatty;
-        my $json = substr($out, 0, 5) eq 'json:' ?
-                ref(PublicInbox::Config->json)->new : undef;
-        if ($json) {
-                if ($opt->{pretty} //= $isatty) {
-                        $json->pretty(1)->space_before(0);
-                        $json->indent_length($opt->{indent} // 2);
-                }
-                $json->utf8; # avoid Wide character in print warnings
-                $json->ascii(1) if $opt->{ascii}; # for "\uXXXX"
-                $json->canonical;
-        }
-
+        require PublicInbox::LeiOverview;
+        $self->{ovv} = PublicInbox::LeiOverview->new($self);
         my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
         $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
         $mset_opt{qstr} = join(' ', map {;
@@ -124,7 +76,17 @@ sub lei_q {
         $mset_opt{relevance} //= -2 if $opt->{thread};
         # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
         $self->{mset_opt} = \%mset_opt;
-        $lxs->do_query($self, \@srcs);
+        $self->{ovv}->ovv_begin($self);
+        pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
+        require PublicInbox::EOFpipe;
+        my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self);
+        $lxs->do_query($self, $qry_done, \@srcs);
+        $eof->event_step unless $self->{sock};
+}
+
+sub query_done { # PublicInbox::EOFpipe callback
+        my ($self) = @_;
+        $self->{ovv}->ovv_end($self);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 94f7c2bc..c030b2b2 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -8,7 +8,6 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::Search qw(get_pct);
 use Sys::Syslog qw(syslog);
 
 sub new {
@@ -102,26 +101,26 @@ sub query_thread_mset { # for --thread
         }
         my $mo = { %{$lei->{mset_opt}} };
         my $mset;
+        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
         do {
                 $mset = $srch->mset($mo->{qstr}, $mo);
                 my $ids = $srch->mset_to_artnums($mset, $mo);
                 my $ctx = { ids => $ids };
                 my $i = 0;
-                my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+                my %n2item = map { ($ids->[$i++], $_) } $mset->items;
                 while ($over->expand_thread($ctx)) {
                         for my $n (@{$ctx->{xids}}) {
                                 my $smsg = $over->get_art($n) or next;
                                 # next if $dd->is_smsg_dup($smsg); TODO
-                                if (my $p = delete $n2p{$smsg->{num}}) {
-                                        $smsg->{relevance} = $p;
-                                }
-                                print { $self->{1} } Dumper($smsg);
+                                my $mitem = delete $n2item{$smsg->{num}};
+                                $each_smsg->($smsg, $mitem);
                                 # $self->out($buf .= $ORS);
                                 # $emit_cb->($smsg);
                         }
                         @{$ctx->{xids}} = ();
                 }
         } while (_mset_more($mset, $mo));
+        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
 sub query_mset { # non-parallel for non-"--thread" users
@@ -130,23 +129,24 @@ sub query_mset { # non-parallel for non-"--thread" users
         my $mset;
         local %SIG = (%SIG, $lei->atfork_child_wq($self));
         $self->attach_external($_) for @$srcs;
+        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
         do {
                 $mset = $self->mset($mo->{qstr}, $mo);
                 for my $it ($mset->items) {
                         my $smsg = smsg_for($self, $it) or next;
                         # next if $dd->is_smsg_dup($smsg);
-                        $smsg->{relevance} = get_pct($it);
-                        use Data::Dumper;
-                        print { $self->{1} } Dumper($smsg);
+                        $each_smsg->($smsg, $it);
                         # $self->out($buf .= $ORS) if defined $buf;
                         #$emit_cb->($smsg);
                 }
         } while (_mset_more($mset, $mo));
+        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
 sub do_query {
-        my ($self, $lei_orig, $srcs) = @_;
+        my ($self, $lei_orig, $qry_done, $srcs) = @_;
         my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+        $io[0] = $qry_done; # don't need stdin
         $io[1]->autoflush(1);
         $io[2]->autoflush(1);
         if ($lei->{opt}->{thread}) {
@@ -160,6 +160,9 @@ sub do_query {
         for my $rmt (@{$self->{remotes} // []}) {
                 $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
         }
+
+        # sent off to children, they will drop remaining references to it
+        close $qry_done;
 }
 
 sub ipc_atfork_child {
@@ -170,7 +173,7 @@ sub ipc_atfork_child {
 
 sub ipc_atfork_prepare {
         my ($self) = @_;
-        $self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]);
+        $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
         $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }