user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 01/12] lei: simplify PktOp callers
Date: Sun, 28 Mar 2021 09:01:13 +0000	[thread overview]
Message-ID: <20210328090124.3541-2-e@80x24.org> (raw)
In-Reply-To: <20210328090124.3541-1-e@80x24.org>

Provide a consistent ->op_wait_event method instead of
forcing callers to loop (or not) at each callsite.
This also avoid a leak possibility by avoiding circular
references.
---
 lib/PublicInbox/LEI.pm        | 11 +++++------
 lib/PublicInbox/LeiBlob.pm    |  4 ++--
 lib/PublicInbox/LeiConvert.pm |  4 ++--
 lib/PublicInbox/LeiImport.pm  |  4 ++--
 lib/PublicInbox/LeiMark.pm    |  4 ++--
 lib/PublicInbox/LeiMirror.pm  |  4 ++--
 lib/PublicInbox/LeiP2q.pm     |  4 ++--
 lib/PublicInbox/LeiXSearch.pm |  8 +++-----
 lib/PublicInbox/PktOp.pm      | 20 +++++++++++++++-----
 9 files changed, 35 insertions(+), 28 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 478912cd..9cacb142 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -494,11 +494,11 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
 }
 
 sub pkt_op_pair {
-	my ($self, $ops) = @_;
+	my ($self) = @_;
 	require PublicInbox::OnDestroy;
 	require PublicInbox::PktOp;
 	my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self);
-	@$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair($ops);
+	@$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair;
 	$end;
 }
 
@@ -512,14 +512,13 @@ sub workers_start {
 		($ops ? %$ops : ()),
 	};
 	$ops->{''} //= [ \&dclose, $lei ];
-	my $end = $lei->pkt_op_pair($ops);
+	my $end = $lei->pkt_op_pair;
 	$wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
 	delete $lei->{pkt_op_p};
-	my $op = delete $lei->{pkt_op_c};
+	my $op_c = delete $lei->{pkt_op_c};
 	@$end = ();
 	$lei->event_step_init;
