Mailing List Archive

r1623 - trunk/varnish-cache/bin/varnishreplay
Author: des
Date: 2007-07-03 11:09:55 +0200 (Tue, 03 Jul 2007)
New Revision: 1623

Modified:
trunk/varnish-cache/bin/varnishreplay/varnishreplay.c
Log:
Reorganize the code a little, and add code to wait for all threads to finish
processing pending messages before we exit.

Note that VSL_Dispatch() will read in log data as fast as it can, so when
working from a log file, varnishreplay will usually read in the entire file
into memory within the first few seconds.


Modified: trunk/varnish-cache/bin/varnishreplay/varnishreplay.c
===================================================================
--- trunk/varnish-cache/bin/varnishreplay/varnishreplay.c 2007-07-03 08:50:34 UTC (rev 1622)
+++ trunk/varnish-cache/bin/varnishreplay/varnishreplay.c 2007-07-03 09:09:55 UTC (rev 1623)
@@ -32,6 +32,7 @@
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
+#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -42,32 +43,60 @@
#include "varnishapi.h"
#include "vss.h"

-static struct thread {
- pthread_t thread_id;
- struct mailbox *mbox;
-} **threads;
-
-struct mailbox {
- pthread_mutex_t lock;
- pthread_cond_t has_mail;
- STAILQ_HEAD(msgq_head, message) messages;
-};
+#ifndef HAVE_STRNDUP
+#include "compat/strndup.h"
+#endif

+#define freez(x) do { if (x) free(x); x = NULL; } while (0);
+
+static struct vss_addr *addr_info;
+static int debug;
+
+/*
+ * mailbox toolkit
+ */
+
struct message {
enum shmlogtag tag;
+ size_t len;
char *ptr;
- unsigned len;
STAILQ_ENTRY(message) list;
};

-static size_t nthreads;
+struct mailbox {
+ pthread_mutex_t lock;
+ pthread_cond_t has_mail;
+ int open;
+ STAILQ_HEAD(msgq_head, message) messages;
+};

-static struct vss_addr *adr_info;
-static int debug;
+static void
+mailbox_create(struct mailbox *mbox)
+{

+ STAILQ_INIT(&mbox->messages);
+ pthread_mutex_init(&mbox->lock, NULL);
+ pthread_cond_init(&mbox->has_mail, NULL);
+ mbox->open = 1;
+}
+
static void
+mailbox_destroy(struct mailbox *mbox)
+{
+ struct message *msg;
+
+ while ((msg = STAILQ_FIRST(&mbox->messages))) {
+ STAILQ_REMOVE_HEAD(&mbox->messages, list);
+ free(msg);
+ }
+ pthread_cond_destroy(&mbox->has_mail);
+ pthread_mutex_destroy(&mbox->lock);
+}
+
+static void
mailbox_put(struct mailbox *mbox, struct message *msg)
{
+
pthread_mutex_lock(&mbox->lock);
STAILQ_INSERT_TAIL(&mbox->messages, msg, list);
pthread_cond_signal(&mbox->has_mail);
@@ -78,15 +107,113 @@
mailbox_get(struct mailbox *mbox)
{
struct message *msg;
-
+
pthread_mutex_lock(&mbox->lock);
- while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL)
+ while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL && mbox->open)
pthread_cond_wait(&mbox->has_mail, &mbox->lock);
- STAILQ_REMOVE_HEAD(&mbox->messages, list);
+ if (msg != NULL)
+ STAILQ_REMOVE_HEAD(&mbox->messages, list);
pthread_mutex_unlock(&mbox->lock);
return msg;
}

