Mailing List Archive

svn commit: r437468 - /spamassassin/branches/tvd-multi-mass-check/masses/mass-check
Author: felicity
Date: Sun Aug 27 15:02:27 2006
New Revision: 437468

URL: http://svn.apache.org/viewvc?rev=437468&view=rev
Log:
more work on client mode

Modified:
spamassassin/branches/tvd-multi-mass-check/masses/mass-check

Modified: spamassassin/branches/tvd-multi-mass-check/masses/mass-check
URL: http://svn.apache.org/viewvc/spamassassin/branches/tvd-multi-mass-check/masses/mass-check?rev=437468&r1=437467&r2=437468&view=diff
==============================================================================
--- spamassassin/branches/tvd-multi-mass-check/masses/mass-check (original)
+++ spamassassin/branches/tvd-multi-mass-check/masses/mass-check Sun Aug 27 15:02:27 2006
@@ -98,8 +98,9 @@
$opt_spamlog $opt_tail $opt_rules $opt_restart $opt_loguris
$opt_logmem $opt_after $opt_before $opt_rewrite $opt_deencap
$opt_learn $opt_reuse $opt_lint $opt_cache $opt_noisy
- $total_messages $statusevery $opt_cachedir $opt_server $opt_client
- $opt_server_max $opt_server_timeout
+ $total_messages $statusevery $opt_cachedir
+ $opt_client $opt_cs_max $opt_cs_timeout $opt_client_url
+ $opt_server
$tmpfd %reuse %orig_conf %reuse_conf $reuse_rules_loaded_p);

use FindBin;
@@ -127,8 +128,6 @@
$opt_spamlog = "spam.log";
$opt_learn = 0;
$reuse_rules_loaded_p = 0;
-$opt_server_max = 1000;
-$opt_server_timeout = 300;

my @ORIG_ARGV = @ARGV;
GetOptions("c=s", "p=s", "f=s", "j=i", "n", "o", "all", "bayes", "debug:s",
@@ -137,7 +136,7 @@
"rules=s", "restart=i", "after=s", "before=s", "loguris",
"deencap=s", "logmem", "learn=i", "reuse", "lint", "cache",
"cachedir=s", "noisy",
- "server", "server_max=i", "server_timeout=i",
+ "server", "cs_max=i", "cs_timeout=i",
"client=s",
"dir" => sub { $opt_format = "dir"; },
"file" => sub { $opt_format = "file"; },
@@ -335,11 +334,41 @@
status('starting scan stage');
}

