OPTIM: listeners: use tasklets for the multi-queue rings

Now that we can wake up a remote thread's tasklet, it's way more
interesting to use a tasklet than a task in the accept queue, as it
will avoid passing through all the scheduler. Just doing this increases
the accept rate by about 4%, overall recovering the slight loss
introduced by the tasklet change. In addition it makes sure that
even a heavily loaded scheduler (e.g. many very fast checks) will
not delay a connection accept.
This commit is contained in:
Willy Tarreau 2019-09-24 06:55:18 +02:00
parent 538aa7168f
commit 2bd65a781e
2 changed files with 9 additions and 9 deletions

View File

@ -292,7 +292,7 @@ struct accept_queue_entry {
struct accept_queue_ring {
unsigned int head;
unsigned int tail;
struct task *task; /* task of the thread owning this ring */
struct tasklet *tasklet; /* tasklet of the thread owning this ring */
struct accept_queue_entry entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64)));
};

View File

@ -51,7 +51,7 @@ struct xfer_sock_list *xfer_sock_list = NULL;
/* there is one listener queue per thread so that a thread unblocking the
* global queue can wake up listeners bound only to foreing threads by
* moving them to the remote queues and waking up the associated task.
* moving them to the remote queues and waking up the associated tasklet.
*/
static struct work_list *local_listener_queue;
@ -194,27 +194,27 @@ static struct task *accept_queue_process(struct task *t, void *context, unsigned
/* ran out of budget ? Let's come here ASAP */
if (!max_accept)
task_wakeup(t, TASK_WOKEN_IO);
tasklet_wakeup(ring->tasklet);
return t;
return NULL;
}
/* Initializes the accept-queues. Returns 0 on success, otherwise ERR_* flags */
static int accept_queue_init()
{
struct task *t;
struct tasklet *t;
int i;
for (i = 0; i < global.nbthread; i++) {
t = task_new(1UL << i);
t = tasklet_new();
if (!t) {
ha_alert("Out of memory while initializing accept queue for thread %d\n", i);
return ERR_FATAL|ERR_ABORT;
}
t->nice = -1024;
t->tid = i;
t->process = accept_queue_process;
t->context = &accept_queue_rings[i];
accept_queue_rings[i].task = t;
accept_queue_rings[i].tasklet = t;
}
return 0;
}
@ -994,7 +994,7 @@ void listener_accept(int fd)
ring = &accept_queue_rings[t];
if (accept_queue_push_mp(ring, cfd, l, &addr, laddr)) {
_HA_ATOMIC_ADD(&activity[t].accq_pushed, 1);
task_wakeup(ring->task, TASK_WOKEN_IO);
tasklet_wakeup(ring->tasklet);
continue;
}
/* If the ring is full we do a synchronous accept on