Mailing List Archive

r1031 - trunk/varnish-cache/bin/varnishd
Author: phk
Date: 2006-09-16 21:54:34 +0200 (Sat, 16 Sep 2006)
New Revision: 1031

Modified:
trunk/varnish-cache/bin/varnishd/cache_pool.c
trunk/varnish-cache/bin/varnishd/heritage.h
trunk/varnish-cache/bin/varnishd/mgt_param.c
Log:
Make it possible to have multiple worker pools.

The acceptor selects the pool based on filedescriptor modulus
number of pools.

This is an attempt to reduce lock contention.


Modified: trunk/varnish-cache/bin/varnishd/cache_pool.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_pool.c 2006-09-16 16:00:48 UTC (rev 1030)
+++ trunk/varnish-cache/bin/varnishd/cache_pool.c 2006-09-16 19:54:34 UTC (rev 1031)
@@ -29,17 +29,22 @@
#include "cli_priv.h"
#include "cache.h"

-static MTX wrk_mtx;
+TAILQ_HEAD(workerhead, worker);

/* Number of work requests queued in excess of worker threads available */
-static unsigned wrk_overflow;

-TAILQ_HEAD(workerhead, worker);
+struct wq {
+ MTX mtx;
+ struct workerhead idle;
+ TAILQ_HEAD(, workreq) req;
+ unsigned overflow;
+};

-static struct workerhead wrk_idle = TAILQ_HEAD_INITIALIZER(wrk_idle);
-static struct workerhead wrk_busy = TAILQ_HEAD_INITIALIZER(wrk_busy);
-static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead);
+static MTX tmtx;

+static struct wq **wq;
+static unsigned nwq;
+
/*--------------------------------------------------------------------
* Write data to fd
* We try to use writev() if possible in order to minimize number of
@@ -169,9 +174,10 @@
wrk_thread(void *priv)
{
struct worker *w, ww;
+ struct wq *qp;
char c;

- (void)priv;
+ qp = priv;
w = &ww;
memset(w, 0, sizeof *w);
w->magic = WORKER_MAGIC;
@@ -179,40 +185,38 @@
AZ(pipe(w->pipe));

VSL(SLT_WorkThread, 0, "%p start", w);
- LOCK(&wrk_mtx);
+ LOCK(&qp->mtx);
VSL_stats->n_wrk_create++;
- TAILQ_INSERT_HEAD(&wrk_busy, w, list);
VSL_stats->n_wrk_busy++;
while (1) {
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);

/* Process overflow requests, if any */
- if (wrk_overflow > 0) {
- wrk_overflow--;
- w->wrq = TAILQ_FIRST(&wrk_reqhead);
+ if (qp->overflow > 0) {
+ qp->overflow--;
+ w->wrq = TAILQ_FIRST(&qp->req);
AN(w->wrq);
- TAILQ_REMOVE(&wrk_reqhead, w->wrq, list);
+ TAILQ_REMOVE(&qp->req, w->wrq, list);
VSL_stats->n_wrk_queue--;
- UNLOCK(&wrk_mtx);
+ UNLOCK(&qp->mtx);
wrk_do_one(w);
- LOCK(&wrk_mtx);
+ LOCK(&qp->mtx);
continue;
}

- TAILQ_REMOVE(&wrk_busy, w, list);
- TAILQ_INSERT_HEAD(&wrk_idle, w, list);
+ TAILQ_INSERT_HEAD(&qp->idle, w, list);
assert(w->idle != 0);
VSL_stats->n_wrk_busy--;
- UNLOCK(&wrk_mtx);
+ UNLOCK(&qp->mtx);
assert(1 == read(w->pipe[0], &c, 1));
if (w->idle == 0)
break;
wrk_do_one(w);
- LOCK(&wrk_mtx);
+ LOCK(&qp->mtx);
}
- LOCK(&wrk_mtx);
+ LOCK(&tmtx);
VSL_stats->n_wrk--;
- UNLOCK(&wrk_mtx);
+ UNLOCK(&tmtx);
VSL(SLT_WorkThread, 0, "%p end", w);
close(w->pipe[0]);
close(w->pipe[1]);
@@ -226,39 +230,42 @@
{
struct worker *w;
pthread_t tp;
+ struct wq *qp;

sp->workreq.sess = sp;
+ qp = wq[sp->fd % nwq];

- LOCK(&wrk_mtx);
+ LOCK(&qp->mtx);

/* If there are idle threads, we tickle the first one into action */
- w = TAILQ_FIRST(&wrk_idle);
+ w = TAILQ_FIRST(&qp->idle);
if (w != NULL) {
- TAILQ_REMOVE(&wrk_idle, w, list);
- TAILQ_INSERT_TAIL(&wrk_busy, w, list);
+ TAILQ_REMOVE(&qp->idle, w, list);
VSL_stats->n_wrk_busy++;
- UNLOCK(&wrk_mtx);
+ UNLOCK(&qp->mtx);
w->wrq = &sp->workreq;
assert(1 == write(w->pipe[1], w, 1));
return;
}

- TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list);
+ TAILQ_INSERT_TAIL(&qp->req, &sp->workreq, list);
VSL_stats->n_wrk_queue++;
- wrk_overflow++;
+ qp->overflow++;
+ UNLOCK(&qp->mtx);