+static void
+mailbox_close(struct mailbox *mbox)
+{
+ pthread_mutex_lock(&mbox->lock);
+ mbox->open = 0;
+ pthread_cond_signal(&mbox->has_mail);
+ pthread_mutex_unlock(&mbox->lock);
+}
+
+/*
+ * thread toolkit
+ */
+
+struct thread {
+ pthread_t thread_id;
+ struct mailbox mbox;
+};
+
+static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void
+thread_log(int lvl, const char *fmt, ...)
+{
+ va_list ap;
+
+ if (lvl > debug)
+ return;
+ pthread_mutex_lock(&log_mutex);
+ fprintf(stderr, "%08x ", (unsigned int)pthread_self());
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ pthread_mutex_unlock(&log_mutex);
+}
+
+static struct thread **threads;
+static size_t nthreads;
+
+static struct thread *
+thread_get(int fd, void *(*thread_main)(void *))
+{
+
+ assert(fd != 0);
+ if (fd >= nthreads) {
+ struct thread **newthreads = threads;
+ size_t newnthreads = nthreads;
+
+ while (fd >= newnthreads)
+ newnthreads += newnthreads + 1;
+ newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
+ assert(newthreads != NULL);
+ memset(newthreads + nthreads, 0,
+ (newnthreads - nthreads) * sizeof *newthreads);
+ threads = newthreads;
+ nthreads = newnthreads;
+ }
+ if (threads[fd] == NULL) {
+ threads[fd] = malloc(sizeof *threads[fd]);
+ assert(threads[fd] != NULL);
+ mailbox_create(&threads[fd]->mbox);
+ if (pthread_create(&threads[fd]->thread_id, NULL,
+ thread_main, threads[fd]) != 0) {
+ thread_log(0, "thread creation failed\n");
+ mailbox_destroy(&threads[fd]->mbox);
+ freez(threads[fd]);
+ }
+ thread_log(1, "thread %08x started\n",
+ (unsigned int)threads[fd]->thread_id);
+ }
+ return (threads[fd]);
+}
+
+static void
+thread_close(int fd)
+{
+
+ assert(fd < nthreads);
+ if (fd == 0) {
+ for (fd = 1; fd < nthreads; ++fd)
+ thread_close(fd);
+ return;
+ }
+
+ if (threads[fd] == NULL)
+ return;
+ mailbox_close(&threads[fd]->mbox);
+ pthread_join(threads[fd]->thread_id, NULL);
+ thread_log(1, "thread %08x stopped\n",
+ (unsigned int)threads[fd]->thread_id);
+ mailbox_destroy(&threads[fd]->mbox);
+ freez(threads[fd]);
+}
+
+/*
+ * ...
+ */
+
static int
isprefix(const char *str, const char *prefix, const char *end, const char **next)
{
@@ -159,14 +286,14 @@
int i, n;

if (VSS_parse(address, &addr, &port) != 0) {
- fprintf(stderr, "Invalid address\n");
+ thread_log(0, "Invalid address\n");
exit(2);
}
n = VSS_resolve(addr, port, &ta);
free(addr);
free(port);
if (n == 0) {
- fprintf(stderr, "Could not connect to server\n");
+ thread_log(0, "Could not connect to server\n");
exit(2);
}
for (i = 1; i < n; ++i) {
@@ -200,12 +327,11 @@
buf = realloc(buf, lbuf);
XXXAN(buf);
}
- //fprintf(stderr, "start reading\n");
i = read(sock, buf + nbuf, 1);
if (i <= 0) {
- perror("error in reading\n");
+ thread_log(0, "read(): %s\n", strerror(errno));
free(buf);
- exit(1);
+ return (-1);
}
nbuf += i;
if (nbuf >= 2 && buf[nbuf-2] == '\r' && buf[nbuf-1] == '\n')
@@ -233,8 +359,8 @@
n = read(sock, buf + nbuf,
(2048 < length - nbuf ? 2048 : length - nbuf));
if (n <= 0) {
- perror("failed reading the block\n");
- break;
+ thread_log(0, "failed reading the block\n");
+ return (-1);
}
nbuf += n;
}
@@ -282,24 +408,20 @@
free(line);
}

- if (debug)
- fprintf(stderr, "status: %d\n", status);
+ thread_log(1, "status: %d\n", status);


