diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/PublicInbox/Config.pm | 14 | ||||
-rw-r--r-- | lib/PublicInbox/HTTP.pm | 27 | ||||
-rw-r--r-- | lib/PublicInbox/HTTPD/Async.pm | 31 | ||||
-rw-r--r-- | lib/PublicInbox/Inbox.pm | 75 | ||||
-rw-r--r-- | lib/PublicInbox/Qspawn.pm | 11 | ||||
-rw-r--r-- | lib/PublicInbox/Search.pm | 29 | ||||
-rw-r--r-- | lib/PublicInbox/SearchIdx.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/SearchMsg.pm | 36 | ||||
-rw-r--r-- | lib/PublicInbox/WatchMaildir.pm | 22 | ||||
-rw-r--r-- | lib/PublicInbox/WwwAtomStream.pm | 1 |
10 files changed, 92 insertions, 158 deletions
diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm index 6e31df72..28b5bdb5 100644 --- a/lib/PublicInbox/Config.pm +++ b/lib/PublicInbox/Config.pm @@ -90,21 +90,11 @@ sub limiter { my ($self, $name) = @_; $self->{-limiters}->{$name} ||= do { require PublicInbox::Qspawn; - my $max; - # XXX "limiter.<name>.max" was a historical mistake - foreach my $pfx (qw(publicinboxlimiter limiter)) { - $max ||= $self->{"$pfx.$name.max"}; - } + my $max = $self->{"publicinboxlimiter.$name.max"}; PublicInbox::Qspawn::Limiter->new($max); }; } -sub get { - my ($self, $inbox, $key) = @_; - - $self->{"publicinbox.$inbox.$key"}; -} - sub config_dir { $ENV{PI_DIR} || "$ENV{HOME}/.public-inbox" } sub default_file { @@ -146,7 +136,7 @@ sub _fill { foreach my $k (qw(mainrepo address filter url newsgroup infourl watch watchheader httpbackendmax - feedmax)) { + feedmax nntpserver)) { my $v = $self->{"$pfx.$k"}; $rv->{$k} = $v if defined $v; } diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index c4b74b45..3530f8ba 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -16,7 +16,6 @@ use Fcntl qw(:seek); use Plack::HTTPParser qw(parse_http_request); # XS or pure Perl use HTTP::Status qw(status_message); use HTTP::Date qw(time2str); -use Scalar::Util qw(weaken); use IO::Handle; use constant { CHUNK_START => -1, # [a-f0-9]+\r\n @@ -237,12 +236,14 @@ sub next_request ($) { } } -sub response_done ($$) { +sub response_done_cb ($$) { my ($self, $alive) = @_; - my $env = $self->{env}; - $self->{env} = undef; - $self->write("0\r\n\r\n") if $alive == 2; - $self->write(sub { $alive ? next_request($self) : $self->close }); + sub { + my $env = $self->{env}; + $self->{env} = undef; + $self->write("0\r\n\r\n") if $alive == 2; + $self->write(sub{$alive ? next_request($self) : $self->close}); + } } sub getline_cb ($$$) { @@ -283,10 +284,8 @@ sub getline_cb ($$$) { $close->(); } -sub getline_response { - my ($self, $body, $write, $close) = @_; - $self->{forward} = $body; - weaken($self); +sub getline_response ($$$) { + my ($self, $write, $close) = @_; my $pull = $self->{pull} = sub { getline_cb($self, $write, $close) }; $pull->(); } @@ -294,15 +293,15 @@ sub getline_response { sub response_write { my ($self, $env, $res) = @_; my $alive = response_header_write($self, $env, $res); - + my $close = response_done_cb($self, $alive); my $write = $alive == 2 ? chunked_wcb($self) : identity_wcb($self); - my $close = sub { response_done($self, $alive) }; if (defined(my $body = $res->[2])) { if (ref $body eq 'ARRAY') { $write->($_) foreach @$body; $close->(); } else { - getline_response($self, $body, $write, $close); + $self->{forward} = $body; + getline_response($self, $write, $close); } } else { # this is returned to the calling application: @@ -473,7 +472,7 @@ sub close { my $self = shift; my $forward = $self->{forward}; my $env = $self->{env}; - delete $env->{'psgix.io'} if $env; # prevent circular referernces + delete $env->{'psgix.io'} if $env; # prevent circular references $self->{pull} = $self->{forward} = $self->{env} = undef; if ($forward) { eval { $forward->close }; diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 79951ca6..54b62451 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -10,7 +10,6 @@ use strict; use warnings; use base qw(Danga::Socket); use fields qw(cb cleanup); -use Scalar::Util qw(weaken); require PublicInbox::EvCleanup; sub new { @@ -29,22 +28,17 @@ sub restart_read_cb ($) { sub { $self->watch_read(1) } } -sub async_pass { - my ($self, $http, $fh, $bref) = @_; - # In case the client HTTP connection ($http) dies, it - # will automatically close this ($self) object. - $http->{forward} = $self; - $fh->write($$bref); - my $restart_read = restart_read_cb($self); - weaken($self); - $self->{cb} = sub { +sub main_cb ($$$) { + my ($http, $fh, $bref) = @_; + sub { + my ($self) = @_; my $r = sysread($self->{sock}, $$bref, 8192); if ($r) { $fh->write($$bref); return if $http->{closed}; if ($http->{write_buf_size}) { $self->watch_read(0); - $http->write($restart_read); # D::S::write + $http->write(restart_read_cb($self)); } # stay in watch_read, but let other clients # get some work done, too. @@ -60,9 +54,18 @@ sub async_pass { } } -sub event_read { $_[0]->{cb}->() } -sub event_hup { $_[0]->{cb}->() } -sub event_err { $_[0]->{cb}->() } +sub async_pass { + my ($self, $http, $fh, $bref) = @_; + # In case the client HTTP connection ($http) dies, it + # will automatically close this ($self) object. + $http->{forward} = $self; + $fh->write($$bref); # PublicInbox:HTTP::{chunked,identity}_wcb + $self->{cb} = main_cb($http, $fh, $bref); +} + +sub event_read { $_[0]->{cb}->(@_) } +sub event_hup { $_[0]->{cb}->(@_) } +sub event_err { $_[0]->{cb}->(@_) } sub sysread { shift->{sock}->sysread(@_) } sub close { diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm index 15db929e..1a844e1c 100644 --- a/lib/PublicInbox/Inbox.pm +++ b/lib/PublicInbox/Inbox.pm @@ -5,28 +5,21 @@ package PublicInbox::Inbox; use strict; use warnings; -use Scalar::Util qw(weaken isweak); use PublicInbox::Git; use PublicInbox::MID qw(mid2path); -my $weakt; +my $cleanup_timer; eval { - $weakt = 'disabled'; + $cleanup_timer = 'disabled'; require PublicInbox::EvCleanup; - $weakt = undef; # OK if we get here + $cleanup_timer = undef; # OK if we get here }; -my $WEAKEN = {}; # string(inbox) -> inbox -sub weaken_task () { - $weakt = undef; - _weaken_fields($_) for values %$WEAKEN; - $WEAKEN = {}; -} - -sub _weaken_later ($) { - my ($self) = @_; - $weakt ||= PublicInbox::EvCleanup::later(*weaken_task); - $WEAKEN->{"$self"} = $self; +my $CLEANUP = {}; # string(inbox) -> inbox +sub cleanup_task () { + $cleanup_timer = undef; + delete $_->{git} for values %$CLEANUP; + $CLEANUP = {}; } sub _set_uint ($$$) { @@ -39,27 +32,11 @@ sub _set_uint ($$$) { $opts->{$field} = $val || $default; } -sub new { - my ($class, $opts) = @_; - my $v = $opts->{address} ||= 'public-inbox@example.com'; - my $p = $opts->{-primary_address} = ref($v) eq 'ARRAY' ? $v->[0] : $v; - $opts->{domain} = ($p =~ /\@(\S+)\z/) ? $1 : 'localhost'; - _set_uint($opts, 'feedmax', 25); - weaken($opts->{-pi_config}); - bless $opts, $class; -} - -sub _weaken_fields { - my ($self) = @_; - foreach my $f (qw(git mm search)) { - isweak($self->{$f}) or weaken($self->{$f}); - } -} - sub _set_limiter ($$$) { - my ($self, $git, $pfx) = @_; + my ($self, $pi_config, $pfx) = @_; my $lkey = "-${pfx}_limiter"; - $git->{$lkey} = $self->{$lkey} ||= eval { + $self->{$lkey} ||= eval { + # full key is: publicinbox.$NAME.httpbackendmax my $mkey = $pfx.'max'; my $val = $self->{$mkey} or return; my $lim; @@ -67,7 +44,7 @@ sub _set_limiter ($$$) { require PublicInbox::Qspawn; $lim = PublicInbox::Qspawn::Limiter->new($val); } elsif ($val =~ /\A[a-z][a-z0-9]*\z/) { - $lim = $self->{-pi_config}->limiter($val); + $lim = $pi_config->limiter($val); warn "$mkey limiter=$val not found\n" if !$lim; } else { warn "$mkey limiter=$val not understood\n"; @@ -76,28 +53,37 @@ sub _set_limiter ($$$) { } } +sub new { + my ($class, $opts) = @_; + my $v = $opts->{address} ||= 'public-inbox@example.com'; + my $p = $opts->{-primary_address} = ref($v) eq 'ARRAY' ? $v->[0] : $v; + $opts->{domain} = ($p =~ /\@(\S+)\z/) ? $1 : 'localhost'; + my $pi_config = delete $opts->{-pi_config}; + _set_limiter($opts, $pi_config, 'httpbackend'); + _set_uint($opts, 'feedmax', 25); + $opts->{nntpserver} ||= $pi_config->{'publicinbox.nntpserver'}; + bless $opts, $class; +} + sub git { my ($self) = @_; $self->{git} ||= eval { - _weaken_later($self); my $g = PublicInbox::Git->new($self->{mainrepo}); - _set_limiter($self, $g, 'httpbackend'); + $g->{-httpbackend_limiter} = $self->{-httpbackend_limiter}; + $cleanup_timer ||= PublicInbox::EvCleanup::later(*cleanup_task); + $CLEANUP->{"$self"} = $self; $g; }; } sub mm { my ($self) = @_; - $self->{mm} ||= eval { - _weaken_later($self); - PublicInbox::Msgmap->new($self->{mainrepo}); - }; + $self->{mm} ||= eval { PublicInbox::Msgmap->new($self->{mainrepo}) }; } sub search { my ($self) = @_; $self->{search} ||= eval { - _weaken_later($self); PublicInbox::Search->new($self->{mainrepo}, $self->{altid}); }; } @@ -163,7 +149,7 @@ sub nntp_url { $self->{-nntp_url} ||= do { # no checking for nntp_usable here, we can point entirely # to non-local servers or users run by a different user - my $ns = $self->{-pi_config}->{'publicinbox.nntpserver'}; + my $ns = $self->{nntpserver}; my $group = $self->{newsgroup}; my @urls; if ($ns && $group) { @@ -226,7 +212,8 @@ sub msg_by_smsg ($$) { # backwards compat to fallback to msg_by_mid # TODO: remove if we bump SCHEMA_VERSION in Search.pm: - defined(my $blob = $smsg->blob) or return msg_by_mid($self, $smsg->mid); + defined(my $blob = $smsg->{blob}) or + return msg_by_mid($self, $smsg->mid); my $str = git($self)->cat_file($blob); $$str =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s if $str; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 099c1f34..ed953002 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -32,14 +32,19 @@ sub _do_spawn { sub finish ($) { my ($self) = @_; my $limiter = $self->{limiter}; + my $running; if (delete $self->{rpipe}) { my $pid = delete $self->{pid}; $self->{err} = $pid == waitpid($pid, 0) ? $? : "PID:$pid still running?"; - $limiter->{running}--; + $running = --$limiter->{running}; } - if (my $next = shift @{$limiter->{run_queue}}) { - _do_spawn(@$next); + + # limiter->{max} may change dynamically + if (($running || $limiter->{running}) < $limiter->{max}) { + if (my $next = shift @{$limiter->{run_queue}}) { + _do_spawn(@$next); + } } $self->{err}; } diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index b59430d8..a1bae419 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -282,7 +282,7 @@ sub lookup_message { # raises on error: my $doc = $self->{xdb}->get_document($doc_id); $smsg = PublicInbox::SearchMsg->wrap($doc, $mid); - $smsg->doc_id($doc_id); + $smsg->{doc_id} = $doc_id; } $smsg; } @@ -326,6 +326,7 @@ sub find_doc_ids_for_term { } # normalize subjects so they are suitable as pathnames for URLs +# XXX: consider for removal sub subject_path { my $subj = pop; $subj = subject_normalized($subj); @@ -343,32 +344,6 @@ sub subject_normalized { $subj; } -# for doc data -sub subject_summary { - my $subj = pop; - my $max = 68; - if (length($subj) > $max) { - my @subj = split(/\s+/, $subj); - $subj = ''; - my $l; - - while ($l = shift @subj) { - my $new = $subj . $l . ' '; - last if length($new) >= $max; - $subj = $new; - } - if ($subj ne '') { - my $r = scalar @subj ? ' ...' : ''; - $subj =~ s/ \z/$r/s; - } else { - # subject has one REALLY long word, and NOT spam? wtf - @subj = ($l =~ /\A(.{1,72})/); - $subj = $subj[0] . ' ...'; - } - } - $subj; -} - sub enquire { my ($self) = @_; $self->{enquire} ||= Search::Xapian::Enquire->new($self->{xdb}); diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 832d1cbf..87ee0d46 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -155,7 +155,7 @@ sub add_message { if ($smsg) { # convert a ghost to a regular message # it will also clobber any existing regular message - $doc_id = $smsg->doc_id; + $doc_id = $smsg->{doc_id}; $old_tid = $smsg->thread_id; } $smsg = PublicInbox::SearchMsg->new($mime); @@ -289,7 +289,7 @@ sub link_message { my ($self, $smsg, $old_tid) = @_; my $doc = $smsg->{doc}; my $mid = $smsg->mid; - my $mime = $smsg->mime; + my $mime = $smsg->{mime}; my $hdr = $mime->header_obj; my $refs = $hdr->header_raw('References'); my @refs = $refs ? ($refs =~ /<([^>]+)>/g) : (); diff --git a/lib/PublicInbox/SearchMsg.pm b/lib/PublicInbox/SearchMsg.pm index 96406c6f..b8eee665 100644 --- a/lib/PublicInbox/SearchMsg.pm +++ b/lib/PublicInbox/SearchMsg.pm @@ -69,7 +69,7 @@ sub subject ($) { __hdr($_[0], 'subject') } sub to ($) { __hdr($_[0], 'to') } sub cc ($) { __hdr($_[0], 'cc') } -# no strftime, that is locale-dependent +# no strftime, that is locale-dependent and not for RFC822 my @DoW = qw(Sun Mon Tue Wed Thu Fri Sat); my @MoY = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec); @@ -103,7 +103,7 @@ sub from_name { sub ts { my ($self) = @_; - $self->{ts} ||= eval { str2time($self->mime->header('Date')) } || 0; + $self->{ts} ||= eval { str2time($self->{mime}->header('Date')) } || 0; } sub to_doc_data { @@ -146,36 +146,7 @@ sub mid ($;$) { } } -sub _extract_mid { mid_clean(mid_mime($_[0]->mime)) } - -sub blob { - my ($self, $x40) = @_; - if (defined $x40) { - $self->{blob} = $x40; - } else { - $self->{blob}; - } -} - -sub mime { - my ($self, $mime) = @_; - if (defined $mime) { - $self->{mime} = $mime; - } else { - # TODO load from git - $self->{mime}; - } -} - -sub doc_id { - my ($self, $doc_id) = @_; - if (defined $doc_id) { - $self->{doc_id} = $doc_id; - } else { - # TODO load from xapian - $self->{doc_id}; - } -} +sub _extract_mid { mid_clean(mid_mime($_[0]->{mime})) } sub thread_id { my ($self) = @_; @@ -184,6 +155,7 @@ sub thread_id { $self->{thread} = _get_term_val($self, 'G', qr/\AG/); # *G*roup } +# XXX: consider removing this, we can phrase match subject sub path { my ($self) = @_; my $path = $self->{path}; diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index c8ea3ed3..b7c2d17a 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -80,6 +80,7 @@ sub new { mdmap => \%mdmap, mdir => \@mdir, mdre => $mdre, + config => $config, importers => {}, }, $class; } @@ -99,15 +100,18 @@ sub _remove_spam { $path =~ /:2,[A-R]*S[T-Z]*\z/i or return; my $mime = _path_to_mime($path) or return; _force_mid($mime); - foreach my $inbox (values %{$self->{mdmap}}) { - next unless ref $inbox; - my $im = _importer_for($self, $inbox); - $im->remove($mime); - if (my $scrub = _scrubber_for($inbox)) { - my $scrubbed = $scrub->scrub($mime) or next; - $im->remove($scrubbed); - } - } + $self->{config}->each_inbox(sub { + my ($ibx) = @_; + eval { + my $im = _importer_for($self, $ibx); + $im->remove($mime); + if (my $scrub = _scrubber_for($ibx)) { + my $scrubbed = $scrub->scrub($mime) or return; + $im->remove($scrubbed); + } + }; + warn "error removing spam at $path from $ibx->{name}\n" if $@; + }) } # used to hash the relevant portions of a message when there are conflicts diff --git a/lib/PublicInbox/WwwAtomStream.pm b/lib/PublicInbox/WwwAtomStream.pm index a6817b31..5a10034b 100644 --- a/lib/PublicInbox/WwwAtomStream.pm +++ b/lib/PublicInbox/WwwAtomStream.pm @@ -6,7 +6,6 @@ package PublicInbox::WwwAtomStream; use strict; use warnings; -# FIXME: locale-independence: use POSIX qw(strftime); use Date::Parse qw(strptime); use Digest::SHA qw(sha1_hex); |