about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-03-28 09:01:13 +0000
committerEric Wong <e@80x24.org>2021-03-28 23:01:36 +0000
commit954581b8e575966a8bddc35e3b23d81d16a52833 (patch)
treed5e87f75313f827411796d82871fd6b1d5388e9c /lib/PublicInbox
parent29792d70a5d8305f68521664a7fa2e0fe54ff291 (diff)
downloadpublic-inbox-954581b8e575966a8bddc35e3b23d81d16a52833.tar.gz
Provide a consistent ->op_wait_event method instead of
forcing callers to loop (or not) at each callsite.
This also avoid a leak possibility by avoiding circular
references.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/LEI.pm11
-rw-r--r--lib/PublicInbox/LeiBlob.pm4
-rw-r--r--lib/PublicInbox/LeiConvert.pm4
-rw-r--r--lib/PublicInbox/LeiImport.pm4
-rw-r--r--lib/PublicInbox/LeiMark.pm4
-rw-r--r--lib/PublicInbox/LeiMirror.pm4
-rw-r--r--lib/PublicInbox/LeiP2q.pm4
-rw-r--r--lib/PublicInbox/LeiXSearch.pm8
-rw-r--r--lib/PublicInbox/PktOp.pm20
9 files changed, 35 insertions, 28 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 478912cd..9cacb142 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -494,11 +494,11 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
 }
 
 sub pkt_op_pair {
-        my ($self, $ops) = @_;
+        my ($self) = @_;
         require PublicInbox::OnDestroy;
         require PublicInbox::PktOp;
         my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self);
-        @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair($ops);
+        @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair;
         $end;
 }
 
@@ -512,14 +512,13 @@ sub workers_start {
                 ($ops ? %$ops : ()),
         };
         $ops->{''} //= [ \&dclose, $lei ];
-        my $end = $lei->pkt_op_pair($ops);
+        my $end = $lei->pkt_op_pair;
         $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
         delete $lei->{pkt_op_p};
-        my $op = delete $lei->{pkt_op_c};
+        my $op_c = delete $lei->{pkt_op_c};
         @$end = ();
         $lei->event_step_init;
