MEDIUM: fd: rely more on fd_update_events() to detect changes

This function already performs a number of checks prior to calling the
IOCB, and detects the change of thread (FD migration). Half of the
controls are still in each poller, and these pollers also maintain
activity counters for various cases.

Note that the unreliable test on thread_mask was removed so that only
the one performed by fd_set_running() is now used, since this one is
reliable.

Let's centralize all that fd-specific logic into the function and make
it return a status among:

  FD_UPDT_DONE,        // update done, nothing else to be done
  FD_UPDT_DEAD,        // FD was already dead, ignore it
  FD_UPDT_CLOSED,      // FD was closed
  FD_UPDT_MIGRATED,    // FD was migrated, ignore it now

Some pollers already used to call it last and have nothing to do after
it, regardless of the result. epoll has to delete the FD in case a
migration is detected. Overall this removes more code than it adds.
This commit is contained in:
Willy Tarreau 2021-07-29 16:57:19 +02:00
parent 84c7922c52
commit 200bd50b73
8 changed files with 66 additions and 70 deletions

View File

@ -110,6 +110,14 @@ enum {
#define FD_EXPORTED (1U << FD_EXPORTED_BIT)
#define FD_EXCL_SYSCALL (1U << FD_EXCL_SYSCALL_BIT)
/* FD update status after fd_update_events() */
enum {
FD_UPDT_DONE = 0, // update done, nothing else to be done
FD_UPDT_DEAD, // FD was already dead, ignore it
FD_UPDT_CLOSED, // FD was closed
FD_UPDT_MIGRATED, // FD was migrated, ignore it now
};
/* This is the value used to mark a file descriptor as dead. This value is
* negative, this is important so that tests on fd < 0 properly match. It
* also has the nice property of being highly negative but neither overflowing

View File

@ -117,7 +117,7 @@ void run_poller();
void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off);
void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
void updt_fd_polling(const int fd);
void fd_update_events(int fd, uint evts);
int fd_update_events(int fd, uint evts);
/* Called from the poller to acknowledge we read an entry from the global
* update list, to remove our bit from the update_mask, and remove it from

View File

@ -218,6 +218,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (count = 0; count < status; count++) {
struct epoll_event ev;
unsigned int n, e;
int ret;
e = epoll_events[count].events;
fd = epoll_events[count].data.fd;
@ -228,27 +229,20 @@ static void _do_poll(struct poller *p, int exp, int wake)
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
if (!fdtab[fd].owner) {
activity[tid].poll_dead_fd++;
continue;
}
if (!(fdtab[fd].thread_mask & tid_bit)) {
/* FD has been migrated */
activity[tid].poll_skip_fd++;
epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
continue;
}
n = ((e & EPOLLIN) ? FD_EV_READY_R : 0) |
((e & EPOLLOUT) ? FD_EV_READY_W : 0) |
((e & EPOLLRDHUP) ? FD_EV_SHUT_R : 0) |
((e & EPOLLHUP) ? FD_EV_SHUT_RW : 0) |
((e & EPOLLERR) ? FD_EV_ERR_RW : 0);
fd_update_events(fd, n);
ret = fd_update_events(fd, n);
if (ret == FD_UPDT_MIGRATED) {
/* FD has been migrated */
epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
}
}
/* the caller will take care of cached events */
}

View File

@ -213,24 +213,14 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (i = 0; i < nevlist; i++) {
unsigned int n = 0;
int events, rebind_events;
int ret;
fd = evports_evlist[i].portev_object;
events = evports_evlist[i].portev_events;
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
if (fdtab[fd].owner == NULL) {
activity[tid].poll_dead_fd++;
continue;
}
if (!(fdtab[fd].thread_mask & tid_bit)) {
activity[tid].poll_skip_fd++;
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
fd_updt[fd_nbupdt++] = fd;
continue;
}
/*
* By virtue of receiving an event for this file descriptor, it
* is no longer associated with the port in question. Store
@ -255,13 +245,24 @@ static void _do_poll(struct poller *p, int exp, int wake)
* entry. If it changes, the fd will be placed on the updated
* list for processing the next time we are called.
*/
fd_update_events(fd, n);
ret = fd_update_events(fd, n);
/* If the FD was already dead , skip it */
if (ret == FD_UPDT_DEAD)
continue;
/* disable polling on this instance if the FD was migrated */
if (ret == FD_UPDT_MIGRATED) {
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
fd_updt[fd_nbupdt++] = fd;
continue;
}
/*
* This file descriptor was closed during the processing of
* polled events. No need to reassociate.
*/
if (fdtab[fd].owner == NULL)
if (ret == FD_UPDT_CLOSED)
continue;
/*

View File

@ -181,23 +181,13 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (count = 0; count < status; count++) {
unsigned int n = 0;
int ret;
fd = kev[count].ident;
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
if (!fdtab[fd].owner) {
activity[tid].poll_dead_fd++;
continue;
}
if (!(fdtab[fd].thread_mask & tid_bit)) {
activity[tid].poll_skip_fd++;
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
fd_updt[fd_nbupdt++] = fd;
continue;
}
if (kev[count].filter == EVFILT_READ) {
if (kev[count].data || !(kev[count].flags & EV_EOF))
n |= FD_EV_READY_R;
@ -210,7 +200,13 @@ static void _do_poll(struct poller *p, int exp, int wake)
n |= FD_EV_ERR_RW;
}
fd_update_events(fd, n);
ret = fd_update_events(fd, n);
if (ret == FD_UPDT_MIGRATED) {
/* FD was migrated, let's stop polling it */
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
fd_updt[fd_nbupdt++] = fd;
}
}
}

