diff --git a/include/haproxy/task.h b/include/haproxy/task.h index 58910fe71..6c4a5f7fb 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -99,7 +99,6 @@ extern struct pool_head *pool_head_notification; #ifdef USE_THREAD extern struct eb_root timers; /* sorted timers tree, global */ -extern struct eb_root rqueue; /* tree constituting the run queue */ #endif __decl_thread(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */ diff --git a/include/haproxy/tinfo-t.h b/include/haproxy/tinfo-t.h index df5ac7ab3..94d08d4f7 100644 --- a/include/haproxy/tinfo-t.h +++ b/include/haproxy/tinfo-t.h @@ -102,6 +102,9 @@ struct thread_ctx { uint64_t prev_cpu_time; /* previous per thread CPU time */ uint64_t prev_mono_time; /* previous system wide monotonic time */ + + struct eb_root rqueue_shared; /* run queue fed by other threads */ + ALWAYS_ALIGN(128); }; diff --git a/src/activity.c b/src/activity.c index eb49169b5..c86d4d7de 100644 --- a/src/activity.c +++ b/src/activity.c @@ -873,17 +873,20 @@ static int cli_io_handler_show_tasks(struct appctx *appctx) /* 1. global run queue */ #ifdef USE_THREAD - rqnode = eb32sc_first(&rqueue, ~0UL); - while (rqnode) { - t = eb32sc_entry(rqnode, struct task, rq); - entry = sched_activity_entry(tmp_activity, t->process); - if (t->call_date) { - lat = now_ns - t->call_date; - if ((int64_t)lat > 0) - entry->lat_time += lat; + for (thr = 0; thr < global.nbthread; thr++) { + /* task run queue */ + rqnode = eb32sc_first(&ha_thread_ctx[thr].rqueue_shared, ~0UL); + while (rqnode) { + t = eb32sc_entry(rqnode, struct task, rq); + entry = sched_activity_entry(tmp_activity, t->process); + if (t->call_date) { + lat = now_ns - t->call_date; + if ((int64_t)lat > 0) + entry->lat_time += lat; + } + entry->calls++; + rqnode = eb32sc_next(rqnode, ~0UL); } - entry->calls++; - rqnode = eb32sc_next(rqnode, ~0UL); } #endif /* 2. all threads's local run queues */ diff --git a/src/task.c b/src/task.c index 247ed807a..f97214224 100644 --- a/src/task.c +++ b/src/task.c @@ -43,9 +43,7 @@ __decl_aligned_rwlock(wq_lock); /* RW lock related to the wait queue */ #ifdef USE_THREAD struct eb_root timers; /* sorted timers tree, global, accessed under wq_lock */ -struct eb_root rqueue; /* tree constituting the global run queue, accessed under rq_lock */ unsigned int grq_total; /* total number of entries in the global run queue, atomic */ -static unsigned int global_rqueue_ticks; /* insertion count in the grq, use rq_lock */ #endif @@ -234,7 +232,7 @@ void __task_wakeup(struct task *t) #ifdef USE_THREAD if (thr != tid) { - root = &rqueue; + root = &ha_thread_ctx[thr].rqueue_shared; _HA_ATOMIC_INC(&grq_total); HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); @@ -243,7 +241,7 @@ void __task_wakeup(struct task *t) global_tasks_mask = all_threads_mask; else global_tasks_mask |= 1UL << thr; - t->rq.key = ++global_rqueue_ticks; + t->rq.key = _HA_ATOMIC_ADD_FETCH(&ha_thread_ctx[thr].rqueue_ticks, 1); __ha_barrier_store(); } else #endif @@ -838,9 +836,9 @@ void process_runnable_tasks() if ((global_tasks_mask & tid_bit) && !grq) { #ifdef USE_THREAD HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); - grq = eb32sc_lookup_ge(&rqueue, global_rqueue_ticks - TIMER_LOOK_BACK, tid_bit); + grq = eb32sc_lookup_ge(&th_ctx->rqueue_shared, _HA_ATOMIC_LOAD(&tt->rqueue_ticks) - TIMER_LOOK_BACK, tid_bit); if (unlikely(!grq)) { - grq = eb32sc_first(&rqueue, tid_bit); + grq = eb32sc_first(&th_ctx->rqueue_shared, tid_bit); if (!grq) { global_tasks_mask &= ~tid_bit; HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); @@ -875,7 +873,7 @@ void process_runnable_tasks() eb32sc_delete(&t->rq); if (unlikely(!grq)) { - grq = eb32sc_first(&rqueue, tid_bit); + grq = eb32sc_first(&th_ctx->rqueue_shared, tid_bit); if (!grq) { global_tasks_mask &= ~tid_bit; HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); @@ -942,7 +940,7 @@ void mworker_cleantasks() #ifdef USE_THREAD /* cleanup the global run queue */ - tmp_rq = eb32sc_first(&rqueue, ~0UL); + tmp_rq = eb32sc_first(&th_ctx->rqueue_shared, ~0UL); while (tmp_rq) { t = eb32sc_entry(tmp_rq, struct task, rq); tmp_rq = eb32sc_next(tmp_rq, ~0UL); @@ -981,7 +979,6 @@ static void init_task() #ifdef USE_THREAD memset(&timers, 0, sizeof(timers)); - memset(&rqueue, 0, sizeof(rqueue)); #endif for (i = 0; i < MAX_THREADS; i++) { for (q = 0; q < TL_CLASSES; q++)