diff options
-rw-r--r-- | MANIFEST | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LEI.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 16 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 1 | ||||
-rw-r--r-- | lib/PublicInbox/MboxLock.pm | 121 | ||||
-rw-r--r-- | t/lei-q-remote-import.t | 12 | ||||
-rw-r--r-- | t/mbox_lock.t | 90 |
7 files changed, 239 insertions, 5 deletions
@@ -201,6 +201,7 @@ lib/PublicInbox/MIME.pm lib/PublicInbox/ManifestJsGz.pm lib/PublicInbox/Mbox.pm lib/PublicInbox/MboxGz.pm +lib/PublicInbox/MboxLock.pm lib/PublicInbox/MboxReader.pm lib/PublicInbox/MdirReader.pm lib/PublicInbox/MiscIdx.pm @@ -383,6 +384,7 @@ t/lei_to_mail.t t/lei_xsearch.t t/linkify.t t/main-bin/spamc +t/mbox_lock.t t/mbox_reader.t t/mda-mime.eml t/mda.t diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 5cdaabc6..b5bdda21 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -112,7 +112,7 @@ our %CMD = ( # sorted in order of importance/use: save-as=s output|mfolder|o=s format|f=s dedupe|d=s threads|t+ augment|a sort|s=s reverse|r offset=i remote! local! external! pretty include|I=s@ exclude=s@ only=s@ jobs|j=s globoff|g stdin| - import-remote! + import-remote! lock=s@ alert=s@ mua=s no-torsocks torsocks=s verbose|v+ quiet|q C=s@), PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ], diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 630da67c..de640657 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -463,11 +463,19 @@ sub _pre_augment_mbox { my ($self, $lei) = @_; my $dst = $lei->{ovv}->{dst}; if ($dst ne '/dev/stdout') { - my $mode = -p $dst ? '>' : '+>>'; - if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) { - $! == ENOENT or die "unlink($dst): $!"; + my $out; + if (-p $dst) { + open $out, '>', $dst or die "open($dst): $!"; + } elsif (-f _ || !-e _) { + require PublicInbox::MboxLock; + my $m = $lei->{opt}->{'lock'} // + PublicInbox::MboxLock->defaults; + $self->{mbl} = PublicInbox::MboxLock->acq($dst, 1, $m); + $out = $self->{mbl}->{fh}; + if (!$lei->{opt}->{augment} and !truncate($out, 0)) { + die "truncate($dst): $!"; + } } - open my $out, $mode, $dst or die "open($dst): $!"; $lei->{old_1} = $lei->{1}; # keep for spawning MUA $lei->{1} = $out; } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index eb015978..7ec696f4 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -338,6 +338,7 @@ Error closing $lei->{ovv}->{dst}: $! $l2m->poke_dst; $lei->poke_mua; } else { # mbox users + delete $l2m->{mbl}; # drop dotlock $lei->start_mua; } } diff --git a/lib/PublicInbox/MboxLock.pm b/lib/PublicInbox/MboxLock.pm new file mode 100644 index 00000000..4e2a2d9a --- /dev/null +++ b/lib/PublicInbox/MboxLock.pm @@ -0,0 +1,121 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# Various mbox locking methods +package PublicInbox::MboxLock; +use strict; +use v5.10.1; +use PublicInbox::OnDestroy; +use Fcntl qw(:flock F_SETLK F_SETLKW F_RDLCK F_WRLCK + O_CREAT O_EXCL O_WRONLY SEEK_SET); +use Carp qw(croak); +use PublicInbox::DS qw(now); # ugh... + +our $TMPL = do { + if ($^O eq 'linux') { \'s @32' } + elsif ($^O =~ /bsd/) { \'@20 s @256' } # n.b. @32 may be enough... + else { eval { require File::FcntlLock; 1 } } +}; + +# This order matches Debian policy on Linux systems. +# See policy/ch-customized-programs.rst in +# https://salsa.debian.org/dbnpolicy/policy.git +sub defaults { [ qw(fcntl dotlock) ] } + +sub acq_fcntl { + my ($self) = @_; + my $op = $self->{nb} ? F_SETLK : F_SETLKW; + my $t = $self->{rw} ? F_WRLCK : F_RDLCK; + my $end = now + $self->{timeout}; + $TMPL or die <<EOF; +"struct flock" layout not available on $^O, install File::FcntlLock? +EOF + do { + if (ref $TMPL) { + return if fcntl($self->{fh}, $op, pack($$TMPL, $t)); + } else { + my $fl = File::FcntlLock->new; + $fl->l_type($t); + $fl->l_whence(SEEK_SET); + $fl->l_start(0); + $fl->l_len(0); + return if $fl->lock($self->{fh}, $op); + } + select(undef, undef, undef, $self->{delay}); + } while (now < $end); + croak "fcntl lock $self->{f}: $!"; +} + +sub acq_dotlock { + my ($self) = @_; + my $dot_lock = "$self->{f}.lock"; + my ($pfx, $base) = ($self->{f} =~ m!(\A.*?/)([^/]+)\z!); + $pfx //= ''; + my $pid = $$; + my $end = now + $self->{timeout}; + do { + my $tmp = "$pfx.$base-".sprintf('%x,%x,%x', + rand(0xffffffff), $pid, time); + if (sysopen(my $fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) { + if (link($tmp, $dot_lock)) { + unlink($tmp) or die "unlink($tmp): $!"; + $self->{".lock$pid"} = $dot_lock; + return; + } + unlink($tmp) or die "unlink($tmp): $!"; + select(undef, undef, undef, $self->{delay}); + } else { + croak "open $tmp (for $dot_lock): $!" if !$!{EXIST}; + } + } while (now < $end); + croak "dotlock $dot_lock"; +} + +sub acq_flock { + my ($self) = @_; + my $op = $self->{rw} ? LOCK_EX : LOCK_SH; + $op |= LOCK_NB if $self->{nb}; + my $end = now + $self->{timeout}; + do { + return if flock($self->{fh}, $op); + select(undef, undef, undef, $self->{delay}); + } while (now < $end); + croak "flock $self->{f}: $!"; +} + +sub acq { + my ($cls, $f, $rw, $methods) = @_; + my $fh; + unless (open $fh, $rw ? '+>>' : '<', $f) { + croak "open($f): $!" if $rw || !$!{ENOENT}; + } + my $self = bless { f => $f, fh => $fh, rw => $rw }, $cls; + my $m = "@$methods"; + if ($m ne 'none') { + my @m = map { + if (/\A(timeout|delay)=([0-9\.]+)s?\z/) { + $self->{$1} = $2 + 0; + (); + } else { + $cls->can("acq_$_") // $_ + } + } split(/[, ]/, $m); + my @bad = grep { !ref } @m; + croak "Unsupported lock methods: @bad\n" if @bad; + croak "No lock methods supplied with $m\n" if !@m; + $self->{nb} = $#m || defined($self->{timeout}); + $self->{delay} //= 0.1; + $self->{timeout} //= 5; + $_->($self) for @m; + } + $self; +} + +sub DESTROY { + my ($self) = @_; + if (my $f = $self->{".lock$$"}) { + unlink($f) or die "unlink($f): $! (lock stolen?)"; + } +} + +1; diff --git a/t/lei-q-remote-import.t b/t/lei-q-remote-import.t index f73524cf..4088b6ad 100644 --- a/t/lei-q-remote-import.t +++ b/t/lei-q-remote-import.t @@ -46,5 +46,17 @@ test_lei({ tmpdir => $tmpdir }, sub { unlink $o or BAIL_OUT $!; lei_ok(@cmd); ok(-f $o && !-s _, '--no-import-remote did not memoize'); + + open my $fh, '>', "$o.lock"; + $cmd[-1] = 'm:qp@example.com'; + unlink $o or BAIL_OUT $!; + lei_ok(@cmd, '--lock=none'); + ok(-f $o && -s _, '--lock=none respected'); + unlink $o or BAIL_OUT $!; + ok(!lei(@cmd, '--lock=dotlock,timeout=0.000001'), 'dotlock fails'); + ok(-f $o && !-s _, 'nothing output on lock failure'); + unlink "$o.lock" or BAIL_OUT $!; + lei_ok(@cmd, '--lock=dotlock,timeout=0.000001', + \'succeeds after lock removal'); }); done_testing; diff --git a/t/mbox_lock.t b/t/mbox_lock.t new file mode 100644 index 00000000..3dc3b449 --- /dev/null +++ b/t/mbox_lock.t @@ -0,0 +1,90 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +use POSIX qw(_exit); +use PublicInbox::DS qw(now); +use Errno qw(EAGAIN); +use_ok 'PublicInbox::MboxLock'; +my ($tmpdir, $for_destroy) = tmpdir(); +my $f = "$tmpdir/f"; +my $mbl = PublicInbox::MboxLock->acq($f, 1, ['dotlock']); +ok(-f "$f.lock", 'dotlock created'); +undef $mbl; +ok(!-f "$f.lock", 'dotlock gone'); +$mbl = PublicInbox::MboxLock->acq($f, 1, ['none']); +ok(!-f "$f.lock", 'no dotlock with none'); +undef $mbl; + +eval { + PublicInbox::MboxLock->acq($f, 1, ['bogus']); + fail "should not succeed with `bogus'"; +}; +ok($@, "fails on `bogus' lock method"); +eval { + PublicInbox::MboxLock->acq($f, 1, ['timeout=1']); + fail "should not succeed with only timeout"; +}; +ok($@, "fails with only `timeout=' and no lock method"); + +my $defaults = PublicInbox::MboxLock->defaults; +is(ref($defaults), 'ARRAY', 'default lock methods'); +my $test_rw_lock = sub { + my ($func) = @_; + my $m = ["$func,timeout=0.000001"]; + for my $i (1..2) { + pipe(my ($r, $w)) or BAIL_OUT "pipe: $!"; + my $t0 = now; + my $pid = fork // BAIL_OUT "fork $!"; + if ($pid == 0) { + eval { PublicInbox::MboxLock->acq($f, 1, $m) }; + my $err = $@; + syswrite $w, "E: $err"; + _exit($err ? 0 : 1); + } + undef $w; + waitpid($pid, 0); + is($?, 0, "$func r/w lock behaved as expected #$i"); + my $d = now - $t0; + ok($d < 1, "$func r/w timeout #$i") or diag "elapsed=$d"; + my $err = do { local $/; <$r> }; + $! = EAGAIN; + my $msg = "$!"; + like($err, qr/\Q$msg\E/, "got EAGAIN in child #$i"); + } +}; + +my $test_ro_lock = sub { + my ($func) = @_; + for my $i (1..2) { + my $t0 = now; + my $pid = fork // BAIL_OUT "fork $!"; + if ($pid == 0) { + eval { PublicInbox::MboxLock->acq($f, 0, [ $func ]) }; + _exit($@ ? 1 : 0); + } + waitpid($pid, 0); + is($?, 0, "$func ro lock behaved as expected #$i"); + my $d = now - $t0; + ok($d < 1, "$func timeout respected #$i") or diag "elapsed=$d"; + } +}; + +SKIP: { + grep(/fcntl/, @$defaults) or skip 'File::FcntlLock not available', 1; + my $top = PublicInbox::MboxLock->acq($f, 1, $defaults); + ok($top, 'fcntl lock acquired'); + $test_rw_lock->('fcntl'); + undef $top; + $top = PublicInbox::MboxLock->acq($f, 0, $defaults); + ok($top, 'fcntl read lock acquired'); + $test_ro_lock->('fcntl'); +} +$mbl = PublicInbox::MboxLock->acq($f, 1, ['flock']); +ok($mbl, 'flock acquired'); +$test_rw_lock->('flock'); +undef $mbl; +$mbl = PublicInbox::MboxLock->acq($f, 0, ['flock']); +$test_ro_lock->('flock'); + +done_testing; |