diff --git a/src/task.c b/src/task.c index aa980485b..7b063b1bc 100644 --- a/src/task.c +++ b/src/task.c @@ -310,103 +310,96 @@ int wake_expired_tasks() * other variables (eg: nice value) to set the final position in the tree. The * counter may wrap without a problem, of course. We then limit the number of * tasks processed to 200 in any case, so that general latency remains low and - * so that task positions have a chance to be considered. + * so that task positions have a chance to be considered. The function scans + * both the global and local run queues and picks the most urgent task between + * the two. We need to grab the global runqueue lock to touch it so it's taken + * on the very first access to the global run queue and is released as soon as + * it reaches the end. * * The function adjusts if a new event is closer. */ void process_runnable_tasks() { - struct eb32sc_node *rq_next; + struct eb32sc_node *lrq = NULL; // next local run queue entry + struct eb32sc_node *grq = NULL; // next global run queue entry struct task *t; int max_processed; + if (!(active_tasks_mask & tid_bit)) { + activity[tid].empty_rq++; + return; + } + tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */ nb_tasks_cur = nb_tasks; max_processed = global.tune.runqueue_depth; - if (likely(global_tasks_mask & tid_bit)) { - HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); - if (!(active_tasks_mask & tid_bit)) { - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); - activity[tid].empty_rq++; - return; - } + /* Note: the grq lock is always held when grq is not null */ -#ifdef USE_THREAD - /* Get some elements from the global run queue and put it in the - * local run queue. To try to keep a bit of fairness, just get as - * much elements from the global list as to have a bigger local queue - * than the average. - */ - rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); - while ((task_per_thread[tid].task_list_size + task_per_thread[tid].rqueue_size) * global.nbthread <= tasks_run_queue + global.nbthread - 1) { - if (unlikely(!rq_next)) { - /* either we just started or we reached the end - * of the tree, typically because - * is in the first half and we're first scanning - * the last half. Let's loop back to the beginning - * of the tree now. - */ - rq_next = eb32sc_first(&rqueue, tid_bit); - if (!rq_next) { + while (task_per_thread[tid].task_list_size < max_processed) { + if ((global_tasks_mask & tid_bit) && !grq) { + HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); + grq = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); + if (unlikely(!grq)) { + grq = eb32sc_first(&rqueue, tid_bit); + if (!grq) { + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); _HA_ATOMIC_AND(&global_tasks_mask, ~tid_bit); - break; } } - - t = eb32sc_entry(rq_next, struct task, rq); - rq_next = eb32sc_next(rq_next, tid_bit); - - /* detach the task from the queue */ - __task_unlink_rq(t); - __task_wakeup(t, &task_per_thread[tid].rqueue); } -#endif - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); - } else { - if (!(active_tasks_mask & tid_bit)) { - activity[tid].empty_rq++; - return; - } - } - /* Get some tasks from the run queue, make sure we don't - * get too much in the task list, but put a bit more than - * the max that will be run, to give a bit more fairness - */ - rq_next = eb32sc_lookup_ge(&task_per_thread[tid].rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); - while (max_processed + (max_processed / 10) > task_per_thread[tid].task_list_size) { - /* Note: this loop is one of the fastest code path in - * the whole program. It should not be re-arranged - * without a good reason. + /* If a global task is available for this thread, it's in grq + * now and the global RQ is locked. */ - if (unlikely(!rq_next)) { - /* either we just started or we reached the end - * of the tree, typically because - * is in the first half and we're first scanning - * the last half. Let's loop back to the beginning - * of the tree now. - */ - rq_next = eb32sc_first(&task_per_thread[tid].rqueue, tid_bit); - if (!rq_next) - break; + + if (!lrq) { + lrq = eb32sc_lookup_ge(&task_per_thread[tid].rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); + if (unlikely(!lrq)) + lrq = eb32sc_first(&task_per_thread[tid].rqueue, tid_bit); } - t = eb32sc_entry(rq_next, struct task, rq); - rq_next = eb32sc_next(rq_next, tid_bit); + + if (!lrq && !grq) + break; + + if (likely(!grq || (lrq && (int)(lrq->key - grq->key) <= 0))) { + t = eb32sc_entry(lrq, struct task, rq); + lrq = eb32sc_next(lrq, tid_bit); + __task_unlink_rq(t); + } + else { + t = eb32sc_entry(grq, struct task, rq); + grq = eb32sc_next(grq, tid_bit); + __task_unlink_rq(t); + if (unlikely(!grq)) { + grq = eb32sc_first(&rqueue, tid_bit); + if (!grq) { + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + _HA_ATOMIC_AND(&global_tasks_mask, ~tid_bit); + } + } + } + /* Make sure nobody re-adds the task in the runqueue */ _HA_ATOMIC_OR(&t->state, TASK_RUNNING); - /* detach the task from the queue */ - __task_unlink_rq(t); /* And add it to the local task list */ task_insert_into_tasklet_list(t); } + + /* release the rqueue lock */ + if (grq) { + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); + grq = NULL; + } + if (!(global_tasks_mask & tid_bit) && task_per_thread[tid].rqueue_size == 0) { _HA_ATOMIC_AND(&active_tasks_mask, ~tid_bit); __ha_barrier_atomic_load(); if (global_tasks_mask & tid_bit) _HA_ATOMIC_OR(&active_tasks_mask, tid_bit); } + while (max_processed > 0 && !LIST_ISEMPTY(&task_per_thread[tid].task_list)) { struct task *t; unsigned short state; @@ -463,11 +456,11 @@ void process_runnable_tasks() } max_processed--; - if (max_processed <= 0) { - _HA_ATOMIC_OR(&active_tasks_mask, tid_bit); - activity[tid].long_rq++; - break; - } + } + + if (!LIST_ISEMPTY(&task_per_thread[tid].task_list)) { + _HA_ATOMIC_OR(&active_tasks_mask, tid_bit); + activity[tid].long_rq++; } }