diff --git a/src/ev_epoll.c b/src/ev_epoll.c index 694b538a5..1f99e7ceb 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -315,7 +315,15 @@ static void _do_poll(struct poller *p, int exp, int wake) COUNT_IF(1, "epoll report of event on a just closed fd (harmless)"); } continue; - } + } else if (fd_tgid(fd) != tgid) { + struct epoll_event ev; + /* + * We've been taken over by another thread from + * another thread group, give up. + */ + epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev); + continue; + } if ((e & EPOLLRDHUP) && !(cur_poller.flags & HAP_POLL_F_RDHUP)) _HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP); diff --git a/src/ev_evports.c b/src/ev_evports.c index 645f3edb1..ddbd6acbb 100644 --- a/src/ev_evports.c +++ b/src/ev_evports.c @@ -58,7 +58,7 @@ static inline void evports_resync_fd(int fd, int events) if (events == 0) port_dissociate(evports_fd[tid], PORT_SOURCE_FD, fd); else - port_associate(evports_fd[tid], PORT_SOURCE_FD, fd, events, NULL); + port_associate(evports_fd[tid], PORT_SOURCE_FD, fd, events, (void *)(uintptr_t)fdtab[fd].generation); } static void _update_fd(int fd) @@ -251,10 +251,21 @@ static void _do_poll(struct poller *p, int exp, int wake) for (i = 0; i < nevlist; i++) { unsigned int n = 0; + unsigned int generation = (unsigned int)(uintptr_t)evports_evlist[i].portev_user; int events, rebind_events; int ret; fd = evports_evlist[i].portev_object; + + if (generation == fdtab[fd].generation && fd_tgid(fd) != tgid) { + /* + * The FD was taken over by another tgid, forget about + * it. + */ + port_dissociate(evports_fd[tid], PORT_SOURCE_FD, fd); + continue; + } + events = evports_evlist[i].portev_events; #ifdef DEBUG_FD diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index 125f3cd52..bf84fa61d 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -60,23 +60,23 @@ static int _update_fd(int fd, int start) if (en & FD_EV_ACTIVE_R) { if (!(pr & ti->ltid_bit)) { - EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, (void *)(uintptr_t)fdtab[fd].generation); _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit); } } else if (pr & ti->ltid_bit) { - EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation); HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit); } if (en & FD_EV_ACTIVE_W) { if (!(ps & ti->ltid_bit)) { - EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *)(uintptr_t)fdtab[fd].generation); _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit); } } else if (ps & ti->ltid_bit) { - EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation); _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit); } @@ -208,12 +208,26 @@ static void _do_poll(struct poller *p, int exp, int wake) for (count = 0; count < status; count++) { unsigned int n = 0; + unsigned int generation = (unsigned int)(uintptr_t)kev[count].udata; fd = kev[count].ident; #ifdef DEBUG_FD _HA_ATOMIC_INC(&fdtab[fd].event_count); #endif + if (generation == fdtab[fd].generation && fd_tgid(fd) != tgid) { + struct kevent tmpkev[2]; + + /* + * The FD was taken over by another tgid, forget about + * it. + */ + EV_SET(&tmpkev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation); + EV_SET(&tmpkev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void *)(uintptr_t)fdtab[fd].generation); + kevent(kqueue_fd[tid], tmpkev, 2, NULL, 0, &timeout_ts); + continue; + } + if (kev[count].filter == EVFILT_READ) { if (kev[count].data || !(kev[count].flags & EV_EOF)) n |= FD_EV_READY_R;