MEDIUM: pollers: Drop fd events after a takeover to another tgid.

In pollers that support it, provide the generation number in addition to
the fd, and, when an event happened, if the generation number is the
same, but the tgid changed, then assumed the fd was taken over by a
thread from another thread group, and just delete the event from the
current thread's poller, as we no longer want to hear about it.
This commit is contained in:
Olivier Houchard 2025-02-25 18:28:45 +01:00 committed by Olivier Houchard
parent c36aae2af1
commit d31b1650ae
3 changed files with 39 additions and 6 deletions

View File

@ -315,7 +315,15 @@ static void _do_poll(struct poller *p, int exp, int wake)
COUNT_IF(1, "epoll report of event on a just closed fd (harmless)");
}
continue;
}
} else if (fd_tgid(fd) != tgid) {
struct epoll_event ev;
/*
* We've been taken over by another thread from
* another thread group, give up.
*/
epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
continue;
}
if ((e & EPOLLRDHUP) && !(cur_poller.flags & HAP_POLL_F_RDHUP))
_HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP);

View File

@ -58,7 +58,7 @@ static inline void evports_resync_fd(int fd, int events)
if (events == 0)
port_dissociate(evports_fd[tid], PORT_SOURCE_FD, fd);
else
port_associate(evports_fd[tid], PORT_SOURCE_FD, fd, events, NULL);
port_associate(evports_fd[tid], PORT_SOURCE_FD, fd, events, (void *)(uintptr_t)fdtab[fd].generation);
}
static void _update_fd(int fd)
@ -251,10 +251,21 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (i = 0; i < nevlist; i++) {
unsigned int n = 0;
unsigned int generation = (unsigned int)(uintptr_t)evports_evlist[i].portev_user;
int events, rebind_events;
int ret;
fd = evports_evlist[i].portev_object;
if (generation == fdtab[fd].generation && fd_tgid(fd) != tgid) {
/*
* The FD was taken over by another tgid, forget about
* it.
*/
port_dissociate(evports_fd[tid], PORT_SOURCE_FD, fd);
continue;
}
events = evports_evlist[i].portev_events;
#ifdef DEBUG_FD

View File

@ -60,23 +60,23 @@ static int _update_fd(int fd, int start)
if (en & FD_EV_ACTIVE_R) {
if (!(pr & ti->ltid_bit)) {
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
}
}
else if (pr & ti->ltid_bit) {
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
}
if (en & FD_EV_ACTIVE_W) {
if (!(ps & ti->ltid_bit)) {
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
}
else if (ps & ti->ltid_bit) {
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
@ -208,12 +208,26 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (count = 0; count < status; count++) {
unsigned int n = 0;
unsigned int generation = (unsigned int)(uintptr_t)kev[count].udata;
fd = kev[count].ident;
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
if (generation == fdtab[fd].generation && fd_tgid(fd) != tgid) {
struct kevent tmpkev[2];
/*
* The FD was taken over by another tgid, forget about
* it.
*/
EV_SET(&tmpkev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
EV_SET(&tmpkev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation);
kevent(kqueue_fd[tid], tmpkev, 2, NULL, 0, &timeout_ts);
continue;
}
if (kev[count].filter == EVFILT_READ) {
if (kev[count].data || !(kev[count].flags & EV_EOF))
n |= FD_EV_READY_R;