diff --git a/include/haproxy/task.h b/include/haproxy/task.h index 5d6501344..552f08ad5 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -106,6 +106,7 @@ __decl_thread(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue __decl_thread(extern HA_RWLOCK_T wq_lock); /* RW lock related to the wait queue */ void __tasklet_wakeup_on(struct tasklet *tl, int thr); +struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl); void task_kill(struct task *t); void tasklet_kill(struct tasklet *t); void __task_wakeup(struct task *t); @@ -458,6 +459,40 @@ static inline void _task_instant_wakeup(struct task *t, unsigned int f, const ch __tasklet_wakeup_on(tl, thr); } +/* schedules tasklet to run immediately after the current one is done + * will be queued after entry , or at the head of the task list. Return + * the new head to be used to queue future tasks. This is used to insert multiple entries + * at the head of the tasklet list, typically to transfer processing from a tasklet + * to another one or a set of other ones. If is NULL, the tasklet list of + * thread will be used. + * With DEBUG_TASK, the : from the call place are stored into the tasklet + * for tracing purposes. + */ +#define tasklet_wakeup_after(head, tl) _tasklet_wakeup_after(head, tl, __FILE__, __LINE__) +static inline struct list *_tasklet_wakeup_after(struct list *head, struct tasklet *tl, + const char *file, int line) +{ + unsigned int state = tl->state; + + do { + /* do nothing if someone else already added it */ + if (state & TASK_IN_LIST) + return head; + } while (!_HA_ATOMIC_CAS(&tl->state, &state, state | TASK_IN_LIST)); + + /* at this point we're the first one to add this task to the list */ +#ifdef DEBUG_TASK + if ((unsigned int)tl->debug.caller_idx > 1) + ABORT_NOW(); + tl->debug.caller_idx = !tl->debug.caller_idx; + tl->debug.caller_file[tl->debug.caller_idx] = file; + tl->debug.caller_line[tl->debug.caller_idx] = line; + if (th_ctx->flags & TH_FL_TASK_PROFILING) + tl->call_date = now_mono_time(); +#endif + return __tasklet_wakeup_after(head, tl); +} + /* This macro shows the current function name and the last known caller of the * task (or tasklet) wakeup. */ diff --git a/src/task.c b/src/task.c index 515a43d65..19df26391 100644 --- a/src/task.c +++ b/src/task.c @@ -181,6 +181,44 @@ void __tasklet_wakeup_on(struct tasklet *tl, int thr) } } +/* Do not call this one, please use tasklet_wakeup_after_on() instead, as this one is + * the slow path of tasklet_wakeup_after() which performs some preliminary checks + * and sets TASK_IN_LIST before calling this one. + */ +struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl) +{ + BUG_ON(tid != tl->tid); + /* this tasklet runs on the caller thread */ + if (!head) { + if (tl->state & TASK_HEAVY) { + LIST_INSERT(&th_ctx->tasklets[TL_HEAVY], &tl->list); + th_ctx->tl_class_mask |= 1 << TL_HEAVY; + } + else if (tl->state & TASK_SELF_WAKING) { + LIST_INSERT(&th_ctx->tasklets[TL_BULK], &tl->list); + th_ctx->tl_class_mask |= 1 << TL_BULK; + } + else if ((struct task *)tl == th_ctx->current) { + _HA_ATOMIC_OR(&tl->state, TASK_SELF_WAKING); + LIST_INSERT(&th_ctx->tasklets[TL_BULK], &tl->list); + th_ctx->tl_class_mask |= 1 << TL_BULK; + } + else if (th_ctx->current_queue < 0) { + LIST_INSERT(&th_ctx->tasklets[TL_URGENT], &tl->list); + th_ctx->tl_class_mask |= 1 << TL_URGENT; + } + else { + LIST_INSERT(&th_ctx->tasklets[th_ctx->current_queue], &tl->list); + th_ctx->tl_class_mask |= 1 << th_ctx->current_queue; + } + } + else { + LIST_APPEND(head, &tl->list); + } + _HA_ATOMIC_INC(&th_ctx->rq_total); + return &tl->list; +} + /* Puts the task in run queue at a position depending on t->nice. is * returned. The nice value assigns boosts in 32th of the run queue size. A * nice value of -1024 sets the task to -tasks_run_queue*32, while a nice value