diff --git a/include/haproxy/task.h b/include/haproxy/task.h index 39e5121f4..c73df7c92 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -220,6 +220,42 @@ static inline void _task_wakeup(struct task *t, unsigned int f, const char *file } } +/* Atomically drop the TASK_RUNNING bit while ensuring that any wakeup that + * happened since the flag was set will result in the task being queued (if + * it wasn't already). This is used to safely drop the flag from within the + * scheduler. The flag is combined with existing flags before the test so + * that it's possible to inconditionally wakeup the task and drop the RUNNING + * flag if needed. + */ +#define task_drop_running(t, f) _task_drop_running(t, f, __FILE__, __LINE__) +static inline void _task_drop_running(struct task *t, unsigned int f, const char *file, int line) +{ + unsigned int state, new_state; + + state = _HA_ATOMIC_LOAD(&t->state); + + while (1) { + new_state = state | f; + if (new_state & TASK_WOKEN_ANY) + new_state |= TASK_QUEUED; + + if (_HA_ATOMIC_CAS(&t->state, &state, new_state & ~TASK_RUNNING)) + break; + __ha_cpu_relax(); + } + + if ((new_state & ~state) & TASK_QUEUED) { +#ifdef DEBUG_TASK + if ((unsigned int)t->debug.caller_idx > 1) + ABORT_NOW(); + t->debug.caller_idx = !t->debug.caller_idx; + t->debug.caller_file[t->debug.caller_idx] = file; + t->debug.caller_line[t->debug.caller_idx] = line; +#endif + __task_wakeup(t); + } +} + /* * Unlink the task from the wait queue, and possibly update the last_timer * pointer. A pointer to the task itself is returned. The task *must* already diff --git a/src/task.c b/src/task.c index ff983903c..05a5e3cd2 100644 --- a/src/task.c +++ b/src/task.c @@ -395,12 +395,29 @@ void wake_expired_tasks() } task = eb32_entry(eb, struct task, wq); + + /* Check for any competing run of the task (quite rare but may + * involve a dangerous concurrent access on task->expire). In + * order to protect against this, we'll take an exclusive access + * on TASK_RUNNING before checking/touching task->expire. If the + * task is already RUNNING on another thread, it will deal by + * itself with the requeuing so we must not do anything and + * simply quit the loop for now, because we cannot wait with the + * WQ lock held as this would prevent the running thread from + * requeuing the task. One annoying effect of holding RUNNING + * here is that a concurrent task_wakeup() will refrain from + * waking it up. This forces us to check for a wakeup after + * releasing the flag. + */ + if (HA_ATOMIC_FETCH_OR(&task->state, TASK_RUNNING) & TASK_RUNNING) + break; + if (tick_is_expired(task->expire, now_ms)) { /* expired task, wake it up */ HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock); __task_unlink_wq(task); HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock); - task_wakeup(task, TASK_WOKEN_TIMER); + task_drop_running(task, TASK_WOKEN_TIMER); } else if (task->expire != eb->key) { /* task is not expired but its key doesn't match so let's @@ -411,11 +428,13 @@ void wake_expired_tasks() if (tick_isset(task->expire)) __task_queue(task, &timers); HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock); + task_drop_running(task, 0); goto lookup_next; } else { /* task not expired and correctly placed. It may not be eternal. */ BUG_ON(task->expire == TICK_ETERNITY); + task_drop_running(task, 0); break; } } @@ -582,9 +601,20 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) LIST_DEL_INIT(&((struct tasklet *)t)->list); __ha_barrier_store(); - state = t->state; - while (!_HA_ATOMIC_CAS(&t->state, &state, (state & TASK_PERSISTENT) | TASK_RUNNING)) - ; + /* We must be the exclusive owner of the TASK_RUNNING bit, and + * have to be careful that the task is not being manipulated on + * another thread finding it expired in wake_expired_tasks(). + * The TASK_RUNNING bit will be set during these operations, + * they are extremely rare and do not last long so the best to + * do here is to wait. + */ + state = _HA_ATOMIC_LOAD(&t->state); + do { + while (unlikely(state & TASK_RUNNING)) { + __ha_cpu_relax(); + state = _HA_ATOMIC_LOAD(&t->state); + } + } while (!_HA_ATOMIC_CAS(&t->state, &state, (state & TASK_PERSISTENT) | TASK_RUNNING)); __ha_barrier_atomic_store(); @@ -637,15 +667,15 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) HA_ATOMIC_ADD(&profile_entry->cpu_time, cpu); } - state = _HA_ATOMIC_AND_FETCH(&t->state, ~TASK_RUNNING); + state = _HA_ATOMIC_LOAD(&t->state); if (unlikely(state & TASK_KILLED)) { task_unlink_wq(t); __task_free(t); } - else if (state & TASK_WOKEN_ANY) - task_wakeup(t, 0); - else + else { task_queue(t); + task_drop_running(t, 0); + } } done++; }