diff options
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/LeiOverview.pm | 35 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 9 | ||||
-rw-r--r-- | lib/PublicInbox/Lock.pm | 2 |
3 files changed, 41 insertions, 5 deletions
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index ec0921ba..ef5f27c1 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -6,8 +6,11 @@ package PublicInbox::LeiOverview; use strict; use v5.10.1; +use parent qw(PublicInbox::Lock); use POSIX qw(strftime); +use Fcntl qw(F_GETFL O_APPEND); use File::Spec; +use File::Temp (); use PublicInbox::MID qw($MID_EXTRACT); use PublicInbox::Address qw(pairs); use PublicInbox::Config; @@ -18,6 +21,21 @@ my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) } +# we open this in the parent process before ->wq_do handoff +sub ovv_out_lk_init ($) { + my ($self) = @_; + $self->{tmp_lk_id} = "$self.$$"; + my $tmp = File::Temp->new("lei-ovv.out.$$.lock-XXXXXX", + TMPDIR => 1, UNLINK => 0); + $self->{lock_path} = $tmp->filename; +} + +sub ovv_out_lk_cancel ($) { + my ($self) = @_; + ($self->{tmp_lk_id}//'') eq "$self.$$" and + unlink(delete($self->{lock_path})); +} + sub new { my ($class, $lei) = @_; my $opt = $lei->{opt}; @@ -50,8 +68,17 @@ sub new { $isatty = -t $lei->{1}; $lei->start_pager if $isatty; $opt->{pretty} //= $isatty; + if (!$isatty && -f _) { + my $fl = fcntl($lei->{1}, F_GETFL, 0) // + return $lei->fail("fcntl(stdout): $!"); + ovv_out_lk_init($self) unless ($fl & O_APPEND); + } else { + ovv_out_lk_init($self); + } } elsif ($json) { return $lei->fail('JSON formats only output to stdout'); + } else { + return $lei->fail("TODO: $out -f $fmt"); } $self; } @@ -109,6 +136,7 @@ sub _unbless_smsg { sub ovv_atexit_child { my ($self, $lei) = @_; if (my $bref = delete $lei->{ovv_buf}) { + my $lk = $self->lock_for_scope; print { $lei->{1} } $$bref; } } @@ -142,7 +170,9 @@ sub _json_pretty { sub ovv_each_smsg_cb { my ($self, $lei) = @_; $lei->{ovv_buf} = \(my $buf = ''); + delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel}; my $json = $self->{json}->new; + $lei->{1}->autoflush(1); if ($json) { $json->utf8->canonical; $json->ascii(1) if $lei->{opt}->{ascii}; @@ -164,6 +194,7 @@ sub ovv_each_smsg_cb { } sort keys %$smsg); $buf .= $EOR; if (length($buf) > 65536) { + my $lk = $self->lock_for_scope; print { $lei->{1} } $buf; $buf = ''; } @@ -175,6 +206,7 @@ sub ovv_each_smsg_cb { delete @$smsg{qw(tid num)}; $buf .= $json->encode(_unbless_smsg(@_)) . $ORS; if (length($buf) > 65536) { + my $lk = $self->lock_for_scope; print { $lei->{1} } $buf; $buf = ''; } @@ -186,4 +218,7 @@ sub ovv_each_smsg_cb { } # else { ... } +no warnings 'once'; +*DESTROY = \&ovv_out_lk_cancel; + 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 80e7a7f7..ee93e074 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -158,20 +158,21 @@ sub query_done { # PublicInbox::EOFpipe callback sub do_query { my ($self, $lei_orig, $srcs) = @_; my ($lei, @io) = $lei_orig->atfork_parent_wq($self); - + my $remotes = $self->{remotes} // []; pipe(my ($eof_wait, $qry_done)) or die "pipe $!"; $io[0] = $qry_done; # don't need stdin - $io[1]->autoflush(1); - $io[2]->autoflush(1); + if ($lei->{opt}->{thread}) { + $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1; for my $ibxish (@$srcs) { $self->wq_do('query_thread_mset', \@io, $lei, $ibxish); } } else { + $lei->{-parallel} = scalar(@$remotes); $self->wq_do('query_mset', \@io, $lei, $srcs); } # TODO - for my $rmt (@{$self->{remotes} // []}) { + for my $rmt (@$remotes) { $self->wq_do('query_thread_mbox', \@io, $lei, $rmt); } @io = (); diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm index 2c5ebf27..bb213de4 100644 --- a/lib/PublicInbox/Lock.pm +++ b/lib/PublicInbox/Lock.pm @@ -37,7 +37,7 @@ sub lock_release { # caller must use return value sub lock_for_scope { my ($self, @single_pid) = @_; - $self->lock_acquire; + lock_acquire($self) or return; # lock_path not set PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self); } |