mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-11-29 06:40:59 +01:00
MEDIUM: task: Use the new _HA_ATOMIC_* macros.
Use the new _HA_ATOMIC_* macros and add barriers where needed.
This commit is contained in:
parent
aa4d71a7fe
commit
4c28328572
@ -154,7 +154,7 @@ static inline void task_wakeup(struct task *t, unsigned int f)
|
|||||||
|
|
||||||
f |= TASK_QUEUED;
|
f |= TASK_QUEUED;
|
||||||
state = t->state;
|
state = t->state;
|
||||||
while (!HA_ATOMIC_CAS(&t->state, &state, state | f))
|
while (!_HA_ATOMIC_CAS(&t->state, &state, state | f))
|
||||||
;
|
;
|
||||||
if (!(state & TASK_QUEUED))
|
if (!(state & TASK_QUEUED))
|
||||||
__task_wakeup(t, root);
|
__task_wakeup(t, root);
|
||||||
@ -206,17 +206,17 @@ static inline struct task *task_unlink_wq(struct task *t)
|
|||||||
*/
|
*/
|
||||||
static inline struct task *__task_unlink_rq(struct task *t)
|
static inline struct task *__task_unlink_rq(struct task *t)
|
||||||
{
|
{
|
||||||
HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
_HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
||||||
#ifdef USE_THREAD
|
#ifdef USE_THREAD
|
||||||
if (t->state & TASK_GLOBAL) {
|
if (t->state & TASK_GLOBAL) {
|
||||||
HA_ATOMIC_AND(&t->state, ~TASK_GLOBAL);
|
_HA_ATOMIC_AND(&t->state, ~TASK_GLOBAL);
|
||||||
global_rqueue_size--;
|
global_rqueue_size--;
|
||||||
} else
|
} else
|
||||||
#endif
|
#endif
|
||||||
task_per_thread[tid].rqueue_size--;
|
task_per_thread[tid].rqueue_size--;
|
||||||
eb32sc_delete(&t->rq);
|
eb32sc_delete(&t->rq);
|
||||||
if (likely(t->nice))
|
if (likely(t->nice))
|
||||||
HA_ATOMIC_SUB(&niced_tasks, 1);
|
_HA_ATOMIC_SUB(&niced_tasks, 1);
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,8 +247,8 @@ static inline void tasklet_wakeup(struct tasklet *tl)
|
|||||||
return;
|
return;
|
||||||
LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list);
|
LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list);
|
||||||
task_per_thread[tid].task_list_size++;
|
task_per_thread[tid].task_list_size++;
|
||||||
HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
_HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
||||||
HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -261,9 +261,9 @@ static inline void task_insert_into_tasklet_list(struct task *t)
|
|||||||
* another runqueue. We set leaf_p to 0x1 to indicate that the node is
|
* another runqueue. We set leaf_p to 0x1 to indicate that the node is
|
||||||
* not in a tree but that it's in the tasklet list. See task_in_rq().
|
* not in a tree but that it's in the tasklet list. See task_in_rq().
|
||||||
*/
|
*/
|
||||||
if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, (void *)0x1)))
|
if (unlikely(!_HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, (void *)0x1)))
|
||||||
return;
|
return;
|
||||||
HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
||||||
task_per_thread[tid].task_list_size++;
|
task_per_thread[tid].task_list_size++;
|
||||||
tl = (struct tasklet *)t;
|
tl = (struct tasklet *)t;
|
||||||
LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list);
|
LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list);
|
||||||
@ -274,8 +274,8 @@ static inline void task_remove_from_task_list(struct task *t)
|
|||||||
LIST_DEL_INIT(&((struct tasklet *)t)->list);
|
LIST_DEL_INIT(&((struct tasklet *)t)->list);
|
||||||
task_per_thread[tid].task_list_size--;
|
task_per_thread[tid].task_list_size--;
|
||||||
if (!TASK_IS_TASKLET(t))
|
if (!TASK_IS_TASKLET(t))
|
||||||
HA_ATOMIC_STORE(&t->rq.node.leaf_p, NULL); // was 0x1
|
_HA_ATOMIC_STORE(&t->rq.node.leaf_p, NULL); // was 0x1
|
||||||
HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
_HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -337,7 +337,7 @@ static inline struct task *task_new(unsigned long thread_mask)
|
|||||||
{
|
{
|
||||||
struct task *t = pool_alloc(pool_head_task);
|
struct task *t = pool_alloc(pool_head_task);
|
||||||
if (t) {
|
if (t) {
|
||||||
HA_ATOMIC_ADD(&nb_tasks, 1);
|
_HA_ATOMIC_ADD(&nb_tasks, 1);
|
||||||
task_init(t, thread_mask);
|
task_init(t, thread_mask);
|
||||||
}
|
}
|
||||||
return t;
|
return t;
|
||||||
@ -352,7 +352,7 @@ static inline void __task_free(struct task *t)
|
|||||||
pool_free(pool_head_task, t);
|
pool_free(pool_head_task, t);
|
||||||
if (unlikely(stopping))
|
if (unlikely(stopping))
|
||||||
pool_flush(pool_head_task);
|
pool_flush(pool_head_task);
|
||||||
HA_ATOMIC_SUB(&nb_tasks, 1);
|
_HA_ATOMIC_SUB(&nb_tasks, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void task_free(struct task *t)
|
static inline void task_free(struct task *t)
|
||||||
@ -370,7 +370,7 @@ static inline void tasklet_free(struct tasklet *tl)
|
|||||||
{
|
{
|
||||||
if (!LIST_ISEMPTY(&tl->list)) {
|
if (!LIST_ISEMPTY(&tl->list)) {
|
||||||
task_per_thread[tid].task_list_size--;
|
task_per_thread[tid].task_list_size--;
|
||||||
HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
_HA_ATOMIC_SUB(&tasks_run_queue, 1);
|
||||||
}
|
}
|
||||||
LIST_DEL(&tl->list);
|
LIST_DEL(&tl->list);
|
||||||
|
|
||||||
|
|||||||
28
src/task.c
28
src/task.c
@ -85,7 +85,7 @@ void __task_wakeup(struct task *t, struct eb_root *root)
|
|||||||
* in the meanwhile.
|
* in the meanwhile.
|
||||||
*/
|
*/
|
||||||
redo:
|
redo:
|
||||||
if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, (void *)0x1))) {
|
if (unlikely(!_HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, (void *)0x1))) {
|
||||||
#ifdef USE_THREAD
|
#ifdef USE_THREAD
|
||||||
if (root == &rqueue)
|
if (root == &rqueue)
|
||||||
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||||
@ -118,21 +118,21 @@ redo:
|
|||||||
#endif
|
#endif
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
||||||
#ifdef USE_THREAD
|
#ifdef USE_THREAD
|
||||||
if (root == &rqueue) {
|
if (root == &rqueue) {
|
||||||
HA_ATOMIC_OR(&global_tasks_mask, t->thread_mask);
|
_HA_ATOMIC_OR(&global_tasks_mask, t->thread_mask);
|
||||||
__ha_barrier_atomic_store();
|
__ha_barrier_atomic_store();
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
old_active_mask = active_tasks_mask;
|
old_active_mask = active_tasks_mask;
|
||||||
HA_ATOMIC_OR(&active_tasks_mask, t->thread_mask);
|
_HA_ATOMIC_OR(&active_tasks_mask, t->thread_mask);
|
||||||
t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1);
|
t->rq.key = _HA_ATOMIC_ADD(&rqueue_ticks, 1);
|
||||||
|
|
||||||
if (likely(t->nice)) {
|
if (likely(t->nice)) {
|
||||||
int offset;
|
int offset;
|
||||||
|
|
||||||
HA_ATOMIC_ADD(&niced_tasks, 1);
|
_HA_ATOMIC_ADD(&niced_tasks, 1);
|
||||||
if (likely(t->nice > 0))
|
if (likely(t->nice > 0))
|
||||||
offset = (unsigned)((*rq_size * (unsigned int)t->nice) / 32U);
|
offset = (unsigned)((*rq_size * (unsigned int)t->nice) / 32U);
|
||||||
else
|
else
|
||||||
@ -147,7 +147,7 @@ redo:
|
|||||||
#ifdef USE_THREAD
|
#ifdef USE_THREAD
|
||||||
if (root == &rqueue) {
|
if (root == &rqueue) {
|
||||||
global_rqueue_size++;
|
global_rqueue_size++;
|
||||||
HA_ATOMIC_OR(&t->state, TASK_GLOBAL);
|
_HA_ATOMIC_OR(&t->state, TASK_GLOBAL);
|
||||||
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||||
} else
|
} else
|
||||||
#endif
|
#endif
|
||||||
@ -347,7 +347,7 @@ void process_runnable_tasks()
|
|||||||
*/
|
*/
|
||||||
rq_next = eb32sc_first(&rqueue, tid_bit);
|
rq_next = eb32sc_first(&rqueue, tid_bit);
|
||||||
if (!rq_next) {
|
if (!rq_next) {
|
||||||
HA_ATOMIC_AND(&global_tasks_mask, ~tid_bit);
|
_HA_ATOMIC_AND(&global_tasks_mask, ~tid_bit);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -392,7 +392,7 @@ void process_runnable_tasks()
|
|||||||
t = eb32sc_entry(rq_next, struct task, rq);
|
t = eb32sc_entry(rq_next, struct task, rq);
|
||||||
rq_next = eb32sc_next(rq_next, tid_bit);
|
rq_next = eb32sc_next(rq_next, tid_bit);
|
||||||
/* Make sure nobody re-adds the task in the runqueue */
|
/* Make sure nobody re-adds the task in the runqueue */
|
||||||
HA_ATOMIC_OR(&t->state, TASK_RUNNING);
|
_HA_ATOMIC_OR(&t->state, TASK_RUNNING);
|
||||||
|
|
||||||
/* detach the task from the queue */
|
/* detach the task from the queue */
|
||||||
__task_unlink_rq(t);
|
__task_unlink_rq(t);
|
||||||
@ -400,10 +400,10 @@ void process_runnable_tasks()
|
|||||||
task_insert_into_tasklet_list(t);
|
task_insert_into_tasklet_list(t);
|
||||||
}
|
}
|
||||||
if (!(global_tasks_mask & tid_bit) && task_per_thread[tid].rqueue_size == 0) {
|
if (!(global_tasks_mask & tid_bit) && task_per_thread[tid].rqueue_size == 0) {
|
||||||
HA_ATOMIC_AND(&active_tasks_mask, ~tid_bit);
|
_HA_ATOMIC_AND(&active_tasks_mask, ~tid_bit);
|
||||||
__ha_barrier_atomic_load();
|
__ha_barrier_atomic_load();
|
||||||
if (global_tasks_mask & tid_bit)
|
if (global_tasks_mask & tid_bit)
|
||||||
HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
_HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
||||||
}
|
}
|
||||||
while (max_processed > 0 && !LIST_ISEMPTY(&task_per_thread[tid].task_list)) {
|
while (max_processed > 0 && !LIST_ISEMPTY(&task_per_thread[tid].task_list)) {
|
||||||
struct task *t;
|
struct task *t;
|
||||||
@ -412,7 +412,7 @@ void process_runnable_tasks()
|
|||||||
struct task *(*process)(struct task *t, void *ctx, unsigned short state);
|
struct task *(*process)(struct task *t, void *ctx, unsigned short state);
|
||||||
|
|
||||||
t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list);
|
t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list);
|
||||||
state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
|
state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
|
||||||
__ha_barrier_atomic_store();
|
__ha_barrier_atomic_store();
|
||||||
task_remove_from_task_list(t);
|
task_remove_from_task_list(t);
|
||||||
|
|
||||||
@ -448,7 +448,7 @@ void process_runnable_tasks()
|
|||||||
t->call_date = 0;
|
t->call_date = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
state = HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
|
state = _HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
|
||||||
if (state)
|
if (state)
|
||||||
#ifdef USE_THREAD
|
#ifdef USE_THREAD
|
||||||
__task_wakeup(t, ((t->thread_mask & all_threads_mask) == tid_bit) ?
|
__task_wakeup(t, ((t->thread_mask & all_threads_mask) == tid_bit) ?
|
||||||
@ -462,7 +462,7 @@ void process_runnable_tasks()
|
|||||||
|
|
||||||
max_processed--;
|
max_processed--;
|
||||||
if (max_processed <= 0) {
|
if (max_processed <= 0) {
|
||||||
HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
_HA_ATOMIC_OR(&active_tasks_mask, tid_bit);
|
||||||
activity[tid].long_rq++;
|
activity[tid].long_rq++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user