/* Read body */
if (content_length > 0 && !chunked) {
/* Fixed body size, read content_length bytes */
- if (debug)
- fprintf(stderr, "fixed length\n");
- n = read_block(content_length, sock);
- if (debug) {
- fprintf(stderr, "size of body: %d\n", (int)content_length);
- fprintf(stderr, "bytes read: %d\n", n);
- }
+ thread_log(1, "fixed length\n");
+ thread_log(1, "size of body: %ld\n", content_length);
+ if ((n = read_block(content_length, sock)) < 0)
+ return (1);
+ thread_log(1, "bytes read: %d\n", n);
} else if (chunked) {
/* Chunked encoding, read size and bytes until no more */
- if (debug)
- fprintf(stderr, "chunked encoding\n");
+ thread_log(1, "chunked encoding\n");
while (1) {
line_len = read_line(&line, sock);
end = line + line_len;
@@ -308,10 +430,8 @@
break;
}
n = read_block(block_len, sock);
- if (debug) {
- fprintf(stderr, "size of body: %d\n", (int)block_len);
- fprintf(stderr, "bytes read: %d\n", n);
- }
+ thread_log(1, "size of body: %d\n", (int)block_len);
+ thread_log(1, "bytes read: %d\n", n);
free(line);
n = read_line(&line, sock);
free(line);
@@ -320,29 +440,27 @@
free(line);
} else if ((content_length <= 0 && !chunked) || req_failed) {
/* No body --> stop reading. */
- if (debug)
- fprintf(stderr, "no body\n");
+ thread_log(1, "no body\n");
+ return (1);
} else {
/* Unhandled case. */
- fprintf(stderr, "An error occured\n");
- exit(1);
+ thread_log(0, "An error occured\n");
+ return (1);
}
- if (debug)
- fprintf(stderr, "\n");

return close_connection;
}

