mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-11-29 14:50:59 +01:00
MEDIUM: task: Split the tasklet list into two lists.
As using an mt_list for the tasklet list is costly, instead use a regular list, but add an mt_list for tasklet woken up by other threads, to be run on the current thread. At the beginning of process_runnable_tasks(), we just take the new list, and merge it into the task_list. This should give us performances comparable to before we started using a mt_list, but allow us to use tasklet_wakeup() from other threads.
This commit is contained in:
parent
d7f2bbcbe3
commit
06910464dd
@ -228,11 +228,18 @@ static inline struct task *task_unlink_rq(struct task *t)
|
|||||||
|
|
||||||
static inline void tasklet_wakeup(struct tasklet *tl)
|
static inline void tasklet_wakeup(struct tasklet *tl)
|
||||||
{
|
{
|
||||||
if (MT_LIST_ADDQ(&task_per_thread[tl->tid].task_list, &tl->list) == 1) {
|
if (tl->tid == tid) {
|
||||||
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
if (LIST_ISEMPTY(&tl->list)) {
|
||||||
if (sleeping_thread_mask & (1 << tl->tid)) {
|
LIST_ADDQ(&task_per_thread[tl->tid].task_list, &tl->list);
|
||||||
_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1 << tl->tid));
|
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
||||||
wake_thread(tl->tid);
|
}
|
||||||
|
} else {
|
||||||
|
if (MT_LIST_ADDQ(&task_per_thread[tl->tid].shared_tasklet_list, (struct mt_list *)&tl->list) == 1) {
|
||||||
|
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
||||||
|
if (sleeping_thread_mask & (1 << tl->tid)) {
|
||||||
|
_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1 << tl->tid));
|
||||||
|
wake_thread(tl->tid);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,23 +250,25 @@ static inline void tasklet_wakeup(struct tasklet *tl)
|
|||||||
*/
|
*/
|
||||||
static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl)
|
static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl)
|
||||||
{
|
{
|
||||||
if (MT_LIST_ADDQ(&sched->task_list, &tl->list) == 1)
|
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
||||||
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
LIST_ADDQ(&sched->task_list, &tl->list);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Remove the tasklet from the tasklet list. The tasklet MUST already be there.
|
/* Remove the tasklet from the tasklet list. The tasklet MUST already be there.
|
||||||
* If unsure, use tasklet_remove_from_tasklet_list() instead. If used with a
|
* If unsure, use tasklet_remove_from_tasklet_list() instead. If used with a
|
||||||
* plain task, the caller must update the task_list_size.
|
* plain task, the caller must update the task_list_size.
|
||||||
|
* This should only be used by the thread that owns the tasklet, any other
|
||||||
|
* thread should use tasklet_cancel().
|
||||||
*/
|
*/
|
||||||
static inline void __tasklet_remove_from_tasklet_list(struct tasklet *t)
|
static inline void __tasklet_remove_from_tasklet_list(struct tasklet *t)
|
||||||
{
|
{
|
||||||
if (MT_LIST_DEL(&t->list) == 1)
|
LIST_DEL_INIT(&t->list);
|
||||||
_HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
_HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void tasklet_remove_from_tasklet_list(struct tasklet *t)
|
static inline void tasklet_remove_from_tasklet_list(struct tasklet *t)
|
||||||
{
|
{
|
||||||
if (likely(!MT_LIST_ISEMPTY(&t->list)))
|
if (likely(!LIST_ISEMPTY(&t->list)))
|
||||||
__tasklet_remove_from_tasklet_list(t);
|
__tasklet_remove_from_tasklet_list(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -290,7 +299,7 @@ static inline void tasklet_init(struct tasklet *t)
|
|||||||
t->state = 0;
|
t->state = 0;
|
||||||
t->process = NULL;
|
t->process = NULL;
|
||||||
t->tid = tid;
|
t->tid = tid;
|
||||||
MT_LIST_INIT(&t->list);
|
LIST_INIT(&t->list);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline struct tasklet *tasklet_new(void)
|
static inline struct tasklet *tasklet_new(void)
|
||||||
@ -359,11 +368,12 @@ static inline void task_destroy(struct task *t)
|
|||||||
t->process = NULL;
|
t->process = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Should only be called by the thread responsible for the tasklet */
|
||||||
static inline void tasklet_free(struct tasklet *tl)
|
static inline void tasklet_free(struct tasklet *tl)
|
||||||
{
|
{
|
||||||
if (!MT_LIST_ISEMPTY(&tl->list)) {
|
if (!LIST_ISEMPTY(&tl->list)) {
|
||||||
if(MT_LIST_DEL(&tl->list) == 1)
|
LIST_DEL(&tl->list);
|
||||||
_HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
_HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
pool_free(pool_head_tasklet, tl);
|
pool_free(pool_head_tasklet, tl);
|
||||||
@ -545,7 +555,7 @@ static inline int thread_has_tasks(void)
|
|||||||
{
|
{
|
||||||
return (!!(global_tasks_mask & tid_bit) |
|
return (!!(global_tasks_mask & tid_bit) |
|
||||||
(sched->rqueue_size > 0) |
|
(sched->rqueue_size > 0) |
|
||||||
!MT_LIST_ISEMPTY(&sched->task_list));
|
!LIST_ISEMPTY(&sched->task_list) | !MT_LIST_ISEMPTY(&sched->shared_tasklet_list));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* adds list item <item> to work list <work> and wake up the associated task */
|
/* adds list item <item> to work list <work> and wake up the associated task */
|
||||||
|
|||||||
@ -61,7 +61,8 @@ struct notification {
|
|||||||
struct task_per_thread {
|
struct task_per_thread {
|
||||||
struct eb_root timers; /* tree constituting the per-thread wait queue */
|
struct eb_root timers; /* tree constituting the per-thread wait queue */
|
||||||
struct eb_root rqueue; /* tree constituting the per-thread run queue */
|
struct eb_root rqueue; /* tree constituting the per-thread run queue */
|
||||||
struct mt_list task_list; /* List of tasks to be run, mixing tasks and tasklets */
|
struct list task_list; /* List of tasks to be run, mixing tasks and tasklets */
|
||||||
|
struct mt_list shared_tasklet_list; /* Tasklet to be run, woken up by other threads */
|
||||||
int task_list_size; /* Number of tasks in the task_list */
|
int task_list_size; /* Number of tasks in the task_list */
|
||||||
int rqueue_size; /* Number of elements in the per-thread run queue */
|
int rqueue_size; /* Number of elements in the per-thread run queue */
|
||||||
struct task *current; /* current task (not tasklet) */
|
struct task *current; /* current task (not tasklet) */
|
||||||
@ -95,7 +96,7 @@ struct task {
|
|||||||
/* lightweight tasks, without priority, mainly used for I/Os */
|
/* lightweight tasks, without priority, mainly used for I/Os */
|
||||||
struct tasklet {
|
struct tasklet {
|
||||||
TASK_COMMON; /* must be at the beginning! */
|
TASK_COMMON; /* must be at the beginning! */
|
||||||
struct mt_list list;
|
struct list list;
|
||||||
int tid; /* TID of the tasklet owner */
|
int tid; /* TID of the tasklet owner */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -57,7 +57,8 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid)
|
|||||||
!!(global_tasks_mask & thr_bit),
|
!!(global_tasks_mask & thr_bit),
|
||||||
!eb_is_empty(&task_per_thread[thr].timers),
|
!eb_is_empty(&task_per_thread[thr].timers),
|
||||||
!eb_is_empty(&task_per_thread[thr].rqueue),
|
!eb_is_empty(&task_per_thread[thr].rqueue),
|
||||||
!MT_LIST_ISEMPTY(&task_per_thread[thr].task_list),
|
!(LIST_ISEMPTY(&task_per_thread[thr].task_list) |
|
||||||
|
MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
|
||||||
task_per_thread[thr].task_list_size,
|
task_per_thread[thr].task_list_size,
|
||||||
task_per_thread[thr].rqueue_size,
|
task_per_thread[thr].rqueue_size,
|
||||||
stuck,
|
stuck,
|
||||||
|
|||||||
26
src/task.c
26
src/task.c
@ -305,6 +305,7 @@ void process_runnable_tasks()
|
|||||||
struct eb32sc_node *grq = NULL; // next global run queue entry
|
struct eb32sc_node *grq = NULL; // next global run queue entry
|
||||||
struct task *t;
|
struct task *t;
|
||||||
int max_processed;
|
int max_processed;
|
||||||
|
struct mt_list *tmp_list;
|
||||||
|
|
||||||
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
||||||
|
|
||||||
@ -312,6 +313,12 @@ void process_runnable_tasks()
|
|||||||
activity[tid].empty_rq++;
|
activity[tid].empty_rq++;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
/* Merge the list of tasklets waken up by other threads to the
|
||||||
|
* main list.
|
||||||
|
*/
|
||||||
|
tmp_list = MT_LIST_BEHEAD(&sched->shared_tasklet_list);
|
||||||
|
if (tmp_list)
|
||||||
|
LIST_SPLICE_END_DETACHED(&sched->task_list, (struct list *)tmp_list);
|
||||||
|
|
||||||
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
|
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
|
||||||
nb_tasks_cur = nb_tasks;
|
nb_tasks_cur = nb_tasks;
|
||||||
@ -371,10 +378,10 @@ void process_runnable_tasks()
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* Make sure the entry doesn't appear to be in a list */
|
/* Make sure the entry doesn't appear to be in a list */
|
||||||
MT_LIST_INIT(&((struct tasklet *)t)->list);
|
LIST_INIT(&((struct tasklet *)t)->list);
|
||||||
/* And add it to the local task list */
|
/* And add it to the local task list */
|
||||||
tasklet_insert_into_tasklet_list((struct tasklet *)t);
|
tasklet_insert_into_tasklet_list((struct tasklet *)t);
|
||||||
HA_ATOMIC_ADD(&tt->task_list_size, 1);
|
tt->task_list_size++;
|
||||||
activity[tid].tasksw++;
|
activity[tid].tasksw++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -384,18 +391,16 @@ void process_runnable_tasks()
|
|||||||
grq = NULL;
|
grq = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (max_processed > 0 && !MT_LIST_ISEMPTY(&tt->task_list)) {
|
while (max_processed > 0 && !LIST_ISEMPTY(&tt->task_list)) {
|
||||||
struct task *t;
|
struct task *t;
|
||||||
unsigned short state;
|
unsigned short state;
|
||||||
void *ctx;
|
void *ctx;
|
||||||
struct task *(*process)(struct task *t, void *ctx, unsigned short state);
|
struct task *(*process)(struct task *t, void *ctx, unsigned short state);
|
||||||
|
|
||||||
t = (struct task *)MT_LIST_POP(&tt->task_list, struct tasklet *, list);
|
t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list);
|
||||||
if (!t)
|
|
||||||
break;
|
|
||||||
_HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
|
||||||
state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
|
state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
|
||||||
__ha_barrier_atomic_store();
|
__ha_barrier_atomic_store();
|
||||||
|
__tasklet_remove_from_tasklet_list((struct tasklet *)t);
|
||||||
|
|
||||||
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
||||||
activity[tid].ctxsw++;
|
activity[tid].ctxsw++;
|
||||||
@ -411,7 +416,7 @@ void process_runnable_tasks()
|
|||||||
|
|
||||||
/* OK then this is a regular task */
|
/* OK then this is a regular task */
|
||||||
|
|
||||||
_HA_ATOMIC_SUB(&tt->task_list_size, 1);
|
tt->task_list_size--;
|
||||||
if (unlikely(t->call_date)) {
|
if (unlikely(t->call_date)) {
|
||||||
uint64_t now_ns = now_mono_time();
|
uint64_t now_ns = now_mono_time();
|
||||||
|
|
||||||
@ -456,7 +461,7 @@ void process_runnable_tasks()
|
|||||||
max_processed--;
|
max_processed--;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!MT_LIST_ISEMPTY(&tt->task_list))
|
if (!LIST_ISEMPTY(&tt->task_list))
|
||||||
activity[tid].long_rq++;
|
activity[tid].long_rq++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -560,7 +565,8 @@ static void init_task()
|
|||||||
#endif
|
#endif
|
||||||
memset(&task_per_thread, 0, sizeof(task_per_thread));
|
memset(&task_per_thread, 0, sizeof(task_per_thread));
|
||||||
for (i = 0; i < MAX_THREADS; i++) {
|
for (i = 0; i < MAX_THREADS; i++) {
|
||||||
MT_LIST_INIT(&task_per_thread[i].task_list);
|
LIST_INIT(&task_per_thread[i].task_list);
|
||||||
|
MT_LIST_INIT(&task_per_thread[i].shared_tasklet_list);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user