From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id EEF051F917 for ; Sat, 27 Jun 2020 10:04:02 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 14/34] watch: support IMAP polling Date: Sat, 27 Jun 2020 10:03:40 +0000 Message-Id: <20200627100400.9871-15-e@yhbt.net> In-Reply-To: <20200627100400.9871-1-e@yhbt.net> References: <20200627100400.9871-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Not all IMAP servers support IDLE, and IDLE may be prohibitively expensive for some IMAP servers with many inboxes. So allow configuring a imap.$IMAP_URL.pollInterval=SECONDS to poll mailboxes. We'll also need to poll for NNTP servers in the future. --- lib/PublicInbox/WatchMaildir.pm | 64 ++++++++++++++++++++++++++++++--- t/imapd.t | 39 ++++++++++++++++++-- 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index 431350be277..ac980d9b0f1 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -202,8 +202,9 @@ sub quit { if (my $imap_pid = $self->{-imap_pid}) { kill('QUIT', $imap_pid); } - if (my $idle_pids = $self->{idle_pids}) { - kill('QUIT', $_) for (keys %$idle_pids); + for (qw(idle_pids poll_pids)) { + my $pids = $self->{$_} or next; + kill('QUIT', $_) for (keys %$pids); } if (my $idle_mic = $self->{idle_mic}) { eval { $idle_mic->done }; @@ -237,12 +238,12 @@ sub imap_section ($) { sub cfg_intvl ($$) { my ($cfg, $key) = @_; defined(my $v = $cfg->{lc($key)}) or return; - $v =~ /\A[0-9]+\z/s and return $v + 0; + $v =~ /\A[0-9]+(?:\.[0-9]+)?\z/s and return $v + 0; if (ref($v) eq 'ARRAY') { $v = join(', ', @$v); warn "W: $key has multiple values: $v\nW: $key ignored\n"; } else { - warn "W: $key=$v is not an integer value in seconds\n"; + warn "W: $key=$v is not a numeric value in seconds\n"; } } @@ -460,6 +461,7 @@ sub watch_imap_idle_1 ($$$) { sub watch_atfork_child ($) { my ($self) = @_; delete $self->{idle_pids}; + delete $self->{poll_pids}; PublicInbox::DS->Reset; PublicInbox::Sigfd::sig_setmask($self->{oldset}); %SIG = (%SIG, %{$self->{sig}}); @@ -504,6 +506,52 @@ sub event_step { goto(&fs_scan_step) if $self->{mdre}; } +sub watch_imap_fetch_all ($$) { + my ($self, $uris) = @_; + for my $uri (@$uris) { + my $sec = imap_section($uri); + my $mic_arg = $self->{mic_arg}->{$sec} or + die "BUG: no Mail::IMAPClient->new arg for $sec"; + my $mic = PublicInbox::IMAPClient->new(%$mic_arg) or next; + my $err = imap_fetch_all($self, $mic, $uri); + last if $self->{quit}; + warn $err, "\n" if $err; + } +} + +sub imap_fetch_fork ($$$) { + my ($self, $intvl, $uris) = @_; + return if $self->{quit}; + $self->{mics} = {}; # going to be forking, so disconnect + defined(my $pid = fork) or die "fork: $!"; + if ($pid == 0) { + watch_atfork_child($self); + watch_imap_fetch_all($self, $uris); + _exit(0); + } + $self->{poll_pids}->{$pid} = [ $intvl, $uris ]; + PublicInbox::DS::dwaitpid($pid, \&imap_fetch_reap, $self); +} + +sub imap_fetch_cb ($$$) { + my ($self, $intvl, $uris) = @_; + sub { imap_fetch_fork($self, $intvl, $uris) }; +} + +sub imap_fetch_reap { # PublicInbox::DS::dwaitpid callback + my ($self, $pid) = @_; + my $intvl_uris = delete $self->{poll_pids}->{$pid} or + die "BUG: PID=$pid (unknown) reaped: \$?=$?\n"; + return if $self->{quit}; + my ($intvl, $uris) = @$intvl_uris; + if ($?) { + warn "W: PID=$pid died: \$?=$?\n", + map { $_->as_string."\n" } @$uris; + } + warn('I: will check ', $_->as_string, " in ${intvl}s\n") for @$uris; + PublicInbox::DS::add_timer($intvl, imap_fetch_cb($self, $intvl, $uris)); +} + sub watch_imap_init ($) { my ($self) = @_; eval { require PublicInbox::IMAPClient } or @@ -542,7 +590,13 @@ sub watch_imap_init ($) { $self->{idle_todo} = $idle; PublicInbox::DS::requeue($self); # ->event_step to fork } - # TODO: polling + return unless scalar keys %$poll; + $self->{poll_pids} = {}; + + # poll all URIs for a given interval sequentially + while (my ($intvl, $uris) = each %$poll) { + PublicInbox::DS::requeue(imap_fetch_cb($self, $intvl, $uris)); + } } sub watch { diff --git a/t/imapd.t b/t/imapd.t index cc87a127851..ee3a3b26767 100644 --- a/t/imapd.t +++ b/t/imapd.t @@ -443,6 +443,7 @@ ok($mic->logout, 'logged out'); { use_ok 'PublicInbox::WatchMaildir'; use_ok 'PublicInbox::InboxIdle'; + my $old_env = { HOME => $ENV{HOME} }; my $home = "$tmpdir/watch_home"; mkdir $home or BAIL_OUT $!; mkdir "$home/.public-inbox" or BAIL_OUT $!; @@ -464,13 +465,45 @@ ok($mic->logout, 'logged out'); my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) }; my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup'; $cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) }); - open my $err, '+>', undef or BAIL_OUT $!; - my $w = start_script(['-watch'], undef, { 2 => $err }); + my $watcherr = "$tmpdir/watcherr"; + open my $err_wr, '>', $watcherr or BAIL_OUT $!; + open my $err, '<', $watcherr or BAIL_OUT $!; + my $w = start_script(['-watch'], undef, { 2 => $err_wr }); + + diag 'waiting for initial fetch...'; + PublicInbox::DS->EventLoop; + diag 'inbox unlocked on initial fetch, waiting for IDLE'; + + tick until (grep(/I: \S+ idling/, <$err>)); + open my $fh, '<', 't/iso-2202-jp.eml' or BAIL_OUT $!; + $old_env->{ORIGINAL_RECIPIENT} = $addr; + ok(run_script([qw(-mda --no-precheck)], $old_env, { 0 => $fh }), + 'delivered a message for IDLE to kick -watch'); + diag 'waiting for IMAP IDLE wakeup'; + PublicInbox::DS->SetPostLoopCallback(undef); + PublicInbox::DS->EventLoop; + diag 'inbox unlocked on IDLE wakeup'; + + # try again with polling + xsys(qw(git config), "--file=$home/.public-inbox/config", + "imap.imap://$ihost:$iport.PollInterval", 0.11) == 0 + or BAIL_OUT "git config $?"; + $w->kill('HUP'); + diag 'waiting for -watch reload + initial fetch'; + tick until (grep(/I: will check/, <$err>)); + + open $fh, '<', 't/psgi_attach.eml' or BAIL_OUT $!; + ok(run_script([qw(-mda --no-precheck)], $old_env, { 0 => $fh }), + 'delivered a message for -watch PollInterval'); + + diag 'waiting for PollInterval wakeup'; + PublicInbox::DS->SetPostLoopCallback(undef); PublicInbox::DS->EventLoop; - diag 'inbox unlocked'; + diag 'inbox unlocked (poll)'; $w->kill; $w->join; is($?, 0, 'no error in exited -watch process'); + $cfg->each_inbox(sub { shift->unsubscribe_unlock('ident') }); $ii->close; PublicInbox::DS->Reset;