+ LOCK(&tmtx);
/* Can we create more threads ? */
if (VSL_stats->n_wrk >= params->wthread_max) {
VSL_stats->n_wrk_max++;
- UNLOCK(&wrk_mtx);
+ UNLOCK(&tmtx);
return;
}

/* Try to create a thread */
VSL_stats->n_wrk++;
- UNLOCK(&wrk_mtx);
+ UNLOCK(&tmtx);

- if (!pthread_create(&tp, NULL, wrk_thread, NULL)) {
+ if (!pthread_create(&tp, NULL, wrk_thread, qp)) {
AZ(pthread_detach(tp));
return;
}
@@ -266,40 +273,75 @@
VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
errno, strerror(errno));

+ LOCK(&tmtx);
/* Register overflow */
- LOCK(&wrk_mtx);
VSL_stats->n_wrk--;
VSL_stats->n_wrk_failed++;
- UNLOCK(&wrk_mtx);
+ UNLOCK(&tmtx);
}

/*--------------------------------------------------------------------*/
+
+static void
+wrk_addpools(unsigned t)
+{
+ struct wq **pwq, **owq;
+ unsigned u;
+
+ if (t <= nwq)
+ return;
+
+ pwq = calloc(sizeof *pwq, params->wthread_pools);
+ if (pwq == NULL)
+ return;
+ if (wq != NULL)
+ memcpy(pwq, wq, sizeof *pwq * nwq);
+ owq = wq;
+ wq = pwq;
+ for (u = nwq; u < t; u++) {
+ wq[u] = calloc(sizeof *wq[u], 1);
+ XXXAN(wq[u]);
+ MTX_INIT(&wq[u]->mtx);
+ TAILQ_INIT(&wq[u]->idle);
+ TAILQ_INIT(&wq[u]->req);
+ }
+ free(owq);
+ nwq = t;
+}
+
+/*--------------------------------------------------------------------*/

