mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-05-04 20:46:11 +02:00
[MAJOR] use an ebtree instead of a list for the run queue
We now insert tasks in a certain sequence in the run queue. The sorting key currently is the arrival order. It will now be possible to apply a "nice" value to any task so that it goes forwards or backwards in the run queue. The calls to wake_expired_tasks() and maintain_proxies() have been moved to the main run_poll_loop(), because they had nothing to do in process_runnable_tasks(). The task_wakeup() function is not inlined anymore, as it was only used at one place. The qlist member of the task structure has been removed now. The run_queue list has been replaced for an integer indicating the number of tasks in the run queue.
This commit is contained in:
parent
af754fc88f
commit
58b458d8ba
@ -32,28 +32,14 @@
|
||||
|
||||
#include <types/task.h>
|
||||
|
||||
extern void *run_queue;
|
||||
extern unsigned int run_queue; /* run queue size */
|
||||
extern struct pool_head *pool2_task;
|
||||
|
||||
/* perform minimal initializations, report 0 in case of error, 1 if OK. */
|
||||
int init_task();
|
||||
|
||||
/* puts the task <t> in run queue <q>, and returns <t> */
|
||||
#define task_wakeup _task_wakeup
|
||||
struct task *_task_wakeup(struct task *t);
|
||||
static inline struct task *__task_wakeup(struct task *t)
|
||||
{
|
||||
if (t->state == TASK_RUNNING)
|
||||
return t;
|
||||
|
||||
if (likely(t->eb.node.leaf_p))
|
||||
eb32_delete(&t->eb);
|
||||
|
||||
DLIST_ADD(run_queue, &t->qlist);
|
||||
t->state = TASK_RUNNING;
|
||||
|
||||
return t;
|
||||
}
|
||||
struct task *task_wakeup(struct task *t);
|
||||
|
||||
/* removes the task <t> from the run queue if it was in it.
|
||||
* returns <t>.
|
||||
@ -61,29 +47,27 @@ static inline struct task *__task_wakeup(struct task *t)
|
||||
static inline struct task *task_sleep(struct task *t)
|
||||
{
|
||||
if (t->state == TASK_RUNNING) {
|
||||
DLIST_DEL(&t->qlist);
|
||||
t->qlist.p = NULL;
|
||||
t->state = TASK_IDLE;
|
||||
eb32_delete(&t->eb);
|
||||
run_queue--;
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
/*
|
||||
* unlinks the task from wherever it is queued :
|
||||
* - eternity_queue, run_queue
|
||||
* - run_queue
|
||||
* - wait queue
|
||||
* A pointer to the task itself is returned.
|
||||
*/
|
||||
static inline struct task *task_delete(struct task *t)
|
||||
{
|
||||
if (t->qlist.p != NULL) {
|
||||
DLIST_DEL(&t->qlist);
|
||||
t->qlist.p = NULL;
|
||||
}
|
||||
|
||||
if (t->eb.node.leaf_p)
|
||||
eb32_delete(&t->eb);
|
||||
|
||||
if (t->state == TASK_RUNNING)
|
||||
run_queue--;
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
@ -93,7 +77,6 @@ static inline struct task *task_delete(struct task *t)
|
||||
*/
|
||||
static inline struct task *task_init(struct task *t)
|
||||
{
|
||||
t->qlist.p = NULL;
|
||||
t->eb.node.leaf_p = NULL;
|
||||
t->state = TASK_IDLE;
|
||||
return t;
|
||||
@ -123,6 +106,12 @@ struct task *task_queue(struct task *task);
|
||||
|
||||
void process_runnable_tasks(struct timeval *next);
|
||||
|
||||
/*
|
||||
* Extract all expired timers from the timer queue, and wakes up all
|
||||
* associated tasks. Returns the date of next event (or eternity).
|
||||
*/
|
||||
void wake_expired_tasks(struct timeval *next);
|
||||
|
||||
|
||||
#endif /* _PROTO_TASK_H */
|
||||
|
||||
|
||||
@ -34,7 +34,6 @@
|
||||
|
||||
/* The base for all tasks */
|
||||
struct task {
|
||||
struct list qlist; /* chaining in the same queue; bidirectionnal but not circular */
|
||||
struct eb32_node eb; /* ebtree node used to hold the task in the wait queue */
|
||||
int state; /* task state : IDLE or RUNNING */
|
||||
struct timeval expire; /* next expiration time for this task, use only for fast sorting */
|
||||
|
||||
@ -898,12 +898,22 @@ void run_poll_loop()
|
||||
|
||||
tv_update_date(0,1);
|
||||
while (1) {
|
||||
/* Check if we can expire some tasks */
|
||||
wake_expired_tasks(&next);
|
||||
|
||||
/* Process a few tasks */
|
||||
process_runnable_tasks(&next);
|
||||
|
||||
/* maintain all proxies in a consistent state. This should quickly
|
||||
* become a task because it becomes expensive when there are huge
|
||||
* numbers of proxies. */
|
||||
maintain_proxies(&next);
|
||||
|
||||
/* stop when there's no connection left and we don't allow them anymore */
|
||||
if (!actconn && listeners == 0)
|
||||
break;
|
||||
|
||||
/* The poller will ensure it returns around <next> */
|
||||
cur_poller.poll(&cur_poller, &next);
|
||||
}
|
||||
}
|
||||
|
||||
@ -300,8 +300,8 @@ int start_proxies(int verbose)
|
||||
/*
|
||||
* this function enables proxies when there are enough free sessions,
|
||||
* or stops them when the table is full. It is designed to be called from the
|
||||
* select_loop(). It returns the date of next expiration event during stop
|
||||
* time, ETERNITY otherwise.
|
||||
* select_loop(). It adjusts the date of next expiration event during stop
|
||||
* time if appropriate.
|
||||
*/
|
||||
void maintain_proxies(struct timeval *next)
|
||||
{
|
||||
@ -309,7 +309,6 @@ void maintain_proxies(struct timeval *next)
|
||||
struct listener *l;
|
||||
|
||||
p = proxy;
|
||||
tv_eternity(next);
|
||||
|
||||
/* if there are enough free sessions, we'll activate proxies */
|
||||
if (actconn < global.maxconn) {
|
||||
|
||||
93
src/task.c
93
src/task.c
@ -23,7 +23,7 @@
|
||||
|
||||
struct pool_head *pool2_task;
|
||||
|
||||
void *run_queue = NULL;
|
||||
unsigned int run_queue = 0;
|
||||
|
||||
/* Principle of the wait queue.
|
||||
*
|
||||
@ -91,6 +91,8 @@ void *run_queue = NULL;
|
||||
#define TIMER_SIGN_BIT (1 << (TIMER_TICK_BITS - 1))
|
||||
|
||||
static struct eb_root timers[TIMER_TREES]; /* trees with MSB 00, 01, 10 and 11 */
|
||||
static struct eb_root rqueue[TIMER_TREES]; /* trees constituting the run queue */
|
||||
static unsigned int rqueue_ticks; /* insertion count */
|
||||
|
||||
/* returns an ordered key based on an expiration date. */
|
||||
static inline unsigned int timeval_to_ticks(const struct timeval *t)
|
||||
@ -118,13 +120,26 @@ static inline unsigned int timeval_to_tree(const struct timeval *t)
|
||||
int init_task()
|
||||
{
|
||||
memset(&timers, 0, sizeof(timers));
|
||||
memset(&rqueue, 0, sizeof(rqueue));
|
||||
pool2_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
|
||||
return pool2_task != NULL;
|
||||
}
|
||||
|
||||
struct task *_task_wakeup(struct task *t)
|
||||
/* puts the task <t> in run queue <q>, and returns <t> */
|
||||
struct task *task_wakeup(struct task *t)
|
||||
{
|
||||
return __task_wakeup(t);
|
||||
if (t->state == TASK_RUNNING)
|
||||
return t;
|
||||
|
||||
if (likely(t->eb.node.leaf_p))
|
||||
eb32_delete(&t->eb);
|
||||
|
||||
run_queue++;
|
||||
t->eb.key = ++rqueue_ticks;
|
||||
t->state = TASK_RUNNING;
|
||||
|
||||
eb32_insert(&rqueue[ticks_to_tree(t->eb.key)], &t->eb);
|
||||
return t;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -156,7 +171,6 @@ struct task *task_queue(struct task *task)
|
||||
/*
|
||||
* Extract all expired timers from the timer queue, and wakes up all
|
||||
* associated tasks. Returns the date of next event (or eternity).
|
||||
*
|
||||
*/
|
||||
void wake_expired_tasks(struct timeval *next)
|
||||
{
|
||||
@ -190,7 +204,7 @@ void wake_expired_tasks(struct timeval *next)
|
||||
|
||||
/* detach the task from the queue and add the task to the run queue */
|
||||
eb = eb32_next(eb);
|
||||
_task_wakeup(task);
|
||||
task_wakeup(task);
|
||||
}
|
||||
tree = (tree + 1) & TIMER_TREE_MASK;
|
||||
} while (((tree - now_tree) & TIMER_TREE_MASK) < TIMER_TREES/2);
|
||||
@ -200,43 +214,58 @@ void wake_expired_tasks(struct timeval *next)
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* This does 4 things :
|
||||
* - wake up all expired tasks
|
||||
* - call all runnable tasks
|
||||
* - call maintain_proxies() to enable/disable the listeners
|
||||
* - return the date of next event in <next> or eternity.
|
||||
/* The run queue is chronologically sorted in a tree. An insertion counter is
|
||||
* used to assign a position to each task. This counter may be combined with
|
||||
* other variables (eg: nice value) to set the final position in the tree. The
|
||||
* counter may wrap without a problem, of course. We then limit the number of
|
||||
* tasks processed at once to 1/8 of the number of tasks in the queue, so that
|
||||
* general latency remains low and so that task positions have a chance to be
|
||||
* considered. It also reduces the number of trees to be evaluated when no task
|
||||
* remains.
|
||||
*
|
||||
* Just like with timers, we start with tree[(current - 1)], which holds past
|
||||
* values, and stop when we reach the middle of the list. In practise, we visit
|
||||
* 3 out of 4 trees.
|
||||
*
|
||||
* The function adjusts <next> if a new event is closer.
|
||||
*/
|
||||
void process_runnable_tasks(struct timeval *next)
|
||||
{
|
||||
struct timeval temp;
|
||||
struct task *t;
|
||||
void *queue;
|
||||
struct eb32_node *eb;
|
||||
unsigned int tree, stop;
|
||||
unsigned int max_processed;
|
||||
|
||||
wake_expired_tasks(next);
|
||||
/* process each task in the run queue now. Each task may be deleted
|
||||
* since we only use the run queue's head. Note that any task can be
|
||||
* woken up by any other task and it will be processed immediately
|
||||
* after as it will be queued on the run queue's head !
|
||||
*/
|
||||
if (!run_queue)
|
||||
return;
|
||||
|
||||
queue = run_queue;
|
||||
foreach_dlist_item(t, queue, struct task *, qlist) {
|
||||
DLIST_DEL(&t->qlist);
|
||||
t->qlist.p = NULL;
|
||||
max_processed = (run_queue + 7) / 8;
|
||||
|
||||
t->state = TASK_IDLE;
|
||||
t->process(t, &temp);
|
||||
tv_bound(next, &temp);
|
||||
}
|
||||
tree = ticks_to_tree(rqueue_ticks);
|
||||
stop = (tree + TIMER_TREES / 2) & TIMER_TREE_MASK;
|
||||
tree = (tree - 1) & TIMER_TREE_MASK;
|
||||
|
||||
/* maintain all proxies in a consistent state. This should quickly
|
||||
* become a task because it becomes expensive when there are huge
|
||||
* numbers of proxies. */
|
||||
maintain_proxies(&temp);
|
||||
tv_bound(next, &temp);
|
||||
return;
|
||||
do {
|
||||
eb = eb32_first(&rqueue[tree]);
|
||||
while (eb) {
|
||||
t = eb32_entry(eb, struct task, eb);
|
||||
|
||||
/* detach the task from the queue and add the task to the run queue */
|
||||
eb = eb32_next(eb);
|
||||
|
||||
run_queue--;
|
||||
t->state = TASK_IDLE;
|
||||
eb32_delete(&t->eb);
|
||||
|
||||
t->process(t, &temp);
|
||||
tv_bound(next, &temp);
|
||||
|
||||
if (!--max_processed)
|
||||
return;
|
||||
}
|
||||
tree = (tree + 1) & TIMER_TREE_MASK;
|
||||
} while (tree != stop);
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user