MEDIUM: tasklets: Make the tasklet list a struct mt_list.

Change the tasklet code so that the tasklet list is now a mt_list.
That means that tasklet now do have an associated tid, for the thread it
is expected to run on, and any thread can now call tasklet_wakeup() for
that tasklet.
One can change the associated tid with tasklet_set_tid().
This commit is contained in:
Olivier Houchard 2019-09-20 17:18:35 +02:00 committed by Olivier Houchard
parent 0cd6a976ff
commit ff1e9f39b9
5 changed files with 36 additions and 24 deletions

View File

@ -226,10 +226,8 @@ static inline struct task *task_unlink_rq(struct task *t)
static inline void tasklet_wakeup(struct tasklet *tl) static inline void tasklet_wakeup(struct tasklet *tl)
{ {
if (!LIST_ISEMPTY(&tl->list)) if (MT_LIST_ADDQ(&task_per_thread[tl->tid].task_list, &tl->list) == 1)
return; _HA_ATOMIC_ADD(&tasks_run_queue, 1);
LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list);
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
} }
@ -238,8 +236,8 @@ static inline void tasklet_wakeup(struct tasklet *tl)
*/ */
static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl) static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl)
{ {
_HA_ATOMIC_ADD(&tasks_run_queue, 1); if (MT_LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list) == 1)
LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list); _HA_ATOMIC_ADD(&tasks_run_queue, 1);
} }
/* Remove the tasklet from the tasklet list. The tasklet MUST already be there. /* Remove the tasklet from the tasklet list. The tasklet MUST already be there.
@ -248,13 +246,13 @@ static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl)
*/ */
static inline void __tasklet_remove_from_tasklet_list(struct tasklet *t) static inline void __tasklet_remove_from_tasklet_list(struct tasklet *t)
{ {
LIST_DEL_INIT(&t->list); if (MT_LIST_DEL(&t->list) == 1)
_HA_ATOMIC_SUB(&tasks_run_queue, 1); _HA_ATOMIC_SUB(&tasks_run_queue, 1);
} }
static inline void tasklet_remove_from_tasklet_list(struct tasklet *t) static inline void tasklet_remove_from_tasklet_list(struct tasklet *t)
{ {
if (likely(!LIST_ISEMPTY(&t->list))) if (likely(!MT_LIST_ISEMPTY(&t->list)))
__tasklet_remove_from_tasklet_list(t); __tasklet_remove_from_tasklet_list(t);
} }
@ -284,7 +282,8 @@ static inline void tasklet_init(struct tasklet *t)
t->calls = 0; t->calls = 0;
t->state = 0; t->state = 0;
t->process = NULL; t->process = NULL;
LIST_INIT(&t->list); t->tid = tid;
MT_LIST_INIT(&t->list);
} }
static inline struct tasklet *tasklet_new(void) static inline struct tasklet *tasklet_new(void)
@ -355,9 +354,9 @@ static inline void task_destroy(struct task *t)
static inline void tasklet_free(struct tasklet *tl) static inline void tasklet_free(struct tasklet *tl)
{ {
if (!LIST_ISEMPTY(&tl->list)) { if (!MT_LIST_ISEMPTY(&tl->list)) {
LIST_DEL(&tl->list); if(MT_LIST_DEL(&tl->list) == 1)
_HA_ATOMIC_SUB(&tasks_run_queue, 1); _HA_ATOMIC_SUB(&tasks_run_queue, 1);
} }
if ((struct task *)tl == curr_task) { if ((struct task *)tl == curr_task) {
@ -369,6 +368,11 @@ static inline void tasklet_free(struct tasklet *tl)
pool_flush(pool_head_tasklet); pool_flush(pool_head_tasklet);
} }
static inline void tasklet_set_tid(struct tasklet *tl, int tid)
{
tl->tid = tid;
}
void __task_queue(struct task *task, struct eb_root *wq); void __task_queue(struct task *task, struct eb_root *wq);
/* Place <task> into the wait queue, where it may already be. If the expiration /* Place <task> into the wait queue, where it may already be. If the expiration
@ -538,7 +542,7 @@ static inline int thread_has_tasks(void)
{ {
return (!!(global_tasks_mask & tid_bit) | return (!!(global_tasks_mask & tid_bit) |
(task_per_thread[tid].rqueue_size > 0) | (task_per_thread[tid].rqueue_size > 0) |
!LIST_ISEMPTY(&task_per_thread[tid].task_list)); !MT_LIST_ISEMPTY(&task_per_thread[tid].task_list));
} }
/* adds list item <item> to work list <work> and wake up the associated task */ /* adds list item <item> to work list <work> and wake up the associated task */

View File

@ -61,7 +61,7 @@ struct notification {
struct task_per_thread { struct task_per_thread {
struct eb_root timers; /* tree constituting the per-thread wait queue */ struct eb_root timers; /* tree constituting the per-thread wait queue */
struct eb_root rqueue; /* tree constituting the per-thread run queue */ struct eb_root rqueue; /* tree constituting the per-thread run queue */
struct list task_list; /* List of tasks to be run, mixing tasks and tasklets */ struct mt_list task_list; /* List of tasks to be run, mixing tasks and tasklets */
int task_list_size; /* Number of tasks in the task_list */ int task_list_size; /* Number of tasks in the task_list */
int rqueue_size; /* Number of elements in the per-thread run queue */ int rqueue_size; /* Number of elements in the per-thread run queue */
__attribute__((aligned(64))) char end[0]; __attribute__((aligned(64))) char end[0];
@ -94,7 +94,8 @@ struct task {
/* lightweight tasks, without priority, mainly used for I/Os */ /* lightweight tasks, without priority, mainly used for I/Os */
struct tasklet { struct tasklet {
TASK_COMMON; /* must be at the beginning! */ TASK_COMMON; /* must be at the beginning! */
struct list list; struct mt_list list;
int tid; /* TID of the tasklet owner */
}; };
#define TASK_IS_TASKLET(t) ((t)->nice == -32768) #define TASK_IS_TASKLET(t) ((t)->nice == -32768)

View File

@ -1638,6 +1638,8 @@ static int connect_conn_chk(struct task *t)
conn = cs->conn; conn = cs->conn;
/* Maybe there were an older connection we were waiting on */ /* Maybe there were an older connection we were waiting on */
check->wait_list.events = 0; check->wait_list.events = 0;
tasklet_set_tid(check->wait_list.tasklet, tid);
if (!sockaddr_alloc(&conn->dst)) if (!sockaddr_alloc(&conn->dst))
return SF_ERR_RESOURCE; return SF_ERR_RESOURCE;
@ -2891,6 +2893,8 @@ static int tcpcheck_main(struct check *check)
cs_destroy(check->cs); cs_destroy(check->cs);
} }
tasklet_set_tid(check->wait_list.tasklet, tid);
check->cs = cs; check->cs = cs;
conn = cs->conn; conn = cs->conn;
/* Maybe there were an older connection we were waiting on */ /* Maybe there were an older connection we were waiting on */

View File

@ -57,7 +57,7 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid)
!!(global_tasks_mask & thr_bit), !!(global_tasks_mask & thr_bit),
!eb_is_empty(&task_per_thread[thr].timers), !eb_is_empty(&task_per_thread[thr].timers),
!eb_is_empty(&task_per_thread[thr].rqueue), !eb_is_empty(&task_per_thread[thr].rqueue),
!LIST_ISEMPTY(&task_per_thread[thr].task_list), !MT_LIST_ISEMPTY(&task_per_thread[thr].task_list),
task_per_thread[thr].task_list_size, task_per_thread[thr].task_list_size,
task_per_thread[thr].rqueue_size, task_per_thread[thr].rqueue_size,
stuck, stuck,

View File

@ -368,9 +368,11 @@ void process_runnable_tasks()
} }
#endif #endif
/* Make sure the entry doesn't appear to be in a list */
MT_LIST_INIT(&((struct tasklet *)t)->list);
/* And add it to the local task list */ /* And add it to the local task list */
tasklet_insert_into_tasklet_list((struct tasklet *)t); tasklet_insert_into_tasklet_list((struct tasklet *)t);
task_per_thread[tid].task_list_size++; HA_ATOMIC_ADD(&task_per_thread[tid].task_list_size, 1);
activity[tid].tasksw++; activity[tid].tasksw++;
} }
@ -380,18 +382,19 @@ void process_runnable_tasks()
grq = NULL; grq = NULL;
} }
while (max_processed > 0 && !LIST_ISEMPTY(&task_per_thread[tid].task_list)) { while (max_processed > 0 && !MT_LIST_ISEMPTY(&task_per_thread[tid].task_list)) {
struct task *t; struct task *t;
unsigned short state; unsigned short state;
void *ctx; void *ctx;
struct task *(*process)(struct task *t, void *ctx, unsigned short state); struct task *(*process)(struct task *t, void *ctx, unsigned short state);
t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list); t = (struct task *)MT_LIST_POP(&task_per_thread[tid].task_list, struct tasklet *, list);
if (!t)
break;
state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING); state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
__ha_barrier_atomic_store(); __ha_barrier_atomic_store();
__tasklet_remove_from_tasklet_list((struct tasklet *)t);
if (!TASK_IS_TASKLET(t)) if (!TASK_IS_TASKLET(t))
task_per_thread[tid].task_list_size--; _HA_ATOMIC_SUB(&task_per_thread[tid].task_list_size, 1);
ti->flags &= ~TI_FL_STUCK; // this thread is still running ti->flags &= ~TI_FL_STUCK; // this thread is still running
activity[tid].ctxsw++; activity[tid].ctxsw++;
@ -443,7 +446,7 @@ void process_runnable_tasks()
max_processed--; max_processed--;
} }
if (!LIST_ISEMPTY(&task_per_thread[tid].task_list)) if (!MT_LIST_ISEMPTY(&task_per_thread[tid].task_list))
activity[tid].long_rq++; activity[tid].long_rq++;
} }
@ -547,7 +550,7 @@ static void init_task()
#endif #endif
memset(&task_per_thread, 0, sizeof(task_per_thread)); memset(&task_per_thread, 0, sizeof(task_per_thread));
for (i = 0; i < MAX_THREADS; i++) { for (i = 0; i < MAX_THREADS; i++) {
LIST_INIT(&task_per_thread[i].task_list); MT_LIST_INIT(&task_per_thread[i].task_list);
} }
} }