[MAJOR] replaced rbtree with ul2tree.

The rbtree-based wait queue consumes a lot of CPU. Use the ul2tree
instead. Lots of cleanups and code reorganizations made it possible
to reduce the task struct and simplify the code a bit.
This commit is contained in:
Willy Tarreau 2007-04-29 10:41:56 +02:00
parent 3fa095d542
commit 96bcfd75aa
12 changed files with 175 additions and 143 deletions

View File

@ -2,7 +2,7 @@
include/proto/task.h
Functions for task management.
Copyright (C) 2000-2006 Willy Tarreau - w@1wt.eu
Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
@ -27,42 +27,66 @@
#include <common/config.h>
#include <common/memory.h>
#include <common/mini-clist.h>
#include <common/standard.h>
#include <types/task.h>
extern void *run_queue;
/* needed later */
void *tree_delete(void *node);
/* puts the task <t> in run queue <q>, and returns <t> */
static inline struct task *task_wakeup(struct task **q, struct task *t)
static inline struct task *task_wakeup(struct task *t)
{
if (t->state == TASK_RUNNING)
return t;
else {
t->rqnext = *q;
t->state = TASK_RUNNING;
return *q = t;
if (t->qlist.p != NULL)
DLIST_DEL(&t->qlist);
DLIST_ADD(run_queue, &t->qlist);
t->state = TASK_RUNNING;
if (likely(t->wq)) {
tree_delete(t->wq);
t->wq = NULL;
}
return t;
}
/* removes the task <t> from the queue <q>
* <s> MUST be <q>'s first task.
* set the run queue to point to the next one, and return it
/* removes the task <t> from the run queue if it was in it.
* returns <t>.
*/
static inline struct task *task_sleep(struct task **q, struct task *t)
static inline struct task *task_sleep(struct task *t)
{
if (t->state == TASK_RUNNING) {
*q = t->rqnext;
t->state = TASK_IDLE; /* tell that s has left the run queue */
DLIST_DEL(&t->qlist);
t->qlist.p = NULL;
t->state = TASK_IDLE;
}
return *q; /* return next running task */
return t;
}
/*
* removes the task <t> from its wait queue. It must have already been removed
* from the run queue. A pointer to the task itself is returned.
* unlinks the task from wherever it is queued :
* - eternity_queue, run_queue
* - wait queue : wq not null => remove carrier node too
* A pointer to the task itself is returned.
*/
static inline struct task *task_delete(struct task *t)
{
rb_erase(&t->rb_node, t->wq);
t->wq = NULL;
if (t->qlist.p != NULL) {
DLIST_DEL(&t->qlist);
t->qlist.p = NULL;
}
if (t->wq) {
tree_delete(t->wq);
t->wq = NULL;
}
return t;
}

View File

@ -25,7 +25,8 @@
#include <sys/time.h>
#include <common/config.h>
#include <common/rbtree.h>
#include <common/mini-clist.h>
#include <import/tree.h>
/* values for task->state */
#define TASK_IDLE 0
@ -33,10 +34,9 @@
/* The base for all tasks */
struct task {
struct rb_node rb_node;
struct rb_root *wq;
struct task *rqnext; /* chaining in run queue ... */
int state; /* task state : IDLE or RUNNING */
struct list qlist; /* chaining in the same queue; bidirectionnal but not circular */
struct ultree *wq; /* NULL if unqueued, or back ref to the carrier node in the WQ */
int state; /* task state : IDLE or RUNNING */
struct timeval expire; /* next expiration time for this task, use only for fast sorting */
int (*process)(struct task *t); /* the function which processes the task */
void *context; /* the task's context */
@ -45,10 +45,6 @@ struct task {
#define sizeof_task sizeof(struct task)
extern void **pool_task;
extern struct rb_root wait_queue[2];
extern struct task *rq;
#endif /* _TYPES_TASK_H */
/*

View File

@ -114,7 +114,7 @@ int appsession_task_init(void)
if ((t = pool_alloc(task)) == NULL)
return -1;
t->wq = NULL;
t->rqnext = NULL;
t->qlist.p = NULL;
t->state = TASK_IDLE;
t->context = NULL;
tv_delayfrom(&t->expire, &now, TBLCHKINT);

View File

@ -572,7 +572,7 @@ int srv_count_retry_down(struct session *t, int conn_err)
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
return 0;
@ -611,7 +611,7 @@ int srv_retryable_connect(struct session *t)
t->be->failed_conns++;
/* release other sessions waiting for this server */
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
/* ensure that we have enough retries left */
@ -625,7 +625,7 @@ int srv_retryable_connect(struct session *t)
*/
/* let's try to offer this slot to anybody */
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
if (t->srv)
t->srv->failed_conns++;
@ -691,7 +691,7 @@ int srv_redispatch_connect(struct session *t)
/* release other sessions waiting for this server */
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
/* if we get here, it's because we got SRV_STATUS_OK, which also

View File

@ -2415,7 +2415,7 @@ int readcfgfile(const char *file)
return -1;
}
t->rqnext = NULL;
t->qlist.p = NULL;
t->wq = NULL;
t->state = TASK_IDLE;
t->process = process_srv_queue;
@ -2463,7 +2463,7 @@ int readcfgfile(const char *file)
}
t->wq = NULL;
t->rqnext = NULL;
t->qlist.p = NULL;
t->state = TASK_IDLE;
t->process = process_chk;
t->context = newsrv;

View File

@ -75,7 +75,7 @@ static void set_server_down(struct server *s)
sess->srv = NULL; /* it's left to the dispatcher to choose a server */
http_flush_cookie_flags(&sess->txn);
pendconn_free(pc);
task_wakeup(&rq, sess->task);
task_wakeup(sess->task);
xferred++;
}
}
@ -167,7 +167,7 @@ static int event_srv_chk_w(int fd)
}
}
out_wakeup:
task_wakeup(&rq, t);
task_wakeup(t);
out_nowake:
EV_FD_CLR(fd, DIR_WR); /* nothing more to write */
fdtab[fd].ev &= ~FD_POLL_WR;
@ -237,7 +237,7 @@ static int event_srv_chk_r(int fd)
out_wakeup:
EV_FD_CLR(fd, DIR_RD);
task_wakeup(&rq, t);
task_wakeup(t);
fdtab[fd].ev &= ~FD_POLL_RD;
return 1;
}
@ -436,7 +436,7 @@ int process_chk(struct task *t)
p->sess->srv = s;
sess = p->sess;
pendconn_free(p);
task_wakeup(&rq, sess->task);
task_wakeup(sess->task);
}
sprintf(trash,

View File

@ -152,7 +152,7 @@ int event_accept(int fd) {
setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one));
t->wq = NULL;
t->rqnext = NULL;
t->qlist.p = NULL;
t->state = TASK_IDLE;
t->process = process_session;
t->context = s;
@ -422,7 +422,7 @@ int event_accept(int fd) {
task_queue(t);
if (p->mode != PR_MODE_HEALTH)
task_wakeup(&rq, t);
task_wakeup(t);
p->feconn++; /* beconn will be increased later */
if (p->feconn > p->feconn_max)

View File

@ -269,6 +269,7 @@ void sig_dump_state(int sig)
void dump(int sig)
{
#if 0
struct task *t;
struct session *s;
struct rb_node *node;
@ -290,6 +291,7 @@ void dump(int sig)
s->req->l, s->rep?s->rep->l:0, s->cli_fd
);
}
#endif
}
#ifdef DEBUG_MEMORY

