[OPTIM] task: don't unlink a task from a wait queue when waking it up

In many situations, we wake a task on an I/O event, then queue it
exactly where it was. This is a real waste because we delete/insert
tasks into the wait queue for nothing. The only reason for this is
that there was only one tree node in the task struct.

By adding another tree node, we can have one tree for the timers
(wait queue) and one tree for the priority (run queue). That way,
we can have a task both in the run queue and wait queue at the
same time. The wait queue now really holds timers, which is what
it was designed for.

The net gain is at least 1 delete/insert cycle per session, and up
to 2-3 depending on the workload, since we save one cycle each time
the expiration date is not changed during a wake up.
This commit is contained in:
Willy Tarreau 2009-03-07 17:25:21 +01:00
parent 1b8ca663a4
commit 4726f53794
4 changed files with 117 additions and 87 deletions

View File

@ -2,7 +2,7 @@
include/proto/task.h include/proto/task.h
Functions for task management. Functions for task management.
Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu Copyright (C) 2000-2009 Willy Tarreau - w@1wt.eu
This library is free software; you can redistribute it and/or This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public modify it under the terms of the GNU Lesser General Public
@ -40,44 +40,68 @@ extern struct task *last_timer; /* optimization: last queued timer */
/* perform minimal initializations, report 0 in case of error, 1 if OK. */ /* perform minimal initializations, report 0 in case of error, 1 if OK. */
int init_task(); int init_task();
/* return 0 if task is in run queue, otherwise non-zero */
static inline int task_in_rq(struct task *t)
{
return t->rq.node.leaf_p != NULL;
}
/* return 0 if task is in wait queue, otherwise non-zero */
static inline int task_in_wq(struct task *t)
{
return t->wq.node.leaf_p != NULL;
}
/* puts the task <t> in run queue with reason flags <f>, and returns <t> */ /* puts the task <t> in run queue with reason flags <f>, and returns <t> */
struct task *__task_wakeup(struct task *t); struct task *__task_wakeup(struct task *t);
static inline struct task *task_wakeup(struct task *t, unsigned int f) static inline struct task *task_wakeup(struct task *t, unsigned int f)
{ {
if (likely(!(t->state & TASK_IN_RUNQUEUE))) if (likely(!task_in_rq(t)))
__task_wakeup(t); __task_wakeup(t);
t->state |= f; t->state |= f;
return t; return t;
} }
/* removes the task <t> from the run queue if it was in it. /*
* returns <t>. * Unlink the task from the wait queue, and possibly update the last_timer
* pointer. A pointer to the task itself is returned. The task *must* already
* be in the wait queue before calling this function. If unsure, use the safer
* task_unlink_wq() function.
*/ */
static inline struct task *task_sleep(struct task *t) static inline struct task *__task_unlink_wq(struct task *t)
{ {
if (t->state & TASK_IN_RUNQUEUE) { eb32_delete(&t->wq);
t->state = TASK_SLEEPING; if (last_timer == t)
eb32_delete(&t->eb); last_timer = NULL;
run_queue--; return t;
if (likely(t->nice)) }
niced_tasks--;
} static inline struct task *task_unlink_wq(struct task *t)
{
if (likely(task_in_wq(t)))
__task_unlink_wq(t);
return t; return t;
} }
/* /*
* unlinks the task from wherever it is queued : * Unlink the task from the run queue. The run_queue size and number of niced
* - run_queue * tasks are updated too. A pointer to the task itself is returned. The task
* - wait queue * *must* already be in the wait queue before calling this function. If unsure,
* A pointer to the task itself is returned. * use the safer task_unlink_rq() function.
*/ */
static inline struct task *task_dequeue(struct task *t) static inline struct task *__task_unlink_rq(struct task *t)
{ {
if (likely(t->eb.node.leaf_p)) { eb32_delete(&t->rq);
if (last_timer == t) run_queue--;
last_timer = NULL; if (likely(t->nice))
eb32_delete(&t->eb); niced_tasks--;
} return t;
}
static inline struct task *task_unlink_rq(struct task *t)
{
if (likely(task_in_rq(t)))
__task_unlink_rq(t);
return t; return t;
} }
@ -87,12 +111,8 @@ static inline struct task *task_dequeue(struct task *t)
*/ */
static inline struct task *task_delete(struct task *t) static inline struct task *task_delete(struct task *t)
{ {
task_dequeue(t); task_unlink_wq(t);
if (t->state & TASK_IN_RUNQUEUE) { task_unlink_rq(t);
run_queue--;
if (likely(t->nice))
niced_tasks--;
}
return t; return t;
} }
@ -102,7 +122,8 @@ static inline struct task *task_delete(struct task *t)
*/ */
static inline struct task *task_init(struct task *t) static inline struct task *task_init(struct task *t)
{ {
t->eb.node.leaf_p = NULL; t->wq.node.leaf_p = NULL;
t->rq.node.leaf_p = NULL;
t->state = TASK_SLEEPING; t->state = TASK_SLEEPING;
t->nice = 0; t->nice = 0;
return t; return t;
@ -116,11 +137,10 @@ static inline void task_free(struct task *t)
pool_free2(pool2_task, t); pool_free2(pool2_task, t);
} }
/* inserts <task> into its assigned wait queue, where it may already be. In this case, it /* Place <task> into the wait queue, where it may already be. If the expiration
* may be only moved or left where it was, depending on its timing requirements. * timer is infinite, the task is dequeued.
* <task> is returned.
*/ */
struct task *task_queue(struct task *task); void task_queue(struct task *task);
/* /*
* This does 4 things : * This does 4 things :

View File

@ -2,7 +2,7 @@
include/types/task.h include/types/task.h
Macros, variables and structures for task management. Macros, variables and structures for task management.
Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu Copyright (C) 2000-2009 Willy Tarreau - w@1wt.eu
This library is free software; you can redistribute it and/or This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public modify it under the terms of the GNU Lesser General Public
@ -30,7 +30,7 @@
/* values for task->state */ /* values for task->state */
#define TASK_SLEEPING 0x00 /* task sleeping */ #define TASK_SLEEPING 0x00 /* task sleeping */
#define TASK_IN_RUNQUEUE 0x01 /* the task is in the run queue */ #define TASK_RUNNING 0x01 /* the task is currently running */
#define TASK_WOKEN_INIT 0x02 /* woken up for initialisation purposes */ #define TASK_WOKEN_INIT 0x02 /* woken up for initialisation purposes */
#define TASK_WOKEN_TIMER 0x04 /* woken up because of expired timer */ #define TASK_WOKEN_TIMER 0x04 /* woken up because of expired timer */
#define TASK_WOKEN_IO 0x08 /* woken up because of completed I/O */ #define TASK_WOKEN_IO 0x08 /* woken up because of completed I/O */
@ -46,7 +46,8 @@
/* The base for all tasks */ /* The base for all tasks */
struct task { struct task {
struct eb32_node eb; /* ebtree node used to hold the task in the wait queue */ struct eb32_node wq; /* ebtree node used to hold the task in the wait queue */
struct eb32_node rq; /* ebtree node used to hold the task in the run queue */
int state; /* task state : bit field of TASK_* */ int state; /* task state : bit field of TASK_* */
unsigned int expire; /* next expiration time for this task */ unsigned int expire; /* next expiration time for this task */
void (*process)(struct task *t, int *next); /* the function which processes the task */ void (*process)(struct task *t, int *next); /* the function which processes the task */

View File

@ -47,6 +47,7 @@
#include <proto/session.h> #include <proto/session.h>
#include <proto/server.h> #include <proto/server.h>
#include <proto/stream_interface.h> #include <proto/stream_interface.h>
#include <proto/task.h>
/* This function parses a "stats" statement in the "global" section. It returns /* This function parses a "stats" statement in the "global" section. It returns
* -1 if there is any error, otherwise zero. If it returns -1, it may write an * -1 if there is any error, otherwise zero. If it returns -1, it may write an
@ -1208,7 +1209,7 @@ void stats_dump_sess_to_buffer(struct session *s, struct buffer *rep)
curr_sess->ana_state, curr_sess->task->state, curr_sess->ana_state, curr_sess->task->state,
human_time(now.tv_sec - curr_sess->logs.tv_accept.tv_sec, 1)); human_time(now.tv_sec - curr_sess->logs.tv_accept.tv_sec, 1));
if (curr_sess->task->state & TASK_IN_RUNQUEUE) if (task_in_rq(curr_sess->task))
chunk_printf(&msg, sizeof(trash), " run(nice=%d)\n", curr_sess->task->nice); chunk_printf(&msg, sizeof(trash), " run(nice=%d)\n", curr_sess->task->nice);
else else
chunk_printf(&msg, sizeof(trash), chunk_printf(&msg, sizeof(trash),

View File

@ -1,7 +1,7 @@
/* /*
* Task management functions. * Task management functions.
* *
* Copyright 2000-2008 Willy Tarreau <w@1wt.eu> * Copyright 2000-2009 Willy Tarreau <w@1wt.eu>
* *
* This program is free software; you can redistribute it and/or * This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License * modify it under the terms of the GNU General Public License
@ -117,26 +117,18 @@ static inline unsigned int timeval_to_tree(const struct timeval *t)
return ticks_to_tree(timeval_to_ticks(t)); return ticks_to_tree(timeval_to_ticks(t));
} }
/* perform minimal intializations, report 0 in case of error, 1 if OK. */ /* Puts the task <t> in run queue at a position depending on t->nice. <t> is
int init_task() * returned. The nice value assigns boosts in 32th of the run queue size. A
{ * nice value of -1024 sets the task to -run_queue*32, while a nice value of
memset(&timers, 0, sizeof(timers)); * 1024 sets the task to run_queue*32. The state flags are cleared, so the
memset(&rqueue, 0, sizeof(rqueue)); * caller will have to set its flags after this call.
pool2_task = create_pool("task", sizeof(struct task), MEM_F_SHARED); * The task must not already be in the run queue. If unsure, use the safer
return pool2_task != NULL; * task_wakeup() function.
}
/* Puts the task <t> in run queue at a position depending on t->nice.
* <t> is returned. The nice value assigns boosts in 32th of the run queue
* size. A nice value of -1024 sets the task to -run_queue*32, while a nice
* value of 1024 sets the task to run_queue*32.
*/ */
struct task *__task_wakeup(struct task *t) struct task *__task_wakeup(struct task *t)
{ {
task_dequeue(t);
run_queue++; run_queue++;
t->eb.key = ++rqueue_ticks; t->rq.key = ++rqueue_ticks;
if (likely(t->nice)) { if (likely(t->nice)) {
int offset; int offset;
@ -146,13 +138,13 @@ struct task *__task_wakeup(struct task *t)
offset = (unsigned)((run_queue * (unsigned int)t->nice) / 32U); offset = (unsigned)((run_queue * (unsigned int)t->nice) / 32U);
else else
offset = -(unsigned)((run_queue * (unsigned int)-t->nice) / 32U); offset = -(unsigned)((run_queue * (unsigned int)-t->nice) / 32U);
t->eb.key += offset; t->rq.key += offset;
} }
/* clear state flags at the same time */ /* clear state flags at the same time */
t->state = TASK_IN_RUNQUEUE; t->state &= ~TASK_WOKEN_ANY;
eb32_insert(&rqueue[ticks_to_tree(t->eb.key)], &t->eb); eb32_insert(&rqueue[ticks_to_tree(t->rq.key)], &t->rq);
return t; return t;
} }
@ -160,42 +152,52 @@ struct task *__task_wakeup(struct task *t)
* task_queue() * task_queue()
* *
* Inserts a task into the wait queue at the position given by its expiration * Inserts a task into the wait queue at the position given by its expiration
* date. Note that the task must *not* already be in the wait queue nor in the * date. It does not matter if the task was already in the wait queue or not,
* run queue, otherwise unpredictable results may happen. Tasks queued with an * and it may even help if its position has not changed because we'll be able
* eternity expiration date are simply returned. Last, tasks must not be queued * to return without doing anything. Tasks queued with an eternity expiration
* further than the end of the next tree, which is between <now_ms> and * are just unlinked from the WQ. Last, tasks must not be queued further than
* <now_ms> + TIMER_SIGN_BIT ms (now+12days..24days in 32bit). * the end of the next tree, which is between <now_ms> and <now_ms> +
* TIMER_SIGN_BIT ms (now+12days..24days in 32bit).
*/ */
struct task *task_queue(struct task *task) void task_queue(struct task *task)
{ {
if (unlikely(!task->expire)) /* if the task is already in the wait queue, we may reuse its position
return task; * or we will at least have to unlink it first.
*/
if (task_in_wq(task)) {
if (task->wq.key == task->expire)
return;
__task_unlink_wq(task);
}
task->eb.key = task->expire; /* the task is not in the queue now */
if (unlikely(!task->expire))
return;
task->wq.key = task->expire;
#ifdef DEBUG_CHECK_INVALID_EXPIRATION_DATES #ifdef DEBUG_CHECK_INVALID_EXPIRATION_DATES
if ((task->eb.key - now_ms) & TIMER_SIGN_BIT) if ((task->wq.key - now_ms) & TIMER_SIGN_BIT)
/* we're queuing too far away or in the past (most likely) */ /* we're queuing too far away or in the past (most likely) */
return task; return;
#endif #endif
if (likely(last_timer && if (likely(last_timer &&
last_timer->eb.key == task->eb.key && last_timer->wq.key == task->wq.key &&
last_timer->eb.node.node_p && last_timer->wq.node.node_p &&
last_timer->eb.node.bit == -1)) { last_timer->wq.node.bit == -1)) {
/* Most often, last queued timer has the same expiration date, so /* Most often, last queued timer has the same expiration date, so
* if it's not queued at the root, let's queue a dup directly there. * if it's not queued at the root, let's queue a dup directly there.
* Note that we can only use dups at the dup tree's root (bit==-1). * Note that we can only use dups at the dup tree's root (bit==-1).
*/ */
eb_insert_dup(&last_timer->eb.node, &task->eb.node); eb_insert_dup(&last_timer->wq.node, &task->wq.node);
return task; return;
} }
eb32_insert(&timers[ticks_to_tree(task->eb.key)], &task->eb); eb32_insert(&timers[ticks_to_tree(task->wq.key)], &task->wq);
if (task->eb.node.bit == -1) if (task->wq.node.bit == -1)
last_timer = task; /* we only want dup a tree's root */ last_timer = task; /* we only want dup a tree's root */
return task; return;
} }
/* /*
* Extract all expired timers from the timer queue, and wakes up all * Extract all expired timers from the timer queue, and wakes up all
* associated tasks. Returns the date of next event (or eternity). * associated tasks. Returns the date of next event (or eternity).
@ -221,7 +223,7 @@ void wake_expired_tasks(int *next)
do { do {
eb = eb32_first(&timers[tree]); eb = eb32_first(&timers[tree]);
while (eb) { while (eb) {
task = eb32_entry(eb, struct task, eb); task = eb32_entry(eb, struct task, wq);
if ((now_ms - eb->key) & TIMER_SIGN_BIT) { if ((now_ms - eb->key) & TIMER_SIGN_BIT) {
/* note that we don't need this check for the <previous> /* note that we don't need this check for the <previous>
* tree, but it's cheaper than duplicating the code. * tree, but it's cheaper than duplicating the code.
@ -232,8 +234,8 @@ void wake_expired_tasks(int *next)
/* detach the task from the queue and add the task to the run queue */ /* detach the task from the queue and add the task to the run queue */
eb = eb32_next(eb); eb = eb32_next(eb);
__task_wakeup(task); __task_unlink_wq(task);
task->state |= TASK_WOKEN_TIMER; task_wakeup(task, TASK_WOKEN_TIMER);
} }
tree = (tree + 1) & TIMER_TREE_MASK; tree = (tree + 1) & TIMER_TREE_MASK;
} while (((tree - now_tree) & TIMER_TREE_MASK) < TIMER_TREES/2); } while (((tree - now_tree) & TIMER_TREE_MASK) < TIMER_TREES/2);
@ -283,18 +285,15 @@ void process_runnable_tasks(int *next)
do { do {
eb = eb32_first(&rqueue[tree]); eb = eb32_first(&rqueue[tree]);
while (eb) { while (eb) {
t = eb32_entry(eb, struct task, eb); t = eb32_entry(eb, struct task, rq);
/* detach the task from the queue and add the task to the run queue */ /* detach the task from the queue and add the task to the run queue */
eb = eb32_next(eb); eb = eb32_next(eb);
__task_unlink_rq(t);
run_queue--; t->state |= TASK_RUNNING;
if (likely(t->nice))
niced_tasks--;
t->state &= ~TASK_IN_RUNQUEUE;
task_dequeue(t);
t->process(t, &temp); t->process(t, &temp);
t->state &= ~TASK_RUNNING;
*next = tick_first(*next, temp); *next = tick_first(*next, temp);
if (!--max_processed) if (!--max_processed)
@ -304,6 +303,15 @@ void process_runnable_tasks(int *next)
} while (tree != stop); } while (tree != stop);
} }
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_task()
{
memset(&timers, 0, sizeof(timers));
memset(&rqueue, 0, sizeof(rqueue));
pool2_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
return pool2_task != NULL;
}
/* /*
* Local variables: * Local variables:
* c-indent-level: 8 * c-indent-level: 8