diff --git a/include/haproxy/fd-t.h b/include/haproxy/fd-t.h index b2ca87cb1..d0c3134cd 100644 --- a/include/haproxy/fd-t.h +++ b/include/haproxy/fd-t.h @@ -71,6 +71,7 @@ enum { #define FD_EXPORTED_BIT 20 /* FD is exported and must not be closed */ #define FD_EXCL_SYSCALL_BIT 21 /* a syscall claims exclusivity on this FD */ #define FD_DISOWN_BIT 22 /* this fd will be closed by some external code */ +#define FD_MUST_CLOSE_BIT 23 /* this fd will be closed by some external code */ /* and flag values */ @@ -112,6 +113,7 @@ enum { #define FD_EXPORTED (1U << FD_EXPORTED_BIT) #define FD_EXCL_SYSCALL (1U << FD_EXCL_SYSCALL_BIT) #define FD_DISOWN (1U << FD_DISOWN_BIT) +#define FD_MUST_CLOSE (1U << FD_MUST_CLOSE_BIT) /* This function is used to report flags in debugging tools. Please reflect * below any single-bit flag addition above in the same order via the diff --git a/src/fd.c b/src/fd.c index 889b31422..706aba829 100644 --- a/src/fd.c +++ b/src/fd.c @@ -419,12 +419,21 @@ void fd_delete(int fd) * will not take new bits in its running_mask so we have the guarantee * that the last thread eliminating running_mask is the one allowed to * safely delete the FD. Most of the time it will be the current thread. + * We still need to set and check the one-shot flag FD_MUST_CLOSE + * to take care of the rare cases where a thread wakes up on late I/O + * before the thread_mask is zero, and sets its bit in the running_mask + * just after the current thread finishes clearing its own bit, hence + * the two threads see themselves as last ones (which they really are). */ HA_ATOMIC_OR(&fdtab[fd].running_mask, ti->ltid_bit); + HA_ATOMIC_OR(&fdtab[fd].state, FD_MUST_CLOSE); HA_ATOMIC_STORE(&fdtab[fd].thread_mask, 0); - if (fd_clr_running(fd) == ti->ltid_bit) - _fd_delete_orphan(fd); + if (fd_clr_running(fd) == ti->ltid_bit) { + if (HA_ATOMIC_BTR(&fdtab[fd].state, FD_MUST_CLOSE_BIT)) { + _fd_delete_orphan(fd); + } + } } /* makes the new fd non-blocking and clears all other O_* flags; this is meant @@ -587,29 +596,47 @@ int fd_update_events(int fd, uint evts) return FD_UPDT_CLOSED; } - /* do nothing if the FD was taken over under us */ + /* Do not take running_mask if not strictly needed (will trigger a + * cosmetic BUG_ON() in fd_insert() anyway if done). + */ + tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask); + if (!(tmask & ti->ltid_bit)) + goto do_update; + + HA_ATOMIC_OR(&fdtab[fd].running_mask, ti->ltid_bit); + + /* From this point, our bit may possibly be in thread_mask, but it may + * still vanish, either because a takeover completed just before taking + * the bit above with the new owner deleting the FD, or because a + * takeover started just before taking the bit. In order to make sure a + * started takeover is complete, we need to verify that all bits of + * running_mask are present in thread_mask, since takeover first takes + * running then atomically replaces thread_mask. Once it's stable, if + * our bit remains there, no further takeover may happen because we + * hold running, but if our bit is not there it means we've lost the + * takeover race and have to decline touching the FD. Regarding the + * risk of deletion, our bit in running_mask prevents fd_delete() from + * finalizing the close, and the caller will leave the FD with a zero + * thread_mask and the FD_MUST_CLOSE flag set. It will then be our + * responsibility to close it. + */ do { - /* make sure we read a synchronous copy of rmask and tmask - * (tmask is only up to date if it reflects all of rmask's - * bits). - */ - do { - rmask = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask); - tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask); - } while (rmask & ~tmask); + rmask = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask); + tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask); + rmask &= ~ti->ltid_bit; + } while (rmask & ~tmask); - if (!(tmask & ti->ltid_bit)) { - /* a takeover has started */ - activity[tid].poll_skip_fd++; + /* Now tmask is stable. Do nothing if the FD was taken over under us */ - /* Let the poller know this FD was lost */ - if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid)) - fd_updt[fd_nbupdt++] = fd; + if (!(tmask & ti->ltid_bit)) { + /* a takeover has started */ + activity[tid].poll_skip_fd++; - fd_drop_tgid(fd); - return FD_UPDT_MIGRATED; - } - } while (!HA_ATOMIC_CAS(&fdtab[fd].running_mask, &rmask, rmask | ti->ltid_bit)); + if (fd_clr_running(fd) == ti->ltid_bit) + goto closed_or_migrated; + + goto do_update; + } /* with running we're safe now, we can drop the reference */ fd_drop_tgid(fd); @@ -706,16 +733,16 @@ int fd_update_events(int fd, uint evts) /* another thread might have attempted to close this FD in the mean * time (e.g. timeout task) striking on a previous thread and closing. - * This is detected by both thread_mask and running_mask being 0 after + * This is detected by us being the last owners of a running_mask bit, + * and the thread_mask being zero. At the moment we release the running + * bit, a takeover may also happen, so in practice we check for our loss + * of the thread_mask bitboth thread_mask and running_mask being 0 after * we remove ourselves last. There is no risk the FD gets reassigned * to a different group since it's not released until the real close() * in _fd_delete_orphan(). */ - if (fd_clr_running(fd) == ti->ltid_bit && !tmask) { - fd_drop_tgid(fd); - _fd_delete_orphan(fd); - return FD_UPDT_CLOSED; - } + if (fd_clr_running(fd) == ti->ltid_bit && !(tmask & ti->ltid_bit)) + goto closed_or_migrated; /* we had to stop this FD and it still must be stopped after the I/O * cb's changes, so let's program an update for this. @@ -729,6 +756,35 @@ int fd_update_events(int fd, uint evts) fd_drop_tgid(fd); return FD_UPDT_DONE; + + closed_or_migrated: + /* We only come here once we've last dropped running and the FD is + * not for us as per !(tmask & tid_bit). It may imply we're + * responsible for closing it. Otherwise it's just a migration. + */ + if (HA_ATOMIC_BTR(&fdtab[fd].state, FD_MUST_CLOSE_BIT)) { + fd_drop_tgid(fd); + _fd_delete_orphan(fd); + return FD_UPDT_CLOSED; + } + + /* So we were alone, no close bit, at best the FD was migrated, at + * worst it's in the process of being closed by another thread. We must + * be ultra-careful as it can be re-inserted by yet another thread as + * the result of socket() or accept(). Let's just tell the poller the + * FD was lost. If it was closed it was already removed and this will + * only cost an update for nothing. + */ + + do_update: + /* The FD is not closed but we don't want the poller to wake up for + * it anymore. + */ + if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid)) + fd_updt[fd_nbupdt++] = fd; + + fd_drop_tgid(fd); + return FD_UPDT_MIGRATED; } /* This is used by pollers at boot time to re-register desired events for