user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 13/34] watch: wire up IMAP IDLE reapers to DS
  2020-06-27 10:03  7% [PATCH 00/34] watch: add IMAP and NNTP support Eric Wong
@ 2020-06-27 10:03  4% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2020-06-27 10:03 UTC (permalink / raw)
  To: meta

We can avoid synchronous `waitpid(-1, 0)' and save a process
when simultaneously watching Maildirs.

One DS bug is fixed: ->Reset needs to clear the DS $in_loop flag
in forked children so dwaitpid() fails and allows git processes
to be reaped synchronously.  TestCommon also calls DS->Reset
when spawning new processes, since t/imapd.t uses DS->EventLoop
while waiting on -watch to write.
---
 lib/PublicInbox/DS.pm           |   2 +-
 lib/PublicInbox/TestCommon.pm   |   1 +
 lib/PublicInbox/WatchMaildir.pm | 170 +++++++++++++-------------------
 script/public-inbox-watch       |   6 +-
 4 files changed, 77 insertions(+), 102 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index da68802dda9..c46b20cba27 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -68,7 +68,7 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
-    $wait_pids = $later_queue = undef;
+    $in_loop = $wait_pids = $later_queue = undef;
     $EXPMAP = {};
     $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef;
     $LoopTimeout = -1;  # no timeout by default
diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm
index b252810fca5..14ebba10563 100644
--- a/lib/PublicInbox/TestCommon.pm
+++ b/lib/PublicInbox/TestCommon.pm
@@ -350,6 +350,7 @@ sub start_script {
 	}
 	defined(my $pid = fork) or die "fork: $!\n";
 	if ($pid == 0) {
+		eval { PublicInbox::DS->Reset };
 		# pretend to be systemd (cf. sd_listen_fds(3))
 		# 3 == SD_LISTEN_FDS_START
 		my $fd;
diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm
index 4d3cd032e5a..431350be277 100644
--- a/lib/PublicInbox/WatchMaildir.pm
+++ b/lib/PublicInbox/WatchMaildir.pm
@@ -12,7 +12,7 @@ use PublicInbox::Filter::Base qw(REJECT);
 use PublicInbox::Spamcheck;
 use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now);
-use POSIX qw(_exit WNOHANG);
+use POSIX qw(_exit);
 *mime_from_path = \&PublicInbox::InboxWritable::mime_from_path;
 
 sub compile_watchheaders ($) {
@@ -213,9 +213,8 @@ sub quit {
 	}
 }
 
-sub watch_fs {
+sub watch_fs_init ($) {
 	my ($self) = @_;
-	require PublicInbox::DirIdle;
 	my $done = sub {
 		delete $self->{done_timer};
 		_done_for_now($self);
@@ -224,10 +223,8 @@ sub watch_fs {
 		_try_path($self, $_[0]->fullname);
 		$self->{done_timer} //= PublicInbox::DS::requeue($done);
 	};
-	my $di = PublicInbox::DirIdle->new($self->{mdir}, $cb);
-	PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} });
-	PublicInbox::DS->EventLoop;
-	_done_for_now($self);
+	require PublicInbox::DirIdle;
+	PublicInbox::DirIdle->new($self->{mdir}, $cb); # EPOLL_CTL_ADD
 }
 
 # returns the git config section name, e.g [imap "imaps://user@example.com"]
@@ -334,25 +331,6 @@ sub mic_for ($$$) { # mic = Mail::IMAPClient
 	$mic;
 }
 
-sub imap_start ($) {
-	my ($self) = @_;
-	eval { require PublicInbox::IMAPClient } or
-		die "Mail::IMAPClient is required for IMAP:\n$@\n";
-	eval { require Git } or
-		die "Git (Perl module) is required for IMAP:\n$@\n";
-	eval { require PublicInbox::IMAPTracker } or
-		die "DBD::SQLite is required for IMAP\n:$@\n";
-
-	my $mic_args = imap_common_init($self);
-	# make sure we can connect and cache the credentials in memory
-	$self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
-	my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj
-	for my $url (sort keys %{$self->{imap}}) {
-		my $uri = PublicInbox::URIimap->new($url);
-		$mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args);
-	}
-}
-
 sub imap_fetch_all ($$$) {
 	my ($self, $mic, $uri) = @_;
 	my $sec = imap_section($uri);
@@ -481,74 +459,76 @@ sub watch_imap_idle_1 ($$$) {
 
 sub watch_atfork_child ($) {
 	my ($self) = @_;
+	delete $self->{idle_pids};
+	PublicInbox::DS->Reset;
 	PublicInbox::Sigfd::sig_setmask($self->{oldset});
 	%SIG = (%SIG, %{$self->{sig}});
 }
 
-sub watch_imap_idle_all ($$) {
-	my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]]
-	$self->{mics} = {}; # going to be forking, so disconnect
-	my $idle_pids = $self->{idle_pids} = {};
-	until ($self->{quit}) {
-		while (my $uri_intvl = shift @$idle) {
-			my ($uri, $intvl) = @$uri_intvl;
-			defined(my $pid = fork) or die "fork: $!";
-			if ($pid == 0) {
-				watch_atfork_child($self);
-				delete $self->{idle_pids};
-				watch_imap_idle_1($self, $uri, $intvl);
-				_exit(0);
-			}
-			$idle_pids->{$pid} = $uri_intvl;
-		}
-		my $pid = waitpid(-1, 0) or next;
-		if ($pid < 0) {
-			warn "W: no idling children: $!";
-			if (@$idle) {
-				sleep 60;
-			} else {
-				warn "W: nothing to respawn, quitting IDLE\n";
-				last;
-			}
-		}
-		if (my $uri_intvl = delete $idle_pids->{$pid}) {
-			my ($uri, $intvl) = @$uri_intvl;
-			my $url = $uri->as_string;
-			if ($? || !$self->{quit}) {
-				warn "W: PID=$pid on $url died: \$?=$?\n";
-			}
-			push @$idle, $uri_intvl;
-		} else {
-			warn "W: PID=$pid (unknown) reaped: \$?=$?\n";
-		}
+sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
+	my ($self, $pid) = @_;
+	my $uri_intvl = delete $self->{idle_pids}->{$pid} or
+		die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+
+	my ($uri, $intvl) = @$uri_intvl;
+	my $url = $uri->as_string;
+	return if $self->{quit};
+	warn "W: PID=$pid on $url died: \$?=$?\n" if $?;
+	push @{$self->{idle_todo}}, $uri_intvl;
+	PubicInbox::DS::requeue($self); # call ->event_step to respawn
+}
+
+sub imap_idle_fork ($$) {
+	my ($self, $uri_intvl) = @_;
+	my ($uri, $intvl) = @$uri_intvl;
+	defined(my $pid = fork) or die "fork: $!";
+	if ($pid == 0) {
+		watch_atfork_child($self);
+		watch_imap_idle_1($self, $uri, $intvl);
+		_exit(0);
 	}
+	$self->{idle_pids}->{$pid} = $uri_intvl;
+	PublicInbox::DS::dwaitpid($pid, \&imap_idle_reap, $self);
+}
 
-	# tear it all down
-	kill('QUIT', $_) for (keys %$idle_pids);
-	while (scalar keys %$idle_pids) {
-		if (my $pid = waitpid(-1, WNOHANG)) {
-			if ($pid < 0) {
-				warn "E: no children? $! (PIDs: ",
-					join(', ', keys %$idle_pids),")\n";
-				last;
-			} else {
-				delete $idle_pids->{$pid};
-			}
-		} else { # signals aren't that reliable w/o signalfd/kevent
-			sleep 1;
-			kill('QUIT', $_) for (keys %$idle_pids);
+sub event_step {
+	my ($self) = @_;
+	return if $self->{quit};
+	my $idle_todo = $self->{idle_todo};
+	if ($idle_todo && @$idle_todo) {
+		$self->{mics} = {}; # going to be forking, so disconnect
+		while (my $uri_intvl = shift(@$idle_todo)) {
+			imap_idle_fork($self, $uri_intvl);
 		}
 	}
+	goto(&fs_scan_step) if $self->{mdre};
 }
 
-sub watch_imap ($) {
+sub watch_imap_init ($) {
 	my ($self) = @_;
-	my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ];
+	eval { require PublicInbox::IMAPClient } or
+		die "Mail::IMAPClient is required for IMAP:\n$@\n";
+	eval { require Git } or
+		die "Git (Perl module) is required for IMAP:\n$@\n";
+	eval { require PublicInbox::IMAPTracker } or
+		die "DBD::SQLite is required for IMAP\n:$@\n";
+
+	my $mic_args = imap_common_init($self); # read args from config
+
+	# make sure we can connect and cache the credentials in memory
+	$self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
+	my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj
+	for my $url (sort keys %{$self->{imap}}) {
+		my $uri = PublicInbox::URIimap->new($url);
+		$mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args);
+	}
+
+	my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]
 	my $poll = {}; # intvl_seconds => [ uri1, uri2 ]
 	for my $url (keys %{$self->{imap}}) {
 		my $uri = PublicInbox::URIimap->new($url);
 		my $sec = imap_section($uri);
-		my $mic = $self->{mics}->{$sec};
+		my $mic = $mics->{$sec};
 		my $intvl = $self->{imap_opt}->{$sec}->{poll_intvl};
 		if ($mic->has_capability('IDLE') && !$intvl) {
 			$intvl = $self->{imap_opt}->{$sec}->{idle_intvl};
@@ -557,9 +537,10 @@ sub watch_imap ($) {
 			push @{$poll->{$intvl || 120}}, $uri;
 		}
 	}
-	my $nr_poll = scalar keys %$poll;
-	if (scalar @$idle && !$nr_poll) { # multiple idlers, need fork
-		watch_imap_idle_all($self, $idle);
+	if (scalar @$idle) {
+		$self->{idle_pids} = {};
+		$self->{idle_todo} = $idle;
+		PublicInbox::DS::requeue($self); # ->event_step to fork
 	}
 	# TODO: polling
 }
@@ -568,21 +549,11 @@ sub watch {
 	my ($self, $sig, $oldset) = @_;
 	$self->{oldset} = $oldset;
 	$self->{sig} = $sig;
-	if ($self->{mdre} && $self->{imap}) {
-		defined(my $pid = fork) or die "fork: $!";
-		if ($pid == 0) {
-			watch_atfork_child($self);
-			imap_start($self);
-			goto &watch_imap;
-		}
-		$self->{-imap_pid} = $pid;
-	} elsif ($self->{imap}) {
-		# not a child process, but no signalfd, yet:
-		watch_atfork_child($self);
-		imap_start($self);
-		goto &watch_imap;
-	}
-	goto &watch_fs;
+	watch_imap_init($self) if $self->{imap};
+	watch_fs_init($self) if $self->{mdre};
+	PublicInbox::DS->SetPostLoopCallback(sub {});
+	PublicInbox::DS->EventLoop until $self->{quit};
+	_done_for_now($self);
 }
 
 sub trigger_scan {
@@ -591,8 +562,7 @@ sub trigger_scan {
 	PublicInbox::DS::requeue($self);
 }
 
-# called directly, and by PublicInbox::DS
-sub event_step ($) {
+sub fs_scan_step {
 	my ($self) = @_;
 	return if $self->{quit};
 	my $op = shift @{$self->{ops}};
@@ -634,7 +604,7 @@ sub event_step ($) {
 sub scan {
 	my ($self, $op) = @_;
 	push @{$self->{ops}}, $op;
-	goto &event_step;
+	goto &fs_scan_step;
 }
 
 sub _importer_for {
diff --git a/script/public-inbox-watch b/script/public-inbox-watch
index b6d545adad7..ae7b70be355 100755
--- a/script/public-inbox-watch
+++ b/script/public-inbox-watch
@@ -22,7 +22,11 @@ if ($watch_md) {
 		$watch_md->quit if $watch_md;
 		$watch_md = undef;
 	};
-	my $sig = { HUP => $reload, USR1 => $scan };
+	my $sig = {
+		HUP => $reload,
+		USR1 => $scan,
+		CHLD => \&PublicInbox::DS::enqueue_reap,
+	};
 	$sig->{QUIT} = $sig->{TERM} = $sig->{INT} = $quit;
 
 	# --no-scan is only intended for testing atm, undocumented.

^ permalink raw reply related	[relevance 4%]

* [PATCH 00/34] watch: add IMAP and NNTP support
@ 2020-06-27 10:03  7% Eric Wong
  2020-06-27 10:03  4% ` [PATCH 13/34] watch: wire up IMAP IDLE reapers to DS Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2020-06-27 10:03 UTC (permalink / raw)
  To: meta

