diff --git a/include/proto/task.h b/include/proto/task.h index c1c4c07ec..59ac38258 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -92,6 +92,8 @@ extern struct pool_head *pool_head_task; extern struct pool_head *pool_head_notification; extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */ extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */ +extern struct eb_root rqueue; /* tree constituting the run queue */ +extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ __decl_hathreads(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */ __decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait queue */ @@ -109,25 +111,28 @@ static inline int task_in_wq(struct task *t) } /* puts the task in run queue with reason flags , and returns */ -struct task *__task_wakeup(struct task *t); -static inline struct task *task_wakeup(struct task *t, unsigned int f) +/* This will put the task in the local runqueue if the task is only runnable + * by the current thread, in the global runqueue otherwies. + */ +void __task_wakeup(struct task *t, struct eb_root *); +static inline void task_wakeup(struct task *t, unsigned int f) { - HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); + unsigned short state; - /* If task is running, we postpone the call - * and backup the state. - */ - if (unlikely(t->state & TASK_RUNNING)) { - t->pending_state |= f; - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); - return t; - } - if (likely(!task_in_rq(t))) - __task_wakeup(t); - t->state |= f; - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); +#ifdef USE_THREAD + struct eb_root *root; - return t; + if (t->thread_mask == tid_bit && global.nbthread > 1) + root = &rqueue_local[tid]; + else + root = &rqueue; +#else + struct eb_root *root = &rqueue; +#endif + + state = HA_ATOMIC_OR(&t->state, f); + if (!(state & TASK_RUNNING)) + __task_wakeup(t, root); } /* change the thread affinity of a task to */ @@ -167,9 +172,9 @@ static inline struct task *task_unlink_wq(struct task *t) static inline struct task *__task_unlink_rq(struct task *t) { eb32sc_delete(&t->rq); - tasks_run_queue--; + HA_ATOMIC_SUB(&tasks_run_queue, 1); if (likely(t->nice)) - niced_tasks--; + HA_ATOMIC_SUB(&niced_tasks, 1); return t; } @@ -178,13 +183,15 @@ static inline struct task *__task_unlink_rq(struct task *t) */ static inline struct task *task_unlink_rq(struct task *t) { - HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); + if (t->thread_mask != tid_bit) + HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); if (likely(task_in_rq(t))) { if (&t->rq == rq_next) rq_next = eb32sc_next(rq_next, tid_bit); __task_unlink_rq(t); } - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + if (t->thread_mask != tid_bit) + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); return t; } @@ -208,7 +215,7 @@ static inline struct task *task_init(struct task *t, unsigned long thread_mask) { t->wq.node.leaf_p = NULL; t->rq.node.leaf_p = NULL; - t->pending_state = t->state = TASK_SLEEPING; + t->state = TASK_SLEEPING; t->thread_mask = thread_mask; t->nice = 0; t->calls = 0; diff --git a/src/task.c b/src/task.c index 23e310b09..876b837e8 100644 --- a/src/task.c +++ b/src/task.c @@ -45,7 +45,10 @@ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lo __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */ static struct eb_root timers; /* sorted timers tree */ -static struct eb_root rqueue; /* tree constituting the run queue */ +struct eb_root rqueue; /* tree constituting the run queue */ +struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ +static int global_rqueue_size; /* Number of element sin the global runqueue */ +static int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */ static unsigned int rqueue_ticks; /* insertion count */ /* Puts the task in run queue at a position depending on t->nice. is @@ -56,30 +59,76 @@ static unsigned int rqueue_ticks; /* insertion count */ * The task must not already be in the run queue. If unsure, use the safer * task_wakeup() function. */ -struct task *__task_wakeup(struct task *t) +void __task_wakeup(struct task *t, struct eb_root *root) { - tasks_run_queue++; + void *expected = NULL; + int *rq_size; + + if (root == &rqueue) { + rq_size = &global_rqueue_size; + HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); + } else { + int nb = root - &rqueue_local[0]; + rq_size = &rqueue_size[nb]; + } + /* Make sure if the task isn't in the runqueue, nobody inserts it + * in the meanwhile. + */ +redo: + if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, 0x1))) { + if (root == &rqueue) + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + return; + } + /* There's a small race condition, when running a task, the thread + * first sets TASK_RUNNING, and then unlink the task. + * If an another thread calls task_wakeup() for the same task, + * it may set t->state before TASK_RUNNING was set, and then try + * to set t->rq.nod.leaf_p after it was unlinked. + * To make sure it is not a problem, we check if TASK_RUNNING is set + * again. If it is, we unset t->rq.node.leaf_p. + * We then check for TASK_RUNNING a third time. If it is still there, + * then we can give up, the task will be re-queued later if it needs + * to be. If it's not there, and there is still something in t->state, + * then we have to requeue. + */ + if (((volatile unsigned short)(t->state)) & TASK_RUNNING) { + unsigned short state; + t->rq.node.leaf_p = NULL; + __ha_barrier_store(); + + state = (volatile unsigned short)(t->state); + if (unlikely(state != 0 && !(state & TASK_RUNNING))) + goto redo; + if (root == &rqueue) + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + return; + } + HA_ATOMIC_ADD(&tasks_run_queue, 1); active_tasks_mask |= t->thread_mask; - t->rq.key = ++rqueue_ticks; + t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1); if (likely(t->nice)) { int offset; - niced_tasks++; + HA_ATOMIC_ADD(&niced_tasks, 1); if (likely(t->nice > 0)) - offset = (unsigned)((tasks_run_queue * (unsigned int)t->nice) / 32U); + offset = (unsigned)((*rq_size * (unsigned int)t->nice) / 32U); else - offset = -(unsigned)((tasks_run_queue * (unsigned int)-t->nice) / 32U); + offset = -(unsigned)((*rq_size * (unsigned int)-t->nice) / 32U); t->rq.key += offset; } - /* reset flag to pending ones - * Note: __task_wakeup must not be called - * if task is running - */ - t->state = t->pending_state; - eb32sc_insert(&rqueue, &t->rq, t->thread_mask); - return t; + eb32sc_insert(root, &t->rq, t->thread_mask); + if (root == &rqueue) { + global_rqueue_size++; + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + } else { + int nb = root - &rqueue_local[0]; + + rqueue_size[nb]++; + } + return; } /* @@ -185,11 +234,8 @@ int wake_expired_tasks() void process_runnable_tasks() { struct task *t; - int i; int max_processed; - struct task *local_tasks[16]; - int local_tasks_count; - int final_tasks_count; + uint64_t average = 0; tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */ nb_tasks_cur = nb_tasks; @@ -216,9 +262,11 @@ void process_runnable_tasks() t = eb32sc_entry(rq_next, struct task, rq); rq_next = eb32sc_next(rq_next, tid_bit); + global_rqueue_size--; + + /* detach the task from the queue */ __task_unlink_rq(t); t->state |= TASK_RUNNING; - t->pending_state = 0; t->calls++; curr_task = t; @@ -244,8 +292,8 @@ void process_runnable_tasks() * immediatly, else we defer * it into wait queue */ - if (t->pending_state) - __task_wakeup(t); + if (t->state) + __task_wakeup(t, &rqueue); else task_queue(t); } @@ -267,104 +315,105 @@ void process_runnable_tasks() return; } + average = tasks_run_queue / global.nbthread; + + /* Get some elements from the global run queue and put it in the + * local run queue. To try to keep a bit of fairness, just get as + * much elements from the global list as to have a bigger local queue + * than the average. + */ + while (rqueue_size[tid] <= average) { + + /* we have to restart looking up after every batch */ + rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); + if (unlikely(!rq_next)) { + /* either we just started or we reached the end + * of the tree, typically because + * is in the first half and we're first scanning + * the last half. Let's loop back to the beginning + * of the tree now. + */ + rq_next = eb32sc_first(&rqueue, tid_bit); + if (!rq_next) + break; + } + + t = eb32sc_entry(rq_next, struct task, rq); + rq_next = eb32sc_next(rq_next, tid_bit); + + /* detach the task from the queue */ + __task_unlink_rq(t); + __task_wakeup(t, &rqueue_local[tid]); + } + + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); active_tasks_mask &= ~tid_bit; while (1) { + unsigned short state; /* Note: this loop is one of the fastest code path in * the whole program. It should not be re-arranged * without a good reason. */ /* we have to restart looking up after every batch */ - rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); - for (local_tasks_count = 0; local_tasks_count < 16; local_tasks_count++) { - if (unlikely(!rq_next)) { - /* either we just started or we reached the end - * of the tree, typically because - * is in the first half and we're first scanning - * the last half. Let's loop back to the beginning - * of the tree now. - */ - rq_next = eb32sc_first(&rqueue, tid_bit); - if (!rq_next) - break; - } - - t = eb32sc_entry(rq_next, struct task, rq); - rq_next = eb32sc_next(rq_next, tid_bit); - - /* detach the task from the queue */ - __task_unlink_rq(t); - local_tasks[local_tasks_count] = t; - t->state |= TASK_RUNNING; - t->pending_state = 0; - t->calls++; - max_processed--; - } - - if (!local_tasks_count) - break; - - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); - - final_tasks_count = 0; - for (i = 0; i < local_tasks_count ; i++) { - t = local_tasks[i]; - /* This is an optimisation to help the processor's branch - * predictor take this most common call. + rq_next = eb32sc_lookup_ge(&rqueue_local[tid], rqueue_ticks - TIMER_LOOK_BACK, tid_bit); + if (unlikely(!rq_next)) { + /* either we just started or we reached the end + * of the tree, typically because + * is in the first half and we're first scanning + * the last half. Let's loop back to the beginning + * of the tree now. */ - curr_task = t; - if (likely(t->process == process_stream)) - t = process_stream(t, t->context, t->state); - else { - if (t->process != NULL) - t = t->process(t, t->context, t->state); - else { - __task_free(t); - t = NULL; - } - } - curr_task = NULL; - if (t) - local_tasks[final_tasks_count++] = t; + rq_next = eb32sc_first(&rqueue_local[tid], tid_bit); + if (!rq_next) + break; } + t = eb32sc_entry(rq_next, struct task, rq); + rq_next = eb32sc_next(rq_next, tid_bit); - for (i = 0; i < final_tasks_count ; i++) { - t = local_tasks[i]; - /* If there is a pending state - * we have to wake up the task - * immediatly, else we defer - * it into wait queue - */ - HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); - t->state &= ~TASK_RUNNING; - if (t->pending_state) { - __task_wakeup(t); - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); - } - else { - /* we must never hold the RQ lock before the WQ lock */ - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING); + /* detach the task from the queue */ + __task_unlink_rq(t); + t->calls++; + max_processed--; + rqueue_size[tid]--; + curr_task = t; + if (likely(t->process == process_stream)) + t = process_stream(t, t->context, state); + else + t = t->process(t, t->context, state); + curr_task = NULL; + /* If there is a pending state we have to wake up the task + * immediatly, else we defer it into wait queue + */ + if (t != NULL) { + state = HA_ATOMIC_AND(&t->state, ~TASK_RUNNING); + if (state) + __task_wakeup(t, (t->thread_mask == tid_bit) ? + &rqueue_local[tid] : &rqueue); + else task_queue(t); - } } - HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); if (max_processed <= 0) { active_tasks_mask |= tid_bit; activity[tid].long_rq++; break; } } - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); } /* perform minimal intializations, report 0 in case of error, 1 if OK. */ int init_task() { + int i; + memset(&timers, 0, sizeof(timers)); memset(&rqueue, 0, sizeof(rqueue)); HA_SPIN_INIT(&wq_lock); HA_SPIN_INIT(&rq_lock); + for (i = 0; i < MAX_THREADS; i++) + memset(&rqueue_local[i], 0, sizeof(rqueue_local[i])); pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED); if (!pool_head_task) return 0;