about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--MANIFEST3
-rw-r--r--lib/PublicInbox/Daemon.pm6
-rw-r--r--lib/PublicInbox/LeiDaemon.pm303
-rwxr-xr-xscript/lei58
-rw-r--r--t/lei.t80
5 files changed, 448 insertions, 2 deletions
diff --git a/MANIFEST b/MANIFEST
index ac442606..7536b7c2 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -159,6 +159,7 @@ lib/PublicInbox/InboxIdle.pm
 lib/PublicInbox/InboxWritable.pm
 lib/PublicInbox/Isearch.pm
 lib/PublicInbox/KQNotify.pm
+lib/PublicInbox/LeiDaemon.pm
 lib/PublicInbox/Linkify.pm
 lib/PublicInbox/Listener.pm
 lib/PublicInbox/Lock.pm
@@ -226,6 +227,7 @@ sa_config/Makefile
 sa_config/README
 sa_config/root/etc/spamassassin/public-inbox.pre
 sa_config/user/.spamassassin/user_prefs
+script/lei
 script/public-inbox-compact
 script/public-inbox-convert
 script/public-inbox-edit
@@ -316,6 +318,7 @@ t/indexlevels-mirror.t
 t/init.t
 t/iso-2202-jp.eml
 t/kqnotify.t
+t/lei.t
 t/linkify.t
 t/main-bin/spamc
 t/mda-mime.eml
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index a2171535..6b92b60d 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -1,7 +1,9 @@
 # Copyright (C) 2015-2020 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# contains common daemon code for the httpd, imapd, and nntpd servers.
-# This may be used for read-only IMAP server if we decide to implement it.
+#
+# Contains common daemon code for the httpd, imapd, and nntpd servers
+# and designed for handling thousands of untrusted clients over slow
+# and/or lossy connections.
 package PublicInbox::Daemon;
 use strict;
 use warnings;