View File

@ -215,6 +215,7 @@ static void _do_poll(struct poller *p, int exp, int wake)
for (count = 0; status > 0 && count < nbfd; count++) {
unsigned int n;
int ret;
int e = poll_events[count].revents;
fd = poll_events[count].fd;
@ -230,27 +231,20 @@ static void _do_poll(struct poller *p, int exp, int wake)
/* ok, we found one active fd */
status--;
if (!fdtab[fd].owner) {
activity[tid].poll_dead_fd++;
continue;
}
if (!(fdtab[fd].thread_mask & tid_bit)) {
activity[tid].poll_skip_fd++;
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
fd_updt[fd_nbupdt++] = fd;
continue;
}
n = ((e & POLLIN) ? FD_EV_READY_R : 0) |
((e & POLLOUT) ? FD_EV_READY_W : 0) |
((e & POLLRDHUP) ? FD_EV_SHUT_R : 0) |
((e & POLLHUP) ? FD_EV_SHUT_RW : 0) |
((e & POLLERR) ? FD_EV_ERR_RW : 0);
fd_update_events(fd, n);
}
ret = fd_update_events(fd, n);
if (ret == FD_UPDT_MIGRATED) {
/* FD was migrated, let's stop polling it */
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
fd_updt[fd_nbupdt++] = fd;
}
}
}

View File

@ -209,15 +209,6 @@ static void _do_poll(struct poller *p, int exp, int wake)
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
if (!fdtab[fd].owner) {
activity[tid].poll_dead_fd++;
continue;
}
if (!(fdtab[fd].thread_mask & tid_bit)) {
activity[tid].poll_skip_fd++;
continue;
}
fd_update_events(fd, n);
}

View File

@ -448,9 +448,10 @@ void updt_fd_polling(const int fd)
/* Update events seen for FD <fd> and its state if needed. This should be
* called by the poller, passing FD_EV_*_{R,W,RW} in <evts>. FD_EV_ERR_*
* doesn't need to also pass FD_EV_SHUT_*, it's implied. ERR and SHUT are
* allowed to be reported regardless of R/W readiness.
* allowed to be reported regardless of R/W readiness. Returns one of
* FD_UPDT_*.
*/
void fd_update_events(int fd, uint evts)
int fd_update_events(int fd, uint evts)
{
unsigned long locked;
uint old, new;
@ -458,9 +459,17 @@ void fd_update_events(int fd, uint evts)
ti->flags &= ~TI_FL_STUCK; // this thread is still running
/* do nothing on remains of an old dead FD */
if (!fdtab[fd].owner) {
activity[tid].poll_dead_fd++;
return FD_UPDT_DEAD;
}
/* do nothing if the FD was taken over under us */
if (fd_set_running(fd) == -1)
return;
if (fd_set_running(fd) == -1) {
activity[tid].poll_skip_fd++;
return FD_UPDT_MIGRATED;
}
locked = (fdtab[fd].thread_mask != tid_bit);
@ -523,6 +532,7 @@ void fd_update_events(int fd, uint evts)
if ((fdtab[fd].running_mask & tid_bit) &&
fd_clr_running(fd) == 0 && !fdtab[fd].thread_mask) {
_fd_delete_orphan(fd);
return FD_UPDT_CLOSED;
}
/* we had to stop this FD and it still must be stopped after the I/O
@ -534,6 +544,8 @@ void fd_update_events(int fd, uint evts)
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
fd_updt[fd_nbupdt++] = fd;
}
return FD_UPDT_DONE;
}
/* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>