View File

@ -2353,7 +2353,7 @@ int process_srv(struct session *t)
*/
/* let's try to offer this slot to anybody */
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
if (t->srv)
t->srv->failed_conns++;
@ -2539,7 +2539,7 @@ int process_srv(struct session *t)
* we have to inform the server that it may be used by another session.
*/
if (t->srv && may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
@ -2581,7 +2581,7 @@ int process_srv(struct session *t)
* we have to inform the server that it may be used by another session.
*/
if (t->srv && may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
@ -2753,7 +2753,7 @@ int process_srv(struct session *t)
* we have to inform the server that it may be used by another session.
*/
if (t->srv && may_dequeue_tasks(t->srv, cur_proxy))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
}
@ -2988,7 +2988,7 @@ int process_srv(struct session *t)
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
@ -3101,7 +3101,7 @@ int process_srv(struct session *t)
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
@ -3117,7 +3117,7 @@ int process_srv(struct session *t)
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
@ -3137,7 +3137,7 @@ int process_srv(struct session *t)
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
@ -3182,7 +3182,7 @@ int process_srv(struct session *t)
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
@ -3198,7 +3198,7 @@ int process_srv(struct session *t)
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}
@ -3218,7 +3218,7 @@ int process_srv(struct session *t)
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
task_wakeup(&rq, t->srv->queue_mgt);
task_wakeup(t->srv->queue_mgt);
return 1;
}

