diff --git a/include/proto/task.h b/include/proto/task.h index 5445c9906..6bc4f43cf 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -94,10 +94,12 @@ extern struct pool_head *pool_head_notification; extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */ extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */ #ifdef USE_THREAD +extern struct eb_root timers; /* sorted timers tree, global */ extern struct eb_root rqueue; /* tree constituting the run queue */ extern int global_rqueue_size; /* Number of element sin the global runqueue */ #endif +extern struct eb_root timers_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ extern int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */ extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */ @@ -167,12 +169,19 @@ static inline struct task *__task_unlink_wq(struct task *t) return t; } +/* remove a task from its wait queue. It may either be the local wait queue if + * the task is bound to a single thread (in which case there's no locking + * involved) or the global queue, with locking. + */ static inline struct task *task_unlink_wq(struct task *t) { - HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); - if (likely(task_in_wq(t))) + if (likely(task_in_wq(t))) { + if (atleast2(t->thread_mask)) + HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); __task_unlink_wq(t); - HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); + if (atleast2(t->thread_mask)) + HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); + } return t; } @@ -356,10 +365,14 @@ static inline void tasklet_free(struct tasklet *tl) pool_flush(pool_head_tasklet); } +void __task_queue(struct task *task, struct eb_root *wq); + /* Place into the wait queue, where it may already be. If the expiration * timer is infinite, do nothing and rely on wake_expired_task to clean up. + * If the task is bound to a single thread, it's assumed to be bound to the + * current thread's queue and is queued without locking. Otherwise it's queued + * into the global wait queue, protected by locks. */ -void __task_queue(struct task *task); static inline void task_queue(struct task *task) { /* If we already have a place in the wait queue no later than the @@ -374,10 +387,18 @@ static inline void task_queue(struct task *task) if (!tick_isset(task->expire)) return; - HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); - if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) - __task_queue(task); - HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); +#ifdef USE_THREAD + if (atleast2(task->thread_mask)) { + HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); + if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) + __task_queue(task, &timers); + HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); + } else +#endif + { + if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) + __task_queue(task, &timers_local[tid]); + } } /* Ensure will be woken up at most at . If the task is already in @@ -390,14 +411,26 @@ static inline void task_schedule(struct task *task, int when) if (task_in_rq(task)) return; - HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); - if (task_in_wq(task)) - when = tick_first(when, task->expire); +#ifdef USE_THREAD + if (atleast2(task->thread_mask)) { + HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); + if (task_in_wq(task)) + when = tick_first(when, task->expire); - task->expire = when; - if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) - __task_queue(task); - HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); + task->expire = when; + if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) + __task_queue(task, &timers); + HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); + } else +#endif + { + if (task_in_wq(task)) + when = tick_first(when, task->expire); + + task->expire = when; + if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) + __task_queue(task, &timers_local[tid]); + } } /* This function register a new signal. "lua" is the current lua diff --git a/src/task.c b/src/task.c index 3f193f2d0..27408b103 100644 --- a/src/task.c +++ b/src/task.c @@ -50,14 +50,16 @@ int task_list_size[MAX_THREADS]; /* Number of tasks in the task_list */ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */ -static struct eb_root timers; /* sorted timers tree */ #ifdef USE_THREAD +struct eb_root timers; /* sorted timers tree, global */ struct eb_root rqueue; /* tree constituting the run queue */ int global_rqueue_size; /* Number of element sin the global runqueue */ #endif + struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */ static unsigned int rqueue_ticks; /* insertion count */ +struct eb_root timers_local[MAX_THREADS]; /* sorted timers tree, per thread */ /* Puts the task in run queue at a position depending on t->nice. is * returned. The nice value assigns boosts in 32th of the run queue size. A @@ -170,7 +172,7 @@ void __task_wakeup(struct task *t, struct eb_root *root) /* * __task_queue() * - * Inserts a task into the wait queue at the position given by its expiration + * Inserts a task into wait queue at the position given by its expiration * date. It does not matter if the task was already in the wait queue or not, * as it will be unlinked. The task must not have an infinite expiration timer. * Last, tasks must not be queued further than the end of the tree, which is @@ -178,9 +180,11 @@ void __task_wakeup(struct task *t, struct eb_root *root) * * This function should not be used directly, it is meant to be called by the * inline version of task_queue() which performs a few cheap preliminary tests - * before deciding to call __task_queue(). + * before deciding to call __task_queue(). Moreover this function doesn't care + * at all about locking so the caller must be careful when deciding whether to + * lock or not around this call. */ -void __task_queue(struct task *task) +void __task_queue(struct task *task, struct eb_root *wq) { if (likely(task_in_wq(task))) __task_unlink_wq(task); @@ -193,9 +197,7 @@ void __task_queue(struct task *task) return; #endif - eb32_insert(&timers, &task->wq); - - return; + eb32_insert(wq, &task->wq); } /* @@ -209,15 +211,14 @@ int wake_expired_tasks() int ret = TICK_ETERNITY; while (1) { - HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); - lookup_next: - eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK); + lookup_next_local: + eb = eb32_lookup_ge(&timers_local[tid], now_ms - TIMER_LOOK_BACK); if (!eb) { /* we might have reached the end of the tree, typically because * is in the first half and we're first scanning the last * half. Let's loop back to the beginning of the tree now. */ - eb = eb32_first(&timers); + eb = eb32_first(&timers_local[tid]); if (likely(!eb)) break; } @@ -247,7 +248,53 @@ int wake_expired_tasks() */ if (!tick_is_expired(task->expire, now_ms)) { if (tick_isset(task->expire)) - __task_queue(task); + __task_queue(task, &timers_local[tid]); + goto lookup_next_local; + } + task_wakeup(task, TASK_WOKEN_TIMER); + } + +#ifdef USE_THREAD + while (1) { + HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); + lookup_next: + eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK); + if (!eb) { + /* we might have reached the end of the tree, typically because + * is in the first half and we're first scanning the last + * half. Let's loop back to the beginning of the tree now. + */ + eb = eb32_first(&timers); + if (likely(!eb)) + break; + } + + if (tick_is_lt(now_ms, eb->key)) { + /* timer not expired yet, revisit it later */ + ret = tick_first(ret, eb->key); + break; + } + + /* timer looks expired, detach it from the queue */ + task = eb32_entry(eb, struct task, wq); + __task_unlink_wq(task); + + /* It is possible that this task was left at an earlier place in the + * tree because a recent call to task_queue() has not moved it. This + * happens when the new expiration date is later than the old one. + * Since it is very unlikely that we reach a timeout anyway, it's a + * lot cheaper to proceed like this because we almost never update + * the tree. We may also find disabled expiration dates there. Since + * we have detached the task from the tree, we simply call task_queue + * to take care of this. Note that we might occasionally requeue it at + * the same place, before , so we have to check if this happens, + * and adjust , otherwise we may skip it which is not what we want. + * We may also not requeue the task (and not point eb at it) if its + * expiration time is not set. + */ + if (!tick_is_expired(task->expire, now_ms)) { + if (tick_isset(task->expire)) + __task_queue(task, &timers); goto lookup_next; } task_wakeup(task, TASK_WOKEN_TIMER); @@ -255,6 +302,7 @@ int wake_expired_tasks() } HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); +#endif return ret; } @@ -415,13 +463,14 @@ int init_task() { int i; - memset(&timers, 0, sizeof(timers)); #ifdef USE_THREAD + memset(&timers, 0, sizeof(timers)); memset(&rqueue, 0, sizeof(rqueue)); #endif HA_SPIN_INIT(&wq_lock); HA_SPIN_INIT(&rq_lock); for (i = 0; i < MAX_THREADS; i++) { + memset(&timers_local[i], 0, sizeof(timers_local[i])); memset(&rqueue_local[i], 0, sizeof(rqueue_local[i])); LIST_INIT(&task_list[i]); task_list_size[i] = 0;