Mailing List Archive

r1620 - trunk/varnish-cache/bin/varnishreplay
Author: cecilihf
Date: 2007-07-03 10:07:09 +0200 (Tue, 03 Jul 2007)
New Revision: 1620

Modified:
trunk/varnish-cache/bin/varnishreplay/Makefile.am
trunk/varnish-cache/bin/varnishreplay/varnishreplay.c
Log:
parellising varnishreplay. Work in progress.


Modified: trunk/varnish-cache/bin/varnishreplay/Makefile.am
===================================================================
--- trunk/varnish-cache/bin/varnishreplay/Makefile.am 2007-07-02 17:30:03 UTC (rev 1619)
+++ trunk/varnish-cache/bin/varnishreplay/Makefile.am 2007-07-03 08:07:09 UTC (rev 1620)
@@ -14,4 +14,6 @@
varnishreplay_LDADD = \
$(top_builddir)/lib/libvarnish/libvarnish.la \
$(top_builddir)/lib/libcompat/libcompat.a \
- $(top_builddir)/lib/libvarnishapi/libvarnishapi.la
+ $(top_builddir)/lib/libvarnishapi/libvarnishapi.la \
+ ${PTHREAD_LIBS}
+

Modified: trunk/varnish-cache/bin/varnishreplay/varnishreplay.c
===================================================================
--- trunk/varnish-cache/bin/varnishreplay/varnishreplay.c 2007-07-02 17:30:03 UTC (rev 1619)
+++ trunk/varnish-cache/bin/varnishreplay/varnishreplay.c 2007-07-03 08:07:09 UTC (rev 1620)
@@ -30,32 +30,63 @@

#include <ctype.h>
#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-#include <fcntl.h>

#include "libvarnish.h"
+#include "queue.h"
#include "varnishapi.h"
#include "vss.h"

-static struct request {
- char *df_H; /* %H, Protocol version */
- char *df_Host; /* %{Host}i */
- char *df_Uq; /* %U%q, URL path and query string */
- char *df_m; /* %m, Request method*/
- char *df_c; /* Connection info (keep-alive, close) */
- int bogus; /* bogus request */
-} **req;
+static struct thread {
+ pthread_t thread_id;
+ struct mailbox *mbox;
+} **threads;
+
+struct mailbox {
+ pthread_mutex_t lock;
+ pthread_cond_t has_mail;
+ STAILQ_HEAD(msgq_head, message) messages;
+};

-static size_t nreq;
+struct message {
+ enum shmlogtag tag;
+ char *ptr;
+ unsigned len;
+ STAILQ_ENTRY(message) list;
+};

+static size_t nthreads;
+
static struct vss_addr *adr_info;
-static int sock;
-static int reopen;
static int debug;

