From d07ba9c30800225052d17ccca458afbfa05a8ff0 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 23 Jul 2022 04:41:53 +0000 Subject: ds: share long_step between NNTP and IMAP It's not actually used by our POP3 code at the moment, but it may be soon to reduce memory usage when loading 50K smsg objects into memory. --- lib/PublicInbox/DS.pm | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) (limited to 'lib/PublicInbox/DS.pm') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index f0181b54..fee31e3d 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -650,18 +650,56 @@ sub shutdn ($) { sub zflush {} # overridden by NNTPdeflate and IMAPdeflate +sub long_response_done {} # overridden by Net::NNTP + +sub long_step { + my ($self) = @_; + # wbuf is unset or empty, here; {long} may add to it + my ($fd, $cb, $t0, @args) = @{$self->{long_cb}}; + my $more = eval { $cb->($self, @args) }; + if ($@ || !$self->{sock}) { # something bad happened... + delete $self->{long_cb}; + my $elapsed = now() - $t0; + $@ and $self->err("%s during long response[$fd] - %0.6f", + $@, $elapsed); + $self->out(" deferred[$fd] aborted - %0.6f", $elapsed); + $self->close; + } elsif ($more) { # $self->{wbuf}: + # control passed to ibx_async_cat if $more == \undef + requeue_once($self) if !ref($more); + } else { # all done! + delete $self->{long_cb}; + $self->long_response_done; + my $elapsed = now() - $t0; + my $fd = fileno($self->{sock}); + $self->out(" deferred[$fd] done - %0.6f", $elapsed); + my $wbuf = $self->{wbuf}; # do NOT autovivify + requeue($self) unless $wbuf && @$wbuf; + } +} + sub requeue_once { my ($self) = @_; # COMPRESS users all share the same DEFLATE context. - # Flush it here to ensure clients don't see - # each other's data + # Flush it here to ensure clients don't see each other's data $self->zflush; # no recursion, schedule another call ASAP, # but only after all pending writes are done. # autovivify wbuf. wbuf may be populated by $cb, # no need to rearm if so: (push returns new size of array) - requeue($self) if push(@{$self->{wbuf}}, $self->can('long_step')) == 1; + requeue($self) if push(@{$self->{wbuf}}, \&long_step) == 1; +} + +sub long_response ($$;@) { + my ($self, $cb, @args) = @_; # cb returns true if more, false if done + my $sock = $self->{sock} or return; + # make sure we disable reading during a long response, + # clients should not be sending us stuff and making us do more + # work while we are stream a response to them + $self->{long_cb} = [ fileno($sock), $cb, now(), @args ]; + long_step($self); # kick off! + undef; } sub dwaitpid ($;$$) { -- cgit v1.2.3-24-ge0c7