static void *
wrk_reaperthread(void *priv)
{
time_t now;
struct worker *w;
+ struct wq *qp;
+ unsigned u;

(void)priv;
while (1) {
+ wrk_addpools(params->wthread_pools);
sleep(1);
if (VSL_stats->n_wrk <= params->wthread_min)
continue;
now = time(NULL);
- LOCK(&wrk_mtx);
- w = TAILQ_LAST(&wrk_idle, workerhead);
- if (w != NULL &&
- (w->idle + params->wthread_timeout < now ||
- VSL_stats->n_wrk <= params->wthread_max))
- TAILQ_REMOVE(&wrk_idle, w, list);
- else
- w = NULL;
- UNLOCK(&wrk_mtx);
- if (w == NULL)
- continue;
- w->idle = 0;
- assert(1 == write(w->pipe[1], w, 1));
+ for (u = 0; u < nwq; u++) {
+ qp = wq[u];
+ LOCK(&qp->mtx);
+ w = TAILQ_LAST(&qp->idle, workerhead);
+ if (w != NULL &&
+ (w->idle + params->wthread_timeout < now ||
+ VSL_stats->n_wrk <= params->wthread_max))
+ TAILQ_REMOVE(&qp->idle, w, list);
+ else
+ w = NULL;
+ UNLOCK(&qp->mtx);
+ if (w == NULL)
+ continue;
+ w->idle = 0;
+ assert(1 == write(w->pipe[1], w, 1));
+ }
}
INCOMPL();
}
@@ -310,53 +352,20 @@
WRK_Init(void)
{
pthread_t tp;
- int i;

- MTX_INIT(&wrk_mtx);
-
+ wrk_addpools(params->wthread_pools);
+ MTX_INIT(&tmtx);
AZ(pthread_create(&tp, NULL, wrk_reaperthread, NULL));
AZ(pthread_detach(tp));
-
- VSL(SLT_Debug, 0, "Starting %u worker threads", params->wthread_min);
- for (i = 0; i < params->wthread_min; i++) {
- VSL_stats->n_wrk++;
- AZ(pthread_create(&tp, NULL, wrk_thread, NULL));
- AZ(pthread_detach(tp));
- }
}

-
/*--------------------------------------------------------------------*/

void
cli_func_dump_pool(struct cli *cli, char **av, void *priv)
{
- unsigned u;
- struct sess *s;
- time_t t;

+ (void)cli;
(void)av;
(void)priv;
- struct worker *w;
- LOCK(&wrk_mtx);
- t = time(NULL);
- TAILQ_FOREACH(w, &wrk_busy, list) {
- cli_out(cli, "\n");
- cli_out(cli, "W %p", w);
- if (w->wrq == NULL)
- continue;
- s = w->wrq->sess;
- if (s == NULL)
- continue;
- cli_out(cli, "sess %p fd %d xid %u step %d handling %d age %d",
- s, s->fd, s->xid, s->step, s->handling,
- t - s->t_req.tv_sec);
- }
- cli_out(cli, "\n");
-
- u = 0;
- TAILQ_FOREACH(w, &wrk_idle, list)
- u++;
- cli_out(cli, "%u idle workers\n", u);
- UNLOCK(&wrk_mtx);
}

Modified: trunk/varnish-cache/bin/varnishd/heritage.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/heritage.h 2006-09-16 16:00:48 UTC (rev 1030)
+++ trunk/varnish-cache/bin/varnishd/heritage.h 2006-09-16 19:54:34 UTC (rev 1031)
@@ -36,6 +36,7 @@
unsigned wthread_min;
unsigned wthread_max;
unsigned wthread_timeout;
+ unsigned wthread_pools;

/* Memory allocation hints */
unsigned mem_workspace;

Modified: trunk/varnish-cache/bin/varnishd/mgt_param.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/mgt_param.c 2006-09-16 16:00:48 UTC (rev 1030)
+++ trunk/varnish-cache/bin/varnishd/mgt_param.c 2006-09-16 19:54:34 UTC (rev 1031)
@@ -118,6 +118,18 @@
/*--------------------------------------------------------------------*/

static void
+tweak_thread_pools(struct cli *cli, struct parspec *par, const char *arg)
+{
+
+ (void)par;
+ tweak_generic_uint(cli, &params->wthread_pools, arg,
+ 1, UINT_MAX);
+}
+
+
+/*--------------------------------------------------------------------*/
+
+static void
tweak_thread_pool_min(struct cli *cli, struct parspec *par, const char *arg)
{

@@ -296,6 +308,9 @@
"To force an immediate effect at the expense of a total "
"flush of the cache use \"url.purge .\"",
"120", "seconds" },
+ { "thread_pools", tweak_thread_pools,
+ "Number of thread pools.\n",
+ "1", "pools" },
{ "thread_pool_max", tweak_thread_pool_max,
"The maximum number of threads in the worker pool.\n"
"-1 is unlimited.\n"