From b85909064e7209de45680150b5fb457736fa618d Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 27 Jun 2020 10:03:54 +0000 Subject: watch: add NNTP support This is similar to IMAP support, but only supports polling. Automatic altid support is not yet supported, yet; but may be in the future. v2: small grammar fix by Kyle Meyer Link: https://public-inbox.org/meta/87sgeg5nxf.fsf@kyleam.com/ --- lib/PublicInbox/WatchMaildir.pm | 357 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 321 insertions(+), 36 deletions(-) (limited to 'lib/PublicInbox/WatchMaildir.pm') diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index 535dadd5..4c8fac12 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -41,7 +41,7 @@ sub compile_watchheaders ($) { sub new { my ($class, $config) = @_; my (%mdmap, $spamc); - my %imap; # url => [inbox objects] or 'watchspam' + my (%imap, %nntp); # url => [inbox objects] or 'watchspam' # "publicinboxwatch" is the documented namespace # "publicinboxlearn" is legacy but may be supported @@ -51,11 +51,14 @@ sub new { defined(my $dirs = $config->{$k}) or next; $dirs = PublicInbox::Config::_array($dirs); for my $dir (@$dirs) { + my $url; if (is_maildir($dir)) { # skip "new", no MUA has seen it, yet. $mdmap{"$dir/cur"} = 'watchspam'; - } elsif (my $url = imap_url($dir)) { + } elsif ($url = imap_url($dir)) { $imap{$url} = 'watchspam'; + } elsif ($url = nntp_url($dir)) { + $nntp{$url} = 'watchspam'; } else { warn "unsupported $k=$dir\n"; } @@ -74,6 +77,7 @@ sub new { my $watches = $ibx->{watch} or return; $watches = PublicInbox::Config::_array($watches); for my $watch (@$watches) { + my $url; if (is_maildir($watch)) { compile_watchheaders($ibx); my ($new, $cur) = ("$watch/new", "$watch/cur"); @@ -81,10 +85,14 @@ sub new { return if is_watchspam($cur, $cur_dst, $ibx); push @{$mdmap{$new} //= []}, $ibx; push @$cur_dst, $ibx; - } elsif (my $url = imap_url($watch)) { + } elsif ($url = imap_url($watch)) { return if is_watchspam($url, $imap{$url}, $ibx); compile_watchheaders($ibx); push @{$imap{$url} ||= []}, $ibx; + } elsif ($url = nntp_url($watch)) { + return if is_watchspam($url, $nntp{$url}, $ibx); + compile_watchheaders($ibx); + push @{$nntp{$url} ||= []}, $ibx; } else { warn "watch unsupported: $k=$watch\n"; } @@ -96,13 +104,15 @@ sub new { $mdre = join('|', map { quotemeta($_) } keys %mdmap); $mdre = qr!\A($mdre)/!; } - return unless $mdre || scalar(keys %imap); + return unless $mdre || scalar(keys %imap) || scalar(keys %nntp); + bless { spamcheck => $spamcheck, mdmap => \%mdmap, mdre => $mdre, config => $config, imap => scalar keys %imap ? \%imap : undef, + nntp => scalar keys %nntp? \%nntp : undef, importers => {}, opendirs => {}, # dirname => dirhandle (in progress scans) ops => [], # 'quit', 'full' @@ -230,7 +240,7 @@ sub watch_fs_init ($) { # returns the git config section name, e.g [imap "imaps://user@example.com"] # without the mailbox, so we can share connections between different inboxes -sub imap_section ($) { +sub uri_section ($) { my ($uri) = @_; $uri->scheme . '://' . $uri->authority; } @@ -247,6 +257,14 @@ sub cfg_intvl ($$$) { } } +sub cfg_bool ($$$) { + my ($cfg, $key, $url) = @_; + my $orig = $cfg->urlmatch($key, $url) // return; + my $bool = PublicInbox::Config::_git_config_bool($orig); + warn "W: $key=$orig for $url is not boolean\n" unless defined($bool); + $bool; +} + # flesh out common IMAP-specific data structures sub imap_common_init ($) { my ($self) = @_; @@ -254,24 +272,17 @@ sub imap_common_init ($) { my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg for my $url (sort keys %{$self->{imap}}) { my $uri = PublicInbox::URIimap->new($url); - my $sec = imap_section($uri); - for my $f (qw(Starttls Debug Compress)) { - my $k = "imap.$f"; - my $orig = $cfg->urlmatch($k, $url) // next; - my $v = PublicInbox::Config::_git_config_bool($orig); - if (defined($v)) { - $mic_args->{$sec}->{$f} = $v; - } else { - warn "W: $k=$orig for $url is not boolean\n"; - } + my $sec = uri_section($uri); + for my $k (qw(Starttls Debug Compress)) { + my $bool = cfg_bool($cfg, "imap.$k", $url) // next; + $mic_args->{$sec}->{$k} = $bool; } my $to = cfg_intvl($cfg, 'imap.timeout', $url); $mic_args->{$sec}->{Timeout} = $to if $to; - $to = cfg_intvl($cfg, 'imap.pollInterval', $url); - $self->{imap_opt}->{$sec}->{poll_intvl} = $to if $to; - $to = cfg_intvl($cfg, 'imap.IdleInterval', $url); - $self->{imap_opt}->{$sec}->{idle_intvl} = $to if $to; - + for my $k (qw(pollInterval idleInterval)) { + $to = cfg_intvl($cfg, "imap.$k", $url) // next; + $self->{imap_opt}->{$sec}->{$k} = $to; + } my $k = 'imap.fetchBatchSize'; my $bs = $cfg->urlmatch($k, $url) // next; if ($bs =~ /\A([0-9]+)\z/) { @@ -295,7 +306,7 @@ sub mic_for ($$$) { # mic = Mail::IMAPClient username => $uri->user, password => $uri->password, }; - my $common = $mic_args->{imap_section($uri)} // {}; + my $common = $mic_args->{uri_section($uri)} // {}; my $host = $cred->{host}; my $mic_arg = { Port => $uri->port, @@ -331,7 +342,7 @@ sub mic_for ($$$) { # mic = Mail::IMAPClient } if ($mic->login && $mic->IsAuthenticated) { # success! keep IMAPClient->new arg in case we get disconnected - $self->{mic_arg}->{imap_section($uri)} = $mic_arg; + $self->{mic_arg}->{uri_section($uri)} = $mic_arg; } else { warn "E: <$url> LOGIN: $@\n"; $mic = undef; @@ -362,7 +373,7 @@ sub imap_import_msg ($$$$) { sub imap_fetch_all ($$$) { my ($self, $mic, $uri) = @_; - my $sec = imap_section($uri); + my $sec = uri_section($uri); my $mbx = $uri->mailbox; my $url = $uri->as_string; $mic->Clear(1); # trim results history @@ -479,7 +490,7 @@ sub imap_idle_once ($$$$) { # idles on a single URI sub watch_imap_idle_1 ($$$) { my ($self, $uri, $intvl) = @_; - my $sec = imap_section($uri); + my $sec = uri_section($uri); my $mic_arg = $self->{mic_arg}->{$sec} or die "BUG: no Mail::IMAPClient->new arg for $sec"; my $mic; @@ -555,7 +566,7 @@ sub event_step { sub watch_imap_fetch_all ($$) { my ($self, $uris) = @_; for my $uri (@$uris) { - my $sec = imap_section($uri); + my $sec = uri_section($uri); my $mic_arg = $self->{mic_arg}->{$sec} or die "BUG: no Mail::IMAPClient->new arg for $sec"; my $mic = PublicInbox::IMAPClient->new(%$mic_arg) or next; @@ -565,21 +576,56 @@ sub watch_imap_fetch_all ($$) { } } -sub imap_fetch_fork ($) { # DS::add_timer callback +sub watch_nntp_fetch_all ($$) { + my ($self, $uris) = @_; + for my $uri (@$uris) { + my $sec = uri_section($uri); + my $nn_arg = $self->{nn_arg}->{$sec} or + die "BUG: no Net::NNTP->new arg for $sec"; + my $nntp_opt = $self->{nntp_opt}->{$sec}; + my $url = $uri->as_string; + my $nn = nn_new($nn_arg, $nntp_opt, $url); + unless ($nn) { + warn "E: $url: \$!=$!\n"; + next; + } + last if $self->{quit}; + if (my $postconn = $nntp_opt->{-postconn}) { + for my $m_arg (@$postconn) { + my ($method, @args) = @$m_arg; + $nn->$method(@args) and next; + warn "E: <$url> $method failed\n"; + $nn = undef; + last; + } + } + last if $self->{quit}; + if ($nn) { + my $err = nntp_fetch_all($self, $nn, $uri); + warn $err, "\n" if $err; + } + } +} + +sub poll_fetch_fork ($) { # DS::add_timer callback my ($self, $intvl, $uris) = @{$_[0]}; return if $self->{quit}; watch_atfork_parent($self); defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { watch_atfork_child($self); - watch_imap_fetch_all($self, $uris); + if ($uris->[0]->scheme =~ /\Aimaps?\z/) { + watch_imap_fetch_all($self, $uris); + } else { + watch_nntp_fetch_all($self, $uris); + } _exit(0); } $self->{poll_pids}->{$pid} = [ $intvl, $uris ]; - PublicInbox::DS::dwaitpid($pid, \&imap_fetch_reap, $self); + PublicInbox::DS::dwaitpid($pid, \&poll_fetch_reap, $self); } -sub imap_fetch_reap { # PublicInbox::DS::dwaitpid callback +sub poll_fetch_reap { # PublicInbox::DS::dwaitpid callback my ($self, $pid) = @_; my $intvl_uris = delete $self->{poll_pids}->{$pid} or die "BUG: PID=$pid (unknown) reaped: \$?=$?\n"; @@ -590,7 +636,7 @@ sub imap_fetch_reap { # PublicInbox::DS::dwaitpid callback map { $_->as_string."\n" } @$uris; } warn('I: will check ', $_->as_string, " in ${intvl}s\n") for @$uris; - PublicInbox::DS::add_timer($intvl, \&imap_fetch_fork, + PublicInbox::DS::add_timer($intvl, \&poll_fetch_fork, [$self, $intvl, $uris]); } @@ -610,18 +656,18 @@ sub watch_imap_init ($) { my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj for my $url (sort keys %{$self->{imap}}) { my $uri = PublicInbox::URIimap->new($url); - $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args); + $mics->{uri_section($uri)} //= mic_for($self, $uri, $mic_args); } my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ] my $poll = {}; # intvl_seconds => [ uri1, uri2 ] for my $url (keys %{$self->{imap}}) { my $uri = PublicInbox::URIimap->new($url); - my $sec = imap_section($uri); + my $sec = uri_section($uri); my $mic = $mics->{$sec}; - my $intvl = $self->{imap_opt}->{$sec}->{poll_intvl}; + my $intvl = $self->{imap_opt}->{$sec}->{pollInterval}; if ($mic->has_capability('IDLE') && !$intvl) { - $intvl = $self->{imap_opt}->{$sec}->{idle_intvl}; + $intvl = $self->{imap_opt}->{$sec}->{idleInterval}; push @$idle, [ $uri, $intvl // () ]; } else { push @{$poll->{$intvl || 120}}, $uri; @@ -633,11 +679,238 @@ sub watch_imap_init ($) { PublicInbox::DS::requeue($self); # ->event_step to fork } return unless scalar keys %$poll; - $self->{poll_pids} = {}; + $self->{poll_pids} //= {}; + + # poll all URIs for a given interval sequentially + while (my ($intvl, $uris) = each %$poll) { + PublicInbox::DS::add_timer(0, \&poll_fetch_fork, + [$self, $intvl, $uris]); + } +} + +# flesh out common NNTP-specific data structures +sub nntp_common_init ($) { + my ($self) = @_; + my $cfg = $self->{config}; + my $nn_args = {}; # scheme://authority => Net::NNTP->new arg + for my $url (sort keys %{$self->{nntp}}) { + my $sec = uri_section(URI->new($url)); + + # Debug and Timeout are passed to Net::NNTP->new + my $v = cfg_bool($cfg, 'nntp.Debug', $url); + $nn_args->{$sec}->{Debug} = $v if defined $v; + my $to = cfg_intvl($cfg, 'nntp.Timeout', $url); + $nn_args->{$sec}->{Timeout} = $to if $to; + + # Net::NNTP post-connect commands + for my $k (qw(starttls compress)) { + $v = cfg_bool($cfg, "nntp.$k", $url) // next; + $self->{nntp_opt}->{$sec}->{$k} = $v; + } + + # internal option + for my $k (qw(pollInterval)) { + $to = cfg_intvl($cfg, "nntp.$k", $url) // next; + $self->{nntp_opt}->{$sec}->{$k} = $to; + } + } + $nn_args; +} + +# Net::NNTP doesn't support CAPABILITIES, yet +sub try_starttls ($) { + my ($host) = @_; + return if $host =~ /\.onion\z/s; + return if $host =~ /\A127\.[0-9]+\.[0-9]+\.[0-9]+\z/s; + return if $host eq '::1'; + 1; +} + +sub nn_new ($$$) { + my ($nn_arg, $nntp_opt, $url) = @_; + my $nn = Net::NNTP->new(%$nn_arg) or die "E: <$url> new: $!\n"; + + # default to using STARTTLS if it's available, but allow + # it to be disabled for localhost/VPN users + if (!$nn_arg->{SSL} && $nn->can('starttls')) { + if (!defined($nntp_opt->{starttls}) && + try_starttls($nn_arg->{Host})) { + # soft fail by default + $nn->starttls or warn <<""; +W: <$url> STARTTLS tried and failed (not requested) + + } elsif ($nntp_opt->{starttls}) { + # hard fail if explicitly configured + $nn->starttls or die <<""; +E: <$url> STARTTLS requested and failed + + } + } elsif ($nntp_opt->{starttls}) { + $nn->can('starttls') or + die "E: <$url> Net::NNTP too old for STARTTLS\n"; + $nn->starttls or die <<""; +E: <$url> STARTTLS requested and failed + + } + $nn; +} + +sub nn_for ($$$) { # nn = Net::NNTP + my ($self, $uri, $nn_args) = @_; + my $url = $uri->as_string; + my $sec = uri_section($uri); + my $nntp_opt = $self->{nntp_opt}->{$sec} //= {}; + my $cred; + my ($u, $p); + if (defined(my $ui = $uri->userinfo)) { + $cred = { + url => $sec, + protocol => $uri->scheme, + host => $uri->host, + }; + ($u, $p) = split(/:/, $ui, 2); + ($cred->{username}, $cred->{password}) = ($u, $p); + } + my $common = $nn_args->{$sec} // {}; + my $nn_arg = { + Port => $uri->port, + # Net::NNTP mishandles `0', so we pass `127.0.0.1' + Host => $uri->host eq '0' ? '127.0.0.1' : $uri->host, + SSL => $uri->secure, # snews == nntps + %$common, # may Debug .... + }; + my $nn = nn_new($nn_arg, $nntp_opt, $url); + + if ($cred) { + Git::credential($cred, 'fill'); # may prompt user here + if ($nn->authinfo($u, $p)) { + push @{$nntp_opt->{-postconn}}, [ 'authinfo', $u, $p ]; + } else { + warn "E: <$url> AUTHINFO $u XXXX failed\n"; + $nn = undef; + } + } + + if ($nntp_opt->{compress}) { + # https://rt.cpan.org/Ticket/Display.html?id=129967 + if ($nn->can('compress')) { + if ($nn->compress) { + push @{$nntp_opt->{-postconn}}, [ 'compress' ]; + } else { + warn "W: <$url> COMPRESS failed\n"; + } + } else { + delete $nntp_opt->{compress}; + warn <<""; +W: <$url> COMPRESS not supported by Net::NNTP +W: see https://rt.cpan.org/Ticket/Display.html?id=129967 for updates + + } + } + + $self->{nn_arg}->{$sec} = $nn_arg; + Git::credential($cred, $nn ? 'approve' : 'reject') if $cred; + $nn; +} + +sub nntp_fetch_all ($$$) { + my ($self, $nn, $uri) = @_; + my ($group, $num_a, $num_b) = $uri->group; + my $sec = uri_section($uri); + my $url = $uri->as_string; + my ($nr, $beg, $end) = $nn->group($group); + unless (defined($nr)) { + chomp(my $msg = $nn->message); + return "E: GROUP $group <$sec> $msg"; + } + + # IMAPTracker is also used for tracking NNTP, UID == article number + # LIST.ACTIVE can get the equivalent of UIDVALIDITY, but that's + # expensive. So we assume newsgroups don't change: + my $itrk = PublicInbox::IMAPTracker->new($url); + my (undef, $l_art) = $itrk->get_last; + $l_art //= $beg; # initial import + + # allow users to specify articles to refetch + # cf. https://tools.ietf.org/id/draft-gilman-news-url-01.txt + # nntp://example.com/inbox.foo/$num_a-$num_b + $l_art = $num_a if defined($num_a) && $num_a < $l_art; + $end = $num_b if defined($num_b) && $num_b < $end; + + return if $l_art >= $end; # nothing to do + $beg = $l_art + 1; + + warn "I: $url fetching ARTICLE $beg..$end\n"; + my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; + my ($err, $art); + local $SIG{__WARN__} = sub { + $warn_cb->("$url ", $art ? ("ARTICLE $art") : (), "\n", @_); + }; + my $inboxes = $self->{nntp}->{$url}; + my $last_art; + for ($beg..$end) { + last if $self->{quit}; + $art = $_; + my $raw = $nn->article($art); + unless (defined($raw)) { + my $msg = $nn->message; + if ($nn->code == 421) { # pseudo response from Net::Cmd + $err = "E: $msg"; + last; + } else { # probably just a deleted message (spam) + warn "W: $msg"; + next; + } + } + s/\r\n/\n/ for @$raw; + $raw = join('', @$raw); + if (ref($inboxes)) { + for my $ibx (@$inboxes) { + my $eml = PublicInbox::Eml->new($raw); + import_eml($self, $ibx, $eml); + } + } elsif ($inboxes eq 'watchspam') { + my $eml = PublicInbox::Eml->new(\$raw); + my $arg = [ $self, $eml, "$url ARTICLE $art" ]; + $self->{config}->each_inbox(\&remove_eml_i, $arg); + } else { + die "BUG: destination unknown $inboxes"; + } + $last_art = $art; + } + $itrk->update_last(0, $last_art) if defined $last_art; + _done_for_now($self); + $err; +} + +sub watch_nntp_init ($) { + my ($self) = @_; + eval { require Net::NNTP } or + die "Net::NNTP is required for NNTP:\n$@\n"; + eval { require Git } or + die "Git (Perl module) is required for NNTP:\n$@\n"; + eval { require PublicInbox::IMAPTracker } or + die "DBD::SQLite is required for NNTP\n:$@\n"; + + my $nn_args = nntp_common_init($self); # read args from config + + # make sure we can connect and cache the credentials in memory + $self->{nn_arg} = {}; # schema://authority => Net::NNTP->new args + for my $url (sort keys %{$self->{nntp}}) { + nn_for($self, URI->new($url), $nn_args); + } + my $poll = {}; # intvl_seconds => [ uri1, uri2 ] + for my $url (keys %{$self->{nntp}}) { + my $uri = URI->new($url); + my $sec = uri_section($uri); + my $intvl = $self->{nntp_opt}->{$sec}->{pollInterval}; + push @{$poll->{$intvl || 120}}, $uri; + } + $self->{poll_pids} //= {}; # poll all URIs for a given interval sequentially while (my ($intvl, $uris) = each %$poll) { - PublicInbox::DS::add_timer(0, \&imap_fetch_fork, + PublicInbox::DS::add_timer(0, \&poll_fetch_fork, [$self, $intvl, $uris]); } } @@ -647,6 +920,7 @@ sub watch { $self->{oldset} = $oldset; $self->{sig} = $sig; watch_imap_init($self) if $self->{imap}; + watch_nntp_init($self) if $self->{nntp}; watch_fs_init($self) if $self->{mdre}; PublicInbox::DS->SetPostLoopCallback(sub {}); PublicInbox::DS->EventLoop until $self->{quit}; @@ -754,4 +1028,15 @@ sub imap_url { $uri ? $uri->canonical->as_string : undef; } +my %IS_NNTP = (news => 1, snews => 1, nntp => 1); +sub nntp_url { + my ($url) = @_; + require URI; + # URI::snews exists, URI::nntps does not, so use URI::snews + $url =~ s!\Anntps://!snews://!i; + my $uri = URI->new($url); + return unless $uri && $IS_NNTP{$uri->scheme}; + $uri->group ? $uri->canonical->as_string : undef; +} + 1; -- cgit v1.2.3-24-ge0c7