Mailing List Archive

r875 - trunk/varnish-cache/bin/varnishd
Author: phk
Date: 2006-08-21 20:55:24 +0200 (Mon, 21 Aug 2006)
New Revision: 875

Modified:
trunk/varnish-cache/bin/varnishd/cache.h
trunk/varnish-cache/bin/varnishd/cache_acceptor.c
trunk/varnish-cache/bin/varnishd/cache_acceptor.h
trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c
Log:
Overhaul kqueue acceptor in light of todays learnings.

Use the pipe trick to inject sessions into the system, as far as I
can tell it is cheaper because of the low rate it happens and the
high rate of mutex operations avoided.

Ignore the timer event, but purge the list every time we wake up
to reduce lumpyness of timeout'ing.

Centralize the polling of a session so we don't have the same two
messages spread out all over the place.

Centralize the acceptor thread and send things directly to the worker
thread, leaving only the session-herder in the split out files.

poll & epoll not yet updated accordingly.



Modified: trunk/varnish-cache/bin/varnishd/cache.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache.h 2006-08-21 17:49:39 UTC (rev 874)
+++ trunk/varnish-cache/bin/varnishd/cache.h 2006-08-21 18:55:24 UTC (rev 875)
@@ -272,7 +272,6 @@

struct workreq workreq;
struct acct acct;
- unsigned kqa;
};

struct backend {

Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor.c 2006-08-21 17:49:39 UTC (rev 874)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor.c 2006-08-21 18:55:24 UTC (rev 875)
@@ -40,6 +40,7 @@
};

static unsigned xids;
+static pthread_t vca_thread_acct;

struct sess *
vca_accept_sess(int fd)
@@ -128,6 +129,23 @@

/*--------------------------------------------------------------------*/

+int
+vca_pollsession(struct sess *sp)
+{
+ int i;
+
+ i = http_RecvSome(sp->fd, sp->http);
+ if (i < 1)
+ return (i);
+ if (i == 1)
+ vca_close_session(sp, "overflow");
+ else if (i == 2)
+ vca_close_session(sp, "no request");
+ return (1);
+}
+
+/*--------------------------------------------------------------------*/
+
void
vca_close_session(struct sess *sp, const char *why)
{
@@ -143,11 +161,35 @@
{

CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ if (sp->fd >= 0) {
+ VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+ (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+ if (http_RecvPrepAgain(sp->http))
+ vca_handover(sp, 0);
+ }
vca_acceptors[0]->recycle(sp);
}

/*--------------------------------------------------------------------*/

+static void *
+vca_acct(void *arg)
+{
+ struct sess *sp;
+
+ (void)arg;
+ while (1) {
+ sp = vca_accept_sess(heritage.socket);
+ if (sp == NULL)
+ continue;
+ http_RecvPrep(sp->http);
+ vca_handfirst(sp);
+ }
+}
+
+
+/*--------------------------------------------------------------------*/
+
void
VCA_Init(void)
{
@@ -161,4 +203,5 @@
exit (2);
}
vca_acceptors[0]->init();
+ AZ(pthread_create(&vca_thread_acct, NULL, vca_acct, NULL));
}

Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor.h 2006-08-21 17:49:39 UTC (rev 874)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor.h 2006-08-21 18:55:24 UTC (rev 875)
@@ -29,4 +29,5 @@
struct sess *vca_accept_sess(int fd);
void vca_handover(struct sess *sp, int bad);
void vca_handfirst(struct sess *sp);
+int vca_pollsession(struct sess *sp);


Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c 2006-08-21 17:49:39 UTC (rev 874)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c 2006-08-21 18:55:24 UTC (rev 875)
@@ -19,109 +19,82 @@
#include <sys/socket.h>
#include <sys/event.h>

-#ifndef HAVE_SRANDOMDEV
-#include "compat/srandomdev.h"
-#endif
-
#include "heritage.h"
#include "shmlog.h"
#include "cache.h"
#include "cache_acceptor.h"

-static pthread_t vca_kqueue_thread1;
-static pthread_t vca_kqueue_thread2;
+static pthread_t vca_kqueue_thread;
static int kq = -1;

+static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
+static int pipes[2];
+
#define NKEV 100

static void
vca_kq_sess(struct sess *sp, int arm)
{
- struct kevent ke[2];
- int i, j, arm2;
+ struct kevent ke;
+ int i;

CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- memset(ke, 0, sizeof ke);
- if (arm == EV_ADD || arm == EV_ENABLE) {
- assert(sp->kqa == 0);
- sp->kqa = 1;
- arm2 = EV_ADD;
- } else {
- assert(sp->kqa == 1);
- sp->kqa = 0;
- arm2 = EV_DELETE;
- }
- j = 0;
- EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2,
- 0, params->sess_timeout * 1000, sp);
- if (sp->fd >= 0)
- EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp);
-
- i = kevent(kq, ke, j, NULL, 0, NULL);
+ if (sp->fd < 0)
+ return;
+ EV_SET(&ke, sp->fd, EVFILT_READ, arm, 0, 0, sp);
+ i = kevent(kq, &ke, 1, NULL, 0, NULL);
assert(i == 0);
}

