diff --git a/include/proto/task.h b/include/proto/task.h index 6d18e38c6..f3cc3eb8c 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -152,7 +152,13 @@ static inline void task_wakeup(struct task *t, unsigned int f) } } -/* change the thread affinity of a task to */ +/* change the thread affinity of a task to . + * This may only be done from within the running task itself or during its + * initialization. It will unqueue and requeue the task from the wait queue + * if it was in it. This is safe against a concurrent task_queue() call because + * task_queue() itself will unlink again if needed after taking into account + * the new thread_mask. + */ static inline void task_set_affinity(struct task *t, unsigned long thread_mask) { if (unlikely(task_in_wq(t))) { @@ -177,15 +183,15 @@ static inline struct task *__task_unlink_wq(struct task *t) } /* remove a task from its wait queue. It may either be the local wait queue if - * the task is bound to a single thread (in which case there's no locking - * involved) or the global queue, with locking. + * the task is bound to a single thread or the global queue. If the task uses a + * shared wait queue, the global wait queue lock is used. */ static inline struct task *task_unlink_wq(struct task *t) { unsigned long locked; if (likely(task_in_wq(t))) { - locked = atleast2(t->thread_mask); + locked = t->state & TASK_SHARED_WQ; if (locked) HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock); __task_unlink_wq(t); @@ -285,7 +291,8 @@ static inline void tasklet_remove_from_tasklet_list(struct tasklet *t) /* * Initialize a new task. The bare minimum is performed (queue pointers and * state). The task is returned. This function should not be used outside of - * task_new(). + * task_new(). If the thread mask contains more than one thread, TASK_SHARED_WQ + * is set. */ static inline struct task *task_init(struct task *t, unsigned long thread_mask) { @@ -293,6 +300,8 @@ static inline struct task *task_init(struct task *t, unsigned long thread_mask) t->rq.node.leaf_p = NULL; t->state = TASK_SLEEPING; t->thread_mask = thread_mask; + if (atleast2(thread_mask)) + t->state |= TASK_SHARED_WQ; t->nice = 0; t->calls = 0; t->call_date = 0; @@ -407,9 +416,9 @@ void __task_queue(struct task *task, struct eb_root *wq); /* Place into the wait queue, where it may already be. If the expiration * timer is infinite, do nothing and rely on wake_expired_task to clean up. - * If the task is bound to a single thread, it's assumed to be bound to the - * current thread's queue and is queued without locking. Otherwise it's queued - * into the global wait queue, protected by locks. + * If the task uses a shared wait queue, it's queued into the global wait queue, + * protected by the global wq_lock, otherwise by it necessarily belongs to the + * current thread'sand is queued without locking. */ static inline void task_queue(struct task *task) { @@ -426,7 +435,7 @@ static inline void task_queue(struct task *task) return; #ifdef USE_THREAD - if (atleast2(task->thread_mask)) { + if (task->state & TASK_SHARED_WQ) { HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock); if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) __task_queue(task, &timers); @@ -434,6 +443,7 @@ static inline void task_queue(struct task *task) } else #endif { + BUG_ON((task->thread_mask & tid_bit) == 0); // should have TASK_SHARED_WQ if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) __task_queue(task, &sched->timers); } @@ -450,7 +460,7 @@ static inline void task_schedule(struct task *task, int when) return; #ifdef USE_THREAD - if (atleast2(task->thread_mask)) { + if (task->state & TASK_SHARED_WQ) { /* FIXME: is it really needed to lock the WQ during the check ? */ HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock); if (task_in_wq(task)) @@ -463,6 +473,7 @@ static inline void task_schedule(struct task *task, int when) } else #endif { + BUG_ON((task->thread_mask & tid_bit) == 0); // should have TASK_SHARED_WQ if (task_in_wq(task)) when = tick_first(when, task->expire); diff --git a/include/types/task.h b/include/types/task.h index f5823fe69..bc516f108 100644 --- a/include/types/task.h +++ b/include/types/task.h @@ -34,6 +34,8 @@ #define TASK_RUNNING 0x0001 /* the task is currently running */ #define TASK_GLOBAL 0x0002 /* The task is currently in the global runqueue */ #define TASK_QUEUED 0x0004 /* The task has been (re-)added to the run queue */ +#define TASK_SHARED_WQ 0x0008 /* The task's expiration may be updated by other + * threads, must be set before first queue/wakeup */ #define TASK_WOKEN_INIT 0x0100 /* woken up for initialisation purposes */ #define TASK_WOKEN_TIMER 0x0200 /* woken up because of expired timer */ diff --git a/src/task.c b/src/task.c index 7a8cf1639..0c26063bc 100644 --- a/src/task.c +++ b/src/task.c @@ -428,7 +428,8 @@ void process_runnable_tasks() struct task *(*process)(struct task *t, void *ctx, unsigned short state); t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list); - state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING); + state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING; + state = _HA_ATOMIC_XCHG(&t->state, state); __ha_barrier_atomic_store(); __tasklet_remove_from_tasklet_list((struct tasklet *)t);