From b2f38c13d1b160ba53f3560459b38b0f2345073d Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Thu, 19 Jan 2023 19:14:18 +0100 Subject: [PATCH] BUG/MINOR: thread: always reload threads_enabled in loops A few loops waiting for threads to synchronize such as thread_isolate() rightfully filter the thread masks via the threads_enabled field that contains the list of enabled threads. However, it doesn't use an atomic load on it. Before 2.7, the equivalent variables were marked as volatile and were always reloaded. In 2.7 they're fields in ha_tgroup_ctx[], and the risk that the compiler keeps them in a register inside a loop is not null at all. In practice when ha_thread_relax() calls sched_yield() or an x86 PAUSE instruction, it could be verified that the variable is always reloaded. If these are avoided (e.g. architecture providing neither solution), it's visible in asm code that the variables are not reloaded. In this case, if a thread exists just between the moment the two values are read, the loop could spin forever. This patch adds the required _HA_ATOMIC_LOAD() on the relevant threads_enabled fields. It must be backported to 2.7. --- include/haproxy/fd.h | 4 ++-- src/fd.c | 3 ++- src/haproxy.c | 9 +++++---- src/listener.c | 2 +- src/proxy.c | 2 +- src/thread.c | 20 +++++++++++++++----- 6 files changed, 26 insertions(+), 14 deletions(-) diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h index 2a5bcad28..984b151e9 100644 --- a/include/haproxy/fd.h +++ b/include/haproxy/fd.h @@ -136,11 +136,11 @@ static inline void done_update_polling(int fd) unsigned long update_mask; update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~ti->ltid_bit); - while ((update_mask & tg->threads_enabled) == 0) { + while ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) == 0) { /* If we were the last one that had to update that entry, remove it from the list */ fd_rm_from_fd_list(&update_list[tgid - 1], fd); update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask); - if ((update_mask & tg->threads_enabled) != 0) { + if ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) != 0) { /* Maybe it's been re-updated in the meanwhile, and we * wrongly removed it from the list, if so, re-add it */ diff --git a/src/fd.c b/src/fd.c index 7e56d8a1f..4d4700f8d 100644 --- a/src/fd.c +++ b/src/fd.c @@ -481,7 +481,8 @@ void updt_fd_polling(const int fd) unsigned long update_mask = fdtab[fd].update_mask; int thr; - while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, ha_tgroup_info[tgrp - 1].threads_enabled)) + while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, + _HA_ATOMIC_LOAD(&ha_tgroup_info[tgrp - 1].threads_enabled))) __ha_cpu_relax(); fd_add_to_fd_list(&update_list[tgrp - 1], fd); diff --git a/src/haproxy.c b/src/haproxy.c index 481fe5a0e..e0b48a752 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -2968,7 +2968,7 @@ void run_poll_loop() _HA_ATOMIC_OR_FETCH(&stopping_tgroup_mask, tg->tgid_bit) == tg->tgid_bit) { /* first one to detect it, notify all threads that stopping was just set */ for (i = 0; i < global.nbthread; i++) { - if (ha_thread_info[i].tg->threads_enabled & + if (_HA_ATOMIC_LOAD(&ha_thread_info[i].tg->threads_enabled) & ha_thread_info[i].ltid_bit & ~_HA_ATOMIC_LOAD(&ha_thread_info[i].tg_ctx->stopping_threads)) wake_thread(i); @@ -2981,14 +2981,15 @@ void run_poll_loop() (_HA_ATOMIC_LOAD(&stopping_tgroup_mask) & all_tgroups_mask) == all_tgroups_mask) { /* check that all threads are aware of the stopping status */ for (i = 0; i < global.nbtgroups; i++) - if ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[i].stopping_threads) & ha_tgroup_info[i].threads_enabled) != - ha_tgroup_info[i].threads_enabled) + if ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[i].stopping_threads) & + _HA_ATOMIC_LOAD(&ha_tgroup_info[i].threads_enabled)) != + _HA_ATOMIC_LOAD(&ha_tgroup_info[i].threads_enabled)) break; #ifdef USE_THREAD if (i == global.nbtgroups) { /* all are OK, let's wake them all and stop */ for (i = 0; i < global.nbthread; i++) - if (i != tid && ha_thread_info[i].tg->threads_enabled & ha_thread_info[i].ltid_bit) + if (i != tid && _HA_ATOMIC_LOAD(&ha_thread_info[i].tg->threads_enabled) & ha_thread_info[i].ltid_bit) wake_thread(i); break; } diff --git a/src/listener.c b/src/listener.c index aa466d05e..4867566a6 100644 --- a/src/listener.c +++ b/src/listener.c @@ -1021,7 +1021,7 @@ void listener_accept(struct listener *l) if (l->rx.flags & RX_F_LOCAL_ACCEPT) goto local_accept; - mask = l->rx.bind_thread & tg->threads_enabled; + mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled); if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ) && !stopping) { struct accept_queue_ring *ring; unsigned int t, t0, t1, t2; diff --git a/src/proxy.c b/src/proxy.c index a0e8c1523..4696dec7c 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -2164,7 +2164,7 @@ struct task *hard_stop(struct task *t, void *context, unsigned int state) send_log(NULL, LOG_WARNING, "Some tasks resisted to hard-stop, exiting now.\n"); killed = 2; for (thr = 0; thr < global.nbthread; thr++) - if (ha_thread_info[thr].tg->threads_enabled & ha_thread_info[thr].ltid_bit) + if (_HA_ATOMIC_LOAD(&ha_thread_info[thr].tg->threads_enabled) & ha_thread_info[thr].ltid_bit) wake_thread(thr); t->expire = TICK_ETERNITY; return t; diff --git a/src/thread.c b/src/thread.c index 04b910b82..00d9f9fba 100644 --- a/src/thread.c +++ b/src/thread.c @@ -104,9 +104,14 @@ void thread_isolate() */ while (1) { for (tgrp = 0; tgrp < global.nbtgroups; tgrp++) { - while ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless) & - ha_tgroup_info[tgrp].threads_enabled) != ha_tgroup_info[tgrp].threads_enabled) + do { + ulong te = _HA_ATOMIC_LOAD(&ha_tgroup_info[tgrp].threads_enabled); + ulong th = _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless); + + if ((th & te) == te) + break; ha_thread_relax(); + } while (1); } /* Now we've seen all threads marked harmless, we can try to run @@ -160,10 +165,15 @@ void thread_isolate_full() */ while (1) { for (tgrp = 0; tgrp < global.nbtgroups; tgrp++) { - while ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless) & - _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_idle) & - ha_tgroup_info[tgrp].threads_enabled) != ha_tgroup_info[tgrp].threads_enabled) + do { + ulong te = _HA_ATOMIC_LOAD(&ha_tgroup_info[tgrp].threads_enabled); + ulong th = _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless); + ulong id = _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_idle); + + if ((th & id & te) == te) + break; ha_thread_relax(); + } while (1); } /* Now we've seen all threads marked harmless and idle, we can