Mailing List Archive

svn commit: r437520 - in /spamassassin/branches/tvd-multi-mass-check: lib/Mail/SpamAssassin/ArchiveIterator.pm masses/mass-check
Author: felicity
Date: Sun Aug 27 20:47:17 2006
New Revision: 437520

URL: http://svn.apache.org/viewvc?rev=437520&view=rev
Log:
more client/server work -- the client now actually downloads from the server and creates the temp dir

Modified:
spamassassin/branches/tvd-multi-mass-check/lib/Mail/SpamAssassin/ArchiveIterator.pm
spamassassin/branches/tvd-multi-mass-check/masses/mass-check

Modified: spamassassin/branches/tvd-multi-mass-check/lib/Mail/SpamAssassin/ArchiveIterator.pm
URL: http://svn.apache.org/viewvc/spamassassin/branches/tvd-multi-mass-check/lib/Mail/SpamAssassin/ArchiveIterator.pm?rev=437520&r1=437519&r2=437520&view=diff
==============================================================================
--- spamassassin/branches/tvd-multi-mass-check/lib/Mail/SpamAssassin/ArchiveIterator.pm (original)
+++ spamassassin/branches/tvd-multi-mass-check/lib/Mail/SpamAssassin/ArchiveIterator.pm Sun Aug 27 20:47:17 2006
@@ -414,44 +414,12 @@

############################################################################

-# four bytes in network/vax format (little endian) as length of message
-# the rest is the actual message
-
-sub read_line {
- my($self, $fd) = @_;
- my($length,$msg);
-
- # read in the 4 byte length and unpack
- sysread($fd, $length, 4);
- $length = unpack("V", $length);
-# warn "<< $$ $length\n";
- return unless $length;
-
- # read in the rest of the single message
- sysread($fd, $msg, $length);
-# warn "<< $$ $msg\n";
- return $msg;
-}
-
-sub send_line {
- my $self = shift;
- my $fd = shift;
-
- foreach ( @_ ) {
- my $length = pack("V", length $_);
-# warn ">> $$ ".length($_)." $_\n";
- syswrite($fd, $length . $_);
- }
-}
-
-############################################################################
-
## FUNCTIONS BELOW THIS POINT ARE FOR FINDING THE MESSAGES TO RUN AGAINST

############################################################################

