about summary refs log tree commit homepage
path: root/lib/PublicInbox/SearchIdxShard.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/SearchIdxShard.pm')
-rw-r--r--lib/PublicInbox/SearchIdxShard.pm53
1 files changed, 47 insertions, 6 deletions
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index f23d23d0..8e24aa1b 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -7,6 +7,7 @@ package PublicInbox::SearchIdxShard;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::SearchIdx);
+use bytes qw(length);
 use IO::Handle (); # autoflush
 use PublicInbox::Eml;
 
@@ -47,6 +48,13 @@ sub spawn_worker {
         close $r or die "failed to close: $!";
 }
 
+sub eml ($$) {
+        my ($r, $len) = @_;
+        my $n = read($r, my $bref, $len) or die "read: $!\n";
+        $n == $len or die "short read: $n != $len\n";
+        PublicInbox::Eml->new(\$bref);
+}
+
 # this reads all the writes to $self->{w} from the parent process
 sub shard_worker_loop ($$$$$) {
         my ($self, $v2w, $r, $shard, $bnote) = @_;
@@ -65,25 +73,32 @@ sub shard_worker_loop ($$$$$) {
                                         die "write failed for barrier $!\n";
                 } elsif ($line =~ /\AD ([a-f0-9]{40,}) ([0-9]+)\n\z/s) {
                         $self->remove_by_oid($1, $2 + 0);
+                } elsif ($line =~ s/\A\+X //) {
+                        my ($len, $docid, $xnum, $oid, $ng_or_dir) =
+                                                        split(/ /, $line, 5);
+                        $self->add_xref3($docid, $xnum, $oid, $ng_or_dir,
+                                                eml($r, $len));
+                } elsif ($line =~ s/\A-X //) {
+                        my ($len, $docid, $xnum, $oid, $ng_or_dir) =
+                                                        split(/ /, $line, 5);
+                        $self->remove_xref3($docid, $xnum, $oid,
+                                                $ng_or_dir, eml($r, $len));
                 } else {
                         chomp $line;
                         # n.b. $mid may contain spaces(!)
-                        my ($to_read, $bytes, $num, $blob, $ds, $ts, $tid, $mid)
+                        my ($len, $bytes, $num, $oid, $ds, $ts, $tid, $mid)
                                 = split(/ /, $line, 8);
                         $self->begin_txn_lazy;
-                        my $n = read($r, my $msg, $to_read) or die "read: $!\n";
-                        $n == $to_read or die "short read: $n != $to_read\n";
-                        my $mime = PublicInbox::Eml->new(\$msg);
                         my $smsg = bless {
                                 bytes => $bytes,
                                 num => $num + 0,
-                                blob => $blob,
+                                blob => $oid,
                                 mid => $mid,
                                 tid => $tid,
                                 ds => $ds,
                                 ts => $ts,
                         }, 'PublicInbox::Smsg';
-                        $self->add_message($mime, $smsg);
+                        $self->add_message(eml($r, $len), $smsg);
                 }
         }
         $self->worker_done;
@@ -107,6 +122,32 @@ sub index_raw {
         }
 }
 
+sub shard_add_xref3 {
+        my ($self, $docid, $xnum, $oid, $xibx, $eml) = @_;
+        my $ng_or_dir = $xibx->{newsgroup} // $xibx->{inboxdir};
+        if (my $w = $self->{w}) {
+                my $hdr = $eml->header_obj->as_string;
+                my $len = length($hdr);
+                print $w "+X $len $docid $xnum $oid $ng_or_dir\n", $hdr or
+                        die "failed to write shard: $!";
+        } else {
+                $self->add_xref3($docid, $xnum, $oid, $ng_or_dir, $eml);
+        }
+}
+
+sub shard_remove_xref3 {
+        my ($self, $docid, $oid, $xibx, $eml) = @_;
+        my $ng_or_dir = $xibx->{newsgroup} // $xibx->{inboxdir};
+        if (my $w = $self->{w}) {
+                my $hdr = $eml->header_obj->as_string;
+                my $len = length($hdr);
+                print $w "-X $len $docid $oid $ng_or_dir\n", $hdr or
+                        die "failed to write shard: $!";
+        } else {
+                $self->remove_xref3($docid, $oid, $ng_or_dir, $eml);
+        }
+}
+
 sub atfork_child {
         close $_[0]->{w} or die "failed to close write pipe: $!\n";
 }