From 9464bb1f05b5e0046716b4573a567d3450ac7604 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Tue, 5 Jul 2022 05:16:13 +0200 Subject: [PATCH] MEDIUM: fd: add the tgid to the fd and pass it to fd_insert() The file descriptors will need to know the thread group ID in addition to the mask. This extends fd_insert() to take the tgid, and will store it into the FD. In the FD, the tgid is stored as a combination of tgid on the lower 16 bits and a refcount on the higher 16 bits. This allows to know when it's really possible to trust the tgid and the running mask. If a refcount is higher than 1 it indeed indicates another thread else might be in the process of updating these values. Since a closed FD must necessarily have a zero refcount, a test was added to fd_insert() to make sure that it is the case. --- include/haproxy/fd-t.h | 6 ++++++ include/haproxy/fd.h | 14 +++++++++++--- src/dns.c | 2 +- src/fd.c | 5 +++-- src/proto_sockpair.c | 2 +- src/sock.c | 2 +- src/sock_inet.c | 2 +- src/sock_unix.c | 2 +- src/ssl_sock.c | 2 +- 9 files changed, 26 insertions(+), 11 deletions(-) diff --git a/include/haproxy/fd-t.h b/include/haproxy/fd-t.h index de91f1776..cecf797fe 100644 --- a/include/haproxy/fd-t.h +++ b/include/haproxy/fd-t.h @@ -153,6 +153,11 @@ struct fdlist { /* info about one given fd. Note: only align on cache lines when using threads; * 32-bit small archs can put everything in 32-bytes when threads are disabled. + * refc_tgid is an atomic 32-bit composite value made of 16 higher bits + * containing a refcount on tgid and the running_mask, and 16 lower bits + * containing a thread group ID. The tgid may only be changed when refc is zero + * and running may only be checked/changed when refc is held and shows the + * reader is alone. An FD with tgid zero belongs to nobody. */ struct fdtab { unsigned long running_mask; /* mask of thread IDs currently using the fd */ @@ -162,6 +167,7 @@ struct fdtab { void (*iocb)(int fd); /* I/O handler */ void *owner; /* the connection or listener associated with this fd, NULL if closed */ unsigned int state; /* FD state for read and write directions (FD_EV_*) + FD_POLL_* */ + unsigned int refc_tgid; /* refcounted tgid, updated atomically */ #ifdef DEBUG_FD unsigned int event_count; /* number of events reported */ #endif diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h index b9f3803b4..ac1a41a48 100644 --- a/include/haproxy/fd.h +++ b/include/haproxy/fd.h @@ -314,6 +314,12 @@ static inline void fd_want_send(int fd) updt_fd_polling(fd); } +/* returns the tgid from an fd (masks the refcount) */ +static forceinline int fd_tgid(int fd) +{ + return _HA_ATOMIC_LOAD(&fdtab[fd].refc_tgid) & 0xFFFF; +} + /* remove tid_bit from the fd's running mask and returns the bits that remain * after the atomic operation. */ @@ -322,10 +328,10 @@ static inline long fd_clr_running(int fd) return _HA_ATOMIC_AND_FETCH(&fdtab[fd].running_mask, ~tid_bit); } -/* Prepares for being polled on all permitted threads (these will then be - * refined to only cover running ones). +/* Prepares for being polled on all permitted threads of this group ID + * (these will then be refined to only cover running ones). */ -static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned long thread_mask) +static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), int tgid, unsigned long thread_mask) { extern void sock_conn_iocb(int); @@ -335,6 +341,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned BUG_ON(fd < 0 || fd >= global.maxsock); BUG_ON(fdtab[fd].owner != NULL); BUG_ON(fdtab[fd].state != 0); + BUG_ON(fdtab[fd].refc_tgid != 0); thread_mask &= all_threads_mask; BUG_ON(thread_mask == 0); @@ -342,6 +349,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned fdtab[fd].owner = owner; fdtab[fd].iocb = iocb; fdtab[fd].state = 0; + fdtab[fd].refc_tgid = tgid; #ifdef DEBUG_FD fdtab[fd].event_count = 0; #endif diff --git a/src/dns.c b/src/dns.c index 75949b59b..a77e71fdf 100644 --- a/src/dns.c +++ b/src/dns.c @@ -74,7 +74,7 @@ static int dns_connect_nameserver(struct dns_nameserver *ns) /* Add the fd in the fd list and update its parameters */ dgram->t.sock.fd = fd; - fd_insert(fd, dgram, dgram_fd_handler, all_threads_mask); + fd_insert(fd, dgram, dgram_fd_handler, tgid, all_threads_mask); fd_want_recv(fd); return 0; } diff --git a/src/fd.c b/src/fd.c index 9757d0ad7..f93c11719 100644 --- a/src/fd.c +++ b/src/fd.c @@ -329,6 +329,7 @@ void _fd_delete_orphan(int fd) polled_mask[fd].poll_recv = polled_mask[fd].poll_send = 0; fdtab[fd].state = 0; + fdtab[fd].refc_tgid = 0; #ifdef DEBUG_FD fdtab[fd].event_count = 0; @@ -824,8 +825,8 @@ static int init_pollers_per_thread() poller_rd_pipe = mypipe[0]; poller_wr_pipe[tid] = mypipe[1]; fd_set_nonblock(poller_rd_pipe); - fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler, tid_bit); - fd_insert(poller_wr_pipe[tid], poller_pipe_io_handler, poller_pipe_io_handler, tid_bit); + fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler, tgid, tid_bit); + fd_insert(poller_wr_pipe[tid], poller_pipe_io_handler, poller_pipe_io_handler, tgid, tid_bit); fd_want_recv(poller_rd_pipe); fd_stop_both(poller_wr_pipe[tid]); return 1; diff --git a/src/proto_sockpair.c b/src/proto_sockpair.c index 0798283b9..282a094bf 100644 --- a/src/proto_sockpair.c +++ b/src/proto_sockpair.c @@ -157,7 +157,7 @@ int sockpair_bind_receiver(struct receiver *rx, char **errmsg) rx->flags |= RX_F_BOUND; - fd_insert(rx->fd, rx->owner, rx->iocb, rx->bind_thread); + fd_insert(rx->fd, rx->owner, rx->iocb, rx->bind_tgroup, rx->bind_thread); return err; bind_return: diff --git a/src/sock.c b/src/sock.c index 602e9c55b..43e30df1d 100644 --- a/src/sock.c +++ b/src/sock.c @@ -703,7 +703,7 @@ void sock_accept_iocb(int fd) void sock_conn_ctrl_init(struct connection *conn) { BUG_ON(conn->flags & CO_FL_FDLESS); - fd_insert(conn->handle.fd, conn, sock_conn_iocb, tid_bit); + fd_insert(conn->handle.fd, conn, sock_conn_iocb, tgid, tid_bit); } /* This completes the release of connection by removing its FD from the diff --git a/src/sock_inet.c b/src/sock_inet.c index 9e1451ad9..d517e4c42 100644 --- a/src/sock_inet.c +++ b/src/sock_inet.c @@ -390,7 +390,7 @@ int sock_inet_bind_receiver(struct receiver *rx, char **errmsg) rx->fd = fd; rx->flags |= RX_F_BOUND; - fd_insert(fd, rx->owner, rx->iocb, rx->bind_thread); + fd_insert(fd, rx->owner, rx->iocb, rx->bind_tgroup, rx->bind_thread); /* for now, all regularly bound TCP listeners are exportable */ if (!(rx->flags & RX_F_INHERITED)) diff --git a/src/sock_unix.c b/src/sock_unix.c index 026d86c57..05d9be7a4 100644 --- a/src/sock_unix.c +++ b/src/sock_unix.c @@ -284,7 +284,7 @@ int sock_unix_bind_receiver(struct receiver *rx, char **errmsg) rx->fd = fd; rx->flags |= RX_F_BOUND; - fd_insert(fd, rx->owner, rx->iocb, rx->bind_thread); + fd_insert(fd, rx->owner, rx->iocb, rx->bind_tgroup, rx->bind_thread); /* for now, all regularly bound TCP listeners are exportable */ if (!(rx->flags & RX_F_INHERITED)) diff --git a/src/ssl_sock.c b/src/ssl_sock.c index 3c89aaa8d..00f024023 100644 --- a/src/ssl_sock.c +++ b/src/ssl_sock.c @@ -824,7 +824,7 @@ static inline void ssl_async_process_fds(struct ssl_sock_ctx *ctx) /* We add new fds to the fdtab */ for (i=0 ; i < num_add_fds ; i++) { - fd_insert(add_fd[i], ctx, ssl_async_fd_handler, tid_bit); + fd_insert(add_fd[i], ctx, ssl_async_fd_handler, tgid, tid_bit); } num_add_fds = 0;