mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 15:47:01 +02:00
MEDIUM: fd: add the tgid to the fd and pass it to fd_insert()
The file descriptors will need to know the thread group ID in addition to the mask. This extends fd_insert() to take the tgid, and will store it into the FD. In the FD, the tgid is stored as a combination of tgid on the lower 16 bits and a refcount on the higher 16 bits. This allows to know when it's really possible to trust the tgid and the running mask. If a refcount is higher than 1 it indeed indicates another thread else might be in the process of updating these values. Since a closed FD must necessarily have a zero refcount, a test was added to fd_insert() to make sure that it is the case.
This commit is contained in:
parent
512dd2dc1c
commit
9464bb1f05
@ -153,6 +153,11 @@ struct fdlist {
|
||||
|
||||
/* info about one given fd. Note: only align on cache lines when using threads;
|
||||
* 32-bit small archs can put everything in 32-bytes when threads are disabled.
|
||||
* refc_tgid is an atomic 32-bit composite value made of 16 higher bits
|
||||
* containing a refcount on tgid and the running_mask, and 16 lower bits
|
||||
* containing a thread group ID. The tgid may only be changed when refc is zero
|
||||
* and running may only be checked/changed when refc is held and shows the
|
||||
* reader is alone. An FD with tgid zero belongs to nobody.
|
||||
*/
|
||||
struct fdtab {
|
||||
unsigned long running_mask; /* mask of thread IDs currently using the fd */
|
||||
@ -162,6 +167,7 @@ struct fdtab {
|
||||
void (*iocb)(int fd); /* I/O handler */
|
||||
void *owner; /* the connection or listener associated with this fd, NULL if closed */
|
||||
unsigned int state; /* FD state for read and write directions (FD_EV_*) + FD_POLL_* */
|
||||
unsigned int refc_tgid; /* refcounted tgid, updated atomically */
|
||||
#ifdef DEBUG_FD
|
||||
unsigned int event_count; /* number of events reported */
|
||||
#endif
|
||||
|
@ -314,6 +314,12 @@ static inline void fd_want_send(int fd)
|
||||
updt_fd_polling(fd);
|
||||
}
|
||||
|
||||
/* returns the tgid from an fd (masks the refcount) */
|
||||
static forceinline int fd_tgid(int fd)
|
||||
{
|
||||
return _HA_ATOMIC_LOAD(&fdtab[fd].refc_tgid) & 0xFFFF;
|
||||
}
|
||||
|
||||
/* remove tid_bit from the fd's running mask and returns the bits that remain
|
||||
* after the atomic operation.
|
||||
*/
|
||||
@ -322,10 +328,10 @@ static inline long fd_clr_running(int fd)
|
||||
return _HA_ATOMIC_AND_FETCH(&fdtab[fd].running_mask, ~tid_bit);
|
||||
}
|
||||
|
||||
/* Prepares <fd> for being polled on all permitted threads (these will then be
|
||||
* refined to only cover running ones).
|
||||
/* Prepares <fd> for being polled on all permitted threads of this group ID
|
||||
* (these will then be refined to only cover running ones).
|
||||
*/
|
||||
static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned long thread_mask)
|
||||
static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), int tgid, unsigned long thread_mask)
|
||||
{
|
||||
extern void sock_conn_iocb(int);
|
||||
|
||||
@ -335,6 +341,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned
|
||||
BUG_ON(fd < 0 || fd >= global.maxsock);
|
||||
BUG_ON(fdtab[fd].owner != NULL);
|
||||
BUG_ON(fdtab[fd].state != 0);
|
||||
BUG_ON(fdtab[fd].refc_tgid != 0);
|
||||
|
||||
thread_mask &= all_threads_mask;
|
||||
BUG_ON(thread_mask == 0);
|
||||
@ -342,6 +349,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned
|
||||
fdtab[fd].owner = owner;
|
||||
fdtab[fd].iocb = iocb;
|
||||
fdtab[fd].state = 0;
|
||||
fdtab[fd].refc_tgid = tgid;
|
||||
#ifdef DEBUG_FD
|
||||
fdtab[fd].event_count = 0;
|
||||
#endif
|
||||
|
@ -74,7 +74,7 @@ static int dns_connect_nameserver(struct dns_nameserver *ns)
|
||||
|
||||
/* Add the fd in the fd list and update its parameters */
|
||||
dgram->t.sock.fd = fd;
|
||||
fd_insert(fd, dgram, dgram_fd_handler, all_threads_mask);
|
||||
fd_insert(fd, dgram, dgram_fd_handler, tgid, all_threads_mask);
|
||||
fd_want_recv(fd);
|
||||
return 0;
|
||||
}
|
||||
|
5
src/fd.c
5
src/fd.c
@ -329,6 +329,7 @@ void _fd_delete_orphan(int fd)
|
||||
polled_mask[fd].poll_recv = polled_mask[fd].poll_send = 0;
|
||||
|
||||
fdtab[fd].state = 0;
|
||||
fdtab[fd].refc_tgid = 0;
|
||||
|
||||
#ifdef DEBUG_FD
|
||||
fdtab[fd].event_count = 0;
|
||||
@ -824,8 +825,8 @@ static int init_pollers_per_thread()
|
||||
poller_rd_pipe = mypipe[0];
|
||||
poller_wr_pipe[tid] = mypipe[1];
|
||||
fd_set_nonblock(poller_rd_pipe);
|
||||
fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler, tid_bit);
|
||||
fd_insert(poller_wr_pipe[tid], poller_pipe_io_handler, poller_pipe_io_handler, tid_bit);
|
||||
fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler, tgid, tid_bit);
|
||||
fd_insert(poller_wr_pipe[tid], poller_pipe_io_handler, poller_pipe_io_handler, tgid, tid_bit);
|
||||
fd_want_recv(poller_rd_pipe);
|
||||
fd_stop_both(poller_wr_pipe[tid]);
|
||||
return 1;
|
||||
|
@ -157,7 +157,7 @@ int sockpair_bind_receiver(struct receiver *rx, char **errmsg)
|
||||
|
||||
rx->flags |= RX_F_BOUND;
|
||||
|
||||
fd_insert(rx->fd, rx->owner, rx->iocb, rx->bind_thread);
|
||||
fd_insert(rx->fd, rx->owner, rx->iocb, rx->bind_tgroup, rx->bind_thread);
|
||||
return err;
|
||||
|
||||
bind_return:
|
||||
|
@ -703,7 +703,7 @@ void sock_accept_iocb(int fd)
|
||||
void sock_conn_ctrl_init(struct connection *conn)
|
||||
{
|
||||
BUG_ON(conn->flags & CO_FL_FDLESS);
|
||||
fd_insert(conn->handle.fd, conn, sock_conn_iocb, tid_bit);
|
||||
fd_insert(conn->handle.fd, conn, sock_conn_iocb, tgid, tid_bit);
|
||||
}
|
||||
|
||||
/* This completes the release of connection <conn> by removing its FD from the
|
||||
|
@ -390,7 +390,7 @@ int sock_inet_bind_receiver(struct receiver *rx, char **errmsg)
|
||||
rx->fd = fd;
|
||||
rx->flags |= RX_F_BOUND;
|
||||
|
||||
fd_insert(fd, rx->owner, rx->iocb, rx->bind_thread);
|
||||
fd_insert(fd, rx->owner, rx->iocb, rx->bind_tgroup, rx->bind_thread);
|
||||
|
||||
/* for now, all regularly bound TCP listeners are exportable */
|
||||
if (!(rx->flags & RX_F_INHERITED))
|
||||
|
@ -284,7 +284,7 @@ int sock_unix_bind_receiver(struct receiver *rx, char **errmsg)
|
||||
rx->fd = fd;
|
||||
rx->flags |= RX_F_BOUND;
|
||||
|
||||
fd_insert(fd, rx->owner, rx->iocb, rx->bind_thread);
|
||||
fd_insert(fd, rx->owner, rx->iocb, rx->bind_tgroup, rx->bind_thread);
|
||||
|
||||
/* for now, all regularly bound TCP listeners are exportable */
|
||||
if (!(rx->flags & RX_F_INHERITED))
|
||||
|
@ -824,7 +824,7 @@ static inline void ssl_async_process_fds(struct ssl_sock_ctx *ctx)
|
||||
|
||||
/* We add new fds to the fdtab */
|
||||
for (i=0 ; i < num_add_fds ; i++) {
|
||||
fd_insert(add_fd[i], ctx, ssl_async_fd_handler, tid_bit);
|
||||
fd_insert(add_fd[i], ctx, ssl_async_fd_handler, tgid, tid_bit);
|
||||
}
|
||||
|
||||
num_add_fds = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user