From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF, T_SCC_BODY_TEXT_LINE shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 263CC1F4B8 for ; Tue, 30 Jan 2024 06:31:11 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1706596271; bh=XG2duiScm9zxXSrQwH1LIzN2yTtfXIot9R7J/8uWDIE=; h=From:To:Subject:Date:In-Reply-To:References:From; b=M0gDvbqt+5KODDiZKWOig+3oCVbeuHESKIKLNebxMuALwRzuXYlcNsita7H2ab04m IyFHZBWECtpfyKnP3hLsEk7Ll1GmuPAlCFUNg6JRCjwSoVHh5Eof9GyfADQKhbArF7 jXQjoiyPfLH809rS+7FSV2zA+ZJ7aQGeKbARoaRs= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 1/2] watch: support incremental updates from MH Date: Tue, 30 Jan 2024 06:31:09 +0000 Message-ID: <20240130063110.1116767-2-e@80x24.org> In-Reply-To: <20240130063110.1116767-1-e@80x24.org> References: <20240130063110.1116767-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: The good news (compared to lei) is we only have to worry about imports and don't care about the filename nor keywords, so it's immune to .mh_sequences writing inconsistencies across MH implementations and sequence number packing. We still assume the writer will write the mail file with one of: * rename(2) to create the final sequence number filename * a single write(2) if not relying on rename(2) mlmmj and mutt satisfy these requirements. Python's Lib/mailbox.py may, I'm not sure... --- Documentation/public-inbox-watch.pod | 16 ++-- MANIFEST | 1 + lib/PublicInbox/Watch.pm | 92 ++++++++++++++------ t/watch_maildir.t | 2 +- t/watch_mh.t | 120 +++++++++++++++++++++++++++ 5 files changed, 198 insertions(+), 33 deletions(-) create mode 100644 t/watch_mh.t diff --git a/Documentation/public-inbox-watch.pod b/Documentation/public-inbox-watch.pod index 6f812966..6e2142fe 100644 --- a/Documentation/public-inbox-watch.pod +++ b/Documentation/public-inbox-watch.pod @@ -48,9 +48,11 @@ of large Maildirs. Upon startup, it scans the mailbox for new messages to be imported while it was not running. -As of public-inbox 1.6.0, Maildirs, IMAP folders, and NNTP -newsgroups are supported. Previous versions of public-inbox -only supported Maildirs. +All versions of public-inbox-watch support Maildirs. public-inbox +1.6.0 added support for IMAP folders and NNTP newsgroups. +public-inbox 2.0 adds support for MH directories. There are no +plans to support the mbox family since new messages are expensive +to detect in large mboxes. public-inbox-watch should be run inside a L session or as a L service. Errors are emitted to stderr. @@ -84,12 +86,16 @@ C and C URLs: watch = nntp://news.example.com/inbox.test.group watch = imaps://user@mail.example.com/INBOX.test +2.0+ supports MH: + + watch = mh:/path/to/MH/inbox.test + This may be specified multiple times to combine several mailboxes into a single public-inbox. URLs requiring authentication will require L and/or L (preferred) to fill in the username and password. -public-inbox 2.0+ supports boolean C to prevent the global +public-inbox 2.0+ also supports boolean C to prevent the global L directive from writing to the inbox. Default: none @@ -127,7 +133,7 @@ Messages without the (S)een flag are not considered for hiding. This hiding affects all configured public-inboxes in PI_CONFIG. As with C, C and C URLs -are supported in public-inbox 1.6.0+. +are supported in public-inbox 1.6.0+, and C in 2.0+. Default: none; only for L users diff --git a/MANIFEST b/MANIFEST index 051cd6f9..2223cfb4 100644 --- a/MANIFEST +++ b/MANIFEST @@ -627,6 +627,7 @@ t/watch_filter_rubylang.t t/watch_imap.t t/watch_maildir.t t/watch_maildir_v2.t +t/watch_mh.t t/watch_multiple_headers.t t/www_altid.t t/www_listing.t diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index b83a77eb..1ec574ea 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -16,6 +16,7 @@ use PublicInbox::DS qw(now add_timer awaitpid); use PublicInbox::MID qw(mids); use PublicInbox::ContentHash qw(content_hash); use POSIX qw(_exit WNOHANG); +use constant { D_MAILDIR => 1, D_MH => 2 }; sub compile_watchheaders ($) { my ($ibx) = @_; @@ -40,9 +41,22 @@ sub compile_watchheaders ($) { $ibx->{-watchheaders} = $watch_hdrs if scalar @$watch_hdrs; } +sub d_type_set ($$$) { + my ($d_type, $dir, $is) = @_; + my $isnt = D_MAILDIR; + if ($is == D_MAILDIR) { + $isnt = D_MH; + $d_type->{"$dir/cur"} |= $is; + $d_type->{"$dir/new"} |= $is; + } + warn <{$dir} |= $is) & $isnt; +W: `$dir' is both Maildir and MH (non-fatal) +EOM +} + sub new { my ($class, $cfg) = @_; - my (%mdmap); + my (%d_map, %d_type); my (%imap, %nntp); # url => [inbox objects] or 'watchspam' my (@imap, @nntp); PublicInbox::Import::load_config($cfg); @@ -57,7 +71,11 @@ sub new { my $uri; if (is_maildir($dir)) { # skip "new", no MUA has seen it, yet. - $mdmap{"$dir/cur"} = 'watchspam'; + $d_map{"$dir/cur"} = 'watchspam'; + d_type_set \%d_type, $dir, D_MAILDIR; + } elsif (is_mh($dir)) { + $d_map{$dir} = 'watchspam'; + d_type_set \%d_type, $dir, D_MH; } elsif ($uri = imap_uri($dir)) { $imap{$$uri} = 'watchspam'; push @imap, $uri; @@ -69,7 +87,6 @@ sub new { } } } - my $k = 'publicinboxwatch.spamcheck'; my $default = undef; my $spamcheck = PublicInbox::Spamcheck::get($cfg, $k, $default); @@ -91,10 +108,17 @@ sub new { } elsif (is_maildir($watch)) { compile_watchheaders($ibx); my ($new, $cur) = ("$watch/new", "$watch/cur"); - my $cur_dst = $mdmap{$cur} //= []; + my $cur_dst = $d_map{$cur} //= []; return if is_watchspam($cur, $cur_dst, $ibx); - push @{$mdmap{$new} //= []}, $ibx; + push @{$d_map{$new} //= []}, $ibx; push @$cur_dst, $ibx; + d_type_set \%d_type, $watch, D_MAILDIR; + } elsif (is_mh($watch)) { + my $cur_dst = $d_map{$watch} //= []; + return if is_watchspam($watch, $cur_dst, $ibx); + compile_watchheaders($ibx); + push(@$cur_dst, $ibx); + d_type_set \%d_type, $watch, D_MH; } elsif ($uri = imap_uri($watch)) { my $cur_dst = $imap{$$uri} //= []; return if is_watchspam($uri, $cur_dst, $ibx); @@ -111,18 +135,19 @@ sub new { } }); - my $mdre; - if (scalar keys %mdmap) { - $mdre = join('|', map { quotemeta($_) } keys %mdmap); - $mdre = qr!\A($mdre)/!; + my $d_re; + if (scalar keys %d_map) { + $d_re = join('|', map quotemeta, keys %d_map); + $d_re = qr!\A($d_re)/!; } - return unless $mdre || scalar(keys %imap) || scalar(keys %nntp); + return unless $d_re || scalar(keys %imap) || scalar(keys %nntp); bless { max_batch => 10, # avoid hogging locks for too long spamcheck => $spamcheck, - mdmap => \%mdmap, - mdre => $mdre, + d_map => \%d_map, + d_re => $d_re, + d_type => \%d_type, pi_cfg => $cfg, imap => scalar keys %imap ? \%imap : undef, nntp => scalar keys %nntp? \%nntp : undef, @@ -220,17 +245,23 @@ sub import_eml ($$$) { sub _try_path { my ($self, $path) = @_; - my $fl = PublicInbox::MdirReader::maildir_path_flags($path) // return; - return if $fl =~ /[DT]/; # no Drafts or Trash - if ($path !~ $self->{mdre}) { - warn "unrecognized path: $path\n"; - return; - } - my $inboxes = $self->{mdmap}->{$1}; - unless ($inboxes) { - warn "unmappable dir: $1\n"; - return; - } + $path =~ $self->{d_re} or + return warn("BUG? unrecognized path: $path\n"); + my $dir = $1; + my $inboxes = $self->{d_map}->{$dir} // + return warn("W: unmappable dir: $dir\n"); + my ($md_fl, $mh_seq); + if ($self->{d_type}->{$dir} & D_MH) { + $path =~ m!/([0-9]+)\z! ? ($mh_seq = $1) : return; + } + $self->{d_type}->{$dir} & D_MAILDIR and + $md_fl = PublicInbox::MdirReader::maildir_path_flags($path); + $md_fl // $mh_seq // return; + return if ($md_fl // '') =~ /[DT]/; # no Drafts or Trash + # n.b. none of the MH keywords are relevant for public mail, + # mh_seq is only used to validate we're reading an email + # and not treating .mh_sequences as an email + my $warn_cb = $SIG{__WARN__} || \&CORE::warn; local $SIG{__WARN__} = sub { my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : ''; @@ -288,7 +319,7 @@ sub watch_fs_init ($) { require PublicInbox::DirIdle; # inotify_create + EPOLL_CTL_ADD my $dir_idle = $self->{dir_idle} = PublicInbox::DirIdle->new($cb); - $dir_idle->add_watches([keys %{$self->{mdmap}}]); + $dir_idle->add_watches([keys %{$self->{d_map}}]); } sub net_cb { # NetReader::(nntp|imap)_each callback @@ -437,7 +468,7 @@ sub event_step { }; die $@ if $@; } - fs_scan_step($self) if $self->{mdre}; + fs_scan_step($self) if $self->{d_re}; } sub watch_imap_fetch_all ($$) { @@ -541,7 +572,7 @@ sub watch { # main entry point # poll all URIs for a given interval sequentially add_timer(0, \&poll_fetch_fork, $self, $intvl, $uris); } - watch_fs_init($self) if $self->{mdre}; + watch_fs_init($self) if $self->{d_re}; local @PublicInbox::DS::post_loop_do = (\&quit_inprogress, $self); PublicInbox::DS::event_loop($first_sig); # calls ->event_step _done_for_now($self); @@ -572,7 +603,7 @@ sub fs_scan_step { $opendirs->{$dir} = $dh if $n < 0; } if ($op && $op eq 'full') { - foreach my $dir (keys %{$self->{mdmap}}) { + foreach my $dir (keys %{$self->{d_map}}) { next if $opendirs->{$dir}; # already in progress my $ok = opendir(my $dh, $dir); unless ($ok) { @@ -647,6 +678,13 @@ sub is_maildir { $_[0]; } +sub is_mh { + $_[0] =~ s!\Amh:!!i or return; + $_[0] =~ tr!/!/!s; + $_[0] =~ s!/\z!!; + $_[0]; +} + sub is_watchspam { my ($cur, $ws, $ibx) = @_; if ($ws && !ref($ws) && $ws eq 'watchspam') { diff --git a/t/watch_maildir.t b/t/watch_maildir.t index d7f01b1a..a12ceefd 100644 --- a/t/watch_maildir.t +++ b/t/watch_maildir.t @@ -46,7 +46,7 @@ my $sem = PublicInbox::Emergency->new($spamdir); # create dirs EOF my $wm = PublicInbox::Watch->new($cfg); is(scalar grep(/is a spam folder/, @w), 1, 'got warning about spam'); - is_deeply($wm->{mdmap}, { "$spamdir/cur" => 'watchspam' }, + is_deeply($wm->{d_map}, { "$spamdir/cur" => 'watchspam' }, 'only got the spam folder to watch'); } diff --git a/t/watch_mh.t b/t/watch_mh.t new file mode 100644 index 00000000..04793750 --- /dev/null +++ b/t/watch_mh.t @@ -0,0 +1,120 @@ +#!perl -w +# Copyright (C) all contributors +# License: AGPL-3.0+ +use v5.12; +use PublicInbox::Eml; +use PublicInbox::TestCommon; +use PublicInbox::Import; +use PublicInbox::IO qw(write_file); +use POSIX qw(mkfifo); +use File::Copy qw(cp); +use autodie qw(rename mkdir); + +my $tmpdir = tmpdir; +my $git_dir = "$tmpdir/test.git"; +my $mh = "$tmpdir/mh"; +my $spamdir = "$tmpdir/mh-spam"; +mkdir $_ for ($mh, $spamdir); +use_ok 'PublicInbox::Watch'; +my $addr = 'test-public@example.com'; +my $default_branch = PublicInbox::Import::default_branch; +PublicInbox::Import::init_bare($git_dir); +my $msg = < +Date: Sat, 18 Jun 2016 00:00:00 +0000 + +something +EOF + +cp 't/plack-qp.eml', "$mh/1"; +mkfifo("$mh/5", 0777) or xbail "mkfifo: $!"; # FIFO to ensure no stuckage +my $cfg = cfg_new $tmpdir, <new($cfg)->scan('full'); +my $git = PublicInbox::Git->new($git_dir); +{ + my @list = $git->qx('rev-list', $default_branch); + is(scalar @list, 1, 'one revision in rev-list'); + $git->cleanup; +} + +# end-to-end test which actually uses inotify/kevent +{ + my $env = { PI_CONFIG => $cfg->{-f} }; + # n.b. --no-scan is only intended for testing atm + my $wm = start_script([qw(-watch --no-scan)], $env); + no_pollerfd($wm->{pid}); + + my $eml = eml_load 't/data/binary.patch'; + $eml->header_set('Cc', $addr); + write_file '>', "$mh/2.tmp", $eml->as_string; + + use_ok 'PublicInbox::InboxIdle'; + use_ok 'PublicInbox::DS'; + my $delivered = 0; + my $cb = sub { + my ($ibx) = @_; + diag "message delivered to `$ibx->{name}'"; + $delivered++; + }; + PublicInbox::DS->Reset; + my $ii = PublicInbox::InboxIdle->new($cfg); + my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup'; + $cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) }); + local @PublicInbox::DS::post_loop_do = (sub { $delivered == 0 }); + + # wait for -watch to setup inotify watches + my $sleep = 1; + if (eval { require PublicInbox::Inotify } && -d "/proc/$wm->{pid}/fd") { + my $end = time + 2; + my (@ino, @ino_info); + do { + @ino = grep { + (readlink($_)//'') =~ /\binotify\b/ + } glob("/proc/$wm->{pid}/fd/*"); + } until (@ino || time > $end || !tick); + if (scalar(@ino) == 1) { + my $ino_fd = (split(m'/', $ino[0]))[-1]; + my $ino_fdinfo = "/proc/$wm->{pid}/fdinfo/$ino_fd"; + while (time < $end && open(my $fh, '<', $ino_fdinfo)) { + @ino_info = grep(/^inotify wd:/, <$fh>); + last if @ino_info >= 2; + tick; + } + $sleep = undef if @ino_info >= 2; + } + } + if ($sleep) { + diag "waiting ${sleep}s for -watch to start up"; + sleep $sleep; + } + rename "$mh/2.tmp", "$mh/2"; + diag 'waiting for -watch to import new message'; + PublicInbox::DS::event_loop(); + + my $subj = $eml->header_raw('Subject'); + my $head = $git->qx(qw(cat-file commit HEAD)); + like $head, qr/^\Q$subj\E/sm, 'new commit made'; + + $wm->kill; + $wm->join; + $ii->close; + PublicInbox::DS->Reset; +} + +my $is_mh = sub { PublicInbox::Watch::is_mh(my $val = shift) }; + +is $is_mh->('mh:/hello//world'), '/hello/world', 'extra slash gone'; +is $is_mh->('MH:/hello/world/'), '/hello/world', 'trailing slash gone'; +is $is_mh->('maildir:/hello/world/'), undef, 'non-MH rejected'; + +done_testing;