diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h index 9fbc358e2..8c558189d 100644 --- a/include/haproxy/fd.h +++ b/include/haproxy/fd.h @@ -431,7 +431,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), int tgid BUG_ON(fdtab[fd].state != 0); BUG_ON(fdtab[fd].refc_tgid != 0); - thread_mask &= all_threads_mask; + thread_mask &= tg->threads_enabled; BUG_ON(thread_mask == 0); fdtab[fd].owner = owner; diff --git a/src/ev_epoll.c b/src/ev_epoll.c index 13ebc4f4d..77ac0c067 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -99,7 +99,7 @@ static void _update_fd(int fd) en |= FD_EV_ACTIVE_R; if ((ps | pr) & ti->ltid_bit) { - if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) { + if (!(fdtab[fd].thread_mask & ti->ltid_bit) || !(en & FD_EV_ACTIVE_RW)) { /* fd removed from poll list */ opcode = EPOLL_CTL_DEL; if (pr & ti->ltid_bit) @@ -129,7 +129,7 @@ static void _update_fd(int fd) opcode = EPOLL_CTL_MOD; } } - else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_ACTIVE_RW)) { + else if ((fdtab[fd].thread_mask & ti->ltid_bit) && (en & FD_EV_ACTIVE_RW)) { /* new fd in the poll list */ opcode = EPOLL_CTL_ADD; if (en & FD_EV_ACTIVE_R) diff --git a/src/ev_evports.c b/src/ev_evports.c index a8fbc13fc..06e7a01c6 100644 --- a/src/ev_evports.c +++ b/src/ev_evports.c @@ -71,7 +71,7 @@ static void _update_fd(int fd) pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv); ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send); - if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) { + if (!(fdtab[fd].thread_mask & ti->ltid_bit) || !(en & FD_EV_ACTIVE_RW)) { if (!((pr | ps) & ti->ltid_bit)) { /* fd was not watched, it's still not */ return; diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index 70ee2ade1..c37d99b1a 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -42,7 +42,7 @@ static int _update_fd(int fd, int start) pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv); ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send); - if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) { + if (!(fdtab[fd].thread_mask & ti->ltid_bit) || !(en & FD_EV_ACTIVE_RW)) { if (!((pr | ps) & ti->ltid_bit)) { /* fd was not watched, it's still not */ return changes; diff --git a/src/ev_poll.c b/src/ev_poll.c index 214781370..790ea5bfa 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -192,7 +192,7 @@ static void _do_poll(struct poller *p, int exp, int wake) continue; } - if (!(fdtab[fd].thread_mask & tid_bit)) { + if (!(fdtab[fd].thread_mask & ti->ltid_bit)) { continue; } diff --git a/src/fd.c b/src/fd.c index 56a771175..d412c2858 100644 --- a/src/fd.c +++ b/src/fd.c @@ -440,7 +440,7 @@ int fd_takeover(int fd, void *expected_owner) } /* success, from now on it's ours */ - HA_ATOMIC_STORE(&fdtab[fd].thread_mask, tid_bit); + HA_ATOMIC_STORE(&fdtab[fd].thread_mask, ti->ltid_bit); /* Make sure the FD doesn't have the active bit. It is possible that * the fd is polled by the thread that used to own it, the new thread @@ -458,7 +458,7 @@ int fd_takeover(int fd, void *expected_owner) void updt_fd_polling(const int fd) { - if (all_threads_mask == 1UL || (fdtab[fd].thread_mask & all_threads_mask) == tid_bit) { + if (tg->threads_enabled == 1UL || (fdtab[fd].thread_mask & tg->threads_enabled) == ti->ltid_bit) { if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid)) return; @@ -472,12 +472,13 @@ void updt_fd_polling(const int fd) fd_add_to_fd_list(&update_list[tgid - 1], fd); - if (fd_active(fd) && !(fdtab[fd].thread_mask & tid_bit)) { + if (fd_active(fd) && !(fdtab[fd].thread_mask & ti->ltid_bit)) { /* we need to wake up another thread to handle it immediately, any will fit, * so let's pick a random one so that it doesn't always end up on the same. */ - int thr = one_among_mask(fdtab[fd].thread_mask & all_threads_mask, + int thr = one_among_mask(fdtab[fd].thread_mask & tg->threads_enabled, statistical_prng_range(MAX_THREADS)); + thr += ha_tgroup_info[tgid - 1].base; wake_thread(thr); } } @@ -520,7 +521,7 @@ int fd_update_events(int fd, uint evts) tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask); } while (rmask & ~tmask); - if (!(tmask & tid_bit)) { + if (!(tmask & ti->ltid_bit)) { /* a takeover has started */ activity[tid].poll_skip_fd++; @@ -536,7 +537,7 @@ int fd_update_events(int fd, uint evts) /* with running we're safe now, we can drop the reference */ fd_drop_tgid(fd); - locked = (tmask != tid_bit); + locked = (tmask != ti->ltid_bit); /* OK now we are guaranteed that our thread_mask was present and * that we're allowed to update the FD.