about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--MANIFEST1
-rw-r--r--lib/PublicInbox/LeiOverview.pm35
-rw-r--r--lib/PublicInbox/LeiXSearch.pm9
-rw-r--r--lib/PublicInbox/Lock.pm2
-rw-r--r--t/lei_overview.t33
5 files changed, 75 insertions, 5 deletions
diff --git a/MANIFEST b/MANIFEST
index 2ca240fc..0ebdaccc 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -338,6 +338,7 @@ t/kqnotify.t
 t/lei-oneshot.t
 t/lei.t
 t/lei_dedupe.t
+t/lei_overview.t
 t/lei_store.t
 t/lei_to_mail.t
 t/lei_xsearch.t
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index ec0921ba..ef5f27c1 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -6,8 +6,11 @@
 package PublicInbox::LeiOverview;
 use strict;
 use v5.10.1;
+use parent qw(PublicInbox::Lock);
 use POSIX qw(strftime);
+use Fcntl qw(F_GETFL O_APPEND);
 use File::Spec;
+use File::Temp ();
 use PublicInbox::MID qw($MID_EXTRACT);
 use PublicInbox::Address qw(pairs);
 use PublicInbox::Config;
@@ -18,6 +21,21 @@ my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
 
 sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 
+# we open this in the parent process before ->wq_do handoff
+sub ovv_out_lk_init ($) {
+        my ($self) = @_;
+        $self->{tmp_lk_id} = "$self.$$";
+        my $tmp = File::Temp->new("lei-ovv.out.$$.lock-XXXXXX",
+                                        TMPDIR => 1, UNLINK => 0);
+        $self->{lock_path} = $tmp->filename;
+}
+
+sub ovv_out_lk_cancel ($) {
+        my ($self) = @_;
+        ($self->{tmp_lk_id}//'') eq "$self.$$" and
+                unlink(delete($self->{lock_path}));
+}
+
 sub new {
         my ($class, $lei) = @_;
         my $opt = $lei->{opt};
@@ -50,8 +68,17 @@ sub new {
                 $isatty = -t $lei->{1};
                 $lei->start_pager if $isatty;
                 $opt->{pretty} //= $isatty;
+                if (!$isatty && -f _) {
+                        my $fl = fcntl($lei->{1}, F_GETFL, 0) //
+                                return $lei->fail("fcntl(stdout): $!");
+                        ovv_out_lk_init($self) unless ($fl & O_APPEND);
+                } else {
+                        ovv_out_lk_init($self);
+                }
         } elsif ($json) {
                 return $lei->fail('JSON formats only output to stdout');
+        } else {
+                return $lei->fail("TODO: $out -f $fmt");
         }
         $self;
 }
@@ -109,6 +136,7 @@ sub _unbless_smsg {
 sub ovv_atexit_child {
         my ($self, $lei) = @_;
         if (my $bref = delete $lei->{ovv_buf}) {
+                my $lk = $self->lock_for_scope;
                 print { $lei->{1} } $$bref;
         }
 }
@@ -142,7 +170,9 @@ sub _json_pretty {
 sub ovv_each_smsg_cb {
         my ($self, $lei) = @_;
         $lei->{ovv_buf} = \(my $buf = '');
+        delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
         my $json = $self->{json}->new;
+        $lei->{1}->autoflush(1);
         if ($json) {
                 $json->utf8->canonical;
                 $json->ascii(1) if $lei->{opt}->{ascii};
@@ -164,6 +194,7 @@ sub ovv_each_smsg_cb {
                         } sort keys %$smsg);
                         $buf .= $EOR;
                         if (length($buf) > 65536) {
+                                my $lk = $self->lock_for_scope;
                                 print { $lei->{1} } $buf;
                                 $buf = '';
                         }
@@ -175,6 +206,7 @@ sub ovv_each_smsg_cb {
                         delete @$smsg{qw(tid num)};
                         $buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
                         if (length($buf) > 65536) {
+                                my $lk = $self->lock_for_scope;
                                 print { $lei->{1} } $buf;
                                 $buf = '';
                         }
@@ -186,4 +218,7 @@ sub ovv_each_smsg_cb {
         } # else { ...
 }
 
+no warnings 'once';
+*DESTROY = \&ovv_out_lk_cancel;
+
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 80e7a7f7..ee93e074 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -158,20 +158,21 @@ sub query_done { # PublicInbox::EOFpipe callback
 sub do_query {
         my ($self, $lei_orig, $srcs) = @_;
         my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
-
+        my $remotes = $self->{remotes} // [];
         pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
         $io[0] = $qry_done; # don't need stdin
-        $io[1]->autoflush(1);
-        $io[2]->autoflush(1);
+
         if ($lei->{opt}->{thread}) {
+                $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
                 for my $ibxish (@$srcs) {
                         $self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
                 }
         } else {
+                $lei->{-parallel} = scalar(@$remotes);
                 $self->wq_do('query_mset', \@io, $lei, $srcs);
         }
         # TODO
-        for my $rmt (@{$self->{remotes} // []}) {
+        for my $rmt (@$remotes) {
                 $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
         }
         @io = ();
diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm
index 2c5ebf27..bb213de4 100644
--- a/lib/PublicInbox/Lock.pm
+++ b/lib/PublicInbox/Lock.pm
@@ -37,7 +37,7 @@ sub lock_release {
 # caller must use return value
 sub lock_for_scope {
         my ($self, @single_pid) = @_;
-        $self->lock_acquire;
+        lock_acquire($self) or return; # lock_path not set
         PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self);
 }
 
diff --git a/t/lei_overview.t b/t/lei_overview.t
new file mode 100644
index 00000000..896cc01a
--- /dev/null
+++ b/t/lei_overview.t
@@ -0,0 +1,33 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use POSIX qw(_exit);
+require_ok 'PublicInbox::LeiOverview';
+
+my $ovv = bless {}, 'PublicInbox::LeiOverview';
+$ovv->ovv_out_lk_init;
+my $lock_path = $ovv->{lock_path};
+ok(-f $lock_path, 'lock init');
+undef $ovv;
+ok(!-f $lock_path, 'lock DESTROY');
+
+$ovv = bless {}, 'PublicInbox::LeiOverview';
+$ovv->ovv_out_lk_init;
+$lock_path = $ovv->{lock_path};
+ok(-f $lock_path, 'lock init #2');
+my $pid = fork // BAIL_OUT "fork $!";
+if ($pid == 0) {
+        undef $ovv;
+        _exit(0);
+}
+is(waitpid($pid, 0), $pid, 'child exited');
+is($?, 0, 'no error in child process');
+ok(-f $lock_path, 'lock was not destroyed by child');
+undef $ovv;
+ok(!-f $lock_path, 'lock DESTROY #2');
+
+done_testing;