static void *
-pthread_main(void *arg)
+replay_thread(void *arg)
{
+ struct thread *thr = arg;
struct message *msg;
- struct thread *th = (struct thread*)arg;
enum shmlogtag tag;
- int len;
+ size_t len;
char *ptr;
const char *end, *next;
-
+
char *df_H = NULL; /* %H, Protocol version */
char *df_Host = NULL; /* %{Host}i */
char *df_Uq = NULL; /* %U%q, URL path and query string */
@@ -351,18 +469,15 @@
int bogus = 0; /* bogus request */

int sock, reopen = 1;
-
- //fprintf(stderr, "thread started\n");
-
- do {
- msg = mailbox_get(th->mbox);
+
+ while ((msg = mailbox_get(&thr->mbox)) != NULL) {
tag = msg->tag;
len = msg->len;
ptr = msg->ptr;
end = ptr + len;
-
- //fprintf(stderr, "%08x %s(%s)\n", (unsigned int)pthread_self(), VSL_tags[tag], msg->ptr);
-
+
+ thread_log(2, "%s(%s)\n", VSL_tags[tag], msg->ptr);
+
switch (tag) {
case SLT_RxRequest:
if (df_m != NULL)
@@ -398,10 +513,10 @@

if (tag != SLT_ReqEnd)
continue;
-
- //fprintf(stderr, "bogus: %d %s\n", bogus, df_m);

- if (!bogus) {
+ if (bogus) {
+ thread_log(1, "bogus\n");
+ } else {
/* If the method is supported (GET or HEAD), send the request out
* on the socket. If the socket needs reopening, reopen it first.
* When the request is sent, call the function for receiving
@@ -409,16 +524,11 @@
*/
if (!(strncmp(df_m, "GET", 3) && strncmp(df_m, "HEAD", 4))) {
if (reopen)
- sock = VSS_connect(adr_info);
+ sock = VSS_connect(addr_info);
reopen = 0;

- if (debug) {
- fprintf(stderr, "%s ", df_m);
- fprintf(stderr, "%s ", df_Uq);
- fprintf(stderr, "%s ", df_H);
- fprintf(stderr, "\n");
- fprintf(stderr, "Host: ");
- }
+ thread_log(1, "%s %s %s\n", df_m, df_Uq, df_H);
+
write(sock, df_m, strlen(df_m));
write(sock, " ", 1);
write(sock, df_Uq, strlen(df_Uq));
@@ -432,16 +542,12 @@

write(sock, "Host: ", 6);
if (df_Host) {
- if (debug)
- fprintf(stderr, df_Host);
+ thread_log(1, "Host: %s\n", df_Host);
write(sock, df_Host, strlen(df_Host));
}
- if (debug)
- fprintf(stderr, "\n");
write(sock, "\r\n", 2);
if (df_c) {
- if (debug)
- fprintf(stderr, "Connection: %s\n", df_c);
+ thread_log(1, "Connection: %s\n", df_c);
write(sock, "Connection: ", 12);
write(sock, df_c, strlen(df_c));
write(sock, "\r\n", 2);
@@ -449,7 +555,7 @@
reopen = 0;
}
if (debug)
- fprintf(stderr, "\n");
+ thread_log(0, "\n");
write(sock, "\r\n", 2);
if (!reopen)
reopen = receive_response(sock);
@@ -459,16 +565,15 @@
}

/* clean up */
-#define freez(x) do { if (x) free(x); x = NULL; } while (0);
+ freez(msg->ptr);
+ freez(msg);
freez(df_H);
freez(df_Host);
freez(df_Uq);
freez(df_m);
freez(df_c);
-#undef freez
bogus = 0;
- } while (1);
-
+ }
return (0);
}

@@ -477,50 +582,25 @@
gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
unsigned len, unsigned spec, const char *ptr)
{
+ struct thread *thr;
const char *end;
struct message *msg;
- int err;
-
+
(void)priv;

end = ptr + len;

- if (!(spec & VSL_S_CLIENT))
+ if (fd == 0 || !(spec & VSL_S_CLIENT))
return (0);

- //fprintf(stderr, "gen_traffic\n");
-
- if (fd >= nthreads) {
- struct thread **newthreads = threads;
- size_t newnthreads = nthreads;
-
- while (fd >= newnthreads)
- newnthreads += newnthreads + 1;
- newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
- assert(newthreads != NULL);
- memset(newthreads + nthreads, 0, (newnthreads - nthreads) * sizeof *newthreads);
- threads = newthreads;
- nthreads = newnthreads;
- }
- if (threads[fd] == NULL) {
- threads[fd] = malloc(sizeof *threads[fd]);
- assert(threads[fd] != NULL);
- threads[fd]->mbox = malloc(sizeof (struct mailbox));
- STAILQ_INIT(&threads[fd]->mbox->messages);
- pthread_mutex_init(&threads[fd]->mbox->lock, NULL);
- pthread_cond_init(&threads[fd]->mbox->has_mail, NULL);
- err = pthread_create(&threads[fd]->thread_id, NULL, pthread_main, threads[fd]);
- if (err)
- fprintf(stderr, "thread creation failed\n");
- fprintf(stderr, "Ok, thread %08x created... %d\n", (unsigned int)threads[fd]->thread_id, err);
- }
+ thread_log(2, "%d %s\n", fd, VSL_tags[tag]);
+ thr = thread_get(fd, replay_thread);
msg = malloc(sizeof (struct message));
msg->tag = tag;
- msg->ptr = strdup(ptr);
msg->len = len;
- mailbox_put(threads[fd]->mbox, msg);
- //fprintf(stderr, "message put\n");
-
+ msg->ptr = strndup(ptr, len);
+ mailbox_put(&thr->mbox, msg);
+
return 0;
}

@@ -536,14 +616,14 @@
char buf[2];
char last = ' ';
int sock, reopen = 1;
-
- adr_info = init_connection(address);
- sock = VSS_connect(adr_info);
+
+ addr_info = init_connection(address);
+ sock = VSS_connect(addr_info);
while (read(fd, buf, 1)) {
write(sock, buf, 1);
- fprintf(stderr, "%s", buf);
+ thread_log(0, "%s", buf);
if (*buf == '\n' && last == '\n'){
- fprintf(stderr, "receive\n");
+ thread_log(0, "receive\n");
reopen = receive_response(sock);
}
last = *buf;
@@ -581,7 +661,7 @@
address = optarg;
break;
case 'D':
- debug = 1;
+ ++debug;
break;
case 't':
/* This option is for testing only. The test file must contain
@@ -609,10 +689,10 @@
if (VSL_OpenLog(vd, NULL))
exit(1);

- adr_info = init_connection(address);
+ addr_info = init_connection(address);

while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
/* nothing */ ;
-
+ thread_close(0);
exit(0);
}