MINOR: threads: Avoid using a thread group mask when stopping.

Remove the "stopped_tgroup_mask" variable, that indicated which thread
groups were stopping, and instead just use "stopped_tgroups", a counter
indicating how many thread groups are stopping. We want to remove all
thread group masks, so that we can increase the maximum number of thread
groups past 64.
This commit is contained in:
Olivier Houchard 2026-01-06 07:48:10 +01:00
parent 529a8dbfba
commit 853604f87a

View File

@ -151,7 +151,8 @@ int pidfd = -1; /* FD to keep PID */
int daemon_fd[2] = {-1, -1}; /* pipe to communicate with parent process */
int devnullfd = -1;
static unsigned long stopping_tgroup_mask; /* Thread groups acknowledging stopping */
static int stopped_tgroups;
static int stop_detected;
/* global options */
struct global global = {
@ -2922,14 +2923,24 @@ void run_poll_loop()
int i;
if (stopping) {
int old_detected;
/* stop muxes/quic-conns before acknowledging stopping */
if (!(tg_ctx->stopping_threads & ti->ltid_bit)) {
task_wakeup(mux_stopping_data[tid].task, TASK_WOKEN_OTHER);
wake = 1;
}
if (_HA_ATOMIC_OR_FETCH(&tg_ctx->stopping_threads, ti->ltid_bit) == ti->ltid_bit &&
_HA_ATOMIC_OR_FETCH(&stopping_tgroup_mask, tg->tgid_bit) == tg->tgid_bit) {
old_detected = stop_detected;
/*
* Check if ze're the first to detect the
* stop
*/
while (old_detected == 0 &&
!_HA_ATOMIC_CAS(&stop_detected, &old_detected, 1));
if (old_detected == 0) {
/* first one to detect it, notify all threads that stopping was just set */
for (i = 0; i < global.nbthread; i++) {
if (_HA_ATOMIC_LOAD(&ha_thread_info[i].tg->threads_enabled) &
@ -2938,28 +2949,26 @@ void run_poll_loop()
wake_thread(i);
}
}
if (!(tg_ctx->stopping_threads & ti->ltid_bit) &&
_HA_ATOMIC_OR_FETCH(&tg_ctx->stopping_threads,
ti->ltid_bit) == tg->threads_enabled) {
/*
* All threads from the thread group
* are stopped, let it been known.
*/
_HA_ATOMIC_INC(&stopped_tgroups);
}
}
/* stop when there's nothing left to do */
if ((jobs - unstoppable_jobs) == 0 &&
(_HA_ATOMIC_LOAD(&stopping_tgroup_mask) & all_tgroups_mask) == all_tgroups_mask) {
/* check that all threads are aware of the stopping status */
for (i = 0; i < global.nbtgroups; i++)
if ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[i].stopping_threads) &
_HA_ATOMIC_LOAD(&ha_tgroup_info[i].threads_enabled)) !=
_HA_ATOMIC_LOAD(&ha_tgroup_info[i].threads_enabled))
break;
(_HA_ATOMIC_LOAD(&stopped_tgroups) == global.nbtgroups)) {
#ifdef USE_THREAD
if (i == global.nbtgroups) {
/* all are OK, let's wake them all and stop */
for (i = 0; i < global.nbthread; i++)
if (i != tid && _HA_ATOMIC_LOAD(&ha_thread_info[i].tg->threads_enabled) & ha_thread_info[i].ltid_bit)
wake_thread(i);
break;
}
#else
break;
for (i = 0; i < global.nbthread; i++)
if (i != tid && _HA_ATOMIC_LOAD(&ha_thread_info[i].tg->threads_enabled) & ha_thread_info[i].ltid_bit)
wake_thread(i);
#endif
break;
}
}
@ -3120,8 +3129,7 @@ void *run_thread_poll_loop(void *data)
#ifdef USE_THREAD
if (!_HA_ATOMIC_AND_FETCH(&ha_tgroup_info[ti->tgid-1].threads_enabled, ~ti->ltid_bit))
_HA_ATOMIC_AND(&all_tgroups_mask, ~tg->tgid_bit);
if (!_HA_ATOMIC_AND_FETCH(&tg_ctx->stopping_threads, ~ti->ltid_bit))
_HA_ATOMIC_AND(&stopping_tgroup_mask, ~tg->tgid_bit);
_HA_ATOMIC_AND_FETCH(&tg_ctx->stopping_threads, ~ti->ltid_bit);
if (tid > 0)
pthread_exit(NULL);
#endif