From 9c7b8085f4cad284642e7f67d2274f2fb568f243 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Wed, 24 Feb 2021 15:10:07 +0100 Subject: [PATCH] MEDIUM: task: remove the tasks_run_queue counter and have one per thread This counter is solely used for reporting in the stats and is the hottest thread contention point to date. Moving it to the scheduler and having a separate one for the global run queue dramatically improves the performance, showing a 12% boost on the request rate on 16 threads! In addition, the thread debugging output which used to rely on rqueue_size was not totally accurate as it would only report task counts. Now we can return the exact thread's run queue length. It is also interesting to note that there are still a few other task/tasklet counters in the scheduler that are not efficiently updated because some cover a single area and others cover multiple areas. It looks like having a distinct counter for each of the following entries would help and would keep the code a bit cleaner: - global run queue (tree) - per-thread run queue (tree) - per-thread shared tasklets list - per-thread local lists Maybe even splitting the shared tasklets lists between pure tasklets and tasks instead of having the whole and tasks would simplify the code because there remain a number of places where several counters have to be updated. --- include/haproxy/task-t.h | 1 + include/haproxy/task.h | 44 +++++++++++++++++++++++++++++----------- src/debug.c | 2 +- src/stats.c | 4 ++-- src/task.c | 21 ++++++++----------- 5 files changed, 45 insertions(+), 27 deletions(-) diff --git a/include/haproxy/task-t.h b/include/haproxy/task-t.h index 1ba91409d..34727fa4e 100644 --- a/include/haproxy/task-t.h +++ b/include/haproxy/task-t.h @@ -81,6 +81,7 @@ struct task_per_thread { int rqueue_size; /* Number of elements in the per-thread run queue */ int current_queue; /* points to current tasklet list being run, -1 if none */ struct task *current; /* current task (not tasklet) */ + unsigned int rq_total; /* total size of the run queue, prio_tree + tasklets */ uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */ __attribute__((aligned(64))) char end[0]; }; diff --git a/include/haproxy/task.h b/include/haproxy/task.h index f774ba224..58418189e 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -89,8 +89,7 @@ /* a few exported variables */ extern unsigned int nb_tasks; /* total number of tasks */ extern volatile unsigned long global_tasks_mask; /* Mask of threads with tasks in the global runqueue */ -extern unsigned int tasks_run_queue; /* run queue size */ -extern unsigned int tasks_run_queue_cur; +extern unsigned int grq_total; /* total number of entries in the global run queue */ extern unsigned int nb_tasks_cur; extern unsigned int niced_tasks; /* number of niced tasks in the run queue */ extern struct pool_head *pool_head_task; @@ -144,7 +143,22 @@ int next_timer_expiry(); */ void mworker_cleantasks(); +/* returns the number of running tasks+tasklets on the whole process. Note + * that this *is* racy since a task may move from the global to a local + * queue for example and be counted twice. This is only for statistics + * reporting. + */ +static inline int total_run_queues() +{ + int thr, ret = 0; +#ifdef USE_THREAD + ret = _HA_ATOMIC_LOAD(&grq_total); +#endif + for (thr = 0; thr < global.nbthread; thr++) + ret += _HA_ATOMIC_LOAD(&task_per_thread[thr].rq_total); + return ret; +} /* return 0 if task is in run queue, otherwise non-zero */ static inline int task_in_rq(struct task *t) @@ -294,21 +308,26 @@ static inline void task_set_affinity(struct task *t, unsigned long thread_mask) } /* - * Unlink the task from the run queue. The tasks_run_queue size and number of - * niced tasks are updated too. A pointer to the task itself is returned. The - * task *must* already be in the run queue before calling this function. If - * unsure, use the safer task_unlink_rq() function. Note that the pointer to the - * next run queue entry is neither checked nor updated. + * Unlink the task from the run queue. The run queue size and number of niced + * tasks are updated too. A pointer to the task itself is returned. The task + * *must* already be in the run queue before calling this function. If the task + * is in the global run queue, the global run queue's lock must already be held. + * If unsure, use the safer task_unlink_rq() function. Note that the pointer to + * the next run queue entry is neither checked nor updated. */ static inline struct task *__task_unlink_rq(struct task *t) { - _HA_ATOMIC_SUB(&tasks_run_queue, 1); #ifdef USE_THREAD - if (t->state & TASK_GLOBAL) + if (t->state & TASK_GLOBAL) { + grq_total--; _HA_ATOMIC_AND(&t->state, ~TASK_GLOBAL); + } else #endif + { sched->rqueue_size--; + _HA_ATOMIC_SUB(&sched->rq_total, 1); + } eb32sc_delete(&t->rq); if (likely(t->nice)) _HA_ATOMIC_SUB(&niced_tasks, 1); @@ -377,15 +396,16 @@ static inline void _tasklet_wakeup_on(struct tasklet *tl, int thr, const char *f LIST_ADDQ(&sched->tasklets[sched->current_queue], &tl->list); sched->tl_class_mask |= 1 << sched->current_queue; } + _HA_ATOMIC_ADD(&sched->rq_total, 1); } else { /* this tasklet runs on a specific thread. */ MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list, (struct mt_list *)&tl->list); + _HA_ATOMIC_ADD(&task_per_thread[thr].rq_total, 1); if (sleeping_thread_mask & (1UL << thr)) { _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr)); wake_thread(thr); } } - _HA_ATOMIC_ADD(&tasks_run_queue, 1); } /* schedules tasklet to run onto the thread designated by tl->tid, which @@ -419,7 +439,7 @@ static inline void tasklet_remove_from_tasklet_list(struct tasklet *t) { if (MT_LIST_DEL((struct mt_list *)&t->list)) { _HA_ATOMIC_AND(&t->state, ~TASK_IN_LIST); - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + _HA_ATOMIC_SUB(&task_per_thread[t->tid >= 0 ? t->tid : tid].rq_total, 1); } } @@ -547,7 +567,7 @@ static inline void task_destroy(struct task *t) static inline void tasklet_free(struct tasklet *tl) { if (MT_LIST_DEL((struct mt_list *)&tl->list)) - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + _HA_ATOMIC_SUB(&task_per_thread[tl->tid >= 0 ? tl->tid : tid].rq_total, 1); #ifdef DEBUG_TASK if ((unsigned int)tl->debug.caller_idx > 1) diff --git a/src/debug.c b/src/debug.c index bf64f19f0..3162d3282 100644 --- a/src/debug.c +++ b/src/debug.c @@ -175,7 +175,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid) LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_BULK]) && MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)), task_per_thread[thr].task_list_size, - task_per_thread[thr].rqueue_size, + task_per_thread[thr].rq_total, stuck, !!(task_profiling_mask & thr_bit)); diff --git a/src/stats.c b/src/stats.c index a63178d5a..e124f28ba 100644 --- a/src/stats.c +++ b/src/stats.c @@ -3339,7 +3339,7 @@ static void stats_dump_html_info(struct stream_interface *si, struct uri_auth *u actconn, pipes_used, pipes_used+pipes_free, read_freq_ctr(&global.conn_per_sec), bps >= 1000000000UL ? (bps / 1000000000.0) : bps >= 1000000UL ? (bps / 1000000.0) : (bps / 1000.0), bps >= 1000000000UL ? 'G' : bps >= 1000000UL ? 'M' : 'k', - tasks_run_queue_cur, nb_tasks_cur, ti->idle_pct + total_run_queues(), nb_tasks_cur, ti->idle_pct ); /* scope_txt = search query, appctx->ctx.stats.scope_len is always <= STAT_SCOPE_TXT_MAXLEN */ @@ -4366,7 +4366,7 @@ int stats_fill_info(struct field *info, int len) info[INF_MAX_ZLIB_MEM_USAGE] = mkf_u32(FO_CONFIG|FN_LIMIT, global.maxzlibmem); #endif info[INF_TASKS] = mkf_u32(0, nb_tasks_cur); - info[INF_RUN_QUEUE] = mkf_u32(0, tasks_run_queue_cur); + info[INF_RUN_QUEUE] = mkf_u32(0, total_run_queues()); info[INF_IDLE_PCT] = mkf_u32(FN_AVG, ti->idle_pct); info[INF_NODE] = mkf_str(FO_CONFIG|FN_OUTPUT|FS_SERVICE, global.node); if (global.desc) diff --git a/src/task.c b/src/task.c index 9c8312cbe..153f7d638 100644 --- a/src/task.c +++ b/src/task.c @@ -37,8 +37,6 @@ DECLARE_POOL(pool_head_notification, "notification", sizeof(struct notification) unsigned int nb_tasks = 0; volatile unsigned long global_tasks_mask = 0; /* Mask of threads with tasks in the global runqueue */ -unsigned int tasks_run_queue = 0; -unsigned int tasks_run_queue_cur = 0; /* copy of the run queue size */ unsigned int nb_tasks_cur = 0; /* copy of the tasks count */ unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */ @@ -50,6 +48,7 @@ __decl_aligned_rwlock(wq_lock); /* RW lock related to the wait queue */ #ifdef USE_THREAD struct eb_root timers; /* sorted timers tree, global, accessed under wq_lock */ struct eb_root rqueue; /* tree constituting the global run queue, accessed under rq_lock */ +unsigned int grq_total; /* total number of entries in the global run queue, use grq_lock */ static unsigned int global_rqueue_ticks; /* insertion count in the grq, use rq_lock */ #endif @@ -97,7 +96,7 @@ void task_kill(struct task *t) /* Beware: tasks that have never run don't have their ->list empty yet! */ MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list, (struct mt_list *)&((struct tasklet *)t)->list); - _HA_ATOMIC_ADD(&tasks_run_queue, 1); + _HA_ATOMIC_ADD(&task_per_thread[thr].rq_total, 1); _HA_ATOMIC_ADD(&task_per_thread[thr].task_list_size, 1); if (sleeping_thread_mask & (1UL << thr)) { _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr)); @@ -122,19 +121,18 @@ void __task_wakeup(struct task *t, struct eb_root *root) if (root == &rqueue) { HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); } -#endif - /* Make sure if the task isn't in the runqueue, nobody inserts it - * in the meanwhile. - */ - _HA_ATOMIC_ADD(&tasks_run_queue, 1); -#ifdef USE_THREAD + if (root == &rqueue) { global_tasks_mask |= t->thread_mask; + grq_total++; t->rq.key = ++global_rqueue_ticks; __ha_barrier_store(); } else #endif + { + _HA_ATOMIC_ADD(&sched->rq_total, 1); t->rq.key = ++sched->rqueue_ticks; + } if (likely(t->nice)) { int offset; @@ -460,7 +458,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) t->calls++; sched->current = t; - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + _HA_ATOMIC_SUB(&sched->rq_total, 1); if (TASK_IS_TASKLET(t)) { LIST_DEL_INIT(&((struct tasklet *)t)->list); @@ -595,7 +593,6 @@ void process_runnable_tasks() return; } - tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */ nb_tasks_cur = nb_tasks; max_processed = global.tune.runqueue_depth; @@ -702,7 +699,7 @@ void process_runnable_tasks() if (picked) { tt->tl_class_mask |= 1 << TL_NORMAL; _HA_ATOMIC_ADD(&tt->task_list_size, picked); - _HA_ATOMIC_ADD(&tasks_run_queue, picked); + _HA_ATOMIC_ADD(&tt->rq_total, picked); activity[tid].tasksw += picked; }