View File

@ -62,7 +62,7 @@ int process_srv_queue(struct task *t)
sess = pendconn_get_next_sess(s, p);
if (sess == NULL)
break;
task_wakeup(&rq, sess->task);
task_wakeup(sess->task);
}
return TIME_ETERNITY;

View File

@ -156,7 +156,7 @@ int stream_sock_read(int fd) {
else
tv_eternity(&b->rex);
task_wakeup(&rq, fdtab[fd].owner);
task_wakeup(fdtab[fd].owner);
}
fdtab[fd].ev &= ~FD_POLL_RD;
@ -291,7 +291,7 @@ int stream_sock_write(int fd) {
}
}
task_wakeup(&rq, fdtab[fd].owner);
task_wakeup(fdtab[fd].owner);
fdtab[fd].ev &= ~FD_POLL_WR;
return retval;
}

View File

@ -1,7 +1,7 @@
/*
* Task management functions.
*
* Copyright 2000-2006 Willy Tarreau <w@1wt.eu>
* Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@ -13,95 +13,117 @@
#include <common/config.h>
#include <common/mini-clist.h>
#include <common/time.h>
#include <common/standard.h>
#include <proto/task.h>
#include <types/task.h>
// FIXME: check 8bitops.c for faster FLS
#include <import/bitops.h>
#include <import/tree.h>
/* FIXME : this should be removed very quickly ! */
extern int maintain_proxies(void);
void **pool_task= NULL;
struct task *rq = NULL; /* global run queue */
void **pool_tree64 = NULL;
static struct ultree *stack[LLONGBITS];
struct rb_root wait_queue[2] = {
RB_ROOT,
RB_ROOT,
};
UL2TREE_HEAD(timer_wq);
void *eternity_queue = NULL;
void *run_queue = NULL;
static inline void __rb_insert_task_queue(struct task *newtask)
struct ultree *ul2tree_insert(struct ultree *root, unsigned long h, unsigned long l)
{
struct rb_node **p = &newtask->wq->rb_node;
struct rb_node *parent = NULL;
struct task * task;
while (*p)
{
parent = *p;
task = rb_entry(parent, struct task, rb_node);
if (tv_cmp_ge2(&task->expire, &newtask->expire))
p = &(*p)->rb_left;
else
p = &(*p)->rb_right;
}
rb_link_node(&newtask->rb_node, parent, p);
return __ul2tree_insert(root, h, l);
}
static void rb_insert_task_queue(struct task *newtask)
{
__rb_insert_task_queue(newtask);
rb_insert_color(&newtask->rb_node, newtask->wq);
void *tree_delete(void *node) {
return __tree_delete(node);
}
/*
* task_queue()
*
* Inserts a task into the wait queue at the position given by its expiration
* date.
*
*/
struct task *task_queue(struct task *task)
{
struct rb_node *node;
struct task *next, *prev;
if (unlikely(task->qlist.p != NULL)) {
DLIST_DEL(&task->qlist);
task->qlist.p = NULL;
}
if (tv_iseternity(&task->expire)) {
if (task->wq) {
if (task->wq == &wait_queue[1])
return task;
else
task_delete(task);
}
task->wq = &wait_queue[1];
rb_insert_task_queue(task);
return task;
} else {
if (task->wq != &wait_queue[0]) {
if (task->wq)
task_delete(task);
task->wq = &wait_queue[0];
rb_insert_task_queue(task);
return task;
}
if (unlikely(task->wq)) {
tree_delete(task->wq);
task->wq = NULL;
}
// check whether task should be re insert
node = rb_prev(&task->rb_node);
if (node) {
prev = rb_entry(node, struct task, rb_node);
if (tv_cmp_ge(&prev->expire, &task->expire)) {
task_delete(task);
task->wq = &wait_queue[0];
rb_insert_task_queue(task);
return task;
}
}
node = rb_next(&task->rb_node);
if (node) {
next = rb_entry(node, struct task, rb_node);
if (tv_cmp_ge(&task->expire, &next->expire)) {
task_delete(task);
task->wq = &wait_queue[0];
rb_insert_task_queue(task);
return task;
}
}
if (unlikely(tv_iseternity(&task->expire))) {
task->wq = NULL;
DLIST_ADD(eternity_queue, &task->qlist);
return task;
}
task->wq = ul2tree_insert(&timer_wq, task->expire.tv_sec, task->expire.tv_usec);
DLIST_ADD(task->wq->data, &task->qlist);
return task;
}
/*
* Extract all expired timers from the wait queue, and wakes up all
* associated tasks.
* Returns the time to wait for next task (next_time).
*
* FIXME: Use an alternative queue for ETERNITY tasks.
*
*/
int wake_expired_tasks()
{
int slen;
struct task *task;
void *data;
int next_time;
/*
* Hint: tasks are *rarely* expired. So we can try to optimize
* by not scanning the tree at all in most cases.
*/
if (likely(timer_wq.data != NULL)) {
task = LIST_ELEM(timer_wq.data, struct task *, qlist);
if (likely(tv_cmp_ge(&task->expire, &now) > 0))
return tv_remain(&now, &task->expire);
}
/* OK we lose. Let's scan the tree then. */
next_time = TIME_ETERNITY;
tree64_foreach(&timer_wq, data, stack, slen) {
task = LIST_ELEM(data, struct task *, qlist);
if (unlikely(tv_cmp_ge(&task->expire, &now) > 0)) {
next_time = tv_remain(&now, &task->expire);
break;
}
/*
* OK, all tasks linked to this node will be unlinked, as well
* as the node itself, so we do not need to care about correct
* unlinking.
*/
foreach_dlist_item(task, data, struct task *, qlist) {
DLIST_DEL(&task->qlist);
task->wq = NULL;
DLIST_ADD(run_queue, &task->qlist);
task->state = TASK_RUNNING;
}
}
return next_time;
}
/*
@ -117,35 +139,23 @@ int process_runnable_tasks()
int next_time;
int time2;
struct task *t;
struct rb_node *node;
next_time = TIME_ETERNITY;
for (node = rb_first(&wait_queue[0]);
node != NULL; node = rb_next(node)) {
t = rb_entry(node, struct task, rb_node);
if (t->state & TASK_RUNNING)
continue;
if (tv_iseternity(&t->expire))
continue;
if (tv_cmp_ms(&t->expire, &now) <= 0) {
task_wakeup(&rq, t);
} else {
int temp_time = tv_remain(&now, &t->expire);
if (temp_time)
next_time = temp_time;
break;
}
}
void *queue;
next_time = wake_expired_tasks();
/* process each task in the run queue now. Each task may be deleted
* since we only use the run queue's head. Note that any task can be
* woken up by any other task and it will be processed immediately
* after as it will be queued on the run queue's head !
*/
while ((t = rq) != NULL) {
queue = run_queue;
foreach_dlist_item(t, queue, struct task *, qlist) {
int temp_time;
task_sleep(&rq, t);
DLIST_DEL(&t->qlist);
t->qlist.p = NULL;
t->state = TASK_IDLE;
temp_time = t->process(t);
next_time = MINTIME(temp_time, next_time);
}