From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id A6B111F934 for ; Tue, 2 Feb 2021 11:47:02 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 01/16] lei: switch to use SEQPACKET socketpair instead of pipe Date: Tue, 2 Feb 2021 11:46:47 +0000 Message-Id: <20210202114702.29886-2-e@80x24.org> In-Reply-To: <20210202114702.29886-1-e@80x24.org> References: <20210202114702.29886-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This will allow us to use larger messages and do progress reporting to accumulate in the main daemon. --- MANIFEST | 2 +- lib/PublicInbox/LEI.pm | 8 ++-- lib/PublicInbox/LeiXSearch.pm | 27 ++++++------ lib/PublicInbox/OpPipe.pm | 41 ------------------ lib/PublicInbox/PktOp.pm | 79 +++++++++++++++++++++++++++++++++++ 5 files changed, 98 insertions(+), 59 deletions(-) delete mode 100644 lib/PublicInbox/OpPipe.pm create mode 100644 lib/PublicInbox/PktOp.pm diff --git a/MANIFEST b/MANIFEST index 017dc7f2..bcb9d08e 100644 --- a/MANIFEST +++ b/MANIFEST @@ -205,9 +205,9 @@ lib/PublicInbox/NNTPD.pm lib/PublicInbox/NNTPdeflate.pm lib/PublicInbox/NewsWWW.pm lib/PublicInbox/OnDestroy.pm -lib/PublicInbox/OpPipe.pm lib/PublicInbox/Over.pm lib/PublicInbox/OverIdx.pm +lib/PublicInbox/PktOp.pm lib/PublicInbox/ProcessPipe.pm lib/PublicInbox/Qspawn.pm lib/PublicInbox/Reply.pm diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 17ad18b9..737db1e1 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -306,7 +306,7 @@ sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) } sub fail ($$;$) { my ($self, $buf, $exit_code) = @_; err($self, $buf) if defined $buf; - syswrite($self->{op_pipe}, '!') if $self->{op_pipe}; # fail_handler + send($self->{pkt_op}, '!', MSG_EOR) if $self->{pkt_op}; # fail_handler x_it($self, ($exit_code // 1) << 8); undef; } @@ -369,14 +369,14 @@ sub io_restore ($$) { sub note_sigpipe { # triggers sigpipe_handler my ($self, $fd) = @_; close(delete($self->{$fd})); # explicit close silences Perl warning - syswrite($self->{op_pipe}, '|') if $self->{op_pipe}; + send($self->{pkt_op}, '|', MSG_EOR) if $self->{pkt_op}; x_it($self, 13); } sub atfork_child_wq { my ($self, $wq) = @_; io_restore($self, $wq); - -p $self->{op_pipe} or die 'BUG: {op_pipe} expected'; + -S $self->{pkt_op} or die 'BUG: {pkt_op} expected'; io_restore($self->{l2m}, $wq); %PATH2CFG = (); undef $errors_log; @@ -408,7 +408,7 @@ sub atfork_parent_wq { $self->{env} = $env; delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m my @io = (delete(@$lei{qw(0 1 2)}), - io_extract($lei, qw(sock op_pipe startq))); + io_extract($lei, qw(sock pkt_op startq))); my $l2m = $lei->{l2m}; if ($l2m && $l2m != $wq) { # $wq == lxs if (my $wq_s1 = $l2m->{-wq_s1}) { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index f630e79a..e577ab09 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -9,10 +9,11 @@ use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); use PublicInbox::DS qw(dwaitpid); -use PublicInbox::OpPipe; +use PublicInbox::PktOp; use PublicInbox::Import; use File::Temp 0.19 (); # 0.19 for ->newdir use File::Spec (); +use Socket qw(MSG_EOR); use PublicInbox::Search qw(xap_terms); use PublicInbox::Spawn qw(popen_rd spawn which); use PublicInbox::MID qw(mids); @@ -353,7 +354,8 @@ sub query_prepare { # called by wq_do delete $lei->{l2m}->{-wq_s1}; eval { $lei->{l2m}->do_augment($lei) }; $lei->fail($@) if $@; - syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!" + send($lei->{pkt_op}, '.', MSG_EOR) == 1 or + die "do_post_augment trigger: $!" } sub fail_handler ($;$$) { @@ -380,20 +382,19 @@ sub do_query { fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; $zpipe = $l2m->pre_augment($lei); } - pipe(my $done, $lei->{op_pipe}) or die "pipe $!"; + my $in_loop = exists $lei->{sock}; + my $ops = { + '|' => [ \&sigpipe_handler, $lei ], + '!' => [ \&fail_handler, $lei ], + '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ], + '' => [ \&query_done, $lei ], + }; + (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop); my ($lei_ipc, @io) = $lei->atfork_parent_wq($self); - delete($lei->{op_pipe}); + delete($lei->{pkt_op}); $lei->event_step_init; # wait for shutdowns - my $done_op = { - '' => [ \&query_done, $lei ], - '|' => [ \&sigpipe_handler, $lei ], - '!' => [ \&fail_handler, $lei ] - }; - my $in_loop = exists $lei->{sock}; - $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop); if ($l2m) { - $done_op->{'.'} = [ \&do_post_augment, $lei, $zpipe, $au_done ]; $self->wq_do('query_prepare', \@io, $lei_ipc); $io[1] = $zpipe->[1] if $zpipe; } @@ -401,7 +402,7 @@ sub do_query { $self->wq_close(1); unless ($in_loop) { # for the $lei_ipc->atfork_child_wq PIPE handler: - while ($done->{sock}) { $done->event_step } + while ($op->{sock}) { $op->event_step } } } diff --git a/lib/PublicInbox/OpPipe.pm b/lib/PublicInbox/OpPipe.pm deleted file mode 100644 index 295a8aa5..00000000 --- a/lib/PublicInbox/OpPipe.pm +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright (C) 2021 all contributors -# License: AGPL-3.0+ - -# bytecode dispatch pipe, reads a byte, runs a sub -# byte => [ sub, @operands ] -package PublicInbox::OpPipe; -use strict; -use v5.10.1; -use parent qw(PublicInbox::DS); -use PublicInbox::Syscall qw(EPOLLIN); - -sub new { - my ($cls, $rd, $op_map, $in_loop) = @_; - my $self = bless { sock => $rd, op_map => $op_map }, $cls; - # 1031: F_SETPIPE_SZ, 4096: page size - fcntl($rd, 1031, 4096) if $^O eq 'linux'; - if ($in_loop) { # iff using DS->EventLoop - $rd->blocking(0); - $self->SUPER::new($rd, EPOLLIN); - } - $self; -} - -sub event_step { - my ($self) = @_; - my $rd = $self->{sock}; - my $byte; - until (defined(sysread($rd, $byte, 1))) { - return if $!{EAGAIN}; - next if $!{EINTR}; - die "read \$rd: $!"; - } - my $op = $self->{op_map}->{$byte} or die "BUG: unknown byte `$byte'"; - if ($byte eq '') { # close on EOF - $rd->blocking ? delete($self->{sock}) : $self->close; - } - my ($sub, @args) = @$op; - $sub->(@args); -} - -1; diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm new file mode 100644 index 00000000..d5b95a73 --- /dev/null +++ b/lib/PublicInbox/PktOp.pm @@ -0,0 +1,79 @@ +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ + +# op dispatch socket, reads a message, runs a sub +# There may be multiple producers, but (for now) only one consumer +# Used for lei_xsearch and maybe other things +# "literal" => [ sub, @operands ] +# /regexp/ => [ sub, @operands ] +package PublicInbox::PktOp; +use strict; +use v5.10.1; +use parent qw(PublicInbox::DS); +use Errno qw(EAGAIN EINTR); +use PublicInbox::Syscall qw(EPOLLIN EPOLLET); +use Socket qw(AF_UNIX MSG_EOR SOCK_SEQPACKET); + +sub new { + my ($cls, $r, $ops, $in_loop) = @_; + my $self = bless { sock => $r, ops => $ops, re => [] }, $cls; + if (ref($ops) eq 'ARRAY') { + my %ops; + for my $op (@$ops) { + if (ref($op->[0])) { + push @{$self->{re}}, $op; + } else { + $ops{$op->[0]} = $op->[1]; + } + } + $self->{ops} = \%ops; + } + if ($in_loop) { # iff using DS->EventLoop + $r->blocking(0); + $self->SUPER::new($r, EPOLLIN|EPOLLET); + } + $self; +} + +# returns a blessed object as the consumer, and a GLOB/IO for the producer +sub pair { + my ($cls, $ops, $in_loop) = @_; + my ($c, $p); + socketpair($c, $p, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!"; + (new($cls, $c, $ops, $in_loop), $p); +} + +sub close { + my ($self) = @_; + my $c = $self->{sock} or return; + $c->blocking ? delete($self->{sock}) : $self->SUPER::close; +} + +sub event_step { + my ($self) = @_; + my $c = $self->{sock}; + my $msg; + do { + my $n = recv($c, $msg, 128, 0); + unless (defined $n) { + return if $! == EAGAIN; + next if $! == EINTR; + $self->close; + die "recv: $!"; + } + my $op = $self->{ops}->{$msg}; + unless ($op) { + for my $re_op (@{$self->{re}}) { + $msg =~ $re_op->[0] or next; + $op = $re_op->[1]; + last; + } + } + die "BUG: unknown message: `$msg'" unless $op; + my ($sub, @args) = @$op; + $sub->(@args); + return $self->close if $msg eq ''; # close on EOF + } while (1); +} + +1;