-	# oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
-	$lei->{oneshot} ? $op : undef;
+	($op_c, $ops);
 }
 
 sub _help {
diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm
index 2facbad3..97747220 100644
--- a/lib/PublicInbox/LeiBlob.pm
+++ b/lib/PublicInbox/LeiBlob.pm
@@ -103,12 +103,12 @@ sub lei_blob {
 	my $lxs = $lei->lxs_prepare or return;
 	require PublicInbox::SolverGit;
 	my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__;
-	my $op = $lei->workers_start($self, 'lei_solve', 1,
+	my ($op_c, $ops) = $lei->workers_start($self, 'lei_solve', 1,
 		{ '' => [ \&sol_done, $lei ] });
 	$lei->{sol} = $self;
 	$self->wq_io_do('do_solve_blob', []);
 	$self->wq_close(1);
-	while ($op && $op->{sock}) { $op->event_step }
+	$op_c->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 083ecc33..5d0adb14 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -53,11 +53,11 @@ sub lei_convert { # the main "lei convert" method
 	my $devfd = $lei->path_to_fd($ovv->{dst}) // return;
 	$lei->{opt}->{augment} = 1 if $devfd < 0;
 	$self->prepare_inputs($lei, \@inputs) or return;
-	my $op = $lei->workers_start($self, 'lei_convert', 1);
+	my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
 	$lei->{cnv} = $self;
 	$self->wq_io_do('do_convert', []);
 	$self->wq_close(1);
-	while ($op && $op->{sock}) { $op->event_step }
+	$op_c->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 7c5b7d09..803b5cda 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -76,11 +76,11 @@ sub lei_import { # the main "lei import" method
 	my $ops = { '' => [ \&import_done, $lei ] };
 	$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
 	$self->{-wq_nr_workers} = $j // 1; # locked
-	my $op = $lei->workers_start($self, 'lei_import', undef, $ops);
+	my ($op_c, undef) = $lei->workers_start($self, 'lei_import', $j, $ops);
 	$lei->{imp} = $self;
 	$self->wq_io_do('input_stdin', []) if $self->{0};
 	net_merge_complete($self) unless $lei->{auth};
-	while ($op && $op->{sock}) { $op->event_step }
+	$op_c->op_wait_event($ops);
 }
 
 no warnings 'once';
diff --git a/lib/PublicInbox/LeiMark.pm b/lib/PublicInbox/LeiMark.pm
index 34846b84..6e611318 100644
--- a/lib/PublicInbox/LeiMark.pm
+++ b/lib/PublicInbox/LeiMark.pm
@@ -116,11 +116,11 @@ sub lei_mark { # the "lei mark" method
 	my $ops = { '' => [ \&mark_done, $lei ] };
 	$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
 	$self->{vmd_mod} = $vmd_mod;
-	my $op = $lei->workers_start($self, 'lei_mark', 1, $ops);
+	my ($op_c, undef) = $lei->workers_start($self, 'lei_mark', 1, $ops);
 	$lei->{mark} = $self;
 	$self->wq_io_do('input_stdin', []) if $self->{0};
 	net_merge_complete($self) unless $lei->{auth};
-	while ($op && $op->{sock}) { $op->event_step }
+	$op_c->op_wait_event($ops);
 }
 
 sub note_missing {
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index c83386c6..89574d28 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -282,13 +282,13 @@ sub start {
 	require PublicInbox::Inbox;
 	require PublicInbox::Admin;
 	require PublicInbox::InboxWritable;
-	my $op = $lei->workers_start($self, 'lei_mirror', 1, {
+	my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1, {
 		'' => [ \&mirror_done, $lei ]
 	});
 	$lei->{mrr} = $self;
 	$self->wq_io_do('do_mirror', []);
 	$self->wq_close(1);
-	while ($op && $op->{sock}) { $op->event_step }
+	$op->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm
index 25f63a10..a8a3dd2c 100644
--- a/lib/PublicInbox/LeiP2q.pm
+++ b/lib/PublicInbox/LeiP2q.pm
@@ -185,11 +185,11 @@ sub lei_p2q { # the "lei patch-to-query" entry point
 	} else {
 		$self->{input} = $input;
 	}
-	my $op = $lei->workers_start($self, 'lei_p2q', 1);
+	my ($op, $ops) = $lei->workers_start($self, 'lei_p2q', 1);
 	$lei->{p2q} = $self;
 	$self->wq_io_do('do_p2q', []);
 	$self->wq_close(1);
-	while ($op && $op->{sock}) { $op->event_step }
+	$op->op_wait_event($ops);
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b41daffe..1a194f1c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -427,7 +427,7 @@ sub do_query {
 		'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
 	};
 	$lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
-	my $end = $lei->pkt_op_pair($ops);
+	my $end = $lei->pkt_op_pair;
 	$lei->{1}->autoflush(1);
 	$lei->start_pager if delete $lei->{need_pager};
 	$lei->{ovv}->ovv_begin($lei);
@@ -445,7 +445,7 @@ sub do_query {
 	}
 	$self->wq_workers_start('lei_xsearch', undef,
 				$lei->oldset, { lei => $lei });
-	my $op = delete $lei->{pkt_op_c};
+	my $op_c = delete $lei->{pkt_op_c};
 	delete $lei->{pkt_op_p};
 	@$end = ();
 	$self->{threads} = $lei->{opt}->{threads};
@@ -455,9 +455,7 @@ sub do_query {
 		start_query($self);
 	}
 	$lei->event_step_init; # wait for shutdowns
-	if ($lei->{oneshot}) {
-		while ($op->{sock}) { $op->event_step }
-	}
+	$op_c->op_wait_event($ops);
 }
 
 sub add_uri {
diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm
index 5d8e78ea..c3221735 100644
--- a/lib/PublicInbox/PktOp.pm
+++ b/lib/PublicInbox/PktOp.pm
@@ -16,21 +16,23 @@ use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
 our @EXPORT_OK = qw(pkt_do);
 
 sub new {
-	my ($cls, $r, $ops) = @_;
-	my $self = bless { sock => $r, ops => $ops }, $cls;
+	my ($cls, $r) = @_;
+	my $self = bless { sock => $r }, $cls;
 	if ($PublicInbox::DS::in_loop) { # iff using DS->EventLoop
 		$r->blocking(0);
 		$self->SUPER::new($r, EPOLLIN|EPOLLET);
+	} else {
+		$self->{blocking} = 1;
 	}
 	$self;
 }
 
 # returns a blessed object as the consumer, and a GLOB/IO for the producer
 sub pair {
-	my ($cls, $ops) = @_;
+	my ($cls) = @_;
 	my ($c, $p);
 	socketpair($c, $p, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
-	(new($cls, $c, $ops), $p);
+	(new($cls, $c), $p);
 }
 
 sub pkt_do { # for the producer to trigger event_step in consumer
@@ -41,7 +43,7 @@ sub pkt_do { # for the producer to trigger event_step in consumer
 sub close {
 	my ($self) = @_;
 	my $c = $self->{sock} or return;
-	$c->blocking ? delete($self->{sock}) : $self->SUPER::close;
+	$self->{blocking} ? delete($self->{sock}) : $self->SUPER::close;
 }
 
 sub event_step {
@@ -73,4 +75,12 @@ sub event_step {
 	}
 }
 
+# call this when we're ready to wait on events,
+# returns immediately if non-blocking
+sub op_wait_event {
+	my ($self, $ops) = @_;
+	$self->{ops} = $ops;
+	while ($self->{blocking} && $self->{sock}) { event_step($self) }
+}
+
 1;

  reply	other threads:[~2021-03-28  9:01 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-03-28  9:01 [PATCH 00/12] lei blob and some yak-shaving Eric Wong
2021-03-28  9:01 ` Eric Wong [this message]
2021-03-28  9:01 ` [PATCH 02/12] lei init: split out into separate file Eric Wong
2021-03-28  9:01 ` [PATCH 03/12] lei blob: dclose if already failed Eric Wong
2021-03-28  9:01 ` [PATCH 04/12] lei blob: support --no-mail switch Eric Wong
2021-03-28  9:01 ` [PATCH 05/12] lei blob: fail early if no git dirs Eric Wong
2021-03-28  9:01 ` [PATCH 06/12] lei blob: some extra tests Eric Wong
2021-03-28  9:01 ` [PATCH 07/12] lei help: show "NAME=VALUE" properly for -c Eric Wong
2021-03-28  9:01 ` [PATCH 08/12] lei blob: flesh out help text Eric Wong
2021-03-28  9:01 ` [PATCH 09/12] t/lei_store: ensure LeiSearch responds to ->isrch Eric Wong
2021-03-28  9:01 ` [PATCH 10/12] lei blob: add remote external support Eric Wong
2021-03-28  9:01 ` [PATCH 11/12] lei: drop coderepo placeholders, submodule TODO Eric Wong
2021-03-28  9:31   ` Eric Wong
2021-03-28  9:01 ` [PATCH 12/12] treewide: shorten temporary filename Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210328090124.3541-2-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    --subject='Re: [PATCH 01/12] lei: simplify PktOp callers' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

user/dev discussion of public-inbox itself

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://public-inbox.org/meta
	git clone --mirror http://czquwvybam4bgbro.onion/meta
	git clone --mirror http://hjrcffqmbrq6wope.onion/meta
	git clone --mirror http://ou63pmih66umazou.onion/meta

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V1 meta meta/ https://public-inbox.org/meta \
		meta@public-inbox.org
	public-inbox-index meta

Example config snippet for mirrors.
Newsgroups are available over NNTP:
	nntp://news.public-inbox.org/inbox.comp.mail.public-inbox.meta
	nntp://7fh6tueqddpjyxjmgtdiueylzoqt6pt7hec3pukyptlmohoowvhde4yd.onion/inbox.comp.mail.public-inbox.meta
	nntp://ie5yzdi7fg72h7s4sdcztq5evakq23rdt33mfyfcddc5u3ndnw24ogqd.onion/inbox.comp.mail.public-inbox.meta
	nntp://4uok3hntl7oi7b4uf4rtfwefqeexfzil2w6kgk2jn5z2f764irre7byd.onion/inbox.comp.mail.public-inbox.meta
	nntp://news.gmane.io/gmane.mail.public-inbox.general
 note: .onion URLs require Tor: https://www.torproject.org/

code repositories for project(s) associated with this inbox:

	https://80x24.org/public-inbox.git

AGPL code for this site: git clone https://public-inbox.org/public-inbox.git