From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 11DB01FA12 for ; Sat, 23 Jan 2021 10:27:56 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 02/10] lei: support remote externals Date: Sat, 23 Jan 2021 10:27:47 +0000 Message-Id: <20210123102755.425-3-e@80x24.org> In-Reply-To: <20210123102755.425-1-e@80x24.org> References: <20210123102755.425-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Via curl(1), since that lets us easily use tor on a per-connection basis via LD_PRELOAD (torsocks) or proxy. We'll eventually support more curl options which can allow users to get past firewalls and deal with other odd network configurations. --- lib/PublicInbox/LEI.pm | 19 ++++++++++-- lib/PublicInbox/LeiOverview.pm | 10 +++++- lib/PublicInbox/LeiToMail.pm | 20 +++++++----- lib/PublicInbox/LeiXSearch.pm | 57 +++++++++++++++++++++++++++++++++- lib/PublicInbox/ProcessPipe.pm | 2 ++ script/lei | 2 ++ t/lei.t | 39 +++++++++++++++++++++++ 7 files changed, 137 insertions(+), 12 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index ef3f90fc..f6bc920d 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -84,6 +84,7 @@ our %CMD = ( # sorted in order of importance/use: 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw( save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a sort|s=s reverse|r offset=i remote local! external! pretty mua-cmd=s + verbose|v since|after=s until|before=s), opt_dash('limit|n=i', '[0-9]+') ], 'show' => [ 'MID|OID', 'show a given object (Message-ID or object ID)', @@ -278,6 +279,16 @@ sub fail ($$;$) { undef; } +sub child_error { # passes non-fatal curl exit codes to user + my ($self, $child_error) = @_; # child_error is $? + if (my $sock = $self->{sock}) { # send to lei(1) client + send($sock, "child_error $child_error", MSG_EOR); + } else { # oneshot + $self->{child_error} = $child_error; + } + undef; +} + sub atfork_prepare_wq { my ($self, $wq) = @_; my $tcafc = $wq->{-ipc_atfork_child_close} //= [ $listener // () ]; @@ -959,19 +970,21 @@ sub lazy_start { exit($exit_code // 0); } -# for users w/o Socket::Msghdr +# for users w/o Socket::Msghdr installed or Inline::C enabled sub oneshot { my ($main_pkg) = @_; my $exit = $main_pkg->can('exit'); # caller may override exit() local $quit = $exit if $exit; local %PATH2CFG; umask(077) // die("umask(077): $!"); - dispatch((bless { + my $self = bless { 0 => *STDIN{GLOB}, 1 => *STDOUT{GLOB}, 2 => *STDERR{GLOB}, env => \%ENV - }, __PACKAGE__), @ARGV); + }, __PACKAGE__; + dispatch($self, @ARGV); + x_it($self, $self->{child_error}) if $self->{child_error}; } # ensures stdout hits the FS before sock disconnects so a client diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index 7a4fa857..49538a60 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -209,7 +209,15 @@ sub ovv_each_smsg_cb { # runs in wq worker usually $json->ascii(1) if $lei->{opt}->{ascii}; } my $l2m = $lei->{l2m}; - if ($l2m && $l2m->{-wq_s1}) { + if ($l2m && $ibxish->can('scheme')) { # remote https?:// mboxrd + delete $l2m->{-wq_s1}; + my $g2m = $l2m->can('git_to_mail'); + my $wcb = $l2m->write_cb($lei); + sub { + my ($smsg, undef, $eml) = @_; # no mitem in $_[1] + $wcb->(undef, $smsg, $eml); + }; + } elsif ($l2m && $l2m->{-wq_s1}) { my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m); # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout, # $io[4] becomes a notification pipe that triggers EOF diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index cea68319..43c59da0 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -251,9 +251,9 @@ sub _mbox_write_cb ($$) { my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe; sub { # for git_to_mail - my ($buf, $smsg) = @_; + my ($buf, $smsg, $eml) = @_; return unless $out; - my $eml = PublicInbox::Eml->new($buf); + $eml //= PublicInbox::Eml->new($buf); if (!$dedupe->is_dup($eml, $smsg->{blob})) { $buf = $eml2mbox->($eml, $smsg); my $lk = $ovv->lock_for_scope; @@ -286,18 +286,23 @@ sub _augment_file { # _maildir_each_file cb # _maildir_each_file callback, \&CORE::unlink doesn't work with it sub _unlink { unlink($_[0]) } +sub _rand () { + state $seq = 0; + sprintf('%x,%x,%x,%x', rand(0xffffffff), time, $$, ++$seq); +} + sub _buf2maildir { my ($dst, $buf, $smsg) = @_; my $kw = $smsg->{kw} // []; my $sfx = join('', sort(map { $kw2char{$_} // () } @$kw)); my $rand = ''; # chosen by die roll :P my ($tmp, $fh, $final); - my $common = $smsg->{blob}; + my $common = $smsg->{blob} // _rand; if (defined(my $pct = $smsg->{pct})) { $common .= "=$pct" } do { $tmp = $dst.'tmp/'.$rand.$common; } while (!sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY) && - $! == EEXIST && ($rand = int(rand 0x7fffffff).',')); + $! == EEXIST && ($rand = _rand.',')); if (print $fh $$buf and close($fh)) { # ignore new/ and write only to cur/, otherwise MUAs # with R/W access to the Maildir will end up doing @@ -308,7 +313,7 @@ sub _buf2maildir { do { $final = $dst.$rand.$common.':2,'.$sfx; } while (!link($tmp, $final) && $! == EEXIST && - ($rand = int(rand 0x7fffffff).',')); + ($rand = _rand.',')); unlink($tmp) or warn "W: failed to unlink $tmp: $!\n"; } else { my $err = $!; @@ -323,9 +328,10 @@ sub _maildir_write_cb ($$) { $dedupe->prepare_dedupe; my $dst = $lei->{ovv}->{dst}; sub { # for git_to_mail - my ($buf, $smsg) = @_; + my ($buf, $smsg, $eml) = @_; + $buf //= \($eml->as_string); return _buf2maildir($dst, $buf, $smsg) if !$dedupe; - my $eml = PublicInbox::Eml->new($$buf); # copy buf + $eml //= PublicInbox::Eml->new($$buf); # copy buf return if $dedupe->is_dup($eml, $smsg->{blob}); undef $eml; _buf2maildir($dst, $buf, $smsg); diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 10c25246..d32fe09a 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -14,6 +14,7 @@ use PublicInbox::Import; use File::Temp 0.19 (); # 0.19 for ->newdir use File::Spec (); use PublicInbox::Search qw(xap_terms); +use PublicInbox::Spawn qw(popen_rd); sub new { my ($class) = @_; @@ -169,8 +170,58 @@ sub query_mset { # non-parallel for non-"--thread" users $lei->{ovv}->ovv_atexit_child($lei); } +sub each_eml { # callback for MboxReader->mboxrd + my ($eml, $self, $lei, $each_smsg) = @_; + my $smsg = bless {}, 'PublicInbox::Smsg'; + $smsg->populate($eml); + $smsg->{$_} //= '' for qw(from to cc ds subject references mid); + delete @$smsg{qw(From Subject -ds -ts)}; + if (my $startq = delete($self->{5})) { wait_startq($startq) } + return if !$lei->{l2m} && $lei->{dedupe}->is_smsg_dup($smsg); + $each_smsg->($smsg, undef, $eml); +} + sub query_remote_mboxrd { my ($self, $lei, $uri) = @_; + local $0 = "$0 query_remote_mboxrd"; + my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq + local @SIG{keys %sig} = values %sig; + my $opt = $lei->{opt}; + $uri->query_form(q => $lei->{mset_opt}->{qstr}, x => 'm', + $opt->{thread} ? (t => 1) : ()); + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $uri); + my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing'; + $dedupe->prepare_dedupe; + my @cmd = qw(curl -XPOST -sSf); + my $tor = $opt->{torsocks} //= 'auto'; + if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' && + (($lei->{env}->{LD_PRELOAD}//'') !~ /torsocks/)) { + unshift @cmd, 'torsocks'; + } elsif (PublicInbox::Config::git_bool($tor)) { + unshift @cmd, 'torsocks'; + } + my $verbose = $opt->{verbose}; + push @cmd, '-v' if $verbose; + push @cmd, $uri->as_string; + $lei->err("# @cmd") if $verbose; + $? = 0; + my $fh = popen_rd(\@cmd, $lei->{env}, { 2 => $lei->{2} }); + $fh = IO::Uncompress::Gunzip->new($fh); + eval { + PublicInbox::MboxReader->mboxrd($fh, \&each_eml, + $self, $lei, $each_smsg); + }; + return $lei->fail("E: @cmd: $@") if $@; + if (($? >> 8) == 22) { # HTTP 404 from curl(1) + $uri->query_form(q => $lei->{mset_opt}->{qstr}); + $lei->err('# no results from '.$uri->as_string); + } elsif ($?) { + $uri->query_form(q => $lei->{mset_opt}->{qstr}); + $lei->err('E: '.$uri->as_string); + $lei->child_error($?); + } + undef $each_smsg; + $lei->{ovv}->ovv_atexit_child($lei); } sub git { @@ -230,7 +281,6 @@ sub start_query { # always runs in main (lei-daemon) process } else { $self->wq_do('query_mset', $io, $lei); } - # TODO for my $uri (remotes($self)) { $self->wq_do('query_remote_mboxrd', $io, $lei, $uri); } @@ -263,6 +313,7 @@ sub do_query { my ($lei, @io) = $lei_orig->atfork_parent_wq($self); $io[0] = undef; pipe(my $done, $io[0]) or die "pipe $!"; + $lei_orig->{1}->autoflush(1); $lei_orig->event_step_init; # wait for shutdowns my $done_op = { @@ -296,6 +347,10 @@ sub do_query { sub ipc_atfork_prepare { my ($self) = @_; + if (exists $self->{remotes}) { + require PublicInbox::MboxReader; + require IO::Uncompress::Gunzip; + } # (0: done_wr, 1: stdout|mbox, 2: stderr, # 3: sock, 4: $l2m->{-wq_s1}, 5: $startq) $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]); diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm index e540dc22..97e9c268 100644 --- a/lib/PublicInbox/ProcessPipe.pm +++ b/lib/PublicInbox/ProcessPipe.pm @@ -13,6 +13,8 @@ sub TIEHANDLE { $class; } +sub BINMODE { binmode(shift->{fh}) } # for IO::Uncompress::Gunzip + sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) } sub READLINE { readline($_[0]->{fh}) } diff --git a/script/lei b/script/lei index 8dcea562..8c40bf12 100755 --- a/script/lei +++ b/script/lei @@ -93,6 +93,8 @@ Falling back to (slow) one-shot mode if ($buf =~ /\Ax_it ([0-9]+)\z/) { $x_it_code = $1 + 0; last; + } elsif ($buf =~ /\Achild_error ([0-9]+)\z/) { + $x_it_code = $1 + 0; } elsif ($buf =~ /\Aexec (.+)\z/) { exec_cmd(\@fds, split(/\0/, $1)); } else { diff --git a/t/lei.t b/t/lei.t index 50ad2bb1..6b45f5b7 100644 --- a/t/lei.t +++ b/t/lei.t @@ -8,11 +8,15 @@ use PublicInbox::TestCommon; use PublicInbox::Config; use File::Path qw(rmtree); use Fcntl qw(SEEK_SET); +use PublicInbox::Spawn qw(which); require_git 2.6; require_mods(qw(json DBD::SQLite Search::Xapian)); my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') }; my ($home, $for_destroy) = tmpdir(); my $err_filter; +my @onions = qw(http://hjrcffqmbrq6wope.onion/meta/ + http://czquwvybam4bgbro.onion/meta/ + http://ou63pmih66umazou.onion/meta/); my $lei = sub { my ($cmd, $env, $xopt) = @_; $out = $err = ''; @@ -155,6 +159,32 @@ my $setup_publicinboxes = sub { $seen || BAIL_OUT 'no imports'; }; +my $test_external_remote = sub { + my ($url, $k) = @_; +SKIP: { + my $nr = 4; + skip "$k unset", $nr if !$url; + which('curl') or skip 'no curl', $nr; + which('torsocks') or skip 'no torsocks', $nr if $url =~ m!\.onion/!; + $lei->('ls-external'); + for my $e (split(/^/ms, $out)) { + $e =~ s/\s+boost.*//s; + $lei->('forget-external', '-q', $e) or + fail "error forgetting $e: $err" + } + $lei->('add-external', $url); + my $mid = '20140421094015.GA8962@dcvr.yhbt.net'; + ok($lei->('q', "m:$mid"), "query $url"); + is($err, '', "no errors on $url"); + my $res = PublicInbox::Config->json->decode($out); + is($res->[0]->{'m'}, "<$mid>", "got expected mid from $url"); + ok($lei->('q', "m:$mid", 'd:..20101002'), 'no results, no error'); + like($err, qr/404/, 'noted 404'); + is($out, "[null]\n", 'got null results'); + $lei->('forget-external', $url); +} # /SKIP +}; # /sub + my $test_external = sub { $setup_publicinboxes->(); $cleanup->(); @@ -243,6 +273,15 @@ my $test_external = sub { } ok(!$lei->('q', '-o', "$home/mbox", 's:nope'), 'fails if mbox format unspecified'); + my %e = ( + TEST_LEI_EXTERNAL_HTTPS => 'https://public-inbox.org/meta/', + TEST_LEI_EXTERNAL_ONION => $onions[int(rand(scalar(@onions)))], + ); + for my $k (keys %e) { + my $url = $ENV{$k} // ''; + $url = $e{$k} if $url eq '1'; + $test_external_remote->($url, $k); + } }; my $test_lei_common = sub {