Mailing List Archive

svn commit: r438344 - /spamassassin/branches/tvd-multi-mass-check/masses/mass-check
Author: felicity
Date: Tue Aug 29 21:10:28 2006
New Revision: 438344

URL: http://svn.apache.org/viewvc?rev=438344&view=rev
Log:
start cleaning up the code and adding in comments, etc.

Modified:
spamassassin/branches/tvd-multi-mass-check/masses/mass-check

Modified: spamassassin/branches/tvd-multi-mass-check/masses/mass-check
URL: http://svn.apache.org/viewvc/spamassassin/branches/tvd-multi-mass-check/masses/mass-check?rev=438344&r1=438343&r2=438344&view=diff
==============================================================================
--- spamassassin/branches/tvd-multi-mass-check/masses/mass-check (original)
+++ spamassassin/branches/tvd-multi-mass-check/masses/mass-check Tue Aug 29 21:10:28 2006
@@ -332,29 +332,28 @@
$iter->set_functions(\&wanted, \&result);
}

+# normal mode as well as a server do scan mode and get a temp file
if (!$opt_client) {
- if ($opt_progress) {
- status('starting scan stage');
- }
+ status('starting scan stage') if ($opt_progress);

+ # Make a temp file and delete it
my $tmpf;
($tmpf, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile();
die 'archive-iterator: failed to create temp file' unless $tmpf;
-
unlink $tmpf or die "archive-iterator: unlink '$tmpf': $!";
- undef $tmpf;

+ # having opt_j or server mode means do scan in a separate process
if ($opt_server || $opt_j) {
- # forked child process scans messages
if ($tmpf = fork()) {
# parent
waitpid($tmpf, 0);
}
elsif (defined $tmpf) {
- # child
+ # child -- process using message_array
my($num, $messages) = $iter->message_array(\@targets);

- # Dump out the messages to the temp file if we're using one
+ # Dump out the number of messages and the message index info to
+ # the temp file
send_line($tmpfd, $num, @{$messages});

exit;
@@ -364,480 +363,206 @@
}
}
else {
- $iter->message_array(\@targets, $tmpfd);
+ # we get here if opt_j == 0, so scan in this process
+ my($num, $messages) = $iter->message_array(\@targets);
+
+ # Dump out the number of messages and the message index info to
+ # the temp file
+ send_line($tmpfd, $num, @{$messages});
}

# we now have a temporary file with the messages to process
- # in theory, our file pointer is at the start of the file, but make sure.
- # NOTE: do this here, not in message_array, since that will only affect
- # the child.
seek($tmpfd, 0, 0);
+ # the first line is the number of messages
$total_messages = read_line($tmpfd);

if (!$total_messages) {
die "archive-iterator: no messages to process\n";
}

- if ($opt_progress) {
- status("completed scan stage, $total_messages messages");
- }
+ status("completed scan stage, $total_messages messages") if ($opt_progress);
}

-sub wanted_server {
- my ($class, $id, $time, $dataref, $format) = @_;
- return $dataref;
+if ($opt_client) {
+ client_mode();
}
+else {
+ status('starting run stage') if ($opt_progress);

-sub result_client {
- my ($class, $result, $time) = @_;
-
- if ($class eq "s") {
- $spam_count++;
+ if ($opt_server) {
+ server_mode();
}
- elsif ($class eq "h") {
- $ham_count++;
+ else {
+ run_through_messages();
}

- $total_count++;
+ status('completed run stage') if ($opt_progress);
+}

- if ($opt_progress) {
- progress($time);
- }
+# Even though we're about to exit, let's clean up after ourselves
+close($tmpfd);
+showdots_finish();

-# print ">>>> $result\n";
- $result =~ s/^(\d+)\s+//m;
- $postdata{$1} = $result;
+if (defined $opt_rewrite) {
+ close(REWRITE);
}

-if ($opt_server) {
- $opt_cs_max ||= 1000;
- $opt_cs_timeout ||= 60 * 5;
+$spamtest->finish();

- my $serv_socket = IO::Socket::INET->new(LocalPort => 8080,
- Proto => 'tcp',
- Listen => 5,
- Reuse => 1);
+# exit status: did we check at least one message correctly?
+exit(!($ham_count || $spam_count));

- die "Could not create socket: $!\n" unless $serv_socket;
+###########################################################################

- if ($opt_progress) {
- status('server ready for connections');
+sub target {
+ my ($target) = @_;
+ if (!defined($opt_format)) {
+ push(@targets, $target);
+ }
+ else {
+ $opt_o = 1;
+ push(@targets, "spam:$opt_format:$target");
}
+}

- my $timestamps = {};
- my $msgsout = { 'curnum' => 0 };
+###########################################################################

- my $select = IO::Select->new( $serv_socket );
+sub init_results {
+ showdots_finish();

- my $sent_messages = 1;
- while ($select->count()) {
- foreach my $socket ($select->can_read()) {
- if ($socket == $serv_socket) {
- $select->add($serv_socket->accept);
- }
- else {
- my($type, $URI, $headers, $postdata) = handle_http_request($socket);
+ # now, showdots only happens if --showdots was used
+ $showdots_active = $opt_showdots;

- if ($type eq 'GET') {
- http_response($socket, "200 OK", {
- 'Content-type' => 'text/plain',
- },
- "Your GET request came from IP Address: ".$socket->peerhost."\n");
- }
- elsif ($type eq 'POST') {
- handle_post_results($postdata, $timestamps, $msgsout);
+ if ($opt_progress) {
+ # round up since 100% will be caught at end already
+ $statusevery = int($total_messages / $updates + 1);

- my $messages = '';
- if ($postdata->{'max_messages'}) {
- my $msgnum = $postdata->{'max_messages'};
- $msgnum = $opt_cs_max if ($msgnum > $opt_cs_max);
- $messages = generate_messages($msgnum, $timestamps, $msgsout, $postdata->{'paths_only'});
- }
+ # if $messages < $updates, just give a status line per msg.
+ $statusevery ||= 1;
+ }

-#print ">> sending messages\n";
+ if ($opt_o) {
+ autoflush STDOUT 1;
+ print STDOUT $log_header;
+ }
+ else {
+ open(HAM, "> $opt_hamlog") || die "open of $opt_hamlog failed: $!";
+ open(SPAM, "> $opt_spamlog") || die "open of $opt_spamlog failed: $!";
+ autoflush HAM 1;
+ autoflush SPAM 1;
+ print HAM $log_header;
+ print SPAM $log_header;
+ }
+ $init_results = 1;
+}

- if ($messages && open(MSG, $messages)) {
- binmode(MSG);
- local $/ = undef;
+sub result {
+ my ($class, $result, $time) = @_;

- # Content-Encoding: gzip
- http_response($socket, "200 OK", {
- "Content-type" => "application/octet-stream",
- "Content-Length" => (-s $messages),
- },
- scalar <MSG>);
+ # don't open results files until we get here to avoid overwriting files
+ &init_results if !$init_results;

- close(MSG);
- unlink $messages;
- }
- elsif (!keys %{$msgsout} && !defined $tmpfd) {
- http_response($socket, "200 OK", {
- "Content-type" => "text/plain",
- "Finished" => 1,
- },
- 'We are all done');
- }
- else {
- http_response($socket, "200 OK", {
- "Content-type" => "text/plain",
- },
- "Your POST request (sans max_messages) came from IP Address: ".$socket->peerhost."\n");
- }
- }
- else {
- # for error, "501 Not Implemented"
- http_response($socket, '501 Not Implemented', {}, '');
- }
-
- $select->remove($socket);
- $socket->close;
- }
- }
+ if ($class eq "s") {
+ if ($opt_o) { print STDOUT $result; } else { print SPAM $result; }
+ $spam_count++;
+ }
+ elsif ($class eq "h") {
+ if ($opt_o) { print STDOUT $result; } else { print HAM $result; }
+ $ham_count++;
+ }

-#print "msgs waiting: ".join(" ", keys %{$msgsout})."\n";
-#print "tmpfd defined? ".(defined $tmpfd ? "yes" : "no")."\n";
+ $total_count++;
+#warn ">> result: $total_count $class $time\n";

- # drop the listener when ready
- # we're not awaiting responses and we've exhausted the input file
- $select->remove($serv_socket) if (!keys %{$msgsout} && !defined $tmpfd);
+ if ($opt_progress) {
+ progress($time);
}
-
- exit;
}

-if ($opt_client) {
- $opt_cs_max ||= 1000;
- $opt_cs_timeout ||= 60 * 2;
+sub wanted {
+ my ($class, $id, $time, $dataref, $format) = @_;
+ my $out = '';

- $opt_client =~ /^http:\/\/([^\/]+)(\/.*)?/;
- my($host, $uri) = ($1,$2);
- my ($http_host) = split(/:/, $host);
+ # if origid is defined, it'll be the message number from server mode
+ my $origid;

- die "No host found in opt_client" unless $host;
- $uri ||= "/";
+ # client mode is a little crazy because we need to kluge around the fact
+ # that the information needed to do the run is different than the
+ # information that goes into the results.
+ if ($opt_client) {
+ if ($opt_cs_paths_only) {
+ $origid = $real{$id};
+ }
+ else {
+ # if we're a non-paths_only client, change the format and id to the real
+ # version, make sure to remember the server's message number
+ $origid=$id;
+ $origid =~ s/^.+?(\d+)$/$1/;
+ $format = $real{$id}->[2];
+ $id = $real{$id}->[3];
+ }
+ }

- # figure out max messages
- my $msgnum = 100;
+ memory_track_start() if ($opt_logmem);

- my $tmpdir;
+ # parse the message, and force it to complete
+ my $ma = $spamtest->parse($dataref, 1);

- if (!$opt_cs_paths_only) {
- $tmpdir = Mail::SpamAssassin::Util::secure_tmpdir();
- die "Can't create tempdir" unless $tmpdir;
+ # remove SpamAssassin markup, if present and the mail was spam
+ my $header = $ma->get_header("Received");
+ my $x_spam_status;
+ if ($opt_reuse) {
+ # get X-Spam-Status: header for rule hit resue
+ $x_spam_status = $ma->get_header("X-Spam-Status");
}
+ # previous hits
+ my @previous;
+ if ($x_spam_status) {
+ $x_spam_status =~ s/,\s+/,/gs;
+ if ($x_spam_status =~ m/tests=(\S*)/
+ && $x_spam_status !~ /\bshortcircuit=(?:ham|spam|default)\b/)
+ {
+ push @previous, split(/,/, $1);

- while (1) {
- # if the number of messages to request is too much, bring it down
- $msgnum = $opt_cs_max if ($msgnum > $opt_cs_max);
+ # we found previous tests, so move the reuse config into place
+ unless ($reuse_rules_loaded_p) {
+ $spamtest->copy_config(\%reuse_conf, undef);
+ $reuse_rules_loaded_p = 1;
+ }
+ }
+ }
+ elsif ($opt_reuse) {
+ if ($reuse_rules_loaded_p) {
+ $spamtest->copy_config(\%orig_conf, undef);
+ $reuse_rules_loaded_p = 0;
+ }
+ }

- # prep the POST request
- $postdata{'max_messages'} = $msgnum;
- $postdata{'paths_only'} = 1 if ($opt_cs_paths_only);
+ if ($header && $header =~ /\bwith SpamAssassin\b/) {
+ if (!$opt_deencap || message_should_be_deencapped($ma)) {
+ my $new_ma = $spamtest->parse($spamtest->remove_spamassassin_markup($ma), 1);
+ $ma->finish();
+ $ma = $new_ma;
+ }
+ }

- my $POSTDATA = join('&', map { post_encode($_) . '=' . post_encode($postdata{$_}) } keys %postdata);
+ # log-uris support
+ my $status;
+ my @uris;
+ my $before;
+ my $after;
+ if ($opt_loguris) {
+ my $pms = Mail::SpamAssassin::PerMsgStatus->new($spamtest, $ma);
+ @uris = $pms->get_uri_list();
+ $pms->finish();

- # connect to server
- my $socket = IO::Socket::INET->new($host);
+ } else {
+ $before = time;
+ $status = $spamtest->check($ma);
+ $after = time;
+ }

- # last if connection fails
- last unless ($socket);
-
- # make request, include and drop results if there are any
- my $result = http_make_request($socket, 'POST', $uri, {
- 'Host' => $http_host,
- 'Content-Type' => 'application/x-www-form-urlencoded',
- 'Content-Length' => length($POSTDATA),
- },
- $POSTDATA
- );
- %postdata = ();
- undef $POSTDATA;
-
- # If we got messages to run through, go ahead and do it.
- # otherwise, just sleep for the timeout length and try again
- if (!defined $result) {
- # we got an error?!? abort!
- last;
- }
- elsif (!$result) {
- # sleep for client_timeout seconds and try the request again
-print "Got no response, waiting $opt_cs_timeout seconds\n";
- sleep $opt_cs_timeout;
- }
- else {
- my $time_start = time;
-
-print "Got response: $result\n";
-
- %postdata = ();
- %real = ();
- $spam_count = $ham_count = 0;
-
- # we got a result, so do things with it!
- my $gzfd = IO::Zlib->new($result, "rb");
- die "Can't open temp result file: $!" unless $gzfd;
-
- # used for the temp queue file
- my $tmppath;
- ($tmppath, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile();
-print ">> $tmppath\n";
- die "Can't make tempfile, exiting" unless $tmppath;
-
-print ">> cleandir\n";
- clean_dir($tmpdir);
-
- # Archive format, gzip compressed file w/ 3 parts per message:
- # 1- server message number in text format
- # 2- server index string, binary packed format
- # 3- message content
-print ">> writing out files\n";
-
- # number of messages
- $msgnum = $total_messages = read_line($gzfd);
-
-print ">> total of $total_messages messages\n";
-
- for(my $i = 0 ; $i < $total_messages; $i++ ) {
- my $num = read_line($gzfd);
- last unless defined $num;
-#print "read in message $num\n";
- my $index = read_line($gzfd);
-#print "read in index $index\n";
- last unless defined $index;
-#print "output message $num\n";
- if (!$opt_cs_paths_only) {
- my $msg = read_line($gzfd);
- last unless defined $msg;
-
- if (open(OUT, ">$tmpdir/$num")) {
- print OUT $msg;
- close(OUT);
-
- my @d = Mail::SpamAssassin::ArchiveIterator::index_unpack($index);
- $real{"$tmpdir/$num"} = \@d;
- send_line($tmpfd,
- Mail::SpamAssassin::ArchiveIterator::index_pack($d[0], $d[1], 'f', "$tmpdir/$num"));
- }
- else {
- warn "Can't create/write $tmpdir/$num: $!";
- }
- }
- else {
- # need to relate message number and path
- my @d = Mail::SpamAssassin::ArchiveIterator::index_unpack($index);
- $real{$d[3]} = $num;
- send_line($tmpfd, $index);
- }
-#print "wrote mess $num\n";
- }
-
-print "exited loop\n";
-
- $gzfd->close;
- unlink $result;
-
-print "beginning run\n";
-
- # we're about to start running, so go back to the start of the file
- seek $tmpfd, 0, 0;
-
- run_through_messages();
-
- unlink $tmppath;
-
-print "ended run\n";
- # figure out new max messages, try keeping ~cs_timeout between runs
- my $time_end = time;
- if ($time_end == $time_start) {
- $time_end++;
- }
-print "ran $msgnum messages in ".($time_end-$time_start)." seconds\n";
- $msgnum = int($msgnum * $opt_cs_timeout / ($time_end-$time_start)) || 1;
-print "now requesting $msgnum messages\n";
- }
- }
-
- close $tmpfd;
-
- if ($tmpdir) {
- clean_dir($tmpdir);
- rmdir $tmpdir;
- }
- exit;
-}
-
-## OK, at this point we're not in client/server mode, so just do a single run
-## and be done.
-
-if ($opt_progress) {
- status('starting run stage');
-}
-
-run_through_messages();
-
-# close tempfile so it will be unlinked
-close($tmpfd);
-
-if ($opt_progress) {
- status('completed run stage');
-}
-
-showdots_finish();
-
-if (defined $opt_rewrite) {
- close(REWRITE);
-}
-
-$spamtest->finish();
-
-# exit status: did we check at least one message correctly?
-exit(!($ham_count || $spam_count));
-
-###########################################################################
-
-sub target {
- my ($target) = @_;
- if (!defined($opt_format)) {
- push(@targets, $target);
- }
- else {
- $opt_o = 1;
- push(@targets, "spam:$opt_format:$target");
- }
-}
-
-###########################################################################
-
-sub init_results {
- showdots_finish();
-
- # now, showdots only happens if --showdots was used
- $showdots_active = $opt_showdots;
-
- if ($opt_progress) {
- # round up since 100% will be caught at end already
- $statusevery = int($total_messages / $updates + 1);
-
- # if $messages < $updates, just give a status line per msg.
- $statusevery ||= 1;
- }
-
- if ($opt_o) {
- autoflush STDOUT 1;
- print STDOUT $log_header;
- }
- else {
- open(HAM, "> $opt_hamlog") || die "open of $opt_hamlog failed: $!";
- open(SPAM, "> $opt_spamlog") || die "open of $opt_spamlog failed: $!";
- autoflush HAM 1;
- autoflush SPAM 1;
- print HAM $log_header;
- print SPAM $log_header;
- }
- $init_results = 1;
-}
-
-sub result {
- my ($class, $result, $time) = @_;
-
- # don't open results files until we get here to avoid overwriting files
- &init_results if !$init_results;
-
- if ($class eq "s") {
- if ($opt_o) { print STDOUT $result; } else { print SPAM $result; }
- $spam_count++;
- }
- elsif ($class eq "h") {
- if ($opt_o) { print STDOUT $result; } else { print HAM $result; }
- $ham_count++;
- }
-
- $total_count++;
-#warn ">> result: $total_count $class $time\n";
-
- if ($opt_progress) {
- progress($time);
- }
-}
-
-sub wanted {
- my ($class, $id, $time, $dataref, $format) = @_;
- my $out = '';
-
- my $origid;
-
- if ($opt_client) {
-# warn ">>> $id\n";
- if ($opt_cs_paths_only) {
- $origid = $real{$id};
- }
- else {
- $origid=$id;
- $origid =~ s/^.+?(\d+)$/$1/;
- $format = $real{$id}->[2];
- $id = $real{$id}->[3];
- }
- }
-
- memory_track_start() if ($opt_logmem);
-
- my $ma = $spamtest->parse($dataref, 1);
-
- # remove SpamAssassin markup, if present and the mail was spam
- my $header = $ma->get_header("Received");
- my $x_spam_status;
- if ($opt_reuse) {
- # get X-Spam-Status: header for rule hit resue
- $x_spam_status = $ma->get_header("X-Spam-Status");
- }
- # previous hits
- my @previous;
- if ($x_spam_status) {
- $x_spam_status =~ s/,\s+/,/gs;
- if ($x_spam_status =~ m/tests=(\S*)/
- && $x_spam_status !~ /\bshortcircuit=(?:ham|spam|default)\b/)
- {
- push @previous, split(/,/, $1);
-
- # we found previous tests, so move the reuse config into place
- unless ($reuse_rules_loaded_p) {
- $spamtest->copy_config(\%reuse_conf, undef);
- $reuse_rules_loaded_p = 1;
- }
- }
- }
- elsif ($opt_reuse) {
- if ($reuse_rules_loaded_p) {
- $spamtest->copy_config(\%orig_conf, undef);
- $reuse_rules_loaded_p = 0;
- }
- }
-
- if ($header && $header =~ /\bwith SpamAssassin\b/) {
- if (!$opt_deencap || message_should_be_deencapped($ma)) {
- my $new_ma = $spamtest->parse($spamtest->remove_spamassassin_markup($ma), 1);
- $ma->finish();
- $ma = $new_ma;
- }
- }
-
- # log-uris support
- my $status;
- my @uris;
- my $before;
- my $after;
- if ($opt_loguris) {
- my $pms = Mail::SpamAssassin::PerMsgStatus->new($spamtest, $ma);
- @uris = $pms->get_uri_list();
- $pms->finish();
-
- } else {
- $before = time;
- $status = $spamtest->check($ma);
- $after = time;
- }
-
- my @extra;
+ my @extra;

# sample-based learning
if ($opt_learn > 0) {
@@ -1511,124 +1236,407 @@
}
}

- # reap children
- reap_children($opt_j, \@child, \@pid);
+ # reap children
+ reap_children($opt_j, \@child, \@pid);
+ }
+}
+
+sub http_response {
+ my($socket, $result, $headers, $data) = @_;
+
+ print $socket
+ "HTTP/1.0 $result\r\n",
+ "Pragma: no-cache\r\n",
+ "Server: mass-check/0.0\r\n",
+ map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers};
+ print $socket "\r\n";
+ print $socket $data;
+}
+
+sub http_make_request {
+ my($socket, $type, $uri, $headers, $data) = @_;
+
+ print $socket
+ "$type $uri HTTP/1.0\r\n",
+ "User-Agent: mass-check/0.0\r\n",
+ map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers};
+ print $socket "\r\n";
+ print $socket $data;
+
+ my $line = $socket->getline();
+ my(undef, $code, $string) = split(/\s+/, $line, 3);
+ return unless $code == 200;
+
+ my %headers = ();
+ do {
+ $line = $socket->getline();
+ last unless defined $line;
+ $line =~ s/\r\n$//;
+
+ if ($line) {
+ my ($k,$v) = split(/:\s*/, $line, 2);
+ $headers{lc $k} = $v;
+ }
+ } while ($line !~ /^$/);
+
+ return if ($headers{'finished'});
+
+ my $gzpath = '';
+ if ($headers{'content-length'}) {
+ my $gzfd;
+ ($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile();
+ die "Can't make tempfile, exiting" unless $gzpath;
+
+ my $rd;
+ $socket->read($rd, $headers{'content-length'});
+ print $gzfd $rd;
+ close $gzfd;
+ }
+
+ $socket->close();
+ return $gzpath;
+}
+
+# Be conservative -- anything that's non-alphanumeric, encode!
+sub post_encode {
+ my $string = shift;
+ $string =~ s/(\W)/sprintf "%%%02x",unpack("C",$1)/egx;
+ return $string;
+}
+
+sub clean_dir {
+ my $dir = shift;
+
+ unless (opendir(DIR, $dir)) {
+ warn "error: can't opendir $dir: $!\n";
+ return;
+ }
+ while(my $file = readdir(DIR)) {
+ $file =~ /^(.+)$/; # untaint
+ $file = $1;
+
+ my $path = File::Spec->catfile($dir, $file);
+ next unless (-f $path);
+
+ if (!unlink $path) {
+ warn "error: can't remove file $path: $!\n";
+ closedir(DIR);
+ return;
+ }
+ }
+ closedir(DIR);
+ return 1;
+}
+
+############################################################################
+
+# four bytes in network/vax format (little endian) as length of message
+# the rest is the actual message
+
+sub read_line {
+ my $fd = shift;
+ my($length,$msg);
+
+ # read in the 4 byte length and unpack
+ $fd->read($length, 4) || return;
+
+ $length = unpack("V", $length);
+ return unless $length;
+
+ # read in the rest of the single message
+ $fd->read($msg, $length) || return;
+
+ return $msg;
+}
+
+sub send_line {
+ my $fd = shift;
+
+ foreach ( @_ ) {
+ my $length = pack("V", length $_);
+ $fd->print($length.$_);
+ }
+}
+
+
+sub server_mode {
+ $opt_cs_max ||= 1000;
+ $opt_cs_timeout ||= 60 * 5;
+
+ my $serv_socket = IO::Socket::INET->new(LocalPort => 8080,
+ Proto => 'tcp',
+ Listen => 5,
+ Reuse => 1);
+
+ die "Could not create socket: $!\n" unless $serv_socket;
+
+ if ($opt_progress) {
+ status('server ready for connections');
+ }
+
+ my $timestamps = {};
+ my $msgsout = { 'curnum' => 0 };
+
+ my $select = IO::Select->new( $serv_socket );
+
+ my $sent_messages = 1;
+ while ($select->count()) {
+ foreach my $socket ($select->can_read()) {
+ if ($socket == $serv_socket) {
+ $select->add($serv_socket->accept);
+ }
+ else {
+ my($type, $URI, $headers, $postdata) = handle_http_request($socket);
+
+ if ($type eq 'GET') {
+ http_response($socket, "200 OK", {
+ 'Content-type' => 'text/plain',
+ },
+ "Your GET request came from IP Address: ".$socket->peerhost."\n");
+ }
+ elsif ($type eq 'POST') {
+ handle_post_results($postdata, $timestamps, $msgsout);
+
+ my $messages = '';
+ if ($postdata->{'max_messages'}) {
+ my $msgnum = $postdata->{'max_messages'};
+ $msgnum = $opt_cs_max if ($msgnum > $opt_cs_max);
+ $messages = generate_messages($msgnum, $timestamps, $msgsout, $postdata->{'paths_only'});
+ }
+
+#print ">> sending messages\n";
+
+ if ($messages && open(MSG, $messages)) {
+ binmode(MSG);
+ local $/ = undef;
+
+ # Content-Encoding: gzip
+ http_response($socket, "200 OK", {
+ "Content-type" => "application/octet-stream",
+ "Content-Length" => (-s $messages),
+ },
+ scalar <MSG>);
+
+ close(MSG);
+ unlink $messages;
+ }
+ elsif (!keys %{$msgsout} && !defined $tmpfd) {
+ http_response($socket, "200 OK", {
+ "Content-type" => "text/plain",
+ "Finished" => 1,
+ },
+ 'We are all done');
+ }
+ else {
+ http_response($socket, "200 OK", {
+ "Content-type" => "text/plain",
+ },
+ "Your POST request (sans max_messages) came from IP Address: ".$socket->peerhost."\n");
+ }
+ }
+ else {
+ # for error, "501 Not Implemented"
+ http_response($socket, '501 Not Implemented', {}, '');
+ }
+
+ $select->remove($socket);
+ $socket->close;
+ }
+ }
+
+#print "msgs waiting: ".join(" ", keys %{$msgsout})."\n";
+#print "tmpfd defined? ".(defined $tmpfd ? "yes" : "no")."\n";
+
+ # drop the listener when ready
+ # we're not awaiting responses and we've exhausted the input file
+ $select->remove($serv_socket) if (!keys %{$msgsout} && !defined $tmpfd);
}
}

-sub http_response {
- my($socket, $result, $headers, $data) = @_;
+sub client_mode {
+ $opt_cs_max ||= 1000;
+ $opt_cs_timeout ||= 60 * 2;

- print $socket
- "HTTP/1.0 $result\r\n",
- "Pragma: no-cache\r\n",
- "Server: mass-check/0.0\r\n",
- map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers};
- print $socket "\r\n";
- print $socket $data;
-}
+ $opt_client =~ /^http:\/\/([^\/]+)(\/.*)?/;
+ my($host, $uri) = ($1,$2);
+ my ($http_host) = split(/:/, $host);

-sub http_make_request {
- my($socket, $type, $uri, $headers, $data) = @_;
+ die "No host found in opt_client" unless $host;
+ $uri ||= "/";

- print $socket
- "$type $uri HTTP/1.0\r\n",
- "User-Agent: mass-check/0.0\r\n",
- map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers};
- print $socket "\r\n";
- print $socket $data;
+ # figure out max messages
+ my $msgnum = 100;

- my $line = $socket->getline();
- my(undef, $code, $string) = split(/\s+/, $line, 3);
- return unless $code == 200;
+ my $tmpdir;

- my %headers = ();
- do {
- $line = $socket->getline();
- last unless defined $line;
- $line =~ s/\r\n$//;
+ if (!$opt_cs_paths_only) {
+ $tmpdir = Mail::SpamAssassin::Util::secure_tmpdir();
+ die "Can't create tempdir" unless $tmpdir;
+ }

- if ($line) {
- my ($k,$v) = split(/:\s*/, $line, 2);
- $headers{lc $k} = $v;
+ while (1) {
+ # if the number of messages to request is too much, bring it down
+ $msgnum = $opt_cs_max if ($msgnum > $opt_cs_max);
+
+ # prep the POST request
+ $postdata{'max_messages'} = $msgnum;
+ $postdata{'paths_only'} = 1 if ($opt_cs_paths_only);
+
+ my $POSTDATA = join('&', map { post_encode($_) . '=' . post_encode($postdata{$_}) } keys %postdata);
+
+ # connect to server
+ my $socket = IO::Socket::INET->new($host);
+
+ # last if connection fails
+ last unless ($socket);
+
+ # make request, include and drop results if there are any
+ my $result = http_make_request($socket, 'POST', $uri, {
+ 'Host' => $http_host,
+ 'Content-Type' => 'application/x-www-form-urlencoded',
+ 'Content-Length' => length($POSTDATA),
+ },
+ $POSTDATA
+ );
+ %postdata = ();
+ undef $POSTDATA;
+
+ # If we got messages to run through, go ahead and do it.
+ # otherwise, just sleep for the timeout length and try again
+ if (!defined $result) {
+ # we got an error?!? abort!
+ last;
}
- } while ($line !~ /^$/);
+ elsif (!$result) {
+ # sleep for client_timeout seconds and try the request again
+print "Got no response, waiting $opt_cs_timeout seconds\n";
+ sleep $opt_cs_timeout;
+ }
+ else {
+ my $time_start = time;

- return if ($headers{'finished'});
+print "Got response: $result\n";

- my $gzpath = '';
- if ($headers{'content-length'}) {
- my $gzfd;
- ($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile();
- die "Can't make tempfile, exiting" unless $gzpath;
+ %postdata = ();
+ %real = ();
+ $total_count = $spam_count = $ham_count = 0;

- my $rd;
- $socket->read($rd, $headers{'content-length'});
- print $gzfd $rd;
- close $gzfd;
- }
+ # we got a result, so do things with it!
+ my $gzfd = IO::Zlib->new($result, "rb");
+ die "Can't open temp result file: $!" unless $gzfd;

- $socket->close();
- return $gzpath;
-}
+ # used for the temp queue file
+ my $tmppath;
+ ($tmppath, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile();
+print ">> $tmppath\n";
+ die "Can't make tempfile, exiting" unless $tmppath;

-# Be conservative -- anything that's non-alphanumeric, encode!
-sub post_encode {
- my $string = shift;
- $string =~ s/(\W)/sprintf "%%%02x",unpack("C",$1)/egx;
- return $string;
-}
+ clean_dir($tmpdir) if ($tmpdir);

-sub clean_dir {
- my $dir = shift;
+ # Archive format, gzip compressed file w/ 3 parts per message:
+ # 1- server message number in text format
+ # 2- server index string, binary packed format
+ # 3- message content
+print ">> writing out files\n";

- unless (opendir(DIR, $dir)) {
- warn "error: can't opendir $dir: $!\n";
- return;
- }
- while(my $file = readdir(DIR)) {
- $file =~ /^(.+)$/; # untaint
- $file = $1;
+ # number of messages
+ $msgnum = $total_messages = read_line($gzfd);

- my $path = File::Spec->catfile($dir, $file);
- next unless (-f $path);
+print ">> total of $total_messages messages\n";

- if (!unlink $path) {
- warn "error: can't remove file $path: $!\n";
- closedir(DIR);
- return;
- }
- }
- closedir(DIR);
- return 1;
-}
+ for(my $i = 0 ; $i < $total_messages; $i++ ) {
+ my $num = read_line($gzfd);
+ last unless defined $num;
+#print "read in message $num\n";
+ my $index = read_line($gzfd);
+#print "read in index $index\n";
+ last unless defined $index;
+#print "output message $num\n";
+ if (!$opt_cs_paths_only) {
+ my $msg = read_line($gzfd);
+ last unless defined $msg;

-############################################################################
+ if (open(OUT, ">$tmpdir/$num")) {
+ print OUT $msg;
+ close(OUT);

-# four bytes in network/vax format (little endian) as length of message
-# the rest is the actual message
+ my @d = Mail::SpamAssassin::ArchiveIterator::index_unpack($index);
+ $real{"$tmpdir/$num"} = \@d;
+ send_line($tmpfd,
+ Mail::SpamAssassin::ArchiveIterator::index_pack($d[0], $d[1], 'f', "$tmpdir/$num"));
+ }
+ else {
+ warn "Can't create/write $tmpdir/$num: $!";
+ }
+ }
+ else {
+ # need to relate message number and path
+ my @d = Mail::SpamAssassin::ArchiveIterator::index_unpack($index);
+ $real{$d[3]} = $num;
+ send_line($tmpfd, $index);
+ }
+#print "wrote mess $num\n";
+ }

-sub read_line {
- my $fd = shift;
- my($length,$msg);
+print "exited loop\n";

- # read in the 4 byte length and unpack
- $fd->read($length, 4) || return;
+ $gzfd->close;
+ unlink $result;

- $length = unpack("V", $length);
- return unless $length;
+print "beginning run\n";

- # read in the rest of the single message
- $fd->read($msg, $length) || return;
+ # we're about to start running, so go back to the start of the file
+ seek $tmpfd, 0, 0;

- return $msg;
+ run_through_messages();
+
+ unlink $tmppath;
+
+print "ended run\n";
+ # figure out new max messages, try keeping ~cs_timeout between runs
+ my $time_end = time;
+ if ($time_end == $time_start) {
+ $time_end++;
+ }
+print "ran $msgnum messages in ".($time_end-$time_start)." seconds\n";
+ $msgnum = int($msgnum * $opt_cs_timeout / ($time_end-$time_start)) || 1;
+print "now requesting $msgnum messages\n";
+ }
+ }
+
+ close $tmpfd;
+
+ if ($tmpdir) {
+ clean_dir($tmpdir);
+ rmdir $tmpdir;
+ }
}

-sub send_line {
- my $fd = shift;
+sub wanted_server {
+ my ($class, $id, $time, $dataref, $format) = @_;
+ return $dataref;
+}

- foreach ( @_ ) {
- my $length = pack("V", length $_);
- $fd->print($length.$_);
+sub result_client {
+ my ($class, $result, $time) = @_;
+
+ if ($class eq "s") {
+ $spam_count++;
+ }
+ elsif ($class eq "h") {
+ $ham_count++;
+ }
+
+ $total_count++;
+
+ if ($opt_progress) {
+ progress($time);
+ }
+
+ if ($result =~ s/^(\d+)\s+//m) {
+ $postdata{$1} = $result;
+ }
+ else {
+ warn ">> result is not in the correct format: $result\n";
}
}