diff --git a/lib/PublicInbox/LeiDaemon.pm b/lib/PublicInbox/LeiDaemon.pm
new file mode 100644
index 00000000..ae40b3a6
--- /dev/null
+++ b/lib/PublicInbox/LeiDaemon.pm
@@ -0,0 +1,303 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Backend for `lei' (local email interface).  Unlike the C10K-oriented
+# PublicInbox::Daemon, this is designed exclusively to handle trusted
+# local clients with read/write access to the FS and use as many
+# system resources as the local user has access to.
+package PublicInbox::LeiDaemon;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use Getopt::Long ();
+use Errno qw(EAGAIN ECONNREFUSED ENOENT);
+use POSIX qw(setsid);
+use IO::Socket::UNIX;
+use IO::Handle ();
+use Sys::Syslog qw(syslog openlog);
+use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
+use PublicInbox::Sigfd;
+use PublicInbox::DS qw(now);
+use PublicInbox::Spawn qw(spawn);
+our $quit = sub { exit(shift // 0) };
+my $glp = Getopt::Long::Parser->new;
+$glp->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
+
+sub x_it ($$) { # pronounced "exit"
+        my ($client, $code) = @_;
+        if (my $sig = ($code & 127)) {
+                kill($sig, $client->{pid} // $$);
+        } else {
+                $code >>= 8;
+                if (my $sock = $client->{sock}) {
+                        say $sock "exit=$code";
+                } else { # for oneshot
+                        $quit->($code);
+                }
+        }
+}
+
+sub emit ($$$) {
+        my ($client, $channel, $buf) = @_;
+        print { $client->{$channel} } $buf or warn "print FD[$channel]: $!";
+}
+
+sub fail ($$;$) {
+        my ($client, $buf, $exit_code) = @_;
+        $buf .= "\n" unless $buf =~ /\n\z/s;
+        emit($client, 2, $buf);
+        x_it($client, ($exit_code // 1) << 8);
+        undef;
+}
+
+sub _help ($;$) {
+        my ($client, $channel) = @_;
+        emit($client, $channel //= 1, <<EOF);
+usage: lei COMMAND [OPTIONS]
+
+...
+EOF
+        x_it($client, $channel == 2 ? 1 << 8 : 0); # stderr => failure
+}
+
+sub assert_args ($$$;$@) {
+        my ($client, $argv, $proto, $opt, @spec) = @_;
+        $opt //= {};
+        push @spec, qw(help|h);
+        $glp->getoptionsfromarray($argv, $opt, @spec) or
+                return fail($client, 'bad arguments or options');
+        if ($opt->{help}) {
+                _help($client);
+                undef;
+        } else {
+                my ($nreq, $rest) = split(/;/, $proto);
+                $nreq = (($nreq // '') =~ tr/$/$/);
+                my $argc = scalar(@$argv);
+                my $tot = ($rest // '') eq '@' ? $argc : ($proto =~ tr/$/$/);
+                return 1 if $argc <= $tot && $argc >= $nreq;
+                _help($client, 2);
+                undef
+        }
+}
+
+sub dispatch {
+        my ($client, $cmd, @argv) = @_;
+        local $SIG{__WARN__} = sub { emit($client, 2, "@_") };
+        local $SIG{__DIE__} = 'DEFAULT';
+        if (defined $cmd) {
+                my $func = "lei_$cmd";
+                $func =~ tr/-/_/;
+                if (my $cb = __PACKAGE__->can($func)) {
+                        $client->{cmd} = $cmd;
+                        $cb->($client, \@argv);
+                } elsif (grep(/\A-/, $cmd, @argv)) {
+                        assert_args($client, [ $cmd, @argv ], '');
+                } else {
+                        fail($client, "`$cmd' is not an lei command");
+                }
+        } else {
+                _help($client, 2);
+        }
+}
+
+sub lei_daemon_pid {
+        my ($client, $argv) = @_;
+        assert_args($client, $argv, '') and emit($client, 1, "$$\n");
+}
+
+sub lei_DBG_pwd {
+        my ($client, $argv) = @_;
+        assert_args($client, $argv, '') and
+                emit($client, 1, "$client->{env}->{PWD}\n");
+}
+
+sub lei_DBG_cwd {
+        my ($client, $argv) = @_;
+        require Cwd;
+        assert_args($client, $argv, '') and emit($client, 1, Cwd::cwd()."\n");
+}
+
+sub lei_DBG_false { x_it($_[0], 1 << 8) }
+
+sub lei_daemon_stop {
+        my ($client, $argv) = @_;
+        assert_args($client, $argv, '') and $quit->(0);
+}
+
+sub lei_help { _help($_[0]) }
+
+sub reap_exec { # dwaitpid callback
+        my ($client, $pid) = @_;
+        x_it($client, $?);
+}
+
+sub lei_git { # support passing through random git commands
+        my ($client, $argv) = @_;
+        my %opt = map { $_ => $client->{$_} } (0..2);
+        my $pid = spawn(['git', @$argv], $client->{env}, \%opt);
+        PublicInbox::DS::dwaitpid($pid, \&reap_exec, $client);
+}
+
+sub accept_dispatch { # Listener {post_accept} callback
+        my ($sock) = @_; # ignore other
+        $sock->blocking(1);
+        $sock->autoflush(1);
+        my $client = { sock => $sock };
+        vec(my $rin = '', fileno($sock), 1) = 1;
+        # `say $sock' triggers "die" in lei(1)
+        for my $i (0..2) {
+                if (select(my $rout = $rin, undef, undef, 1)) {
+                        my $fd = IO::FDPass::recv(fileno($sock));
+                        if ($fd >= 0) {
+                                my $rdr = ($fd == 0 ? '<&=' : '>&=');
+                                if (open(my $fh, $rdr, $fd)) {
+                                        $client->{$i} = $fh;
+                                } else {
+                                        say $sock "open($rdr$fd) (FD=$i): $!";
+                                        return;
+                                }
+                        } else {
+                                say $sock "recv FD=$i: $!";
+                                return;
+                        }
+                } else {
+                        say $sock "timed out waiting to recv FD=$i";
+                        return;
+                }
+        }
+        # $ARGV_STR = join("]\0[", @ARGV);
+        # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV);
+        # $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0";
+        my ($client_pid, $argv, $env) = do {
+                local $/ = "\0\0\0"; # yes, 3 NULs at EOL, not 2
+                chomp(my $line = <$sock>);
+                split(/\0\0>/, $line, 3);
+        };
+        my %env = map { split(/=/, $_, 2) } split(/\0/, $env);
+        if (chdir($env{PWD})) {
+                $client->{env} = \%env;
+                $client->{pid} = $client_pid;
+                eval { dispatch($client, split(/\]\0\[/, $argv)) };
+                say $sock $@ if $@;
+        } else {
+                say $sock "chdir($env{PWD}): $!"; # implicit close
+        }
+}
+
+sub noop {}
+
+# lei(1) calls this when it can't connect
+sub lazy_start ($$) {
+        my ($path, $err) = @_;
+        if ($err == ECONNREFUSED) {
+                unlink($path) or die "unlink($path): $!";
+        } elsif ($err != ENOENT) {
+                die "connect($path): $!";
+        }
+        my $umask = umask(077) // die("umask(077): $!");
+        my $l = IO::Socket::UNIX->new(Local => $path,
+                                        Listen => 1024,
+                                        Type => SOCK_STREAM) or
+                $err = $!;
+        umask($umask) or die("umask(restore): $!");
+        $l or return $err;
+        my @st = stat($path) or die "stat($path): $!";
+        my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
+        pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
+        my $oldset = PublicInbox::Sigfd::block_signals();
+        my $pid = fork // die "fork: $!";
+        if ($pid) {
+                PublicInbox::Sigfd::sig_setmask($oldset);
+                return; # client will connect to $path
+        }
+        openlog($path, 'pid', 'user');
+        local $SIG{__DIE__} = sub {
+                syslog('crit', "@_");
+                exit $! if $!;
+                exit $? >> 8 if $? >> 8;
+                exit 255;
+        };
+        local $SIG{__WARN__} = sub { syslog('warning', "@_") };
+        open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!\n";
+        open STDOUT, '>&STDIN' or die "redirect stdout failed: $!\n";
+        open STDERR, '>&STDIN' or die "redirect stderr failed: $!\n";
+        setsid();
+        $pid = fork // die "fork: $!";
+        exit if $pid;
+        $0 = "lei-daemon $path";
+        require PublicInbox::Listener;
+        require PublicInbox::EOFpipe;
+        $l->blocking(0);
+        $eof_w->blocking(0);
+        $eof_r->blocking(0);
+        my $listener = PublicInbox::Listener->new($l, \&accept_dispatch, $l);
+        my $exit_code;
+        local $quit = sub {
+                $exit_code //= shift;
+                my $tmp = $listener or exit($exit_code);
+                unlink($path) if defined($path);
+                syswrite($eof_w, '.');
+                $l = $listener = $path = undef;
+                $tmp->close if $tmp; # DS::close
+                PublicInbox::DS->SetLoopTimeout(1000);
+        };
+        PublicInbox::EOFpipe->new($eof_r, sub {}, undef);
+        my $sig = {
+                CHLD => \&PublicInbox::DS::enqueue_reap,
+                QUIT => $quit,
+                INT => $quit,
+                TERM => $quit,
+                HUP => \&noop,
+                USR1 => \&noop,
+                USR2 => \&noop,
+        };
+        my $sigfd = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK);
+        local %SIG = (%SIG, %$sig) if !$sigfd;
+        if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
+                PublicInbox::DS->SetLoopTimeout(5000);
+        } else {
+                # wake up every second to accept signals if we don't
+                # have signalfd or IO::KQueue:
+                PublicInbox::Sigfd::sig_setmask($oldset);
+                PublicInbox::DS->SetLoopTimeout(1000);
+        }
+        PublicInbox::DS->SetPostLoopCallback(sub {
+                my ($dmap, undef) = @_;
+                if (@st = defined($path) ? stat($path) : ()) {
+                        if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
+                                warn "$path dev/ino changed, quitting\n";
+                                $path = undef;
+                        }
+                } elsif (defined($path)) {
+                        warn "stat($path): $!, quitting ...\n";
+                        undef $path; # don't unlink
+                        $quit->();
+                }
+                return 1 if defined($path);
+                my $now = now();
+                my $n = 0;
+                for my $s (values %$dmap) {
+                        $s->can('busy') or next;
+                        if ($s->busy($now)) {
+                                ++$n;
+                        } else {
+                                $s->close;
+                        }
+                }
+                $n; # true: continue, false: stop
+        });
+        PublicInbox::DS->EventLoop;
+        exit($exit_code // 0);
+}
+
+# for users w/o IO::FDPass
+sub oneshot {
+        dispatch({
+                0 => *STDIN{IO},
+                1 => *STDOUT{IO},
+                2 => *STDERR{IO},
+                env => \%ENV
+        }, @ARGV);
+}
+
+1;
diff --git a/script/lei b/script/lei
new file mode 100755
index 00000000..1b5af3a1
--- /dev/null
+++ b/script/lei
@@ -0,0 +1,58 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Cwd qw(cwd);
+use IO::Socket::UNIX;
+
+if (eval { require IO::FDPass; 1 }) { # use daemon to reduce load time
+        my $path = do {
+                my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei';
+                if ($runtime_dir eq '/lei') {
+                        require File::Spec;
+                        $runtime_dir = File::Spec->tmpdir."/lei-$<";
+                }
+                unless (-d $runtime_dir && -w _) {
+                        require File::Path;
+                        File::Path::mkpath($runtime_dir, 0, 0700);
+                }
+                "$runtime_dir/sock";
+        };
+        my $sock = IO::Socket::UNIX->new(Peer => $path, Type => SOCK_STREAM);
+        unless ($sock) { # start the daemon if not started
+                my $err = $!;
+                require PublicInbox::LeiDaemon;
+                $err = PublicInbox::LeiDaemon::lazy_start($path, $err);
+                # try connecting again anyways, unlink+bind may be racy
+                $sock = IO::Socket::UNIX->new(Peer => $path,
+                                                Type => SOCK_STREAM) // die
+                        "connect($path): $! (bind($path): $err)";
+        }
+        my $pwd = $ENV{PWD};
+        my $cwd = cwd();
+        if ($pwd) { # prefer ENV{PWD} if it's a symlink to real cwd
+                my @st_cwd = stat($cwd) or die "stat(cwd=$cwd): $!\n";
+                my @st_pwd = stat($pwd);
+                # make sure st_dev/st_ino match for {PWD} to be valid
+                $pwd = $cwd if (!@st_pwd || $st_pwd[1] != $st_cwd[1] ||
+                                        $st_pwd[0] != $st_cwd[0]);
+        } else {
+                $pwd = $cwd;
+        }
+        local $ENV{PWD} = $pwd;
+        $sock->autoflush(1);
+        IO::FDPass::send(fileno($sock), $_) for (0..2);
+        my $buf = "$$\0\0>" . join("]\0[", @ARGV) . "\0\0>";
+        while (my ($k, $v) = each %ENV) { $buf .= "$k=$v\0" }
+        $buf .= "\0\0";
+        print $sock $buf or die "print(sock, buf): $!";
+        local $/ = "\n";
+        while (my $line = <$sock>) {
+                $line =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0);
+                die $line;
+        }
+} else { # for systems lacking IO::FDPass
+        require PublicInbox::LeiDaemon;
+        PublicInbox::LeiDaemon::oneshot();
+}
diff --git a/t/lei.t b/t/lei.t
new file mode 100644
index 00000000..feee9270
--- /dev/null
+++ b/t/lei.t
@@ -0,0 +1,80 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use PublicInbox::Config;
+my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing';
+require_mods(qw(DBD::SQLite Search::Xapian));
+my ($home, $for_destroy) = tmpdir();
+my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') };
+
+SKIP: {
+        require_mods('IO::FDPass', 51);
+        local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
+        mkdir "$home/xdg_run", 0700 or BAIL_OUT "mkdir: $!";
+        my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/sock";
+
+        ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid');
+        is($err, '', 'no error from daemon-pid');
+        like($out, qr/\A[0-9]+\n\z/s, 'pid returned') or BAIL_OUT;
+        chomp(my $pid = $out);
+        ok(kill(0, $pid), 'pid is valid');
+        ok(-S $sock, 'sock created');
+
+        ok(!run_script([qw(lei)], undef, $opt), 'no args fails');
+        is($? >> 8, 1, '$? is 1');
+        is($out, '', 'nothing in stdout');
+        like($err, qr/^usage:/sm, 'usage in stderr');
+
+        for my $arg (['-h'], ['--help'], ['help'], [qw(daemon-pid --help)]) {
+                $out = $err = '';
+                ok(run_script(['lei', @$arg], undef, $opt), "lei @$arg");
+                like($out, qr/^usage:/sm, "usage in stdout (@$arg)");
+                is($err, '', "nothing in stderr (@$arg)");
+        }
+
+        ok(!run_script([qw(lei DBG-false)], undef, $opt), 'false(1) emulation');
+        is($? >> 8, 1, '$? set correctly');
+        is($err, '', 'no error from false(1) emulation');
+
+        for my $arg ([''], ['--halp'], ['halp'], [qw(daemon-pid --halp)]) {
+                $out = $err = '';
+                ok(!run_script(['lei', @$arg], undef, $opt), "lei @$arg");
+                is($? >> 8, 1, '$? set correctly');
+                isnt($err, '', 'something in stderr');
+                is($out, '', 'nothing in stdout');
+        }
+
+        $out = '';
+        ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid');
+        chomp(my $pid_again = $out);
+        is($pid, $pid_again, 'daemon-pid idempotent');
+
+        ok(run_script([qw(lei daemon-stop)], undef, $opt), 'daemon-stop');
+        is($out, '', 'no output from daemon-stop');
+        is($err, '', 'no error from daemon-stop');
+        for (0..100) {
+                kill(0, $pid) or last;
+                tick();
+        }
+        ok(!-S $sock, 'sock gone');
+        ok(!kill(0, $pid), 'pid gone after stop');
+
+        ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid');
+        chomp(my $new_pid = $out);
+        ok(kill(0, $new_pid), 'new pid is running');
+        ok(-S $sock, 'sock exists again');
+        unlink $sock or BAIL_OUT "unlink $!";
+        for (0..100) {
+                kill('CHLD', $new_pid) or last;
+                tick();
+        }
+        ok(!kill(0, $new_pid), 'daemon exits after unlink');
+};
+
+require_ok 'PublicInbox::LeiDaemon';
+
+done_testing;