Some fairly major changes to -watch.  Filesys::Notify::Simple is
no longer used, and -watch now uses inotify, signalfd or kevent
like the read-only daemons.

Credentials are handled via Net::Netrc (Perl standard library)
or "git-credential", so we do no password storage on our own.

NNTP (and non-IDLE IMAP) may allow more parallelization in the
future.

One significant project-wide change is getting rid of "use
fields".  It gets in my way more than it helps, and it's
probably alien to a fair amount of Perl hackers.  AFAIK, it's
never really been popular outside of Danga::Socket-based
projects.


Eric W. Biederman (1):
  IMAPTracker: Add a helper to track our place in reading imap mailboxes

Eric Wong (33):
  inboxwritable: ensure ssoma.lock exists on init
  inbox: warn on ->on_inbox_unlock exception
  imaptracker: use ~/.local/share/public-inbox/imap.sqlite3
  watchmaildir: hoist out compile_watchheaders
  watchmaildir: fix check for spam vs ham inbox conflicts
  URI IMAP support
  watch: preliminary IMAP support
  kqnotify|fake_inotify: detect Maildir write ops
  watch: remove Filesys::Notify::Simple dependency
  watch: use signalfd for Maildir watching
  ds: remove fields.pm usage
  watch: wire up IMAP IDLE reapers to DS
  watch: support IMAP polling
  config: support ->urlmatch method for -watch
  watch: stop importers before forking
  watch: use UID SEARCH to avoid empty UID FETCH
  ds: add_timer: allow passing arg to callback.
  imaptracker: add {url} field to reduce args
  imaptracker: drop {dbname} field
  watch: avoid long transaction when writing to IMAPTracker
  watch: support imap.fetchBatchSize parameter
  watch: imap: be quieter about disconnecting on quit
  watch: support multiple watch: directives per-inbox
  watch: remove {mdir} array
  watch: just use ->urlmatch
  testcommon: $ENV{TAIL} supports non-@ARGV redirects
  watch: add NNTP support
  watch: show user-specified URL consistently.
  watch: enable autoflush for STDOUT and STDERR
  watch: use our own "git credential" wrapper
  watch: support ~/.netrc via Net::Netrc
  imaptracker: use flock(2) around writes
  watch: simplify internal structures

 Documentation/public-inbox-watch.pod |   3 +-
 INSTALL                              |   8 -
 MANIFEST                             |  11 +
 Makefile.PL                          |   4 -
 ci/deps.perl                         |   1 -
 lib/PublicInbox/Config.pm            |  21 +-
 lib/PublicInbox/DS.pm                |  29 +-
 lib/PublicInbox/Daemon.pm            |  19 +-
 lib/PublicInbox/DirIdle.pm           |  49 ++
 lib/PublicInbox/FakeInotify.pm       |  56 +-
 lib/PublicInbox/GitAsyncCat.pm       |   4 +-
 lib/PublicInbox/GitCredential.pm     |  55 ++
 lib/PublicInbox/HTTP.pm              |  23 +-
 lib/PublicInbox/HTTPD/Async.pm       |  22 +-
 lib/PublicInbox/IMAP.pm              |  19 +-
 lib/PublicInbox/IMAPTracker.pm       |  82 +++
 lib/PublicInbox/In2Tie.pm            |  13 +
 lib/PublicInbox/Inbox.pm             |   1 +
 lib/PublicInbox/InboxIdle.pm         |  20 +-
 lib/PublicInbox/InboxWritable.pm     |   3 +
 lib/PublicInbox/KQNotify.pm          |  38 +-
 lib/PublicInbox/Listener.pm          |   8 +-
 lib/PublicInbox/NNTP.pm              |  12 +-
 lib/PublicInbox/NNTPdeflate.pm       |   5 +-
 lib/PublicInbox/ParentPipe.pm        |   8 +-
 lib/PublicInbox/Sigfd.pm             |  21 +-
 lib/PublicInbox/TestCommon.pm        |  40 +-
 lib/PublicInbox/URIimap.pm           | 113 +++
 lib/PublicInbox/WatchMaildir.pm      | 998 +++++++++++++++++++++++----
 script/public-inbox-watch            |  33 +-
 t/config.t                           |  18 +
 t/dir_idle.t                         |   6 +
 t/fake_inotify.t                     |  45 ++
 t/imap_tracker.t                     |  54 ++
 t/imapd.t                            |  74 ++
 t/kqnotify.t                         |  41 ++
 t/nntpd.t                            |  52 ++
 t/uri_imap.t                         |  65 ++
 t/watch_filter_rubylang.t            |   2 +-
 t/watch_imap.t                       |  21 +
 t/watch_maildir.t                    |  96 ++-
 t/watch_maildir_v2.t                 |   4 +-
 t/watch_multiple_headers.t           |   2 +-
 t/watch_nntp.t                       |  17 +
 xt/mem-imapd-tls.t                   |  18 +-
 45 files changed, 1944 insertions(+), 290 deletions(-)
 create mode 100644 lib/PublicInbox/DirIdle.pm
 create mode 100644 lib/PublicInbox/GitCredential.pm
 create mode 100644 lib/PublicInbox/IMAPTracker.pm
 create mode 100644 lib/PublicInbox/URIimap.pm
 create mode 100644 t/dir_idle.t
 create mode 100644 t/fake_inotify.t
 create mode 100644 t/imap_tracker.t
 create mode 100644 t/kqnotify.t
 create mode 100644 t/uri_imap.t
 create mode 100644 t/watch_imap.t
 create mode 100644 t/watch_nntp.t


^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2020-06-27 10:03  7% [PATCH 00/34] watch: add IMAP and NNTP support Eric Wong
2020-06-27 10:03  4% ` [PATCH 13/34] watch: wire up IMAP IDLE reapers to DS Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).