diff --git a/include/haproxy/task-t.h b/include/haproxy/task-t.h index 1ba91409d..34727fa4e 100644 --- a/include/haproxy/task-t.h +++ b/include/haproxy/task-t.h @@ -81,6 +81,7 @@ struct task_per_thread { int rqueue_size; /* Number of elements in the per-thread run queue */ int current_queue; /* points to current tasklet list being run, -1 if none */ struct task *current; /* current task (not tasklet) */ + unsigned int rq_total; /* total size of the run queue, prio_tree + tasklets */ uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */ __attribute__((aligned(64))) char end[0]; }; diff --git a/include/haproxy/task.h b/include/haproxy/task.h index f774ba224..58418189e 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -89,8 +89,7 @@ /* a few exported variables */ extern unsigned int nb_tasks; /* total number of tasks */ extern volatile unsigned long global_tasks_mask; /* Mask of threads with tasks in the global runqueue */ -extern unsigned int tasks_run_queue; /* run queue size */ -extern unsigned int tasks_run_queue_cur; +extern unsigned int grq_total; /* total number of entries in the global run queue */ extern unsigned int nb_tasks_cur; extern unsigned int niced_tasks; /* number of niced tasks in the run queue */ extern struct pool_head *pool_head_task; @@ -144,7 +143,22 @@ int next_timer_expiry(); */ void mworker_cleantasks(); +/* returns the number of running tasks+tasklets on the whole process. Note + * that this *is* racy since a task may move from the global to a local + * queue for example and be counted twice. This is only for statistics + * reporting. + */ +static inline int total_run_queues() +{ + int thr, ret = 0; +#ifdef USE_THREAD + ret = _HA_ATOMIC_LOAD(&grq_total); +#endif + for (thr = 0; thr < global.nbthread; thr++) + ret += _HA_ATOMIC_LOAD(&task_per_thread[thr].rq_total); + return ret; +} /* return 0 if task is in run queue, otherwise non-zero */ static inline int task_in_rq(struct task *t) @@ -294,21 +308,26 @@ static inline void task_set_affinity(struct task *t, unsigned long thread_mask) } /* - * Unlink the task from the run queue. The tasks_run_queue size and number of - * niced tasks are updated too. A pointer to the task itself is returned. The - * task *must* already be in the run queue before calling this function. If - * unsure, use the safer task_unlink_rq() function. Note that the pointer to the - * next run queue entry is neither checked nor updated. + * Unlink the task from the run queue. The run queue size and number of niced + * tasks are updated too. A pointer to the task itself is returned. The task + * *must* already be in the run queue before calling this function. If the task + * is in the global run queue, the global run queue's lock must already be held. + * If unsure, use the safer task_unlink_rq() function. Note that the pointer to + * the next run queue entry is neither checked nor updated. */ static inline struct task *__task_unlink_rq(struct task *t) { - _HA_ATOMIC_SUB(&tasks_run_queue, 1); #ifdef USE_THREAD - if (t->state & TASK_GLOBAL) + if (t->state & TASK_GLOBAL) { + grq_total--; _HA_ATOMIC_AND(&t->state, ~TASK_GLOBAL); + } else #endif + { sched->rqueue_size--; + _HA_ATOMIC_SUB(&sched->rq_total, 1); + } eb32sc_delete(&t->rq); if (likely(t->nice)) _HA_ATOMIC_SUB(&niced_tasks, 1); @@ -377,15 +396,16 @@ static inline void _tasklet_wakeup_on(struct tasklet *tl, int thr, const char *f LIST_ADDQ(&sched->tasklets[sched->current_queue], &tl->list); sched->tl_class_mask |= 1 << sched->current_queue; } + _HA_ATOMIC_ADD(&sched->rq_total, 1); } else { /* this tasklet runs on a specific thread. */ MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list, (struct mt_list *)&tl->list); + _HA_ATOMIC_ADD(&task_per_thread[thr].rq_total, 1); if (sleeping_thread_mask & (1UL << thr)) { _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr)); wake_thread(thr); } } - _HA_ATOMIC_ADD(&tasks_run_queue, 1); } /* schedules tasklet to run onto the thread designated by tl->tid, which @@ -419,7 +439,7 @@ static inline void tasklet_remove_from_tasklet_list(struct tasklet *t) { if (MT_LIST_DEL((struct mt_list *)&t->list)) { _HA_ATOMIC_AND(&t->state, ~TASK_IN_LIST); - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + _HA_ATOMIC_SUB(&task_per_thread[t->tid >= 0 ? t->tid : tid].rq_total, 1); } } @@ -547,7 +567,7 @@ static inline void task_destroy(struct task *t) static inline void tasklet_free(struct tasklet *tl) { if (MT_LIST_DEL((struct mt_list *)&tl->list)) - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + _HA_ATOMIC_SUB(&task_per_thread[tl->tid >= 0 ? tl->tid : tid].rq_total, 1); #ifdef DEBUG_TASK if ((unsigned int)tl->debug.caller_idx > 1) diff --git a/src/debug.c b/src/debug.c index bf64f19f0..3162d3282 100644 --- a/src/debug.c +++ b/src/debug.c @@ -175,7 +175,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid) LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_BULK]) && MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)), task_per_thread[thr].task_list_size, - task_per_thread[thr].rqueue_size, + task_per_thread[thr].rq_total, stuck, !!(task_profiling_mask & thr_bit)); diff --git a/src/stats.c b/src/stats.c index a63178d5a..e124f28ba 100644 --- a/src/stats.c +++ b/src/stats.c @@ -3339,7 +3339,7 @@ static void stats_dump_html_info(struct stream_interface *si, struct uri_auth *u actconn, pipes_used, pipes_used+pipes_free, read_freq_ctr(&global.conn_per_sec), bps >= 1000000000UL ? (bps / 1000000000.0) : bps >= 1000000UL ? (bps / 1000000.0) : (bps / 1000.0), bps >= 1000000000UL ? 'G' : bps >= 1000000UL ? 'M' : 'k', - tasks_run_queue_cur, nb_tasks_cur, ti->idle_pct + total_run_queues(), nb_tasks_cur, ti->idle_pct ); /* scope_txt = search query, appctx->ctx.stats.scope_len is always <= STAT_SCOPE_TXT_MAXLEN */ @@ -4366,7 +4366,7 @@ int stats_fill_info(struct field *info, int len) info[INF_MAX_ZLIB_MEM_USAGE] = mkf_u32(FO_CONFIG|FN_LIMIT, global.maxzlibmem); #endif info[INF_TASKS] = mkf_u32(0, nb_tasks_cur); - info[INF_RUN_QUEUE] = mkf_u32(0, tasks_run_queue_cur); + info[INF_RUN_QUEUE] = mkf_u32(0, total_run_queues()); info[INF_IDLE_PCT] = mkf_u32(FN_AVG, ti->idle_pct); info[INF_NODE] = mkf_str(FO_CONFIG|FN_OUTPUT|FS_SERVICE, global.node); if (global.desc) diff --git a/src/task.c b/src/task.c index 9c8312cbe..153f7d638 100644 --- a/src/task.c +++ b/src/task.c @@ -37,8 +37,6 @@ DECLARE_POOL(pool_head_notification, "notification", sizeof(struct notification) unsigned int nb_tasks = 0; volatile unsigned long global_tasks_mask = 0; /* Mask of threads with tasks in the global runqueue */ -unsigned int tasks_run_queue = 0; -unsigned int tasks_run_queue_cur = 0; /* copy of the run queue size */ unsigned int nb_tasks_cur = 0; /* copy of the tasks count */ unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */ @@ -50,6 +48,7 @@ __decl_aligned_rwlock(wq_lock); /* RW lock related to the wait queue */ #ifdef USE_THREAD struct eb_root timers; /* sorted timers tree, global, accessed under wq_lock */ struct eb_root rqueue; /* tree constituting the global run queue, accessed under rq_lock */ +unsigned int grq_total; /* total number of entries in the global run queue, use grq_lock */ static unsigned int global_rqueue_ticks; /* insertion count in the grq, use rq_lock */ #endif @@ -97,7 +96,7 @@ void task_kill(struct task *t) /* Beware: tasks that have never run don't have their ->list empty yet! */ MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list, (struct mt_list *)&((struct tasklet *)t)->list); - _HA_ATOMIC_ADD(&tasks_run_queue, 1); + _HA_ATOMIC_ADD(&task_per_thread[thr].rq_total, 1); _HA_ATOMIC_ADD(&task_per_thread[thr].task_list_size, 1); if (sleeping_thread_mask & (1UL << thr)) { _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr)); @@ -122,19 +121,18 @@ void __task_wakeup(struct task *t, struct eb_root *root) if (root == &rqueue) { HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); } -#endif - /* Make sure if the task isn't in the runqueue, nobody inserts it - * in the meanwhile. - */ - _HA_ATOMIC_ADD(&tasks_run_queue, 1); -#ifdef USE_THREAD + if (root == &rqueue) { global_tasks_mask |= t->thread_mask; + grq_total++; t->rq.key = ++global_rqueue_ticks; __ha_barrier_store(); } else #endif + { + _HA_ATOMIC_ADD(&sched->rq_total, 1); t->rq.key = ++sched->rqueue_ticks; + } if (likely(t->nice)) { int offset; @@ -460,7 +458,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) t->calls++; sched->current = t; - _HA_ATOMIC_SUB(&tasks_run_queue, 1); + _HA_ATOMIC_SUB(&sched->rq_total, 1); if (TASK_IS_TASKLET(t)) { LIST_DEL_INIT(&((struct tasklet *)t)->list); @@ -595,7 +593,6 @@ void process_runnable_tasks() return; } - tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */ nb_tasks_cur = nb_tasks; max_processed = global.tune.runqueue_depth; @@ -702,7 +699,7 @@ void process_runnable_tasks() if (picked) { tt->tl_class_mask |= 1 << TL_NORMAL; _HA_ATOMIC_ADD(&tt->task_list_size, picked); - _HA_ATOMIC_ADD(&tasks_run_queue, picked); + _HA_ATOMIC_ADD(&tt->rq_total, picked); activity[tid].tasksw += picked; }