-static struct sess *
+static void
vca_kev(struct kevent *kp)
{
int i;
struct sess *sp;

- if (kp->udata == NULL) {
- VSL(SLT_Debug, 0,
- "KQ RACE %s flags %x fflags %x data %x",
- kp->filter == EVFILT_READ ? "R" : "T",
- kp->flags, kp->fflags, kp->data);
- return (NULL);
+ assert(kp->udata != NULL);
+ if (kp->udata == pipes) {
+ while (kp->data > 0) {
+ i = read(pipes[0], &sp, sizeof sp);
+ assert(i == sizeof sp);
+ kp->data -= i;
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ TAILQ_INSERT_TAIL(&sesshead, sp, list);
+ vca_kq_sess(sp, EV_ADD);
+ }
+ return;
}
CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
- if (sp->kqa == 0) {
- VSL(SLT_Debug, sp->id,
- "KQ %s flags %x fflags %x data %x",
- kp->filter == EVFILT_READ ? "R" : "T",
- kp->flags, kp->fflags, kp->data);
- return (NULL);
- }
- if (kp->filter == EVFILT_READ) {
- if (kp->data > 0) {
- i = http_RecvSome(sp->fd, sp->http);
- switch (i) {
- case -1:
- return (NULL);
- case 0:
- vca_kq_sess(sp, EV_DISABLE);
- vca_handover(sp, i);
- return (NULL); /* ?? */
- case 1:
- vca_close_session(sp, "overflow");
- break;
- case 2:
- vca_close_session(sp, "no request");
- break;
- default:
- INCOMPL();
- }
- return (sp);
+ if (kp->data > 0) {
+ i = vca_pollsession(sp);
+ if (i == -1)
+ return;
+ TAILQ_REMOVE(&sesshead, sp, list);
+ if (i == 0) {
+ vca_kq_sess(sp, EV_DELETE);
+ vca_handover(sp, i);
+ } else {
+ SES_Delete(sp);
}
- if (kp->flags == EV_EOF) {
- vca_close_session(sp, "EOF");
- return (sp);
- }
- INCOMPL();
+ return;
}
- if (kp->filter == EVFILT_TIMER) {
- vca_close_session(sp, "timeout");
- return (sp);
+ if (kp->flags == EV_EOF) {
+ TAILQ_REMOVE(&sesshead, sp, list);
+ vca_close_session(sp, "EOF");
+ SES_Delete(sp);
+ return;
}
INCOMPL();
}

+/*--------------------------------------------------------------------*/

static void *
vca_kqueue_main(void *arg)
{
struct kevent ke[NKEV], *kp;
int i, j, n;
+ struct timespec ts;
struct sess *sp;

(void)arg;
@@ -129,62 +102,57 @@
kq = kqueue();
assert(kq >= 0);

+ j = 0;
+ EV_SET(&ke[j++], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
+ EV_SET(&ke[j++], pipes[0], EVFILT_READ, EV_ADD, 0, 0, pipes);
+ i = kevent(kq, ke, j, NULL, 0, NULL);
+ assert(i == 0);
+
while (1) {
n = kevent(kq, NULL, 0, ke, NKEV, NULL);
assert(n >= 1 && n <= NKEV);
for (kp = ke, j = 0; j < n; j++, kp++) {
- sp = vca_kev(kp);
- if (sp != NULL) {
- vca_kq_sess(sp, EV_DELETE);
- SES_Delete(sp);
- for (i = j; i < n; i++)
- if (ke[i].udata == sp)
- ke[i].udata = NULL;
- }
+ if (kp->filter == EVFILT_TIMER)
+ continue;
+ assert(kp->filter == EVFILT_READ);
+ vca_kev(kp);
}
+ clock_gettime(CLOCK_REALTIME, &ts);
+ ts.tv_sec -= params->sess_timeout;
+ for (;;) {
+ sp = TAILQ_FIRST(&sesshead);
+ if (sp == NULL)
+ break;
+ if (sp->t_open.tv_sec > ts.tv_sec)
+ break;
+ if (sp->t_open.tv_sec == ts.tv_sec &&
+ sp->t_open.tv_nsec > ts.tv_nsec)
+ break;
+ TAILQ_REMOVE(&sesshead, sp, list);
+ vca_close_session(sp, "timeout");
+ SES_Delete(sp);
+ }
}
- INCOMPL();
}

-static void *
-vca_kqueue_acct(void *arg)
-{
- struct sess *sp;
-
- (void)arg;
- while (1) {
- sp = vca_accept_sess(heritage.socket);
- if (sp == NULL)
- continue;
- clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
- http_RecvPrep(sp->http);
- vca_kq_sess(sp, EV_ADD);
- }
-}
-
/*--------------------------------------------------------------------*/

static void
vca_kqueue_recycle(struct sess *sp)
{

- if (sp->fd < 0) {
+ if (sp->fd < 0)
SES_Delete(sp);
- return;
- }
- (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
- VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
- if (http_RecvPrepAgain(sp->http))
- vca_handover(sp, 0);
- else
- vca_kq_sess(sp, EV_ENABLE);
+ else
+ assert(write(pipes[1], &sp, sizeof sp) == sizeof sp);
}

static void
vca_kqueue_init(void)
{
- AZ(pthread_create(&vca_kqueue_thread1, NULL, vca_kqueue_main, NULL));
- AZ(pthread_create(&vca_kqueue_thread2, NULL, vca_kqueue_acct, NULL));
+
+ AZ(pipe(pipes));
+ AZ(pthread_create(&vca_kqueue_thread, NULL, vca_kqueue_main, NULL));
}

struct acceptor acceptor_kqueue = {