From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id DF0F41FB09 for ; Thu, 14 Jan 2021 07:06:28 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 11/14] lei: q: lock stdout on overview output Date: Wed, 13 Jan 2021 19:06:24 -1200 Message-Id: <20210114070627.18195-12-e@80x24.org> In-Reply-To: <20210114070627.18195-1-e@80x24.org> References: <20210114070627.18195-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Most writes to stdout aren't atomic and we need locking to prevent workers from interleaving and corrupting JSON output. The one case stdout won't require locking is if it's pointed to a regular file with O_APPEND; as POSIX O_APPEND semantics guarantees atomicity. --- MANIFEST | 1 + lib/PublicInbox/LeiOverview.pm | 34 ++++++++++++++++++++++++++++++++++ lib/PublicInbox/LeiXSearch.pm | 9 +++++---- lib/PublicInbox/Lock.pm | 2 +- t/lei_overview.t | 33 +++++++++++++++++++++++++++++++++ 5 files changed, 74 insertions(+), 5 deletions(-) create mode 100644 t/lei_overview.t diff --git a/MANIFEST b/MANIFEST index 2ca240fc..0ebdaccc 100644 --- a/MANIFEST +++ b/MANIFEST @@ -338,6 +338,7 @@ t/kqnotify.t t/lei-oneshot.t t/lei.t t/lei_dedupe.t +t/lei_overview.t t/lei_store.t t/lei_to_mail.t t/lei_xsearch.t diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index ec0921ba..44c21837 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -6,8 +6,11 @@ package PublicInbox::LeiOverview; use strict; use v5.10.1; +use parent qw(PublicInbox::Lock); use POSIX qw(strftime); +use Fcntl qw(F_GETFL O_APPEND); use File::Spec; +use File::Temp (); use PublicInbox::MID qw($MID_EXTRACT); use PublicInbox::Address qw(pairs); use PublicInbox::Config; @@ -18,6 +21,23 @@ my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) } +# we open this in the parent process before ->wq_do handoff +sub ovv_out_lk_init ($) { + my ($self) = @_; + $self->{tmp_lk_id} = "$self.$$"; + my $tmp = File::Temp->new("lei-ovv.out.$$.lock-XXXXXX", + TMPDIR => 1, UNLINK => 0); + $self->{lock_path} = $tmp->filename; +} + +sub ovv_out_lk_cancel ($) { + my ($self) = @_; + ($self->{tmp_lk_id}//'') eq "$self.$$" and + unlink(delete($self->{lock_path})); +} + +*DESTROY = \&ovv_out_lk_cancel; + sub new { my ($class, $lei) = @_; my $opt = $lei->{opt}; @@ -50,8 +70,17 @@ sub new { $isatty = -t $lei->{1}; $lei->start_pager if $isatty; $opt->{pretty} //= $isatty; + if (!$isatty && -f _) { + my $fl = fcntl($lei->{1}, F_GETFL, 0) // + return $lei->fail("fcntl(stdout): $!"); + ovv_out_lk_init($self) unless ($fl & O_APPEND); + } else { + ovv_out_lk_init($self); + } } elsif ($json) { return $lei->fail('JSON formats only output to stdout'); + } else { + return $lei->fail("TODO: $out -f $fmt"); } $self; } @@ -109,6 +138,7 @@ sub _unbless_smsg { sub ovv_atexit_child { my ($self, $lei) = @_; if (my $bref = delete $lei->{ovv_buf}) { + my $lk = $self->lock_for_scope; print { $lei->{1} } $$bref; } } @@ -142,7 +172,9 @@ sub _json_pretty { sub ovv_each_smsg_cb { my ($self, $lei) = @_; $lei->{ovv_buf} = \(my $buf = ''); + delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel}; my $json = $self->{json}->new; + $lei->{1}->autoflush(1); if ($json) { $json->utf8->canonical; $json->ascii(1) if $lei->{opt}->{ascii}; @@ -164,6 +196,7 @@ sub ovv_each_smsg_cb { } sort keys %$smsg); $buf .= $EOR; if (length($buf) > 65536) { + my $lk = $self->lock_for_scope; print { $lei->{1} } $buf; $buf = ''; } @@ -175,6 +208,7 @@ sub ovv_each_smsg_cb { delete @$smsg{qw(tid num)}; $buf .= $json->encode(_unbless_smsg(@_)) . $ORS; if (length($buf) > 65536) { + my $lk = $self->lock_for_scope; print { $lei->{1} } $buf; $buf = ''; } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 80e7a7f7..ee93e074 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -158,20 +158,21 @@ sub query_done { # PublicInbox::EOFpipe callback sub do_query { my ($self, $lei_orig, $srcs) = @_; my ($lei, @io) = $lei_orig->atfork_parent_wq($self); - + my $remotes = $self->{remotes} // []; pipe(my ($eof_wait, $qry_done)) or die "pipe $!"; $io[0] = $qry_done; # don't need stdin - $io[1]->autoflush(1); - $io[2]->autoflush(1); + if ($lei->{opt}->{thread}) { + $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1; for my $ibxish (@$srcs) { $self->wq_do('query_thread_mset', \@io, $lei, $ibxish); } } else { + $lei->{-parallel} = scalar(@$remotes); $self->wq_do('query_mset', \@io, $lei, $srcs); } # TODO - for my $rmt (@{$self->{remotes} // []}) { + for my $rmt (@$remotes) { $self->wq_do('query_thread_mbox', \@io, $lei, $rmt); } @io = (); diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm index 2c5ebf27..bb213de4 100644 --- a/lib/PublicInbox/Lock.pm +++ b/lib/PublicInbox/Lock.pm @@ -37,7 +37,7 @@ sub lock_release { # caller must use return value sub lock_for_scope { my ($self, @single_pid) = @_; - $self->lock_acquire; + lock_acquire($self) or return; # lock_path not set PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self); } diff --git a/t/lei_overview.t b/t/lei_overview.t new file mode 100644 index 00000000..896cc01a --- /dev/null +++ b/t/lei_overview.t @@ -0,0 +1,33 @@ +#!perl -w +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ +use strict; +use v5.10.1; +use Test::More; +use PublicInbox::TestCommon; +use POSIX qw(_exit); +require_ok 'PublicInbox::LeiOverview'; + +my $ovv = bless {}, 'PublicInbox::LeiOverview'; +$ovv->ovv_out_lk_init; +my $lock_path = $ovv->{lock_path}; +ok(-f $lock_path, 'lock init'); +undef $ovv; +ok(!-f $lock_path, 'lock DESTROY'); + +$ovv = bless {}, 'PublicInbox::LeiOverview'; +$ovv->ovv_out_lk_init; +$lock_path = $ovv->{lock_path}; +ok(-f $lock_path, 'lock init #2'); +my $pid = fork // BAIL_OUT "fork $!"; +if ($pid == 0) { + undef $ovv; + _exit(0); +} +is(waitpid($pid, 0), $pid, 'child exited'); +is($?, 0, 'no error in child process'); +ok(-f $lock_path, 'lock was not destroyed by child'); +undef $ovv; +ok(!-f $lock_path, 'lock DESTROY #2'); + +done_testing;