BUG/MINOR: thread: always reload threads_enabled in loops

A few loops waiting for threads to synchronize such as thread_isolate()
rightfully filter the thread masks via the threads_enabled field that
contains the list of enabled threads. However, it doesn't use an atomic
load on it. Before 2.7, the equivalent variables were marked as volatile
and were always reloaded. In 2.7 they're fields in ha_tgroup_ctx[], and
the risk that the compiler keeps them in a register inside a loop is not
null at all. In practice when ha_thread_relax() calls sched_yield() or
an x86 PAUSE instruction, it could be verified that the variable is
always reloaded. If these are avoided (e.g. architecture providing
neither solution), it's visible in asm code that the variables are not
reloaded. In this case, if a thread exists just between the moment the
two values are read, the loop could spin forever.

This patch adds the required _HA_ATOMIC_LOAD() on the relevant
threads_enabled fields. It must be backported to 2.7.
This commit is contained in:
Willy Tarreau 2023-01-19 19:14:18 +01:00
parent ad90110338
commit b2f38c13d1
6 changed files with 26 additions and 14 deletions

View File

@ -136,11 +136,11 @@ static inline void done_update_polling(int fd)
unsigned long update_mask;
update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~ti->ltid_bit);
while ((update_mask & tg->threads_enabled) == 0) {
while ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) == 0) {
/* If we were the last one that had to update that entry, remove it from the list */
fd_rm_from_fd_list(&update_list[tgid - 1], fd);
update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask);
if ((update_mask & tg->threads_enabled) != 0) {
if ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) != 0) {
/* Maybe it's been re-updated in the meanwhile, and we
* wrongly removed it from the list, if so, re-add it
*/

View File

@ -481,7 +481,8 @@ void updt_fd_polling(const int fd)
unsigned long update_mask = fdtab[fd].update_mask;
int thr;
while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, ha_tgroup_info[tgrp - 1].threads_enabled))
while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask,
_HA_ATOMIC_LOAD(&ha_tgroup_info[tgrp - 1].threads_enabled)))
__ha_cpu_relax();
fd_add_to_fd_list(&update_list[tgrp - 1], fd);

View File

@ -2968,7 +2968,7 @@ void run_poll_loop()
_HA_ATOMIC_OR_FETCH(&stopping_tgroup_mask, tg->tgid_bit) == tg->tgid_bit) {
/* first one to detect it, notify all threads that stopping was just set */
for (i = 0; i < global.nbthread; i++) {
if (ha_thread_info[i].tg->threads_enabled &
if (_HA_ATOMIC_LOAD(&ha_thread_info[i].tg->threads_enabled) &
ha_thread_info[i].ltid_bit &
~_HA_ATOMIC_LOAD(&ha_thread_info[i].tg_ctx->stopping_threads))
wake_thread(i);
@ -2981,14 +2981,15 @@ void run_poll_loop()
(_HA_ATOMIC_LOAD(&stopping_tgroup_mask) & all_tgroups_mask) == all_tgroups_mask) {
/* check that all threads are aware of the stopping status */
for (i = 0; i < global.nbtgroups; i++)
if ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[i].stopping_threads) & ha_tgroup_info[i].threads_enabled) !=
ha_tgroup_info[i].threads_enabled)
if ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[i].stopping_threads) &
_HA_ATOMIC_LOAD(&ha_tgroup_info[i].threads_enabled)) !=
_HA_ATOMIC_LOAD(&ha_tgroup_info[i].threads_enabled))
break;
#ifdef USE_THREAD
if (i == global.nbtgroups) {
/* all are OK, let's wake them all and stop */
for (i = 0; i < global.nbthread; i++)
if (i != tid && ha_thread_info[i].tg->threads_enabled & ha_thread_info[i].ltid_bit)
if (i != tid && _HA_ATOMIC_LOAD(&ha_thread_info[i].tg->threads_enabled) & ha_thread_info[i].ltid_bit)
wake_thread(i);
break;
}

View File

@ -1021,7 +1021,7 @@ void listener_accept(struct listener *l)
if (l->rx.flags & RX_F_LOCAL_ACCEPT)
goto local_accept;
mask = l->rx.bind_thread & tg->threads_enabled;
mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled);
if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ) && !stopping) {
struct accept_queue_ring *ring;
unsigned int t, t0, t1, t2;

View File

@ -2164,7 +2164,7 @@ struct task *hard_stop(struct task *t, void *context, unsigned int state)
send_log(NULL, LOG_WARNING, "Some tasks resisted to hard-stop, exiting now.\n");
killed = 2;
for (thr = 0; thr < global.nbthread; thr++)
if (ha_thread_info[thr].tg->threads_enabled & ha_thread_info[thr].ltid_bit)
if (_HA_ATOMIC_LOAD(&ha_thread_info[thr].tg->threads_enabled) & ha_thread_info[thr].ltid_bit)
wake_thread(thr);
t->expire = TICK_ETERNITY;
return t;

View File

@ -104,9 +104,14 @@ void thread_isolate()
*/
while (1) {
for (tgrp = 0; tgrp < global.nbtgroups; tgrp++) {
while ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless) &
ha_tgroup_info[tgrp].threads_enabled) != ha_tgroup_info[tgrp].threads_enabled)
do {
ulong te = _HA_ATOMIC_LOAD(&ha_tgroup_info[tgrp].threads_enabled);
ulong th = _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless);
if ((th & te) == te)
break;
ha_thread_relax();
} while (1);
}
/* Now we've seen all threads marked harmless, we can try to run
@ -160,10 +165,15 @@ void thread_isolate_full()
*/
while (1) {
for (tgrp = 0; tgrp < global.nbtgroups; tgrp++) {
while ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless) &
_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_idle) &
ha_tgroup_info[tgrp].threads_enabled) != ha_tgroup_info[tgrp].threads_enabled)
do {
ulong te = _HA_ATOMIC_LOAD(&ha_tgroup_info[tgrp].threads_enabled);
ulong th = _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless);
ulong id = _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_idle);
if ((th & id & te) == te)
break;
ha_thread_relax();
} while (1);
}
/* Now we've seen all threads marked harmless and idle, we can