-        # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
-        $lei->{oneshot} ? $op : undef;
+        ($op_c, $ops);
 }
 
 sub _help {
diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm
index 2facbad3..97747220 100644
--- a/lib/PublicInbox/LeiBlob.pm
+++ b/lib/PublicInbox/LeiBlob.pm
@@ -103,12 +103,12 @@ sub lei_blob {
         my $lxs = $lei->lxs_prepare or return;
         require PublicInbox::SolverGit;
         my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__;
-        my $op = $lei->workers_start($self, 'lei_solve', 1,
+        my ($op_c, $ops) = $lei->workers_start($self, 'lei_solve', 1,
                 { '' => [ \&sol_done, $lei ] });
         $lei->{sol} = $self;
         $self->wq_io_do('do_solve_blob', []);
         $self->wq_close(1);
-        while ($op && $op->{sock}) { $op->event_step }
+        $op_c->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 083ecc33..5d0adb14 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -53,11 +53,11 @@ sub lei_convert { # the main "lei convert" method
         my $devfd = $lei->path_to_fd($ovv->{dst}) // return;
         $lei->{opt}->{augment} = 1 if $devfd < 0;
         $self->prepare_inputs($lei, \@inputs) or return;
-        my $op = $lei->workers_start($self, 'lei_convert', 1);
+        my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
         $lei->{cnv} = $self;
         $self->wq_io_do('do_convert', []);
         $self->wq_close(1);
-        while ($op && $op->{sock}) { $op->event_step }
+        $op_c->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 7c5b7d09..803b5cda 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -76,11 +76,11 @@ sub lei_import { # the main "lei import" method
         my $ops = { '' => [ \&import_done, $lei ] };
         $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
         $self->{-wq_nr_workers} = $j // 1; # locked
-        my $op = $lei->workers_start($self, 'lei_import', undef, $ops);
+        my ($op_c, undef) = $lei->workers_start($self, 'lei_import', $j, $ops);
         $lei->{imp} = $self;
         $self->wq_io_do('input_stdin', []) if $self->{0};
         net_merge_complete($self) unless $lei->{auth};
-        while ($op && $op->{sock}) { $op->event_step }
+        $op_c->op_wait_event($ops);
 }
 
 no warnings 'once';
diff --git a/lib/PublicInbox/LeiMark.pm b/lib/PublicInbox/LeiMark.pm
index 34846b84..6e611318 100644
--- a/lib/PublicInbox/LeiMark.pm
+++ b/lib/PublicInbox/LeiMark.pm
@@ -116,11 +116,11 @@ sub lei_mark { # the "lei mark" method
         my $ops = { '' => [ \&mark_done, $lei ] };
         $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
         $self->{vmd_mod} = $vmd_mod;
-        my $op = $lei->workers_start($self, 'lei_mark', 1, $ops);
+        my ($op_c, undef) = $lei->workers_start($self, 'lei_mark', 1, $ops);
         $lei->{mark} = $self;
         $self->wq_io_do('input_stdin', []) if $self->{0};
         net_merge_complete($self) unless $lei->{auth};
-        while ($op && $op->{sock}) { $op->event_step }
+        $op_c->op_wait_event($ops);
 }
 
 sub note_missing {
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index c83386c6..89574d28 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -282,13 +282,13 @@ sub start {
         require PublicInbox::Inbox;
         require PublicInbox::Admin;
         require PublicInbox::InboxWritable;
-        my $op = $lei->workers_start($self, 'lei_mirror', 1, {
+        my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1, {
                 '' => [ \&mirror_done, $lei ]
         });
         $lei->{mrr} = $self;
         $self->wq_io_do('do_mirror', []);
         $self->wq_close(1);
-        while ($op && $op->{sock}) { $op->event_step }
+        $op->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm
index 25f63a10..a8a3dd2c 100644
--- a/lib/PublicInbox/LeiP2q.pm
+++ b/lib/PublicInbox/LeiP2q.pm
@@ -185,11 +185,11 @@ sub lei_p2q { # the "lei patch-to-query" entry point
         } else {
                 $self->{input} = $input;
         }
-        my $op = $lei->workers_start($self, 'lei_p2q', 1);
+        my ($op, $ops) = $lei->workers_start($self, 'lei_p2q', 1);
         $lei->{p2q} = $self;
         $self->wq_io_do('do_p2q', []);
         $self->wq_close(1);
-        while ($op && $op->{sock}) { $op->event_step }
+        $op->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b41daffe..1a194f1c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -427,7 +427,7 @@ sub do_query {
                 'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
         };
         $lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
-        my $end = $lei->pkt_op_pair($ops);
+        my $end = $lei->pkt_op_pair;
         $lei->{1}->autoflush(1);
         $lei->start_pager if delete $lei->{need_pager};
         $lei->{ovv}->ovv_begin($lei);
@@ -445,7 +445,7 @@ sub do_query {
         }
         $self->wq_workers_start('lei_xsearch', undef,
                                 $lei->oldset, { lei => $lei });
-        my $op = delete $lei->{pkt_op_c};
+        my $op_c = delete $lei->{pkt_op_c};
         delete $lei->{pkt_op_p};
         @$end = ();
         $self->{threads} = $lei->{opt}->{threads};
@@ -455,9 +455,7 @@ sub do_query {
                 start_query($self);
         }
         $lei->event_step_init; # wait for shutdowns
-        if ($lei->{oneshot}) {
-                while ($op->{sock}) { $op->event_step }
-        }
+        $op_c->op_wait_event($ops);
 }
 
 sub add_uri {
diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm
index 5d8e78ea..c3221735 100644
--- a/lib/PublicInbox/PktOp.pm
+++ b/lib/PublicInbox/PktOp.pm
@@ -16,21 +16,23 @@ use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
 our @EXPORT_OK = qw(pkt_do);
 
 sub new {
-        my ($cls, $r, $ops) = @_;
-        my $self = bless { sock => $r, ops => $ops }, $cls;
+        my ($cls, $r) = @_;
+        my $self = bless { sock => $r }, $cls;
         if ($PublicInbox::DS::in_loop) { # iff using DS->EventLoop
                 $r->blocking(0);
                 $self->SUPER::new($r, EPOLLIN|EPOLLET);
+        } else {
+                $self->{blocking} = 1;
         }
         $self;
 }
 
 # returns a blessed object as the consumer, and a GLOB/IO for the producer
 sub pair {
-        my ($cls, $ops) = @_;
+        my ($cls) = @_;
         my ($c, $p);
         socketpair($c, $p, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
-        (new($cls, $c, $ops), $p);
+        (new($cls, $c), $p);
 }
 
 sub pkt_do { # for the producer to trigger event_step in consumer
@@ -41,7 +43,7 @@ sub pkt_do { # for the producer to trigger event_step in consumer
 sub close {
         my ($self) = @_;
         my $c = $self->{sock} or return;
-        $c->blocking ? delete($self->{sock}) : $self->SUPER::close;
+        $self->{blocking} ? delete($self->{sock}) : $self->SUPER::close;
 }
 
 sub event_step {
@@ -73,4 +75,12 @@ sub event_step {
         }
 }
 
+# call this when we're ready to wait on events,
+# returns immediately if non-blocking
+sub op_wait_event {
+        my ($self, $ops) = @_;
+        $self->{ops} = $ops;
+        while ($self->{blocking} && $self->{sock}) { event_step($self) }
+}
+
 1;