sub message_array {
- my ($self, $targets, $fh) = @_;
+ my ($self, $targets) = @_;

foreach my $target (@${targets}) {
if (!defined $target) {
@@ -573,12 +541,6 @@
}
if ($self->{opt_head} < 0) {
splice(@messages, -$self->{opt_head});
- }
-
- # Dump out the messages to the temp file if we're using one
- if (defined $fh) {
- $self->send_line($fh, scalar(@messages), @messages);
- return;
}

return scalar(@messages), \@messages;

Modified: spamassassin/branches/tvd-multi-mass-check/masses/mass-check
URL: http://svn.apache.org/viewvc/spamassassin/branches/tvd-multi-mass-check/masses/mass-check?rev=437520&r1=437519&r2=437520&view=diff
==============================================================================
--- spamassassin/branches/tvd-multi-mass-check/masses/mass-check (original)
+++ spamassassin/branches/tvd-multi-mass-check/masses/mass-check Sun Aug 27 20:47:17 2006
@@ -99,7 +99,7 @@
$opt_logmem $opt_after $opt_before $opt_rewrite $opt_deencap
$opt_learn $opt_reuse $opt_lint $opt_cache $opt_noisy
$total_messages $statusevery $opt_cachedir
- $opt_client $opt_cs_max $opt_cs_timeout $opt_client_url
+ $opt_client $opt_cs_max $opt_cs_timeout
$opt_server
$tmpfd %reuse %orig_conf %reuse_conf $reuse_rules_loaded_p);

@@ -349,7 +349,11 @@
}
elsif (defined $tmpf) {
# child
- $iter->message_array(\@targets, $tmpfd);
+ my($num, $messages) = $iter->message_array(\@targets);
+
+ # Dump out the messages to the temp file if we're using one
+ send_line($tmpfd, $num, @{$messages});
+
exit;
}
else {
@@ -365,7 +369,7 @@
# NOTE: do this here, not in message_array, since that will only affect
# the child.
seek($tmpfd, 0, 0);
- $total_messages = $iter->read_line($tmpfd);
+ $total_messages = read_line($tmpfd);

if (!$total_messages) {
die "archive-iterator: no messages to process\n";
@@ -426,6 +430,8 @@
$messages = generate_messages($msgnum, $timestamps, $msgsout);
}

+print ">> sending messages\n";
+
if ($messages && open(MSG, $messages)) {
binmode(MSG);
local $/ = undef;
@@ -469,13 +475,11 @@
$opt_cs_max ||= 1000;
$opt_cs_timeout ||= 60 * 2;

- die "Need client_url for client mode!" unless $opt_client_url;
-
- $opt_client_url =~ /^http:\/\/([^\/]+)(\/.*)?/;
+ $opt_client =~ /^http:\/\/([^\/]+)(\/.*)?/;
my($host, $uri) = ($1,$2);
my ($http_host) = split(/:/, $host);

- die "No host found in client_url" unless $host;
+ die "No host found in opt_client" unless $host;
$uri ||= "/";

# figure out max messages
@@ -504,7 +508,7 @@
# make request, include and drop results if there are any
my $result = http_make_request($socket, 'POST', $uri, {
'Host' => $http_host,
- 'Content-Type' => 'Content-Type: application/x-www-form-urlencoded',
+ 'Content-Type' => 'application/x-www-form-urlencoded',
'Content-Length' => length($POSTDATA),
},
$POSTDATA
@@ -520,11 +524,13 @@
}
elsif (!$result) {
# sleep for client_timeout seconds and try the request again
+print "Got no response, waiting $opt_cs_timeout seconds\n";
sleep $opt_cs_timeout;
}
else {
+print "Got response: $result\n";
# we got a result, so do things with it!
- my $gzfd = new IO::Zlib->new($result, "rb");
+ my $gzfd = IO::Zlib->new($result, "rb");
die "Can't open temp result file: $!" unless $gzfd;

# used for the temp queue file
@@ -532,10 +538,40 @@
($tmppath, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile();
die "Can't make tempfile, exiting" unless $tmppath;

+print ">> cleandir\n";
+ clean_dir($tmpdir);
+
# Archive format, gzip compressed file w/ 3 parts per message:
# 1- server message number in text format
# 2- server index string, binary packed format
# 3- message content
+ my %real = ();
+print ">> writing out files\n";
+ do {
+ my $num = read_line($gzfd);
+ last unless defined $num;
+print "read in message $num\n";
+ my $index = read_line($gzfd);
+ last unless defined $index;
+print "output message $num\n";
+ if (open(OUT, ">$tmpdir/$num")) {
+ print OUT read_line($gzfd);
+ close(OUT);
+
+ my @d = Mail::SpamAssassin::ArchiveIterator::index_unpack($index);
+ $real{$num} = \@d;
+ send_line($tmpfd,
+ Mail::SpamAssassin::ArchiveIterator::index_unpack($d[0], $d[1], 'f', "$tmpdir/$num"));
+ }
+ else {
+ warn "Can't create/write $tmpdir/$num: $!";
+ }
+print "wrote mess $num\n";
+ } while (!$gzfd->eof);
+
+ $gzfd->close;
+ exit;
+ unlink $result;

my $time_start = time;
# generate temp file w/ messages to run through
@@ -1044,8 +1080,8 @@
close $child->[$i];
select($parent);
$| = 1; # print to parent by default, turn off buffering
- $iter->send_line($parent,"START");
- while ($line = $iter->read_line($parent)) {
+ send_line($parent,"START");
+ while ($line = read_line($parent)) {
if ($line eq "exit") {
close $parent;
exit;
@@ -1062,7 +1098,7 @@
$line = Mail::SpamAssassin::ArchiveIterator::index_pack($date, $class, $format, $where);
}

- $iter->send_line($parent,"$result\0RESULT $line");
+ send_line($parent,"$result\0RESULT $line");
}
exit;
}
@@ -1085,7 +1121,7 @@

for (my $i = 0; $i < $count; $i++) {
#warn "debug: killing child $i (pid ",$pid->[$i],")\n";
- $iter->send_line($socket->[$i],"exit"); # tell the child to die.
+ send_line($socket->[$i],"exit"); # tell the child to die.
close $socket->[$i];
waitpid($pid->[$i], 0); # wait for the signal ...
}
@@ -1100,7 +1136,9 @@
# read in the request
# read in headers, "key: value"
my $line = $socket->getline();
+ $line ||= '';
$line =~ s/\r\n$//;
+
my ($type, $URI, $VERS) = $line =~ /^([a-zA-Z]+)\s+(\S+)(?:\s*(\S+))/;
unless ($type && $URI && $VERS) {
$type ||= '';
@@ -1166,7 +1204,7 @@

if ($tmpfd) {
while (@tosend < $msgs) {
- my $msg = $iter->read_line($tmpfd);
+ my $msg = read_line($tmpfd);

# no more messages from the temp file, close it out
unless ($msg) {
@@ -1201,9 +1239,9 @@
# 1- server message number in text format
# 2- server index string, binary packed format
# 3- message content
- $iter->send_line($gzfd, $num);
- $iter->send_line($gzfd, $data);
- $iter->send_line($gzfd, join('', @{$msg}));
+ send_line($gzfd, $num);
+ send_line($gzfd, $data);
+ send_line($gzfd, join('', @{$msg}));
}

$gzfd->close;
@@ -1271,7 +1309,7 @@
my $messages;
my $total_count = 0;

- while (($total_messages > $total_count) && ($message = $iter->read_line($tmpfd))) {
+ while (($total_messages > $total_count) && ($message = read_line($tmpfd))) {
my($class, undef, $date, undef, $result) = $iter->run_message($message);
result($class, $result, $date) if $result;
$total_count++;
@@ -1293,7 +1331,7 @@
# feed childen, make them work for it, repeat
while ($select->count()) {
foreach my $socket ($select->can_read()) {
- my $line = $iter->read_line($socket);
+ my $line = read_line($socket);

# some error happened during the read!
if (!defined $line) {
@@ -1312,7 +1350,7 @@

# if messages remain, and we don't need to restart, send message
if (($total_messages > $total_count) && !$needs_restart) {
- $iter->send_line($socket, $iter->read_line($tmpfd));
+ send_line($socket, read_line($tmpfd));
$total_count++;
#warn ">> recv: $total_messages $total_count\n";
}
@@ -1335,7 +1373,7 @@
elsif ($line eq "START") {
if ($total_messages > $total_count) {
# we still have messages, send one to child
- $iter->send_line($socket, $iter->read_line($tmpfd));
+ send_line($socket, read_line($tmpfd));
$total_count++;
#warn ">> new: $total_messages $total_count\n";
}
@@ -1375,8 +1413,8 @@
"Pragma: no-cache\r\n",
"Server: mass-check/0.0\r\n",
map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers};
- print "\r\n";
- print $data;
+ print $socket "\r\n";
+ print $socket $data;
}

sub http_make_request {
@@ -1386,8 +1424,8 @@
"$type $uri HTTP/1.0\r\n",
"User-Agent: mass-check/0.0\r\n",
map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers};
- print "\r\n";
- print $data;
+ print $socket "\r\n";
+ print $socket $data;

my $line = $socket->getline();
my(undef, $code, $string) = split(/\s+/, $line, 3);
@@ -1401,7 +1439,7 @@

if ($line) {
my ($k,$v) = split(/:\s*/, $line, 2);
- $headers->{lc $k} = $v;
+ $headers{lc $k} = $v;
}
} while ($line !~ /^$/);

@@ -1412,7 +1450,7 @@
die "Can't make tempfile, exiting" unless $gzpath;

my $rd;
- $socket->read($rd, $headers->{'content-length'});
+ $socket->read($rd, $headers{'content-length'});
print $gzfd $rd;
close $gzfd;
}
@@ -1426,4 +1464,76 @@
my $string = shift;
$string =~ s/(\W)/sprintf "%%%02x",unpack("C",$1)/egx;
return $string;
+}
+
+sub clean_dir {
+ my $dir = shift;
+
+ unless (opendir(DIR, $dir)) {
+ warn "error: can't opendir $dir: $!\n";
+ return;
+ }
+ while(my $file = readdir(DIR)) {
+ $file =~ /^(.+)$/; # untaint
+ $file = $1;
+
+ my $path = File::Spec->catfile($dir, $file);
+ next unless (-f $path);
+
+ if (!unlink $path) {
+ warn "error: can't remove file $path: $!\n";
+ closedir(DIR);
+ return;
+ }
+ }
+ closedir(DIR);
+ return 1;
+}
+
+############################################################################
+
+# four bytes in network/vax format (little endian) as length of message
+# the rest is the actual message
+
+sub read_line {
+ my $fd = shift;
+ my($length,$msg);
+
+ # read in the 4 byte length and unpack
+# if (ref $fd eq 'GLOB') {
+# sysread($fd, $length, 4) || return;
+# }
+# elsif (ref $fd ne '') {
+ $fd->read($length, 4) || return;
+# }
+
+ $length = unpack("V", $length);
+# warn "<< $$ $length\n";
+ return unless $length;
+
+ # read in the rest of the single message
+# if (ref $fd eq 'GLOB') {
+# sysread($fd, $msg, $length) || return;
+# }
+# elsif (ref $fd ne '') {
+ $fd->read($msg, $length) || return;
+# }
+
+# warn "<< $$ $msg\n";
+ return $msg;
+}
+
+sub send_line {
+ my $fd = shift;
+
+ foreach ( @_ ) {
+ my $length = pack("V", length $_);
+# warn ">> $$ ".length($_)." $_\n";
+# if (ref $fd eq 'GLOB') {
+# syswrite($fd, $length . $_);
+# }
+# elsif (ref $fd ne '') {
+ $fd->print($length.$_);
+# }
+ }
}