From ebc78d78a27ac3de7308eeb499c51d638e79ed6b Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Sat, 20 Jan 2018 23:53:50 +0100 Subject: [PATCH] BUG/MEDIUM: fd: maintain a per-thread update mask Since the fd update tables are per-thread, we need to have a bit per thread to indicate whether an update exists, otherwise this can lead to lost update events every time multiple threads want to update the same FD. In practice *for now*, it only happens at start time when listeners are enabled and ask for polling after facing their first EAGAIN. But since the pollers are still shared, a lost event is still recovered by a neighbor thread. This will not reliably work anymore with per-thread pollers, where it has been observed a few times on startup that a single-threaded listener would not always accept incoming connections upon startup. It's worth noting that during this code review it appeared that the "new" flag in the fdtab isn't used anymore. This fix should be backported to 1.8. --- include/proto/fd.h | 6 +++--- include/types/fd.h | 2 +- src/cli.c | 5 ++--- src/ev_epoll.c | 2 +- src/ev_kqueue.c | 2 +- src/ev_poll.c | 2 +- src/ev_select.c | 2 +- src/fd.c | 2 +- src/stream.c | 4 ++-- 9 files changed, 13 insertions(+), 14 deletions(-) diff --git a/include/proto/fd.h b/include/proto/fd.h index 44370e768..d6b591d19 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -99,10 +99,10 @@ void fd_process_cached_events(); */ static inline void updt_fd_polling(const int fd) { - if (fdtab[fd].updated) + if (fdtab[fd].update_mask & tid_bit) /* already scheduled for update */ return; - fdtab[fd].updated = 1; + fdtab[fd].update_mask |= tid_bit; fd_updt[fd_nbupdt++] = fd; } @@ -400,7 +400,7 @@ static inline void fd_insert(int fd, unsigned long thread_mask) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fdtab[fd].ev = 0; fdtab[fd].new = 1; - fdtab[fd].updated = 0; + fdtab[fd].update_mask &= ~tid_bit; fdtab[fd].linger_risk = 0; fdtab[fd].cloned = 0; fdtab[fd].cache = 0; diff --git a/include/types/fd.h b/include/types/fd.h index 032bab967..54192e478 100644 --- a/include/types/fd.h +++ b/include/types/fd.h @@ -94,13 +94,13 @@ enum fd_states { struct fdtab { __decl_hathreads(HA_SPINLOCK_T lock); unsigned long thread_mask; /* mask of thread IDs authorized to process the task */ + unsigned long update_mask; /* mask of thread IDs having an update for fd */ void (*iocb)(int fd); /* I/O handler */ void *owner; /* the connection or listener associated with this fd, NULL if closed */ unsigned int cache; /* position+1 in the FD cache. 0=not in cache. */ unsigned char state; /* FD state for read and write directions (2*3 bits) */ unsigned char ev; /* event seen in return of poll() : FD_POLL_* */ unsigned char new:1; /* 1 if this fd has just been created */ - unsigned char updated:1; /* 1 if this fd is already in the update list */ unsigned char linger_risk:1; /* 1 if we must kill lingering before closing */ unsigned char cloned:1; /* 1 if a cloned socket, requires EPOLL_CTL_DEL on close */ }; diff --git a/src/cli.c b/src/cli.c index 3e62c311b..d5c615bb1 100644 --- a/src/cli.c +++ b/src/cli.c @@ -794,7 +794,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx) li = fdt.owner; chunk_printf(&trash, - " %5d : st=0x%02x(R:%c%c%c W:%c%c%c) ev=0x%02x(%c%c%c%c%c) [%c%c%c%c] cache=%u owner=%p iocb=%p(%s) tmask=0x%lx", + " %5d : st=0x%02x(R:%c%c%c W:%c%c%c) ev=0x%02x(%c%c%c%c%c) [%c%c%c] cache=%u owner=%p iocb=%p(%s) tmask=0x%lx umask=0x%lx", fd, fdt.state, (fdt.state & FD_EV_POLLED_R) ? 'P' : 'p', @@ -810,7 +810,6 @@ static int cli_io_handler_show_fd(struct appctx *appctx) (fdt.ev & FD_POLL_PRI) ? 'P' : 'p', (fdt.ev & FD_POLL_IN) ? 'I' : 'i', fdt.new ? 'N' : 'n', - fdt.updated ? 'U' : 'u', fdt.linger_risk ? 'L' : 'l', fdt.cloned ? 'C' : 'c', fdt.cache, @@ -820,7 +819,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx) (fdt.iocb == dgram_fd_handler) ? "dgram_fd_handler" : (fdt.iocb == listener_accept) ? "listener_accept" : "unknown", - fdt.thread_mask); + fdt.thread_mask, fdt.update_mask); if (fdt.iocb == conn_fd_handler) { chunk_appendf(&trash, " cflg=0x%08x", conn_flags); diff --git a/src/ev_epoll.c b/src/ev_epoll.c index 679dfee4d..f37455faf 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -74,7 +74,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) } HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fdtab[fd].updated = 0; + fdtab[fd].update_mask &= ~tid_bit; fdtab[fd].new = 0; eo = fdtab[fd].state; diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index 69d51b6b0..20fa29084 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -53,7 +53,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) } HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fdtab[fd].updated = 0; + fdtab[fd].update_mask &= ~tid_bit; fdtab[fd].new = 0; eo = fdtab[fd].state; diff --git a/src/ev_poll.c b/src/ev_poll.c index efd56ee19..f24bf69a9 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -79,7 +79,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) } HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fdtab[fd].updated = 0; + fdtab[fd].update_mask &= ~tid_bit; fdtab[fd].new = 0; eo = fdtab[fd].state; diff --git a/src/ev_select.c b/src/ev_select.c index 52c445473..19b13805c 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -61,7 +61,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) } HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); - fdtab[fd].updated = 0; + fdtab[fd].update_mask &= ~tid_bit; fdtab[fd].new = 0; eo = fdtab[fd].state; diff --git a/src/fd.c b/src/fd.c index 8411bcfb9..112806bbb 100644 --- a/src/fd.c +++ b/src/fd.c @@ -199,7 +199,7 @@ static void fd_dodelete(int fd, int do_close) port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port); fdinfo[fd].port_range = NULL; fdtab[fd].owner = NULL; - fdtab[fd].updated = 0; + fdtab[fd].update_mask &= ~tid_bit; fdtab[fd].new = 0; fdtab[fd].thread_mask = 0; if (do_close) diff --git a/src/stream.c b/src/stream.c index ebe41be19..92f9c0a64 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2906,7 +2906,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st conn->handle.fd, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0, - conn->handle.fd >= 0 ? fdtab[conn->handle.fd].updated : 0, + conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0); } else if ((tmpctx = objt_appctx(strm->si[0].end)) != NULL) { @@ -2939,7 +2939,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st conn->handle.fd, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0, - conn->handle.fd >= 0 ? fdtab[conn->handle.fd].updated : 0, + conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0, conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0); } else if ((tmpctx = objt_appctx(strm->si[1].end)) != NULL) {