diff options
Diffstat (limited to 'lib/PublicInbox/SearchIdxShard.pm')
-rw-r--r-- | lib/PublicInbox/SearchIdxShard.pm | 53 |
1 files changed, 47 insertions, 6 deletions
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index f23d23d0..8e24aa1b 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -7,6 +7,7 @@ package PublicInbox::SearchIdxShard; use strict; use v5.10.1; use parent qw(PublicInbox::SearchIdx); +use bytes qw(length); use IO::Handle (); # autoflush use PublicInbox::Eml; @@ -47,6 +48,13 @@ sub spawn_worker { close $r or die "failed to close: $!"; } +sub eml ($$) { + my ($r, $len) = @_; + my $n = read($r, my $bref, $len) or die "read: $!\n"; + $n == $len or die "short read: $n != $len\n"; + PublicInbox::Eml->new(\$bref); +} + # this reads all the writes to $self->{w} from the parent process sub shard_worker_loop ($$$$$) { my ($self, $v2w, $r, $shard, $bnote) = @_; @@ -65,25 +73,32 @@ sub shard_worker_loop ($$$$$) { die "write failed for barrier $!\n"; } elsif ($line =~ /\AD ([a-f0-9]{40,}) ([0-9]+)\n\z/s) { $self->remove_by_oid($1, $2 + 0); + } elsif ($line =~ s/\A\+X //) { + my ($len, $docid, $xnum, $oid, $ng_or_dir) = + split(/ /, $line, 5); + $self->add_xref3($docid, $xnum, $oid, $ng_or_dir, + eml($r, $len)); + } elsif ($line =~ s/\A-X //) { + my ($len, $docid, $xnum, $oid, $ng_or_dir) = + split(/ /, $line, 5); + $self->remove_xref3($docid, $xnum, $oid, + $ng_or_dir, eml($r, $len)); } else { chomp $line; # n.b. $mid may contain spaces(!) - my ($to_read, $bytes, $num, $blob, $ds, $ts, $tid, $mid) + my ($len, $bytes, $num, $oid, $ds, $ts, $tid, $mid) = split(/ /, $line, 8); $self->begin_txn_lazy; - my $n = read($r, my $msg, $to_read) or die "read: $!\n"; - $n == $to_read or die "short read: $n != $to_read\n"; - my $mime = PublicInbox::Eml->new(\$msg); my $smsg = bless { bytes => $bytes, num => $num + 0, - blob => $blob, + blob => $oid, mid => $mid, tid => $tid, ds => $ds, ts => $ts, }, 'PublicInbox::Smsg'; - $self->add_message($mime, $smsg); + $self->add_message(eml($r, $len), $smsg); } } $self->worker_done; @@ -107,6 +122,32 @@ sub index_raw { } } +sub shard_add_xref3 { + my ($self, $docid, $xnum, $oid, $xibx, $eml) = @_; + my $ng_or_dir = $xibx->{newsgroup} // $xibx->{inboxdir}; + if (my $w = $self->{w}) { + my $hdr = $eml->header_obj->as_string; + my $len = length($hdr); + print $w "+X $len $docid $xnum $oid $ng_or_dir\n", $hdr or + die "failed to write shard: $!"; + } else { + $self->add_xref3($docid, $xnum, $oid, $ng_or_dir, $eml); + } +} + +sub shard_remove_xref3 { + my ($self, $docid, $oid, $xibx, $eml) = @_; + my $ng_or_dir = $xibx->{newsgroup} // $xibx->{inboxdir}; + if (my $w = $self->{w}) { + my $hdr = $eml->header_obj->as_string; + my $len = length($hdr); + print $w "-X $len $docid $oid $ng_or_dir\n", $hdr or + die "failed to write shard: $!"; + } else { + $self->remove_xref3($docid, $oid, $ng_or_dir, $eml); + } +} + sub atfork_child { close $_[0]->{w} or die "failed to close write pipe: $!\n"; } |