public-inbox.git  about / heads / tags
an "archives first" approach to mailing lists
blob 8d900d0c0cd96403b18204668a4498ce9945ffd7 4548 bytes (raw)
$ git show HEAD:lib/PublicInbox/LeiNoteEvent.pm	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
 
# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>

# internal command for dealing with inotify, kqueue vnodes, etc
# it is a semi-persistent worker
package PublicInbox::LeiNoteEvent;
use strict;
use v5.10.1;
use parent qw(PublicInbox::IPC);
use PublicInbox::DS;
use Errno qw(ENOENT);

our $to_flush; # { cfgpath => $lei }

sub flush_lei ($;$) {
	my ($lei, $manual) = @_;
	my $lne = delete $lei->{cfg}->{-lei_note_event} // return;
	$lne->{lei_sock} = $lei->{sock} if $manual;
	$lne->wq_close; # runs _lei_wq_eof;
}

# we batch up writes and flush every 5s (matching Linux default
# writeback behavior) since MUAs can trigger a storm of inotify events
sub flush_task { # PublicInbox::DS timer callback
	my $todo = $to_flush // return;
	$to_flush = undef;
	for my $lei (values %$todo) { flush_lei($lei) }
}

sub eml_event ($$$$) {
	my ($self, $eml, $vmd, $state) = @_;
	my $sto = $self->{lei}->{sto};
	if ($state =~ /\Aimport-(?:rw|ro)\z/) {
		$sto->wq_do('set_eml', $eml, $vmd);
	} elsif ($state =~ /\Aindex-(?:rw|ro)\z/) {
		my $xoids = $self->{lei}->ale->xoids_for($eml);
		$sto->wq_do('index_eml_only', $eml, $vmd, $xoids);
	} elsif ($state =~ /\Atag-(?:rw|ro)\z/) {
		my $docids = [];
		my $c = eval {
			$self->{lse}->kw_changed($eml, $vmd->{kw}, $docids);
		} // 1; # too new, assume changed since still to-be-committed.
		if (scalar @$docids) { # already in lei/store
			$sto->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c;
		} elsif (my $xoids = $self->{lei}->ale->xoids_for($eml)) {
			# it's in an external, only set kw, here
			$sto->wq_do('set_xvmd', $xoids, $eml, $vmd);
		} # else { totally unknown: ignore
	} else {
		warn "unknown state: $state (in $self->{lei}->{cfg}->{'-f'})\n";
	}
}

sub maildir_event { # via wq_nonblock_do
	my ($self, $fn, $vmd, $state) = @_;
	if (my $eml = PublicInbox::InboxWritable::eml_from_path($fn)) {
		eml_event($self, $eml, $vmd, $state);
	} elsif ($! == ENOENT) {
		$self->{lms}->clear_src(@{$vmd->{sync_info}});
	} # else: eml_from_path already warns
}

sub _mh_cb { # mh_read_one cb
	my ($dir, $bn, $kw, $eml, $self, $state) = @_;
}

sub mh_event { # via wq_nonblock_do
	my ($self, $folder, $bn, $state) = @_;
	my $dir = substr($folder, 3);
	require PublicInbox::MHreader; # if we forked early
	my $mhr = PublicInbox::MHreader->new($dir, $self->{lei}->{3});
	$mhr->mh_read_one($bn, \&_mh_cb, $self, $state);
}

sub lei_note_event {
	my ($lei, $folder, $new_cur, $bn, $fn, @rest) = @_;
	die "BUG: unexpected: @rest" if @rest;
	my $cfg = $lei->_lei_cfg or return; # gone (race)
	my $sto = $lei->_lei_store or return; # gone
	return flush_lei($lei, 1) if $folder eq 'done'; # special case
	my $lms = $lei->lms or return;
	$lms->lms_write_prepare if $new_cur eq ''; # for ->clear_src below
	$lei->{opt}->{quiet} = 1;
	$lms->arg2folder($lei, [ $folder ]);
	my $state = $cfg->get_1("watch.$folder.state") // 'tag-rw';
	return if $state eq 'pause';
	if ($new_cur eq '') {
		my $id = $folder =~ /\Amaildir:/ ? \$bn : $bn + 0;
		return $lms->clear_src($folder, $id);
	}
	$lms->lms_pause;
	$lei->ale; # prepare
	$sto->write_prepare($lei);
	require PublicInbox::MHreader if $folder =~ /\Amh:/; # optimistic
	my $self = $cfg->{-lei_note_event} //= do {
		my $wq = bless { lms => $lms }, __PACKAGE__;
		# MUAs such as mutt can trigger massive rename() storms so
		# use some CPU, but don't overwhelm slower storage, either
		my $jobs = $wq->detect_nproc // 1;
		$jobs = 4 if $jobs > 4; # same default as V2Writable
		my ($op_c, $ops) = $lei->workers_start($wq, $jobs);
		$lei->wait_wq_events($op_c, $ops);
		PublicInbox::DS::add_uniq_timer('flush_timer', 5, \&flush_task);
		$to_flush->{$lei->{cfg}->{'-f'}} //= $lei;
		$wq->prepare_nonblock;
		$lei->{lne} = $wq;
	};
	if ($folder =~ /\Amaildir:/i) {
		require PublicInbox::MdirReader;
		my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn)
			// return;
		return if index($fl, 'T') >= 0;
		my $kw = PublicInbox::MdirReader::flags2kw($fl);
		my $vmd = { kw => $kw, sync_info => [ $folder, \$bn ] };
		$self->wq_nonblock_do('maildir_event', $fn, $vmd, $state);
	} elsif ($folder =~ /\Amh:/) {
		$self->wq_nonblock_do('mh_event', $folder, $bn, $state);
	} # else: TODO: imap
}

sub ipc_atfork_child {
	my ($self) = @_;
	$self->{lei}->_lei_atfork_child(1); # persistent, for a while
	$self->{lms}->lms_write_prepare;
	$self->{lse} = $self->{lei}->{sto}->search;
	$self->SUPER::ipc_atfork_child;
}

sub _lei_wq_eof { # EOF callback for main lei daemon
	$_[0]->wq_eof('lne');
}

1;

git clone https://public-inbox.org/public-inbox.git
git clone http://7fh6tueqddpjyxjmgtdiueylzoqt6pt7hec3pukyptlmohoowvhde4yd.onion/public-inbox.git