diff --git a/Makefile b/Makefile index 195e6a9fc..5e23a0593 100644 --- a/Makefile +++ b/Makefile @@ -182,7 +182,7 @@ OBJS = src/haproxy.o src/list.o src/chtbl.o src/hashpjw.o src/base64.o \ src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \ src/checks.o src/queue.o src/capture.o src/client.o src/proxy.o \ src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \ - src/session.o src/hdr_idx.o + src/session.o src/hdr_idx.o src/rbtree.o haproxy: $(OBJS) $(LD) $(LDFLAGS) -o $@ $^ $(LIBS) diff --git a/Makefile.bsd b/Makefile.bsd index 7cde1949c..40229fab1 100644 --- a/Makefile.bsd +++ b/Makefile.bsd @@ -87,7 +87,7 @@ OBJS = src/haproxy.o src/list.o src/chtbl.o src/hashpjw.o src/base64.o \ src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \ src/checks.o src/queue.o src/capture.o src/client.o src/proxy.o \ src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \ - src/session.o src/hdr_idx.o + src/session.o src/hdr_idx.o src/rbtree.o all: haproxy diff --git a/include/proto/task.h b/include/proto/task.h index 70abb8296..8bd973a51 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -61,8 +61,8 @@ static inline struct task *task_sleep(struct task **q, struct task *t) */ static inline struct task *task_delete(struct task *t) { - t->prev->next = t->next; - t->next->prev = t->prev; + rb_erase(&t->rb_node, t->wq); + t->wq = NULL; return t; } diff --git a/include/types/task.h b/include/types/task.h index 6b1df226c..d09efae2c 100644 --- a/include/types/task.h +++ b/include/types/task.h @@ -25,6 +25,7 @@ #include #include +#include /* values for task->state */ #define TASK_IDLE 0 @@ -32,9 +33,9 @@ /* The base for all tasks */ struct task { - struct task *next, *prev; /* chaining ... */ + struct rb_node rb_node; + struct rb_root *wq; struct task *rqnext; /* chaining in run queue ... */ - struct task *wq; /* the wait queue this task is in */ int state; /* task state : IDLE or RUNNING */ struct timeval expire; /* next expiration time for this task, use only for fast sorting */ int (*process)(struct task *t); /* the function which processes the task */ @@ -44,7 +45,7 @@ struct task { #define sizeof_task sizeof(struct task) extern void **pool_task; -extern struct task wait_queue[2]; +extern struct rb_root wait_queue[2]; extern struct task *rq; diff --git a/src/appsession.c b/src/appsession.c index a63116d9c..62b096ff4 100644 --- a/src/appsession.c +++ b/src/appsession.c @@ -113,8 +113,8 @@ int appsession_task_init(void) if (!initialized) { if ((t = pool_alloc(task)) == NULL) return -1; - t->next = t->prev = t->rqnext = NULL; - t->wq = LIST_HEAD(wait_queue[0]); + t->wq = NULL; + t->rqnext = NULL; t->state = TASK_IDLE; t->context = NULL; tv_delayfrom(&t->expire, &now, TBLCHKINT); diff --git a/src/cfgparse.c b/src/cfgparse.c index 5017d50d0..7b517d771 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -2293,8 +2293,8 @@ int readcfgfile(const char *file) return -1; } - t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */ - t->wq = LIST_HEAD(wait_queue[1]); /* already assigned to the eternity queue */ + t->rqnext = NULL; + t->wq = NULL; t->state = TASK_IDLE; t->process = process_srv_queue; t->context = newsrv; @@ -2340,8 +2340,8 @@ int readcfgfile(const char *file) return -1; } - t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */ - t->wq = LIST_HEAD(wait_queue[0]); /* but already has a wait queue assigned */ + t->wq = NULL; + t->rqnext = NULL; t->state = TASK_IDLE; t->process = process_chk; t->context = newsrv; diff --git a/src/client.c b/src/client.c index f0e698d4c..a8aad8e3c 100644 --- a/src/client.c +++ b/src/client.c @@ -150,8 +150,8 @@ int event_accept(int fd) { if (p->options & PR_O_TCP_CLI_KA) setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)); - t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */ - t->wq = LIST_HEAD(wait_queue[0]); /* but already has a wait queue assigned */ + t->wq = NULL; + t->rqnext = NULL; t->state = TASK_IDLE; t->process = process_session; t->context = s; diff --git a/src/haproxy.c b/src/haproxy.c index 047136422..4a3035730 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -255,12 +255,13 @@ void sig_dump_state(int sig) void dump(int sig) { - struct task *t, *tnext; + struct task *t; struct session *s; + struct rb_node *node; - tnext = ((struct task *)LIST_HEAD(wait_queue[0]))->next; - while ((t = tnext) != LIST_HEAD(wait_queue[0])) { /* we haven't looped ? */ - tnext = t->next; + for(node = rb_first(&wait_queue[0]); + node != NULL; node = rb_next(node)) { + t = rb_entry(node, struct task, rb_node); s = t->context; qfprintf(stderr,"[dump] wq: task %p, still %ld ms, " "cli=%d, srv=%d, cr=%d, cw=%d, sr=%d, sw=%d, " diff --git a/src/task.c b/src/task.c index 1f567b924..beddb27e3 100644 --- a/src/task.c +++ b/src/task.c @@ -22,112 +22,86 @@ extern int maintain_proxies(void); void **pool_task= NULL; struct task *rq = NULL; /* global run queue */ -struct task wait_queue[2] = { /* global wait queue */ - { - prev:LIST_HEAD(wait_queue[0]), /* expirable tasks */ - next:LIST_HEAD(wait_queue[0]), - }, - { - prev:LIST_HEAD(wait_queue[1]), /* non-expirable tasks */ - next:LIST_HEAD(wait_queue[1]), - }, + +struct rb_root wait_queue[2] = { + RB_ROOT, + RB_ROOT, }; -/* inserts into its assigned wait queue, where it may already be. In this case, it - * may be only moved or left where it was, depending on its timing requirements. - * is returned. - */ +static inline void __rb_insert_task_queue(struct task *newtask) +{ + struct rb_node **p = &newtask->wq->rb_node; + struct rb_node *parent = NULL; + struct task * task; + + while (*p) + { + parent = *p; + task = rb_entry(parent, struct task, rb_node); + if (tv_cmp2(&task->expire, &newtask->expire) >= 0) + p = &(*p)->rb_left; + else + p = &(*p)->rb_right; + } + rb_link_node(&newtask->rb_node, parent, p); +} + +static inline void rb_insert_task_queue(struct task *newtask) +{ + __rb_insert_task_queue(newtask); + rb_insert_color(&newtask->rb_node, newtask->wq); +} + + struct task *task_queue(struct task *task) { - struct task *list = task->wq; - struct task *start_from; + struct rb_node *node; + struct task *next, *prev; - /* This is a very dirty hack to queue non-expirable tasks in another queue - * in order to avoid pulluting the tail of the standard queue. This will go - * away with the new O(log(n)) scheduler anyway. - */ if (tv_iseternity(&task->expire)) { - /* if the task was queued in the standard wait queue, we must dequeue it */ - if (task->prev) { - if (task->wq == LIST_HEAD(wait_queue[1])) + if (task->wq) { + if (task->wq == &wait_queue[1]) return task; - else { + else task_delete(task); - task->prev = NULL; + } + task->wq = &wait_queue[1]; + rb_insert_task_queue(task); + return task; + } else { + if (task->wq != &wait_queue[0]) { + if (task->wq) + task_delete(task); + task->wq = &wait_queue[0]; + rb_insert_task_queue(task); + return task; + } + + // check whether task should be re insert + node = rb_prev(&task->rb_node); + if (node) { + prev = rb_entry(node, struct task, rb_node); + if (tv_cmp2(&prev->expire, &task->expire) >= 0) { + task_delete(task); + task->wq = &wait_queue[0]; + rb_insert_task_queue(task); + return task; } } - list = task->wq = LIST_HEAD(wait_queue[1]); - } else { - /* if the task was queued in the eternity queue, we must dequeue it */ - if (task->prev && (task->wq == LIST_HEAD(wait_queue[1]))) { - task_delete(task); - task->prev = NULL; - list = task->wq = LIST_HEAD(wait_queue[0]); - } - } - /* next, test if the task was already in a list */ - if (task->prev == NULL) { - // start_from = list; - start_from = list->prev; - /* insert the unlinked into the list, searching back from the last entry */ - while (start_from != list && tv_cmp2(&task->expire, &start_from->expire) < 0) { - start_from = start_from->prev; + node = rb_next(&task->rb_node); + if (node) { + next = rb_entry(node, struct task, rb_node); + if (tv_cmp2(&task->expire, &next->expire) > 0) { + task_delete(task); + task->wq = &wait_queue[0]; + rb_insert_task_queue(task); + return task; + } } - - // while (start_from->next != list && tv_cmp2(&task->expire, &start_from->next->expire) > 0) { - // start_from = start_from->next; - // stats_tsk_nsrch++; - // } - } - else if (task->prev == list || - tv_cmp2(&task->expire, &task->prev->expire) >= 0) { /* walk right */ - start_from = task->next; - if (start_from == list || tv_cmp2(&task->expire, &start_from->expire) <= 0) { - return task; /* it's already in the right place */ - } - - /* if the task is not at the right place, there's little chance that - * it has only shifted a bit, and it will nearly always be queued - * at the end of the list because of constant timeouts - * (observed in real case). - */ -#ifndef WE_REALLY_THINK_THAT_THIS_TASK_MAY_HAVE_SHIFTED - start_from = list->prev; /* assume we'll queue to the end of the list */ - while (start_from != list && tv_cmp2(&task->expire, &start_from->expire) < 0) { - start_from = start_from->prev; - } -#else /* WE_REALLY_... */ - /* insert the unlinked into the list, searching after position */ - while (start_from->next != list && tv_cmp2(&task->expire, &start_from->next->expire) > 0) { - start_from = start_from->next; - } -#endif /* WE_REALLY_... */ - - /* we need to unlink it now */ - task_delete(task); + return task; } - else { /* walk left. */ -#ifdef LEFT_TO_TOP /* not very good */ - start_from = list; - while (start_from->next != list && tv_cmp2(&task->expire, &start_from->next->expire) > 0) { - start_from = start_from->next; - } -#else - start_from = task->prev->prev; /* valid because of the previous test above */ - while (start_from != list && tv_cmp2(&task->expire, &start_from->expire) < 0) { - start_from = start_from->prev; - } -#endif - /* we need to unlink it now */ - task_delete(task); - } - task->prev = start_from; - task->next = start_from->next; - task->next->prev = task; - start_from->next = task; - return task; } /* @@ -136,37 +110,26 @@ struct task *task_queue(struct task *task) * - call all runnable tasks * - call maintain_proxies() to enable/disable the listeners * - return the delay till next event in ms, -1 = wait indefinitely - * Note: this part should be rewritten with the O(ln(n)) scheduler. * */ - int process_runnable_tasks() { int next_time; int time2; - struct task *t, *tnext; + struct task *t; + struct rb_node *node; - next_time = TIME_ETERNITY; /* set the timer to wait eternally first */ - - /* look for expired tasks and add them to the run queue. - */ - tnext = ((struct task *)LIST_HEAD(wait_queue[0]))->next; - while ((t = tnext) != LIST_HEAD(wait_queue[0])) { /* we haven't looped ? */ - tnext = t->next; + next_time = TIME_ETERNITY; + for (node = rb_first(&wait_queue[0]); + node != NULL; node = rb_next(node)) { + t = rb_entry(node, struct task, rb_node); if (t->state & TASK_RUNNING) continue; - if (tv_iseternity(&t->expire)) continue; - - /* wakeup expired entries. It doesn't matter if they are - * already running because of a previous event - */ if (tv_cmp_ms(&t->expire, &now) <= 0) { task_wakeup(&rq, t); - } - else { - /* first non-runnable task. Use its expiration date as an upper bound */ + } else { int temp_time = tv_remain(&now, &t->expire); if (temp_time) next_time = temp_time; @@ -177,7 +140,7 @@ int process_runnable_tasks() /* process each task in the run queue now. Each task may be deleted * since we only use the run queue's head. Note that any task can be * woken up by any other task and it will be processed immediately - * after as it will be queued on the run queue's head. + * after as it will be queued on the run queue's head ! */ while ((t = rq) != NULL) { int temp_time; @@ -186,13 +149,14 @@ int process_runnable_tasks() temp_time = t->process(t); next_time = MINTIME(temp_time, next_time); } - - /* maintain all proxies in a consistent state. This should quickly become a task */ + + /* maintain all proxies in a consistent state. This should quickly + * become a task because it becomes expensive when there are huge + * numbers of proxies. */ time2 = maintain_proxies(); return MINTIME(time2, next_time); } - /* * Local variables: * c-indent-level: 8