about summary refs log tree commit homepage
path: root/lib/PublicInbox/ProcessPipe.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-12-31 13:51:25 +0000
committerEric Wong <e@80x24.org>2021-01-01 05:00:38 +0000
commitda089825419835c37afbdf7b7c5f0448766bdb27 (patch)
tree83aab587e43e6d1ea1b2eb5b4b3b939feb216754 /lib/PublicInbox/ProcessPipe.pm
parenta7539312d51443c9a705e64b16ac4fdcd4b17a6e (diff)
downloadpublic-inbox-da089825419835c37afbdf7b7c5f0448766bdb27.tar.gz
We'll allow using multiple workers to write to a single
mbox (which could be compressed).  This is can be done
safely with O_APPEND + syswrite for uncompressed files,
and using a lock when piping to pigz/gzip/bzip2/xz.
Diffstat (limited to 'lib/PublicInbox/ProcessPipe.pm')
-rw-r--r--lib/PublicInbox/ProcessPipe.pm21
1 files changed, 16 insertions, 5 deletions
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index 2ce7eb8f..c9234f42 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -4,28 +4,39 @@
 # a tied handle for auto reaping of children tied to a pipe, see perltie(1)
 package PublicInbox::ProcessPipe;
 use strict;
-use warnings;
+use v5.10.1;
 
 sub TIEHANDLE {
-        my ($class, $pid, $fh) = @_;
-        bless { pid => $pid, fh => $fh }, $class;
+        my ($class, $pid, $fh, $cb, $arg) = @_;
+        bless { pid => $pid, fh => $fh, cb => $cb, arg => $arg }, $class;
 }
 
 sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) }
 
 sub READLINE { readline($_[0]->{fh}) }
 
+sub WRITE {
+        use bytes qw(length);
+        syswrite($_[0]->{fh}, $_[1], $_[2] // length($_[1]), $_[3] // 0);
+}
+
+sub PRINT {
+        my $self = shift;
+        print { $self->{fh} } @_;
+}
+
 sub CLOSE {
         my $fh = delete($_[0]->{fh});
         my $ret = defined $fh ? close($fh) : '';
-        my $pid = delete $_[0]->{pid};
+        my ($pid, $cb, $arg) = delete @{$_[0]}{qw(pid cb arg)};
         if (defined $pid) {
                 # PublicInbox::DS may not be loaded
-                eval { PublicInbox::DS::dwaitpid($pid, undef, undef) };
+                eval { PublicInbox::DS::dwaitpid($pid, $cb, $arg) };
 
                 if ($@) { # ok, not in the event loop, work synchronously
                         waitpid($pid, 0);
                         $ret = '' if $?;
+                        $cb->($arg, $pid) if $cb;
                 }
         }
         $ret;