+static void
+mailbox_put(struct mailbox *mbox, struct message *msg)
+{
+ pthread_mutex_lock(&mbox->lock);
+ STAILQ_INSERT_TAIL(&mbox->messages, msg, list);
+ pthread_cond_signal(&mbox->has_mail);
+ pthread_mutex_unlock(&mbox->lock);
+}
+
+static struct message *
+mailbox_get(struct mailbox *mbox)
+{
+ struct message *msg;
+
+ pthread_mutex_lock(&mbox->lock);
+ while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL)
+ pthread_cond_wait(&mbox->has_mail, &mbox->lock);
+ STAILQ_REMOVE_HEAD(&mbox->messages, list);
+ pthread_mutex_unlock(&mbox->lock);
+ return msg;
+}
+
static int
isprefix(const char *str, const char *prefix, const char *end, const char **next)
{
@@ -153,7 +184,7 @@
* A line is terminated by \r\n
*/
static int
-read_line(char **line)
+read_line(char **line, int sock)
{
char *buf;
unsigned nbuf, lbuf;
@@ -191,7 +222,7 @@
* the number of bytes read.
*/
static int
-read_block(int length)
+read_block(int length, int sock)
{
char *buf;
int n, nbuf;
@@ -214,7 +245,7 @@
/* Receive the response after sending a request.
*/
static int
-receive_response(void)
+receive_response(int sock)
{
char *line, *end;
const char *next;
@@ -229,7 +260,7 @@

/* Read header */
while (1) {
- line_len = read_line(&line);
+ line_len = read_line(&line, sock);
end = line + line_len;

if (*line == '\r' && *(line + 1) == '\n') {
@@ -260,7 +291,7 @@
/* Fixed body size, read content_length bytes */
if (debug)
fprintf(stderr, "fixed length\n");
- n = read_block(content_length);
+ n = read_block(content_length, sock);
if (debug) {
fprintf(stderr, "size of body: %d\n", (int)content_length);
fprintf(stderr, "bytes read: %d\n", n);
@@ -270,22 +301,22 @@
if (debug)
fprintf(stderr, "chunked encoding\n");
while (1) {
- line_len = read_line(&line);
+ line_len = read_line(&line, sock);
end = line + line_len;
block_len = strtol(line, &end, 16);
if (block_len == 0) {
break;
}
- n = read_block(block_len);
+ n = read_block(block_len, sock);
if (debug) {
fprintf(stderr, "size of body: %d\n", (int)block_len);
fprintf(stderr, "bytes read: %d\n", n);
}
free(line);
- n = read_line(&line);
+ n = read_line(&line, sock);
free(line);
}
- n = read_line(&line);
+ n = read_line(&line, sock);
free(line);
} else if ((content_length <= 0 && !chunked) || req_failed) {
/* No body --> stop reading. */
@@ -302,155 +333,198 @@
return close_connection;
}

-
-static int
-gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
- unsigned len, unsigned spec, const char *ptr)
+static void *
+pthread_main(void *arg)
{
+ struct message *msg;
+ struct thread *th = (struct thread*)arg;
+ enum shmlogtag tag;
+ int len;
+ char *ptr;
const char *end, *next;
- FILE *fo;
- struct request *rp;
+
+ char *df_H = NULL; /* %H, Protocol version */
+ char *df_Host = NULL; /* %{Host}i */
+ char *df_Uq = NULL; /* %U%q, URL path and query string */
+ char *df_m = NULL; /* %m, Request method*/
+ char *df_c = NULL; /* Connection info (keep-alive, close) */
+ int bogus = 0; /* bogus request */

- end = ptr + len;
+ int sock, reopen = 1;
+
+ //fprintf(stderr, "thread started\n");
+
+ do {
+ msg = mailbox_get(th->mbox);
+ tag = msg->tag;
+ len = msg->len;
+ ptr = msg->ptr;
+ end = ptr + len;
+
+ //fprintf(stderr, "%08x %s(%s)\n", (unsigned int)pthread_self(), VSL_tags[tag], msg->ptr);
+
+ switch (tag) {
+ case SLT_RxRequest:
+ if (df_m != NULL)
+ bogus = 1;
+ else
+ df_m = trimline(ptr, end);
+ break;

- if (!(spec & VSL_S_CLIENT))
- return (0);
+ case SLT_RxURL:
+ if (df_Uq != NULL)
+ bogus = 1;
+ else
+ df_Uq = trimline(ptr, end);
+ break;

- if (fd >= nreq) {
- struct request **newreq = req;
- size_t newnreq = nreq;
-
- while (fd >= newnreq)
- newnreq += newnreq + 1;
- newreq = realloc(newreq, newnreq * sizeof *newreq);
- assert(newreq != NULL);
- memset(newreq + nreq, 0, (newnreq - nreq) * sizeof *newreq);
- req = newreq;
- nreq = newnreq;
- }
- if (req[fd] == NULL) {
- req[fd] = calloc(sizeof *req[fd], 1);
- assert(req[fd] != NULL);
- }
- rp = req[fd];
-
- switch (tag) {
- case SLT_RxRequest:
- if (tag == SLT_RxRequest && (spec & VSL_S_BACKEND))
+ case SLT_RxProtocol:
+ if (df_H != NULL)
+ bogus = 1;
+ else
+ df_H = trimline(ptr, end);
break;

- if (rp->df_m != NULL)
- rp->bogus = 1;
- else
- rp->df_m = trimline(ptr, end);
- break;
-
- case SLT_RxURL:
- if (tag == SLT_RxURL && (spec & VSL_S_BACKEND))
+ case SLT_RxHeader:
+ if (isprefix(ptr, "host:", end, &next))
+ df_Host = trimline(next, end);
+ if (isprefix(ptr, "connection:", end, &next))
+ df_c = trimline(next, end);
break;

- if (rp->df_Uq != NULL)
- rp->bogus = 1;
- else
- rp->df_Uq = trimline(ptr, end);
- break;
-
- case SLT_RxProtocol:
- if (tag == SLT_RxProtocol && (spec & VSL_S_BACKEND))
+ default:
break;
+ }

- if (rp->df_H != NULL)
- rp->bogus = 1;
- else
- rp->df_H = trimline(ptr, end);
- break;
+ if (tag != SLT_ReqEnd)
+ continue;
+
+ //fprintf(stderr, "bogus: %d %s\n", bogus, df_m);

- case SLT_RxHeader:
- if (isprefix(ptr, "host:", end, &next))
- rp->df_Host = trimline(next, end);
- if (isprefix(ptr, "connection:", end, &next))
- rp->df_c = trimline(next, end);
- break;
+ if (!bogus) {
+ /* If the method is supported (GET or HEAD), send the request out
+ * on the socket. If the socket needs reopening, reopen it first.
+ * When the request is sent, call the function for receiving
+ * the answer.
+ */
+ if (!(strncmp(df_m, "GET", 3) && strncmp(df_m, "HEAD", 4))) {
+ if (reopen)
+ sock = VSS_connect(adr_info);
+ reopen = 0;

- default:
- break;
- }
+ if (debug) {
+ fprintf(stderr, "%s ", df_m);
+ fprintf(stderr, "%s ", df_Uq);
+ fprintf(stderr, "%s ", df_H);
+ fprintf(stderr, "\n");
+ fprintf(stderr, "Host: ");
+ }
+ write(sock, df_m, strlen(df_m));
+ write(sock, " ", 1);
+ write(sock, df_Uq, strlen(df_Uq));
+ write(sock, " ", 1);
+ write(sock, df_H, strlen(df_H));
+ write(sock, " ", 1);
+ write(sock, "\r\n", 2);

- if ((spec & VSL_S_CLIENT) && tag != SLT_ReqEnd)
- return (0);
+ if (strncmp(df_H, "HTTP/1.0", 8))
+ reopen = 1;

- if (!rp->bogus) {
- fo = priv;
-
- /* If the method is supported (GET or HEAD), send the request out
- * on the socket. If the socket needs reopening, reopen it first.
- * When the request is sent, call the function for receiving
- * the answer.
- */
- if (!(strncmp(rp->df_m, "GET", 3) && strncmp(rp->df_m, "HEAD", 4))) {
- if (reopen)
- sock = VSS_connect(adr_info);
- reopen = 0;
-
- if (debug) {
- fprintf(fo, "%s ", rp->df_m);
- fprintf(fo, "%s ", rp->df_Uq);
- fprintf(fo, "%s ", rp->df_H);
- fprintf(fo, "\n");
- fprintf(fo, "Host: ");
- }
- write(sock, rp->df_m, strlen(rp->df_m));
- write(sock, " ", 1);
- write(sock, rp->df_Uq, strlen(rp->df_Uq));
- write(sock, " ", 1);
- write(sock, rp->df_H, strlen(rp->df_H));
- write(sock, " ", 1);
- write(sock, "\r\n", 2);
-
- if (strncmp(rp->df_H, "HTTP/1.0", 8))
- reopen = 1;
-
- write(sock, "Host: ", 6);
- if (rp->df_Host) {
+ write(sock, "Host: ", 6);
+ if (df_Host) {
+ if (debug)
+ fprintf(stderr, df_Host);
+ write(sock, df_Host, strlen(df_Host));
+ }
if (debug)
- fprintf(fo, rp->df_Host);
- write(sock, rp->df_Host, strlen(rp->df_Host));
- }
- if (debug)
- fprintf(fo, "\n");
- write(sock, "\r\n", 2);
- if (rp->df_c) {
+ fprintf(stderr, "\n");
+ write(sock, "\r\n", 2);
+ if (df_c) {
+ if (debug)
+ fprintf(stderr, "Connection: %s\n", df_c);
+ write(sock, "Connection: ", 12);
+ write(sock, df_c, strlen(df_c));
+ write(sock, "\r\n", 2);
+ if (isequal(df_c, "keep-alive", df_c + strlen(df_c)))
+ reopen = 0;
+ }
if (debug)
- fprintf(fo, "Connection: %s\n", rp->df_c);
- write(sock, "Connection: ", 12);
- write(sock, rp->df_c, strlen(rp->df_c));
+ fprintf(stderr, "\n");
write(sock, "\r\n", 2);
- if (isequal(rp->df_c, "keep-alive", rp->df_c + strlen(rp->df_c)))
- reopen = 0;
+ if (!reopen)
+ reopen = receive_response(sock);
+ if (reopen)
+ close(sock);
}
- if (debug)
- fprintf(fo, "\n");
- write(sock, "\r\n", 2);
- if (!reopen)
- reopen = receive_response();
- if (reopen)
- close(sock);
}
- }

- /* clean up */
+ /* clean up */
#define freez(x) do { if (x) free(x); x = NULL; } while (0);
- freez(rp->df_H);
- freez(rp->df_Host);
- freez(rp->df_Uq);
- freez(rp->df_m);
- freez(rp->df_c);
+ freez(df_H);
+ freez(df_Host);
+ freez(df_Uq);
+ freez(df_m);
+ freez(df_c);
#undef freez
- rp->bogus = 0;
+ bogus = 0;
+ } while (1);

return (0);
}

+
+static int
+gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
+ unsigned len, unsigned spec, const char *ptr)
+{
+ const char *end;
+ struct message *msg;
+ int err;
+
+ (void)priv;
+
+ end = ptr + len;
+
+ if (!(spec & VSL_S_CLIENT))
+ return (0);
+
+ //fprintf(stderr, "gen_traffic\n");
+
+ if (fd >= nthreads) {
+ struct thread **newthreads = threads;
+ size_t newnthreads = nthreads;
+
+ while (fd >= newnthreads)
+ newnthreads += newnthreads + 1;
+ newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
+ assert(newthreads != NULL);
+ memset(newthreads + nthreads, 0, (newnthreads - nthreads) * sizeof *newthreads);
+ threads = newthreads;
+ nthreads = newnthreads;
+ }
+ if (threads[fd] == NULL) {
+ threads[fd] = malloc(sizeof *threads[fd]);
+ assert(threads[fd] != NULL);
+ threads[fd]->mbox = malloc(sizeof (struct mailbox));
+ STAILQ_INIT(&threads[fd]->mbox->messages);
+ pthread_mutex_init(&threads[fd]->mbox->lock, NULL);
+ pthread_cond_init(&threads[fd]->mbox->has_mail, NULL);
+ err = pthread_create(&threads[fd]->thread_id, NULL, pthread_main, threads[fd]);
+ if (err)
+ fprintf(stderr, "thread creation failed\n");
+ fprintf(stderr, "Ok, thread %08x created... %d\n", (unsigned int)threads[fd]->thread_id, err);
+ }
+ msg = malloc(sizeof (struct message));
+ msg->tag = tag;
+ msg->ptr = strdup(ptr);
+ msg->len = len;
+ mailbox_put(threads[fd]->mbox, msg);
+ //fprintf(stderr, "message put\n");
+
+ return 0;
+}
+
+
/* This function is for testing only, and only sends
* the raw data from the file to the address.
* The receive function is called for each blank line.
@@ -461,6 +535,8 @@
int fd = open(file, O_RDONLY);
char buf[2];
char last = ' ';
+ int sock, reopen = 1;
+
adr_info = init_connection(address);
sock = VSS_connect(adr_info);
while (read(fd, buf, 1)) {
@@ -468,7 +544,7 @@
fprintf(stderr, "%s", buf);
if (*buf == '\n' && last == '\n'){
fprintf(stderr, "receive\n");
- reopen = receive_response();
+ reopen = receive_response(sock);
}
last = *buf;
}
@@ -491,15 +567,14 @@
{
int c;
struct VSL_data *vd;
- const char *ofn = NULL;
const char *address = NULL;
- FILE *of;

char *test_file = NULL;

vd = VSL_New();
debug = 0;

+ VSL_Arg(vd, 'c', NULL);
while ((c = getopt(argc, argv, "a:Dr:t:")) != -1) {
switch (c) {
case 'a':
@@ -534,18 +609,10 @@
if (VSL_OpenLog(vd, NULL))
exit(1);

- ofn = "stdout";
- of = stdout;
-
adr_info = init_connection(address);
- reopen = 1;

- while (VSL_Dispatch(vd, gen_traffic, of) == 0) {
- if (fflush(of) != 0) {
- perror(ofn);
- exit(1);
- }
- }
+ while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
+ /* nothing */ ;

exit(0);
}