diff --git a/include/haproxy/fd-t.h b/include/haproxy/fd-t.h index 4759ef2c6..99231503a 100644 --- a/include/haproxy/fd-t.h +++ b/include/haproxy/fd-t.h @@ -110,6 +110,14 @@ enum { #define FD_EXPORTED (1U << FD_EXPORTED_BIT) #define FD_EXCL_SYSCALL (1U << FD_EXCL_SYSCALL_BIT) +/* FD update status after fd_update_events() */ +enum { + FD_UPDT_DONE = 0, // update done, nothing else to be done + FD_UPDT_DEAD, // FD was already dead, ignore it + FD_UPDT_CLOSED, // FD was closed + FD_UPDT_MIGRATED, // FD was migrated, ignore it now +}; + /* This is the value used to mark a file descriptor as dead. This value is * negative, this is important so that tests on fd < 0 properly match. It * also has the nice property of being highly negative but neither overflowing diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h index 7d9d0e649..8e1a442a3 100644 --- a/include/haproxy/fd.h +++ b/include/haproxy/fd.h @@ -117,7 +117,7 @@ void run_poller(); void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off); void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off); void updt_fd_polling(const int fd); -void fd_update_events(int fd, uint evts); +int fd_update_events(int fd, uint evts); /* Called from the poller to acknowledge we read an entry from the global * update list, to remove our bit from the update_mask, and remove it from diff --git a/src/ev_epoll.c b/src/ev_epoll.c index 8810b7785..1de2e0fc4 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -218,6 +218,7 @@ static void _do_poll(struct poller *p, int exp, int wake) for (count = 0; count < status; count++) { struct epoll_event ev; unsigned int n, e; + int ret; e = epoll_events[count].events; fd = epoll_events[count].data.fd; @@ -228,27 +229,20 @@ static void _do_poll(struct poller *p, int exp, int wake) #ifdef DEBUG_FD _HA_ATOMIC_INC(&fdtab[fd].event_count); #endif - if (!fdtab[fd].owner) { - activity[tid].poll_dead_fd++; - continue; - } - - if (!(fdtab[fd].thread_mask & tid_bit)) { - /* FD has been migrated */ - activity[tid].poll_skip_fd++; - epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev); - _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); - _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); - continue; - } - n = ((e & EPOLLIN) ? FD_EV_READY_R : 0) | ((e & EPOLLOUT) ? FD_EV_READY_W : 0) | ((e & EPOLLRDHUP) ? FD_EV_SHUT_R : 0) | ((e & EPOLLHUP) ? FD_EV_SHUT_RW : 0) | ((e & EPOLLERR) ? FD_EV_ERR_RW : 0); - fd_update_events(fd, n); + ret = fd_update_events(fd, n); + + if (ret == FD_UPDT_MIGRATED) { + /* FD has been migrated */ + epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev); + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); + } } /* the caller will take care of cached events */ } diff --git a/src/ev_evports.c b/src/ev_evports.c index 109e59c61..c7bf4f6f7 100644 --- a/src/ev_evports.c +++ b/src/ev_evports.c @@ -213,24 +213,14 @@ static void _do_poll(struct poller *p, int exp, int wake) for (i = 0; i < nevlist; i++) { unsigned int n = 0; int events, rebind_events; + int ret; + fd = evports_evlist[i].portev_object; events = evports_evlist[i].portev_events; #ifdef DEBUG_FD _HA_ATOMIC_INC(&fdtab[fd].event_count); #endif - if (fdtab[fd].owner == NULL) { - activity[tid].poll_dead_fd++; - continue; - } - - if (!(fdtab[fd].thread_mask & tid_bit)) { - activity[tid].poll_skip_fd++; - if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) - fd_updt[fd_nbupdt++] = fd; - continue; - } - /* * By virtue of receiving an event for this file descriptor, it * is no longer associated with the port in question. Store @@ -255,13 +245,24 @@ static void _do_poll(struct poller *p, int exp, int wake) * entry. If it changes, the fd will be placed on the updated * list for processing the next time we are called. */ - fd_update_events(fd, n); + ret = fd_update_events(fd, n); + + /* If the FD was already dead , skip it */ + if (ret == FD_UPDT_DEAD) + continue; + + /* disable polling on this instance if the FD was migrated */ + if (ret == FD_UPDT_MIGRATED) { + if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) + fd_updt[fd_nbupdt++] = fd; + continue; + } /* * This file descriptor was closed during the processing of * polled events. No need to reassociate. */ - if (fdtab[fd].owner == NULL) + if (ret == FD_UPDT_CLOSED) continue; /* diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index d51a833ed..ce71484de 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -181,23 +181,13 @@ static void _do_poll(struct poller *p, int exp, int wake) for (count = 0; count < status; count++) { unsigned int n = 0; + int ret; + fd = kev[count].ident; #ifdef DEBUG_FD _HA_ATOMIC_INC(&fdtab[fd].event_count); #endif - if (!fdtab[fd].owner) { - activity[tid].poll_dead_fd++; - continue; - } - - if (!(fdtab[fd].thread_mask & tid_bit)) { - activity[tid].poll_skip_fd++; - if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) - fd_updt[fd_nbupdt++] = fd; - continue; - } - if (kev[count].filter == EVFILT_READ) { if (kev[count].data || !(kev[count].flags & EV_EOF)) n |= FD_EV_READY_R; @@ -210,7 +200,13 @@ static void _do_poll(struct poller *p, int exp, int wake) n |= FD_EV_ERR_RW; } - fd_update_events(fd, n); + ret = fd_update_events(fd, n); + + if (ret == FD_UPDT_MIGRATED) { + /* FD was migrated, let's stop polling it */ + if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) + fd_updt[fd_nbupdt++] = fd; + } } } diff --git a/src/ev_poll.c b/src/ev_poll.c index 6bd0cf473..bb9d8f87a 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -215,6 +215,7 @@ static void _do_poll(struct poller *p, int exp, int wake) for (count = 0; status > 0 && count < nbfd; count++) { unsigned int n; + int ret; int e = poll_events[count].revents; fd = poll_events[count].fd; @@ -230,27 +231,20 @@ static void _do_poll(struct poller *p, int exp, int wake) /* ok, we found one active fd */ status--; - if (!fdtab[fd].owner) { - activity[tid].poll_dead_fd++; - continue; - } - - if (!(fdtab[fd].thread_mask & tid_bit)) { - activity[tid].poll_skip_fd++; - if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) - fd_updt[fd_nbupdt++] = fd; - continue; - } - n = ((e & POLLIN) ? FD_EV_READY_R : 0) | ((e & POLLOUT) ? FD_EV_READY_W : 0) | ((e & POLLRDHUP) ? FD_EV_SHUT_R : 0) | ((e & POLLHUP) ? FD_EV_SHUT_RW : 0) | ((e & POLLERR) ? FD_EV_ERR_RW : 0); - fd_update_events(fd, n); - } + ret = fd_update_events(fd, n); + if (ret == FD_UPDT_MIGRATED) { + /* FD was migrated, let's stop polling it */ + if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) + fd_updt[fd_nbupdt++] = fd; + } + } } diff --git a/src/ev_select.c b/src/ev_select.c index 3e5ee5a25..c143bd916 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -209,15 +209,6 @@ static void _do_poll(struct poller *p, int exp, int wake) #ifdef DEBUG_FD _HA_ATOMIC_INC(&fdtab[fd].event_count); #endif - if (!fdtab[fd].owner) { - activity[tid].poll_dead_fd++; - continue; - } - - if (!(fdtab[fd].thread_mask & tid_bit)) { - activity[tid].poll_skip_fd++; - continue; - } fd_update_events(fd, n); } diff --git a/src/fd.c b/src/fd.c index febe45161..df9b5deb2 100644 --- a/src/fd.c +++ b/src/fd.c @@ -448,9 +448,10 @@ void updt_fd_polling(const int fd) /* Update events seen for FD and its state if needed. This should be * called by the poller, passing FD_EV_*_{R,W,RW} in . FD_EV_ERR_* * doesn't need to also pass FD_EV_SHUT_*, it's implied. ERR and SHUT are - * allowed to be reported regardless of R/W readiness. + * allowed to be reported regardless of R/W readiness. Returns one of + * FD_UPDT_*. */ -void fd_update_events(int fd, uint evts) +int fd_update_events(int fd, uint evts) { unsigned long locked; uint old, new; @@ -458,9 +459,17 @@ void fd_update_events(int fd, uint evts) ti->flags &= ~TI_FL_STUCK; // this thread is still running + /* do nothing on remains of an old dead FD */ + if (!fdtab[fd].owner) { + activity[tid].poll_dead_fd++; + return FD_UPDT_DEAD; + } + /* do nothing if the FD was taken over under us */ - if (fd_set_running(fd) == -1) - return; + if (fd_set_running(fd) == -1) { + activity[tid].poll_skip_fd++; + return FD_UPDT_MIGRATED; + } locked = (fdtab[fd].thread_mask != tid_bit); @@ -523,6 +532,7 @@ void fd_update_events(int fd, uint evts) if ((fdtab[fd].running_mask & tid_bit) && fd_clr_running(fd) == 0 && !fdtab[fd].thread_mask) { _fd_delete_orphan(fd); + return FD_UPDT_CLOSED; } /* we had to stop this FD and it still must be stopped after the I/O @@ -534,6 +544,8 @@ void fd_update_events(int fd, uint evts) if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid)) fd_updt[fd_nbupdt++] = fd; } + + return FD_UPDT_DONE; } /* Tries to send parts from followed by parts from