Mailing List Archive

svn commit: r168094 - /spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm
Author: jm
Date: Wed May 4 00:00:16 2005
New Revision: 168094

URL: http://svn.apache.org/viewcvs?rev=168094&view=rev
Log:
bug 4258: fix spamd lockup on FreeBSD by moving to a fixed-buffer-size protocol between spamd parent and children. This avoids a hang when two messages arrive in the same read(2) call, which gets buffered on FreeBSD

Modified:
spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm
URL: http://svn.apache.org/viewcvs/spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm?rev=168094&r1=168093&r2=168094&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm Wed May 4 00:00:16 2005
@@ -169,12 +169,12 @@
return;
}

+ # otherwise it's a status report from a child.
foreach my $fh ($self->{backchannel}->select_vec_to_fh_list($rout))
{
- # otherwise it's a status report from a child.
# just read one line. if there's more lines, we'll get them
# when we re-enter the can_read() select call above...
- if ($self->read_one_line_from_child_socket($fh) == PFSTATE_IDLE)
+ if ($self->read_one_message_from_child_socket($fh) == PFSTATE_IDLE)
{
dbg("prefork: child reports idle");
if ($self->{overloaded}) {
@@ -199,11 +199,13 @@
$self->adapt_num_children();
}

-sub read_one_line_from_child_socket {
+sub read_one_message_from_child_socket {
my ($self, $sock) = @_;

- my $line = $sock->getline();
- if (!defined $line) {
+ # "I b1 b2 b3 b4 \n " or "B b1 b2 b3 b4 \n "
+ my $line;
+ my $nbytes = $sock->sysread($line, 6);
+ if (!defined $nbytes || $nbytes == 0) {
dbg ("prefork: child closed connection");

# stop it being select'd
@@ -215,14 +217,20 @@

return PFSTATE_ERROR;
}
+ if ($nbytes < 6) {
+ warn ("prefork: child gave short message: len=$nbytes bytes=".
+ join(" ", unpack "C*", $line));
+ }

chomp $line;
- if ($line =~ /^I(\d+)/) {
- $self->set_child_state ($1, PFSTATE_IDLE);
+ if ($line =~ s/^I//) {
+ my $pid = unpack("N1", $line);
+ $self->set_child_state ($pid, PFSTATE_IDLE);
return PFSTATE_IDLE;
}
- elsif ($line =~ /^B(\d+)/) {
- $self->set_child_state ($1, PFSTATE_BUSY);
+ elsif ($line =~ s/^B//) {
+ my $pid = unpack("N1", $line);
+ $self->set_child_state ($pid, PFSTATE_BUSY);
return PFSTATE_BUSY;
}
else {
@@ -235,8 +243,11 @@

# we use the following protocol between the master and child processes to
# control when they accept/who accepts: server tells a child to accept with a
-# "A\n", child responds with "B$pid\n" when it's busy, and "I$pid\n" once it's
-# idle again. Very simple, line-based protocol.
+# "A....\n", child responds with "B$pid\n" when it's busy, and "I$pid\n" once
+# it's idle again. Very simple protocol. Note that the $pid values are packed
+# into 4 bytes so that the buffers are always of a known length; if you need to
+# transfer longer data, assign a new protocol verb (the first char) and use the
+# length of the following data buffer as the packed value.

sub order_idle_child_to_accept {
my ($self) = @_;
@@ -245,7 +256,7 @@
if (defined $kid)
{
my $sock = $self->{backchannel}->get_socket_for_child($kid);
- if (!$sock->syswrite ("A\n"))
+ if (!$sock->syswrite ("A....\n"))
{
# failure to write to the child; bad news. call it dead
warn "prefork: killing rogue child $kid, failed to write: $!\n";
@@ -272,7 +283,7 @@
my ($self, $sock) = @_;

while (1) {
- my $state = $self->read_one_line_from_child_socket($sock);
+ my $state = $self->read_one_message_from_child_socket($sock);
if ($state == PFSTATE_BUSY) {
return 1; # 1 == success
}
@@ -289,7 +300,7 @@
my ($self, $kid) = @_;
if ($self->{waiting_for_idle_child}) {
my $sock = $self->{backchannel}->get_socket_for_child($kid);
- $sock->syswrite ("A\n")
+ $sock->syswrite ("A....\n")
or die "prefork: $kid claimed it was ready, but write failed: $!";
$self->{waiting_for_idle_child} = 0;
}
@@ -305,12 +316,14 @@

sub update_child_status_idle {
my ($self) = @_;
- $self->report_backchannel_socket("I".$self->{pid}."\n");
+ # "I b1 b2 b3 b4 \n "
+ $self->report_backchannel_socket("I".pack("N",$self->{pid})."\n");
}

sub update_child_status_busy {
my ($self) = @_;
- $self->report_backchannel_socket("B".$self->{pid}."\n");
+ # "B b1 b2 b3 b4 \n "
+ $self->report_backchannel_socket("B".pack("N",$self->{pid})."\n");
}

sub report_backchannel_socket {
@@ -325,10 +338,17 @@

my $sock = $self->{backchannel}->get_parent_socket();
while (1) {
- my $line = $sock->getline();
- if (!defined($line)) {
+ # "A . . . . \n "
+ my $line;
+ my $nbytes = $sock->sysread($line, 6);
+ if (!defined $nbytes || $nbytes == 0) {
die "prefork: empty order from parent";
}
+ if ($nbytes < 6) {
+ warn ("prefork: parent gave short message: len=$nbytes bytes=".
+ join(" ", unpack "C*", $line));
+ }
+
chomp $line;
if (index ($line, "A") == 0) { # string starts with "A" = accept
return PFORDER_ACCEPT;