From bbf4159b7694241d2139be641ced4c485303714a Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 27 Jun 2020 10:03:40 +0000 Subject: watch: support IMAP polling Not all IMAP servers support IDLE, and IDLE may be prohibitively expensive for some IMAP servers with many inboxes. So allow configuring a imap.$IMAP_URL.pollInterval=SECONDS to poll mailboxes. We'll also need to poll for NNTP servers in the future. --- lib/PublicInbox/WatchMaildir.pm | 64 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 5 deletions(-) (limited to 'lib/PublicInbox/WatchMaildir.pm') diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index 431350be..ac980d9b 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -202,8 +202,9 @@ sub quit { if (my $imap_pid = $self->{-imap_pid}) { kill('QUIT', $imap_pid); } - if (my $idle_pids = $self->{idle_pids}) { - kill('QUIT', $_) for (keys %$idle_pids); + for (qw(idle_pids poll_pids)) { + my $pids = $self->{$_} or next; + kill('QUIT', $_) for (keys %$pids); } if (my $idle_mic = $self->{idle_mic}) { eval { $idle_mic->done }; @@ -237,12 +238,12 @@ sub imap_section ($) { sub cfg_intvl ($$) { my ($cfg, $key) = @_; defined(my $v = $cfg->{lc($key)}) or return; - $v =~ /\A[0-9]+\z/s and return $v + 0; + $v =~ /\A[0-9]+(?:\.[0-9]+)?\z/s and return $v + 0; if (ref($v) eq 'ARRAY') { $v = join(', ', @$v); warn "W: $key has multiple values: $v\nW: $key ignored\n"; } else { - warn "W: $key=$v is not an integer value in seconds\n"; + warn "W: $key=$v is not a numeric value in seconds\n"; } } @@ -460,6 +461,7 @@ sub watch_imap_idle_1 ($$$) { sub watch_atfork_child ($) { my ($self) = @_; delete $self->{idle_pids}; + delete $self->{poll_pids}; PublicInbox::DS->Reset; PublicInbox::Sigfd::sig_setmask($self->{oldset}); %SIG = (%SIG, %{$self->{sig}}); @@ -504,6 +506,52 @@ sub event_step { goto(&fs_scan_step) if $self->{mdre}; } +sub watch_imap_fetch_all ($$) { + my ($self, $uris) = @_; + for my $uri (@$uris) { + my $sec = imap_section($uri); + my $mic_arg = $self->{mic_arg}->{$sec} or + die "BUG: no Mail::IMAPClient->new arg for $sec"; + my $mic = PublicInbox::IMAPClient->new(%$mic_arg) or next; + my $err = imap_fetch_all($self, $mic, $uri); + last if $self->{quit}; + warn $err, "\n" if $err; + } +} + +sub imap_fetch_fork ($$$) { + my ($self, $intvl, $uris) = @_; + return if $self->{quit}; + $self->{mics} = {}; # going to be forking, so disconnect + defined(my $pid = fork) or die "fork: $!"; + if ($pid == 0) { + watch_atfork_child($self); + watch_imap_fetch_all($self, $uris); + _exit(0); + } + $self->{poll_pids}->{$pid} = [ $intvl, $uris ]; + PublicInbox::DS::dwaitpid($pid, \&imap_fetch_reap, $self); +} + +sub imap_fetch_cb ($$$) { + my ($self, $intvl, $uris) = @_; + sub { imap_fetch_fork($self, $intvl, $uris) }; +} + +sub imap_fetch_reap { # PublicInbox::DS::dwaitpid callback + my ($self, $pid) = @_; + my $intvl_uris = delete $self->{poll_pids}->{$pid} or + die "BUG: PID=$pid (unknown) reaped: \$?=$?\n"; + return if $self->{quit}; + my ($intvl, $uris) = @$intvl_uris; + if ($?) { + warn "W: PID=$pid died: \$?=$?\n", + map { $_->as_string."\n" } @$uris; + } + warn('I: will check ', $_->as_string, " in ${intvl}s\n") for @$uris; + PublicInbox::DS::add_timer($intvl, imap_fetch_cb($self, $intvl, $uris)); +} + sub watch_imap_init ($) { my ($self) = @_; eval { require PublicInbox::IMAPClient } or @@ -542,7 +590,13 @@ sub watch_imap_init ($) { $self->{idle_todo} = $idle; PublicInbox::DS::requeue($self); # ->event_step to fork } - # TODO: polling + return unless scalar keys %$poll; + $self->{poll_pids} = {}; + + # poll all URIs for a given interval sequentially + while (my ($intvl, $uris) = each %$poll) { + PublicInbox::DS::requeue(imap_fetch_cb($self, $intvl, $uris)); + } } sub watch { -- cgit v1.2.3-24-ge0c7