+ my $tmpf;
+ ($tmpf, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile();
+ die 'archive-iterator: failed to create temp file' unless $tmpf;
+
+ unlink $tmpf or die "archive-iterator: unlink '$tmpf': $!";
+ undef $tmpf;
+
if ($opt_server || $opt_j) {
- fork_scan_targets(\@targets, $iter);
+ # forked child process scans messages
+ if ($tmpf = fork()) {
+ # parent
+ waitpid($tmpf, 0);
+ }
+ elsif (defined $tmpf) {
+ # child
+ $iter->message_array(\@targets, $tmpfd);
+ exit;
+ }
+ else {
+ die "archive-iterator: cannot fork: $!";
+ }
}
else {
- ($total_messages, $messages) = $iter->message_array(\@targets);
+ $iter->message_array(\@targets, $tmpfd);
+ }
+
+ # we now have a temporary file with the messages to process
+ # in theory, our file pointer is at the start of the file, but make sure.
+ # NOTE: do this here, not in message_array, since that will only affect
+ # the child.
+ seek($tmpfd, 0, 0);
+ $total_messages = $iter->read_line($tmpfd);
+
+ if (!$total_messages) {
+ die "archive-iterator: no messages to process\n";
}

if ($opt_progress) {
@@ -353,6 +382,9 @@
}

if ($opt_server) {
+ $opt_cs_max ||= 1000;
+ $opt_cs_timeout ||= 60 * 5;
+
my $serv_socket = IO::Socket::INET->new(LocalPort => 8080,
Proto => 'tcp',
Listen => 5,
@@ -379,14 +411,10 @@
my($type, $URI, $headers, $postdata) = handle_http_request($socket);

if ($type eq 'GET') {
- print $socket join("\r\n",
- "HTTP/1.0 200 OK",
- "Content-type: text/plain",
- "Pragma: no-cache",
- "Server: mass-check/0.0",
- "\r\n"),join("\n",
- "Your GET request came from IP Address: ".$socket->peerhost,
- "");
+ http_response($socket, "200 OK", {
+ 'Content-type' => 'text/plain',
+ },
+ "Your GET request came from IP Address: ".$socket->peerhost."\n");
}
elsif ($type eq 'POST') {
handle_post_results($postdata, $timestamps, $msgsout);
@@ -394,7 +422,7 @@
my $messages = '';
if ($postdata->{'max_messages'}) {
my $msgnum = $postdata->{'max_messages'};
- $msgnum = $opt_server_max if ($msgnum > $opt_server_max);
+ $msgnum = $opt_cs_max if ($msgnum > $opt_cs_max);
$messages = generate_messages($msgnum, $timestamps, $msgsout);
}

@@ -403,34 +431,25 @@
local $/ = undef;

# Content-Encoding: gzip
- print $socket join("\r\n",
- "HTTP/1.0 200 OK",
- "Content-type: application/octet-stream",
- "Pragma: no-cache",
- "Server: mass-check/0.0",
- "Content-Length: ".(-s $messages),
- "\r\n"),
- <MSG>;
+ http_response($socket, "200 OK", {
+ "Content-type" => "application/octet-stream",
+ "Content-Length" => (-s $messages),
+ },
+ scalar <MSG>);

close(MSG);
unlink $messages;
}
else {
- print $socket join("\r\n",
- "HTTP/1.0 200 OK",
- "Content-type: text/plain",
- "Pragma: no-cache",
- "Server: mass-check/0.0",
- "\r\n"),join("\n",
- "Your POST request (sans max_messages) came from IP Address: ".$socket->peerhost,
- "");
+ http_response($socket, "200 OK", {
+ "Content-type" => "text/plain",
+ },
+ "Your POST request (sans max_messages) came from IP Address: ".$socket->peerhost."\n");
}
}
else {
# for error, "501 Not Implemented"
- print $socket join("\r\n",
- "HTTP/1.0 501 Not Implemented",
- "");
+ http_response($socket, '501 Not Implemented', {}, '');
}

$select->remove($socket);
@@ -446,119 +465,107 @@
exit;
}

-if ($opt_progress) {
- status('starting run stage');
-}
-
-# If opt_j isn't given, just use the normal AI code ...
-if ($opt_j == 0) {
- $iter->_run($messages);
-}
-else {
- # only do 1 process, message list in a temp file, no restarting
- if ($opt_j == 1 && !defined $opt_restart) {
- my $message;
- my $messages;
- my $total_count = 0;
-
- while (($total_messages > $total_count) && ($message = $iter->read_line($tmpfd))) {
- my($class, undef, $date, undef, $result) = $iter->run_message($message);
- result($class, $result, $date) if $result;
- $total_count++;
+if ($opt_client) {
+ $opt_cs_max ||= 1000;
+ $opt_cs_timeout ||= 60 * 2;
+
+ die "Need client_url for client mode!" unless $opt_client_url;
+
+ $opt_client_url =~ /^http:\/\/([^\/]+)(\/.*)?/;
+ my($host, $uri) = ($1,$2);
+ my ($http_host) = split(/:/, $host);
+
+ die "No host found in client_url" unless $host;
+ $uri ||= "/";
+
+ # figure out max messages
+ my $msgnum = 100;
+
+ my %postdata = ();
+
+ my $tmpdir = Mail::SpamAssassin::Util::secure_tmpdir();
+ die "Can't create tempdir" unless $tmpdir;
+
+ while (1) {
+ # if the number of messages to request is too much, bring it down
+ $msgnum = $opt_cs_max if ($msgnum > $opt_cs_max);
+
+ # prep the POST request
+ $postdata{'max_messages'} = $msgnum;
+
+ my $POSTDATA = join('&', map { post_encode($_) . '=' . post_encode($postdata{$_}) } keys %postdata);
+
+ # connect to server
+ my $socket = IO::Socket::INET->new($host);
+
+ # last if connection fails
+ last unless ($socket);
+
+ # make request, include and drop results if there are any
+ my $result = http_make_request($socket, 'POST', $uri, {
+ 'Host' => $http_host,
+ 'Content-Type' => 'Content-Type: application/x-www-form-urlencoded',
+ 'Content-Length' => length($POSTDATA),
+ },
+ $POSTDATA
+ );
+ %postdata = ();
+ undef $POSTDATA;
+
+ # If we got messages to run through, go ahead and do it.
+ # otherwise, just sleep for the timeout length and try again
+ if (!defined $result) {
+ # we got an error?!? abort!
+ last;
}
- }
- # more than one process or one process with restarts
- else {
- my $select = IO::Select->new();
+ elsif (!$result) {
+ # sleep for client_timeout seconds and try the request again
+ sleep $opt_cs_timeout;
+ }
+ else {
+ # we got a result, so do things with it!
+ my $gzfd = new IO::Zlib->new($result, "rb");
+ die "Can't open temp result file: $!" unless $gzfd;

- my $total_count = 0;
- my $needs_restart = 0;
- my @child = ();
- my @pid = ();
- my $messages;
+ # used for the temp queue file
+ my $tmppath;
+ ($tmppath, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile();
+ die "Can't make tempfile, exiting" unless $tmppath;

- # start children processes
- start_children($opt_j, \@child, \@pid, $select);
+ # Archive format, gzip compressed file w/ 3 parts per message:
+ # 1- server message number in text format
+ # 2- server index string, binary packed format
+ # 3- message content

- # feed childen, make them work for it, repeat
- while ($select->count()) {
- foreach my $socket ($select->can_read()) {
- my $line = $iter->read_line($socket);
+ my $time_start = time;
+ # generate temp file w/ messages to run through
+ # create temp directory with file format messages

- # some error happened during the read!
- if (!defined $line) {
- $needs_restart = 1;
- warn "archive-iterator: readline failed, attempting to recover\n";
- $select->remove($socket);
- }
- elsif ($line =~ /^([^\0]+)\0RESULT (.+)$/s) {
- my $result = $1;
- my ($date,$class,$type) = Mail::SpamAssassin::ArchiveIterator::index_unpack($2);
- #warn ">> RESULT: $class, $type, $date\n";
+ run_through_messages();
+ # marshall up results

- if (defined $opt_restart && ($total_count % $opt_restart) == 0) {
- $needs_restart = 1;
- }
-
- # if messages remain, and we don't need to restart, send message
- if (($total_messages > $total_count) && !$needs_restart) {
- $iter->send_line($socket, $iter->read_line($tmpfd));
- $total_count++;
- #warn ">> recv: $total_messages $total_count\n";
- }
- else {
- # stop listening on this child since we're done with it
- #warn ">> removeresult: $needs_restart $total_messages $total_count\n";
- $select->remove($socket);
- }
+ my $time_end = time;

- # deal with the result we received
- if ($result) {
- if ($opt_client) {
- result_client($class, $result, $date);
- }
- else {
- result($class, $result, $date);
- }
- }
- }
- elsif ($line eq "START") {
- if ($total_messages > $total_count) {
- # we still have messages, send one to child
- $iter->send_line($socket, $iter->read_line($tmpfd));
- $total_count++;
- #warn ">> new: $total_messages $total_count\n";
- }
- else {
- # no more messages, so stop listening on this child
- #warn ">> removestart: $needs_restart $total_messages $total_count\n";
- $select->remove($socket);
- }
- }
- }
+ # figure out new max messages, try keeping ~cs_timeout between runs
+ $msgnum = int($msgnum * ($time_end-$time_start) / $opt_cs_timeout);
+ }
+ }

- #warn ">> out of loop, $total_messages $total_count $needs_restart ".$select->count()."\n";
+ close $tmpfd;
+ exit;
+}

- # If there are still messages to process, and we need to restart
- # the children, and all of the children are idle, let's go ahead.
- if ($needs_restart && $select->count == 0 && $total_messages > $total_count) {
- $needs_restart = 0;
+## OK, at this point we're not in client/server mode, so just do a single run
+## and be done.

- #warn "debug: needs restart, $total_messages total, $total_count done\n";
- reap_children($opt_j, \@child, \@pid);
- @child=();
- @pid=();
- start_children($opt_j, \@child, \@pid, $select);
- }
- }
+if ($opt_progress) {
+ status('starting run stage');
+}

- # reap children
- reap_children($opt_j, \@child, \@pid);
- }
+run_through_messages();

- # close tempfile so it will be unlinked
- close($tmpfd);
-}
+# close tempfile so it will be unlinked
+close($tmpfd);

if ($opt_progress) {
status('completed run stage');
@@ -1084,43 +1091,6 @@
}
}

-sub fork_scan_targets {
- my $targets = shift;
- my $iter = shift;
-
- my $tmpf;
- ($tmpf, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile();
- die 'archive-iterator: failed to create temp file' unless $tmpf;
-
- unlink $tmpf or die "archive-iterator: unlink '$tmpf': $!";
- undef $tmpf;
-
- # forked child process scans messages
- if ($tmpf = fork()) {
- # parent
- waitpid($tmpf, 0);
- }
- elsif (defined $tmpf) {
- # child
- $iter->message_array($targets, $tmpfd);
- exit;
- }
- else {
- die "archive-iterator: cannot fork: $!";
- }
-
- # we now have a temporary file with the messages to process
- # in theory, our file pointer is at the start of the file, but make sure.
- # NOTE: do this here, not in message_array, since that will only affect
- # the child.
- seek($tmpfd, 0, 0);
- $total_messages = $iter->read_line($tmpfd);
-
- if (!$total_messages) {
- die "archive-iterator: no messages to process\n";
- }
-}
-
sub handle_http_request {
my $socket = shift;

@@ -1176,7 +1146,7 @@

# Find out if any of the messages we sent out before need to be sent out
# again because we haven't seen a response yet.
- my $tooold = time - $opt_server_timeout;
+ my $tooold = time - $opt_cs_timeout;
foreach (keys %{$timestamps}) {
next if ($_ > $tooold);
my $wanted = $msgs - @tosend;
@@ -1292,4 +1262,168 @@
delete $timestamps->{$k};
}
}
+}
+
+sub run_through_messages {
+ # only do 1 process, message list in a temp file, no restarting
+ if ($opt_j <= 1 && !defined $opt_restart) {
+ my $message;
+ my $messages;
+ my $total_count = 0;
+
+ while (($total_messages > $total_count) && ($message = $iter->read_line($tmpfd))) {
+ my($class, undef, $date, undef, $result) = $iter->run_message($message);
+ result($class, $result, $date) if $result;
+ $total_count++;
+ }
+ }
+ # more than one process or one process with restarts
+ else {
+ my $select = IO::Select->new();
+
+ my $total_count = 0;
+ my $needs_restart = 0;
+ my @child = ();
+ my @pid = ();
+ my $messages;
+
+ # start children processes
+ start_children($opt_j, \@child, \@pid, $select);
+
+ # feed childen, make them work for it, repeat
+ while ($select->count()) {
+ foreach my $socket ($select->can_read()) {
+ my $line = $iter->read_line($socket);
+
+ # some error happened during the read!
+ if (!defined $line) {
+ $needs_restart = 1;
+ warn "archive-iterator: readline failed, attempting to recover\n";
+ $select->remove($socket);
+ }
+ elsif ($line =~ /^([^\0]+)\0RESULT (.+)$/s) {
+ my $result = $1;
+ my ($date,$class,$type) = Mail::SpamAssassin::ArchiveIterator::index_unpack($2);
+ #warn ">> RESULT: $class, $type, $date\n";
+
+ if (defined $opt_restart && ($total_count % $opt_restart) == 0) {
+ $needs_restart = 1;
+ }
+
+ # if messages remain, and we don't need to restart, send message
+ if (($total_messages > $total_count) && !$needs_restart) {
+ $iter->send_line($socket, $iter->read_line($tmpfd));
+ $total_count++;
+ #warn ">> recv: $total_messages $total_count\n";
+ }
+ else {
+ # stop listening on this child since we're done with it
+ #warn ">> removeresult: $needs_restart $total_messages $total_count\n";
+ $select->remove($socket);
+ }
+
+ # deal with the result we received
+ if ($result) {
+ if ($opt_client) {
+ result_client($class, $result, $date);
+ }
+ else {
+ result($class, $result, $date);
+ }
+ }
+ }
+ elsif ($line eq "START") {
+ if ($total_messages > $total_count) {
+ # we still have messages, send one to child
+ $iter->send_line($socket, $iter->read_line($tmpfd));
+ $total_count++;
+ #warn ">> new: $total_messages $total_count\n";
+ }
+ else {
+ # no more messages, so stop listening on this child
+ #warn ">> removestart: $needs_restart $total_messages $total_count\n";
+ $select->remove($socket);
+ }
+ }
+ }
+
+ #warn ">> out of loop, $total_messages $total_count $needs_restart ".$select->count()."\n";
+
+ # If there are still messages to process, and we need to restart
+ # the children, and all of the children are idle, let's go ahead.
+ if ($needs_restart && $select->count == 0 && $total_messages > $total_count) {
+ $needs_restart = 0;
+
+ #warn "debug: needs restart, $total_messages total, $total_count done\n";
+ reap_children($opt_j, \@child, \@pid);
+ @child=();
+ @pid=();
+ start_children($opt_j, \@child, \@pid, $select);
+ }
+ }
+
+ # reap children
+ reap_children($opt_j, \@child, \@pid);
+ }
+}
+
+sub http_response {
+ my($socket, $result, $headers, $data) = @_;
+
+ print $socket
+ "HTTP/1.0 $result\r\n",
+ "Pragma: no-cache\r\n",
+ "Server: mass-check/0.0\r\n",
+ map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers};
+ print "\r\n";
+ print $data;
+}
+
+sub http_make_request {
+ my($socket, $type, $uri, $headers, $data) = @_;
+
+ print $socket
+ "$type $uri HTTP/1.0\r\n",
+ "User-Agent: mass-check/0.0\r\n",
+ map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers};
+ print "\r\n";
+ print $data;
+
+ my $line = $socket->getline();
+ my(undef, $code, $string) = split(/\s+/, $line, 3);
+ return unless $code == 200;
+
+ my %headers = ();
+ do {
+ $line = $socket->getline();
+ last unless defined $line;
+ $line =~ s/\r\n$//;
+
+ if ($line) {
+ my ($k,$v) = split(/:\s*/, $line, 2);
+ $headers->{lc $k} = $v;
+ }
+ } while ($line !~ /^$/);
+
+ my $gzpath = '';
+ if ($headers{'content-length'}) {
+ my $gzfd;
+ ($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile();
+ die "Can't make tempfile, exiting" unless $gzpath;
+
+ my $rd;
+ $socket->read($rd, $headers->{'content-length'});
+ print $gzfd $rd;
+ close $gzfd;
+ }
+
+ $socket->close();
+ return $gzpath;
+}
+
+# Be conservative -- anything that's non-alphanumeric, encode!
+sub post_encode {
+ my $string = shift;
+ $string =~ s/(\W)/sprintf "%%%02x",unpack("C",$1)/egx;
+ return $string;
}