about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-02-24 17:31:53 +0600
committerEric Wong <e@80x24.org>2021-02-24 23:26:29 +0000
commit48bae7dc634583b58fd747bd557392cac60123b3 (patch)
tree4b170b92548c85fd8ba14b4f6838b3a90ce6b5d0
parent2f9f6c9eca667dfde9e267e946498eaad0b0c8da (diff)
downloadpublic-inbox-48bae7dc634583b58fd747bd557392cac60123b3.tar.gz
NetReader::<imap|nntp>_each were based on the -watch
code they now replace.

v2: do not warn on EINTR if user quit to fix occasional
    test failure in t/imapd.t
-rw-r--r--lib/PublicInbox/NetReader.pm20
-rw-r--r--lib/PublicInbox/Watch.pm249
-rw-r--r--t/imapd.t2
-rw-r--r--t/nntpd.t2
4 files changed, 67 insertions, 206 deletions
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 2a453217..785211bf 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -366,6 +366,13 @@ sub _imap_do_msg ($$$$$) {
         $eml_cb->($uri, $uid, $kw, PublicInbox::Eml->new($raw), @args);
 }
 
+sub run_commit_cb ($) {
+        my ($self) = @_;
+        my $cmt_cb_args = $self->{on_commit} or return;
+        my ($cb, @args) = @$cmt_cb_args;
+        $cb->(@args);
+}
+
 sub _imap_fetch_all ($$$) {
         my ($self, $mic, $uri) = @_;
         my $sec = uri_section($uri);
@@ -414,8 +421,10 @@ sub _imap_fetch_all ($$$) {
                 # I wish "UID FETCH $START:*" could work, but:
                 # 1) servers do not need to return results in any order
                 # 2) Mail::IMAPClient doesn't offer a streaming API
-                $uids = $mic->search("UID $l_uid:*") or
+                unless ($uids = $mic->search("UID $l_uid:*")) {
+                        return if $!{EINTR} && $self->{quit};
                         return "E: $uri UID SEARCH $l_uid:* error: $!";
+                }
                 return if scalar(@$uids) == 0;
 
                 # RFC 3501 doesn't seem to indicate order of UID SEARCH
@@ -437,6 +446,7 @@ sub _imap_fetch_all ($$$) {
                         local $0 = "UID:$batch $mbx $sec";
                         my $r = $mic->fetch_hash($batch, $req, 'FLAGS');
                         unless ($r) { # network error?
+                                last if $!{EINTR} && $self->{quit};
                                 $err = "E: $uri UID FETCH $batch error: $!";
                                 last;
                         }
@@ -451,6 +461,7 @@ sub _imap_fetch_all ($$$) {
                         }
                         last if $self->{quit};
                 }
+                run_commit_cb($self);
                 $itrk->update_last($r_uidval, $last_uid) if $itrk;
         } until ($err || $self->{quit});
         $err;
@@ -490,7 +501,7 @@ sub imap_each {
                 local $self->{eml_each} = [ $eml_cb, @args ];
                 $err = _imap_fetch_all($self, $mic, $uri);
         } else {
-                $err = "E: not connected: $!";
+                $err = "E: <$uri> not connected: $!";
         }
         warn $err if $err;
         $mic;
@@ -555,6 +566,7 @@ sub _nntp_fetch_all ($$$) {
                 last if $self->{quit};
                 $art = $_;
                 if (--$n < 0) {
+                        run_commit_cb($self);
                         $itrk->update_last(0, $last_art) if $itrk;
                         $n = $self->{max_batch};
                 }
@@ -575,6 +587,7 @@ sub _nntp_fetch_all ($$$) {
                 $eml_cb->($uri, $art, [], PublicInbox::Eml->new(\$raw), @args);
                 $last_art = $art;
         }
+        run_commit_cb($self);
         $itrk->update_last(0, $last_art) if $itrk;
         $err;
 }
@@ -585,12 +598,13 @@ sub nntp_each {
         my $sec = uri_section($uri);
         local $0 = $uri->group ." $sec";
         my $nn = nn_get($self, $uri);
+        return if $self->{quit};
         my $err;
         if ($nn) {
                 local $self->{eml_each} = [ $eml_cb, @args ];
                 $err = _nntp_fetch_all($self, $nn, $uri);
         } else {
-                $err = "E: not connected: $!";
+                $err = "E: <$uri> not connected: $!";
         }
         warn $err if $err;
         $nn;
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index 4b009a28..0b72bd16 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -309,102 +309,38 @@ sub imap_import_msg ($$$$$) {
         }
 }
 
-sub imap_fetch_all ($$$) {
-        my ($self, $mic, $uri) = @_;
-        my $sec = uri_section($uri);
-        my $mbx = $uri->mailbox;
-        $mic->Clear(1); # trim results history
-        $mic->examine($mbx) or return "E: EXAMINE $mbx ($sec) failed: $!";
-        my ($r_uidval, $r_uidnext);
-        for ($mic->Results) {
-                /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ and $r_uidval = $1;
-                /^\* OK \[UIDNEXT ([0-9]+)\].*/ and $r_uidnext = $1;
-                last if $r_uidval && $r_uidnext;
-        }
-        $r_uidval //= $mic->uidvalidity($mbx) //
-                return "E: $uri cannot get UIDVALIDITY";
-        $r_uidnext //= $mic->uidnext($mbx) //
-                return "E: $uri cannot get UIDNEXT";
-        my $itrk = PublicInbox::IMAPTracker->new($$uri);
-        my ($l_uidval, $l_uid) = $itrk->get_last;
-        $l_uidval //= $r_uidval; # first time
-        $l_uid //= 1;
-        if ($l_uidval != $r_uidval) {
-                return "E: $uri UIDVALIDITY mismatch\n".
-                        "E: local=$l_uidval != remote=$r_uidval";
-        }
-        my $r_uid = $r_uidnext - 1;
-        if ($l_uid != 1 && $l_uid > $r_uid) {
-                return "E: $uri local UID exceeds remote ($l_uid > $r_uid)\n".
-                        "E: $uri strangely, UIDVALIDLITY matches ($l_uidval)\n";
-        }
-        return if $l_uid >= $r_uid; # nothing to do
-
-        warn "I: $uri fetching UID $l_uid:$r_uid\n";
-        $mic->Uid(1); # the default, we hope
-        my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1;
-        my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
-
-        # TODO: FLAGS may be useful for personal use
-        my $key = $req;
-        $key =~ s/\.PEEK//;
-        my ($uids, $batch);
+sub net_cb { # NetReader::(nntp|imap)_each callback
+        my ($uri, $art, $kw, $eml, $self, $inboxes) = @_;
+        local $self->{cur_uid} = $art; # IMAP UID or NNTP article
+        if (ref($inboxes)) {
+                my @ibx = @$inboxes;
+                my $last = pop @ibx;
+                for my $ibx (@ibx) {
+                        my $tmp = PublicInbox::Eml->new(\($eml->as_string));
+                        import_eml($self, $ibx, $tmp);
+                }
+                import_eml($self, $last, $eml);
+        } elsif ($inboxes eq 'watchspam') {
+                $self->{pi_cfg}->each_inbox(\&remove_eml_i,
+                                $self, $eml, "$uri #$art");
+        } else {
+                die "BUG: destination unknown $inboxes";
+        }
+}
+
+sub imap_fetch_all ($$) {
+        my ($self, $uri) = @_;
         my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
+        $self->{incremental} = 1;
+        $self->{on_commit} = [ \&_done_for_now, $self ];
+        local $self->{cur_uid};
         local $SIG{__WARN__} = sub {
-                my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : '';
-                $batch //= '?';
-                $warn_cb->("$pfx$uri UID:$batch\n", @_);
+                my $pfx = ($_[0] // '') =~ /^([A-Z]: |# )/g ? $1 : '';
+                my $uid = $self->{cur_uid};
+                $warn_cb->("$pfx$uri", $uid ? ("UID:$uid") : (), "\n", @_);
         };
-        my $err;
-        do {
-                # I wish "UID FETCH $START:*" could work, but:
-                # 1) servers do not need to return results in any order
-                # 2) Mail::IMAPClient doesn't offer a streaming API
-                $uids = $mic->search("UID $l_uid:*") or
-                        return "E: $uri UID SEARCH $l_uid:* error: $!";
-                return if scalar(@$uids) == 0;
-
-                # RFC 3501 doesn't seem to indicate order of UID SEARCH
-                # responses, so sort it ourselves.  Order matters so
-                # IMAPTracker can store the newest UID.
-                @$uids = sort { $a <=> $b } @$uids;
-
-                # Did we actually get new messages?
-                return if $uids->[0] < $l_uid;
-
-                $l_uid = $uids->[-1] + 1; # for next search
-                my $last_uid;
-                my $n = $self->{max_batch};
-
-                while (scalar @$uids) {
-                        if (--$n < 0) {
-                                _done_for_now($self);
-                                $itrk->update_last($r_uidval, $last_uid);
-                                $n = $self->{max_batch};
-                        }
-                        my @batch = splice(@$uids, 0, $bs);
-                        $batch = join(',', @batch);
-                        local $0 = "UID:$batch $mbx $sec";
-                        my $r = $mic->fetch_hash($batch, $req, 'FLAGS');
-                        unless ($r) { # network error?
-                                $err = "E: $uri UID FETCH $batch error: $!";
-                                last;
-                        }
-                        for my $uid (@batch) {
-                                # messages get deleted, so holes appear
-                                my $per_uid = delete $r->{$uid} // next;
-                                my $raw = delete($per_uid->{$key}) // next;
-                                my $fl = $per_uid->{FLAGS} // '';
-                                imap_import_msg($self, $uri, $uid, \$raw, $fl);
-                                $last_uid = $uid;
-                                last if $self->{quit};
-                        }
-                        last if $self->{quit};
-                }
-                _done_for_now($self);
-                $itrk->update_last($r_uidval, $last_uid);
-        } until ($err || $self->{quit});
-        $err;
+        PublicInbox::NetReader::imap_each($self, $uri, \&net_cb, $self,
+                                        $self->{imap}->{$$uri});
 }
 
 sub imap_idle_once ($$$$) {
@@ -444,8 +380,11 @@ sub watch_imap_idle_1 ($$$) {
                 $mic //= PublicInbox::IMAPClient->new(%$mic_arg);
                 my $err;
                 if ($mic && $mic->IsConnected) {
-                        $err = imap_fetch_all($self, $mic, $uri);
-                        $err //= imap_idle_once($self, $mic, $intvl, $uri);
+                        local $self->{mics_cached}->{$sec} = $mic;
+                        my $m = imap_fetch_all($self, $uri);
+                        $m == $mic or die "BUG: wrong mic";
+                        $mic->IsConnected and
+                                $err = imap_idle_once($self, $mic, $intvl, $uri)
                 } else {
                         $err = "E: not connected: $!";
                 }
@@ -540,43 +479,27 @@ sub event_step {
 sub watch_imap_fetch_all ($$) {
         my ($self, $uris) = @_;
         for my $uri (@$uris) {
-                my $sec = uri_section($uri);
-                my $mic_arg = $self->{mic_arg}->{$sec} or
-                        die "BUG: no Mail::IMAPClient->new arg for $sec";
-                my $mic = PublicInbox::IMAPClient->new(%$mic_arg) or next;
-                my $err = imap_fetch_all($self, $mic, $uri);
+                imap_fetch_all($self, $uri);
                 last if $self->{quit};
-                warn $err, "\n" if $err;
         }
 }
 
 sub watch_nntp_fetch_all ($$) {
         my ($self, $uris) = @_;
-        for my $uri (@$uris) {
-                my $sec = uri_section($uri);
-                my $nn_arg = $self->{nn_arg}->{$sec} or
-                        die "BUG: no Net::NNTP->new arg for $sec";
-                my $nntp_opt = $self->{nntp_opt}->{$sec};
-                my $nn = nn_new($nn_arg, $nntp_opt, $uri);
-                unless ($nn) {
-                        warn "E: $uri: \$!=$!\n";
-                        next;
-                }
-                last if $self->{quit};
-                if (my $postconn = $nntp_opt->{-postconn}) {
-                        for my $m_arg (@$postconn) {
-                                my ($method, @args) = @$m_arg;
-                                $nn->$method(@args) and next;
-                                warn "E: <$uri> $method failed\n";
-                                $nn = undef;
-                                last;
-                        }
-                }
+        $self->{incremental} = 1;
+        $self->{on_commit} = [ \&_done_for_now, $self ];
+        my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
+        local $self->{cur_uid};
+        my $uri = '';
+        local $SIG{__WARN__} = sub {
+                my $pfx = ($_[0] // '') =~ /^([A-Z]: |# )/g ? $1 : '';
+                my $art = $self->{cur_uid};
+                $warn_cb->("$pfx$uri", $art ? ("ARTICLE $art") : (), "\n", @_);
+        };
+        for $uri (@$uris) {
+                PublicInbox::NetReader::nntp_each($self, $uri, \&net_cb, $self,
+                                        $self->{nntp}->{$$uri});
                 last if $self->{quit};
-                if ($nn) {
-                        my $err = nntp_fetch_all($self, $nn, $uri);
-                        warn $err, "\n" if $err;
-                }
         }
 }
 
@@ -640,82 +563,6 @@ sub watch_imap_init ($$) {
         }
 }
 
-sub nntp_fetch_all ($$$) {
-        my ($self, $nn, $uri) = @_;
-        my ($group, $num_a, $num_b) = $uri->group;
-        my $sec = uri_section($uri);
-        my ($nr, $beg, $end) = $nn->group($group);
-        unless (defined($nr)) {
-                chomp(my $msg = $nn->message);
-                return "E: GROUP $group <$sec> $msg";
-        }
-
-        # IMAPTracker is also used for tracking NNTP, UID == article number
-        # LIST.ACTIVE can get the equivalent of UIDVALIDITY, but that's
-        # expensive.  So we assume newsgroups don't change:
-        my $itrk = PublicInbox::IMAPTracker->new($$uri);
-        my (undef, $l_art) = $itrk->get_last;
-        $l_art //= $beg; # initial import
-
-        # allow users to specify articles to refetch
-        # cf. https://tools.ietf.org/id/draft-gilman-news-url-01.txt
-        # nntp://example.com/inbox.foo/$num_a-$num_b
-        $l_art = $num_a if defined($num_a) && $num_a < $l_art;
-        $end = $num_b if defined($num_b) && $num_b < $end;
-
-        return if $l_art >= $end; # nothing to do
-        $beg = $l_art + 1;
-
-        warn "I: $uri fetching ARTICLE $beg..$end\n";
-        my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
-        my ($err, $art);
-        local $SIG{__WARN__} = sub {
-                my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : '';
-                $warn_cb->("$pfx$uri ", $art ? ("ARTICLE $art") : (), "\n", @_);
-        };
-        my $inboxes = $self->{nntp}->{$$uri};
-        my $last_art;
-        my $n = $self->{max_batch};
-        for ($beg..$end) {
-                last if $self->{quit};
-                $art = $_;
-                if (--$n < 0) {
-                        _done_for_now($self);
-                        $itrk->update_last(0, $last_art);
-                        $n = $self->{max_batch};
-                }
-                my $raw = $nn->article($art);
-                unless (defined($raw)) {
-                        my $msg = $nn->message;
-                        if ($nn->code == 421) { # pseudo response from Net::Cmd
-                                $err = "E: $msg";
-                                last;
-                        } else { # probably just a deleted message (spam)
-                                warn "W: $msg";
-                                next;
-                        }
-                }
-                s/\r\n/\n/ for @$raw;
-                $raw = join('', @$raw);
-                if (ref($inboxes)) {
-                        for my $ibx (@$inboxes) {
-                                my $eml = PublicInbox::Eml->new($raw);
-                                import_eml($self, $ibx, $eml);
-                        }
-                } elsif ($inboxes eq 'watchspam') {
-                        my $eml = PublicInbox::Eml->new(\$raw);
-                        $self->{pi_cfg}->each_inbox(\&remove_eml_i,
-                                        $self, $eml, "$uri ARTICLE $art");
-                } else {
-                        die "BUG: destination unknown $inboxes";
-                }
-                $last_art = $art;
-        }
-        _done_for_now($self);
-        $itrk->update_last(0, $last_art);
-        $err;
-}
-
 sub watch_nntp_init ($$) {
         my ($self, $poll) = @_;
         nntp_common_init($self); # read args from config
diff --git a/t/imapd.t b/t/imapd.t
index 0583dfdd..f1b498a7 100644
--- a/t/imapd.t
+++ b/t/imapd.t
@@ -507,7 +507,7 @@ SKIP: {
         $ii->close;
         PublicInbox::DS->Reset;
         seek($err, 0, 0);
-        my @err = grep(!/^I:/, <$err>);
+        my @err = grep(!/^(?:I:|#)/, <$err>);
         is(@err, 0, 'no warnings/errors from -watch'.join(' ', @err));
 
         if ($ENV{TEST_KILL_IMAPD}) { # not sure how reliable this test can be
diff --git a/t/nntpd.t b/t/nntpd.t
index 18aaccbe..16a2ab76 100644
--- a/t/nntpd.t
+++ b/t/nntpd.t
@@ -459,7 +459,7 @@ sub test_watch {
         $cfg->each_inbox(sub { shift->unsubscribe_unlock('ident') });
         $ii->close;
         PublicInbox::DS->Reset;
-        my @err = grep(!/^I:/, <$err>);
+        my @err = grep(!/^(?:I:|#)/, <$err>);
         is(@err, 0, 'no warnings/errors from -watch'.join(' ', @err));
         my @ls = xqx(['git', "--git-dir=$inboxdir", qw(ls-tree -r HEAD)]);
         isnt(scalar(@ls), 0, 'imported something');