From c1abb946e53e4179666ebb290e31c2d9ddc40711 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 15 Jun 2016 00:14:28 +0000 Subject: emergency: implement new emergency Maildir delivery This is transactional and hopefully safer in case we hit SIGSEGV or SIGKILL during processing, as the tmp/ copy will remain on the FS even if DESTROY/END handlers are not called. --- lib/PublicInbox/Emergency.pm | 96 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 lib/PublicInbox/Emergency.pm (limited to 'lib/PublicInbox/Emergency.pm') diff --git a/lib/PublicInbox/Emergency.pm b/lib/PublicInbox/Emergency.pm new file mode 100644 index 00000000..e402d30f --- /dev/null +++ b/lib/PublicInbox/Emergency.pm @@ -0,0 +1,96 @@ +# Copyright (C) 2016 all contributors +# License: AGPL-3.0+ +# +# Emergency Maildir delivery for MDA +package PublicInbox::Emergency; +use strict; +use warnings; +use Fcntl qw(:DEFAULT SEEK_SET); +use Sys::Hostname qw(hostname); +use IO::Handle; + +sub new { + my ($class, $dir) = @_; + + foreach (qw(new tmp cur)) { + my $d = "$dir/$_"; + next if -d $d; + require File::Path; + File::Path::mkpath($d); # croaks on fatal errors + } + bless { dir => $dir, files => {}, t => 0, cnt => 0 }, $class; +} + +sub _fn_in { + my ($self, $dir) = @_; + my @host = split(/\./, hostname); + my $now = time; + if ($self->{t} != $now) { + $self->{t} = $now; + $self->{cnt} = 0; + } else { + $self->{cnt}++; + } + + my $f; + do { + $f = "$self->{dir}/$dir/$self->{t}.$$"."_$self->{cnt}.$host[0]"; + $self->{cnt}++; + } while (-e $f); + $f; +} + +sub prepare { + my ($self, $strref) = @_; + + die "already in transaction: $self->{tmp}" if $self->{tmp}; + my ($tmp, $fh); + do { + $tmp = _fn_in($self, 'tmp'); + $! = undef; + } while (!sysopen($fh, $tmp, O_CREAT|O_EXCL|O_RDWR) && $!{EEXIST}); + print $fh $$strref or die "write failed: $!"; + $fh->flush or die "flush failed: $!"; + $fh->autoflush(1); + $self->{fh} = $fh; + $self->{tmp} = $tmp; +} + +sub abort { + my ($self) = @_; + delete $self->{fh}; + my $tmp = delete $self->{tmp} or return; + + unlink($tmp) or warn "Failed to unlink $tmp: $!"; + undef; +} + +sub fh { + my ($self) = @_; + my $fh = $self->{fh} or die "{fh} not open!\n"; + seek($fh, 0, SEEK_SET) or die "seek(fh) failed: $!"; + sysseek($fh, 0, SEEK_SET) or die "sysseek(fh) failed: $!"; + $fh; +} + +sub commit { + my ($self) = @_; + + delete $self->{fh}; + my $tmp = delete $self->{tmp} or return; + my $new; + do { + $new = _fn_in($self, 'new'); + } while (!link($tmp, $new) && $!{EEXIST}); + my @sn = stat($new) or die "stat $new failed: $!"; + my @st = stat($tmp) or die "stat $tmp failed: $!"; + if ($st[0] == $sn[0] && $st[1] == $sn[1]) { + unlink($tmp) or warn "Failed to unlink $tmp: $!"; + } else { + warn "stat($new) and stat($tmp) differ"; + } +} + +sub DESTROY { commit($_[0]) } + +1; -- cgit v1.2.3-24-ge0c7