mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 15:47:01 +02:00
REORG: fd: uninline fd_update_events()
This function has become a monster (80 lines and 2/3 of a kB), it doesn't benefit from being static nor inline anymore, let's move it to fd.c.
This commit is contained in:
parent
53a16187fd
commit
84c7922c52
@ -117,6 +117,7 @@ void run_poller();
|
|||||||
void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off);
|
void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off);
|
||||||
void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
|
void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
|
||||||
void updt_fd_polling(const int fd);
|
void updt_fd_polling(const int fd);
|
||||||
|
void fd_update_events(int fd, uint evts);
|
||||||
|
|
||||||
/* Called from the poller to acknowledge we read an entry from the global
|
/* Called from the poller to acknowledge we read an entry from the global
|
||||||
* update list, to remove our bit from the update_mask, and remove it from
|
* update list, to remove our bit from the update_mask, and remove it from
|
||||||
@ -349,97 +350,6 @@ static inline long fd_clr_running(int fd)
|
|||||||
return _HA_ATOMIC_AND_FETCH(&fdtab[fd].running_mask, ~tid_bit);
|
return _HA_ATOMIC_AND_FETCH(&fdtab[fd].running_mask, ~tid_bit);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Update events seen for FD <fd> and its state if needed. This should be
|
|
||||||
* called by the poller, passing FD_EV_*_{R,W,RW} in <evts>. FD_EV_ERR_*
|
|
||||||
* doesn't need to also pass FD_EV_SHUT_*, it's implied. ERR and SHUT are
|
|
||||||
* allowed to be reported regardless of R/W readiness.
|
|
||||||
*/
|
|
||||||
static inline void fd_update_events(int fd, uint evts)
|
|
||||||
{
|
|
||||||
unsigned long locked;
|
|
||||||
uint old, new;
|
|
||||||
uint new_flags, must_stop;
|
|
||||||
|
|
||||||
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
|
||||||
|
|
||||||
/* do nothing if the FD was taken over under us */
|
|
||||||
if (fd_set_running(fd) == -1)
|
|
||||||
return;
|
|
||||||
|
|
||||||
locked = (fdtab[fd].thread_mask != tid_bit);
|
|
||||||
|
|
||||||
/* OK now we are guaranteed that our thread_mask was present and
|
|
||||||
* that we're allowed to update the FD.
|
|
||||||
*/
|
|
||||||
|
|
||||||
new_flags =
|
|
||||||
((evts & FD_EV_READY_R) ? FD_POLL_IN : 0) |
|
|
||||||
((evts & FD_EV_READY_W) ? FD_POLL_OUT : 0) |
|
|
||||||
((evts & FD_EV_SHUT_R) ? FD_POLL_HUP : 0) |
|
|
||||||
((evts & FD_EV_ERR_RW) ? FD_POLL_ERR : 0);
|
|
||||||
|
|
||||||
/* SHUTW reported while FD was active for writes is an error */
|
|
||||||
if ((fdtab[fd].state & FD_EV_ACTIVE_W) && (evts & FD_EV_SHUT_W))
|
|
||||||
new_flags |= FD_POLL_ERR;
|
|
||||||
|
|
||||||
/* compute the inactive events reported late that must be stopped */
|
|
||||||
must_stop = 0;
|
|
||||||
if (unlikely(!fd_active(fd))) {
|
|
||||||
/* both sides stopped */
|
|
||||||
must_stop = FD_POLL_IN | FD_POLL_OUT;
|
|
||||||
}
|
|
||||||
else if (unlikely(!fd_recv_active(fd) && (evts & (FD_EV_READY_R | FD_EV_SHUT_R | FD_EV_ERR_RW)))) {
|
|
||||||
/* only send remains */
|
|
||||||
must_stop = FD_POLL_IN;
|
|
||||||
}
|
|
||||||
else if (unlikely(!fd_send_active(fd) && (evts & (FD_EV_READY_W | FD_EV_SHUT_W | FD_EV_ERR_RW)))) {
|
|
||||||
/* only recv remains */
|
|
||||||
must_stop = FD_POLL_OUT;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (new_flags & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
|
|
||||||
new_flags |= FD_EV_READY_R;
|
|
||||||
|
|
||||||
if (new_flags & (FD_POLL_OUT | FD_POLL_ERR))
|
|
||||||
new_flags |= FD_EV_READY_W;
|
|
||||||
|
|
||||||
old = fdtab[fd].state;
|
|
||||||
new = (old & ~FD_POLL_UPDT_MASK) | new_flags;
|
|
||||||
|
|
||||||
if (unlikely(locked)) {
|
|
||||||
/* Locked FDs (those with more than 2 threads) are atomically updated */
|
|
||||||
while (unlikely(new != old && !_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new)))
|
|
||||||
new = (old & ~FD_POLL_UPDT_MASK) | new_flags;
|
|
||||||
} else {
|
|
||||||
if (new != old)
|
|
||||||
fdtab[fd].state = new;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fdtab[fd].iocb && fd_active(fd)) {
|
|
||||||
fdtab[fd].iocb(fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* another thread might have attempted to close this FD in the mean
|
|
||||||
* time (e.g. timeout task) striking on a previous thread and closing.
|
|
||||||
* This is detected by both thread_mask and running_mask being 0 after
|
|
||||||
* we remove ourselves last.
|
|
||||||
*/
|
|
||||||
if ((fdtab[fd].running_mask & tid_bit) &&
|
|
||||||
fd_clr_running(fd) == 0 && !fdtab[fd].thread_mask) {
|
|
||||||
_fd_delete_orphan(fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* we had to stop this FD and it still must be stopped after the I/O
|
|
||||||
* cb's changes, so let's program an update for this.
|
|
||||||
*/
|
|
||||||
if (must_stop && !(fdtab[fd].update_mask & tid_bit)) {
|
|
||||||
if (((must_stop & FD_POLL_IN) && !fd_recv_active(fd)) ||
|
|
||||||
((must_stop & FD_POLL_OUT) && !fd_send_active(fd)))
|
|
||||||
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
|
|
||||||
fd_updt[fd_nbupdt++] = fd;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Prepares <fd> for being polled */
|
/* Prepares <fd> for being polled */
|
||||||
static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned long thread_mask)
|
static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned long thread_mask)
|
||||||
{
|
{
|
||||||
|
91
src/fd.c
91
src/fd.c
@ -445,6 +445,97 @@ void updt_fd_polling(const int fd)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Update events seen for FD <fd> and its state if needed. This should be
|
||||||
|
* called by the poller, passing FD_EV_*_{R,W,RW} in <evts>. FD_EV_ERR_*
|
||||||
|
* doesn't need to also pass FD_EV_SHUT_*, it's implied. ERR and SHUT are
|
||||||
|
* allowed to be reported regardless of R/W readiness.
|
||||||
|
*/
|
||||||
|
void fd_update_events(int fd, uint evts)
|
||||||
|
{
|
||||||
|
unsigned long locked;
|
||||||
|
uint old, new;
|
||||||
|
uint new_flags, must_stop;
|
||||||
|
|
||||||
|
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
||||||
|
|
||||||
|
/* do nothing if the FD was taken over under us */
|
||||||
|
if (fd_set_running(fd) == -1)
|
||||||
|
return;
|
||||||
|
|
||||||
|
locked = (fdtab[fd].thread_mask != tid_bit);
|
||||||
|
|
||||||
|
/* OK now we are guaranteed that our thread_mask was present and
|
||||||
|
* that we're allowed to update the FD.
|
||||||
|
*/
|
||||||
|
|
||||||
|
new_flags =
|
||||||
|
((evts & FD_EV_READY_R) ? FD_POLL_IN : 0) |
|
||||||
|
((evts & FD_EV_READY_W) ? FD_POLL_OUT : 0) |
|
||||||
|
((evts & FD_EV_SHUT_R) ? FD_POLL_HUP : 0) |
|
||||||
|
((evts & FD_EV_ERR_RW) ? FD_POLL_ERR : 0);
|
||||||
|
|
||||||
|
/* SHUTW reported while FD was active for writes is an error */
|
||||||
|
if ((fdtab[fd].state & FD_EV_ACTIVE_W) && (evts & FD_EV_SHUT_W))
|
||||||
|
new_flags |= FD_POLL_ERR;
|
||||||
|
|
||||||
|
/* compute the inactive events reported late that must be stopped */
|
||||||
|
must_stop = 0;
|
||||||
|
if (unlikely(!fd_active(fd))) {
|
||||||
|
/* both sides stopped */
|
||||||
|
must_stop = FD_POLL_IN | FD_POLL_OUT;
|
||||||
|
}
|
||||||
|
else if (unlikely(!fd_recv_active(fd) && (evts & (FD_EV_READY_R | FD_EV_SHUT_R | FD_EV_ERR_RW)))) {
|
||||||
|
/* only send remains */
|
||||||
|
must_stop = FD_POLL_IN;
|
||||||
|
}
|
||||||
|
else if (unlikely(!fd_send_active(fd) && (evts & (FD_EV_READY_W | FD_EV_SHUT_W | FD_EV_ERR_RW)))) {
|
||||||
|
/* only recv remains */
|
||||||
|
must_stop = FD_POLL_OUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (new_flags & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
|
||||||
|
new_flags |= FD_EV_READY_R;
|
||||||
|
|
||||||
|
if (new_flags & (FD_POLL_OUT | FD_POLL_ERR))
|
||||||
|
new_flags |= FD_EV_READY_W;
|
||||||
|
|
||||||
|
old = fdtab[fd].state;
|
||||||
|
new = (old & ~FD_POLL_UPDT_MASK) | new_flags;
|
||||||
|
|
||||||
|
if (unlikely(locked)) {
|
||||||
|
/* Locked FDs (those with more than 2 threads) are atomically updated */
|
||||||
|
while (unlikely(new != old && !_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new)))
|
||||||
|
new = (old & ~FD_POLL_UPDT_MASK) | new_flags;
|
||||||
|
} else {
|
||||||
|
if (new != old)
|
||||||
|
fdtab[fd].state = new;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fdtab[fd].iocb && fd_active(fd)) {
|
||||||
|
fdtab[fd].iocb(fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* another thread might have attempted to close this FD in the mean
|
||||||
|
* time (e.g. timeout task) striking on a previous thread and closing.
|
||||||
|
* This is detected by both thread_mask and running_mask being 0 after
|
||||||
|
* we remove ourselves last.
|
||||||
|
*/
|
||||||
|
if ((fdtab[fd].running_mask & tid_bit) &&
|
||||||
|
fd_clr_running(fd) == 0 && !fdtab[fd].thread_mask) {
|
||||||
|
_fd_delete_orphan(fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* we had to stop this FD and it still must be stopped after the I/O
|
||||||
|
* cb's changes, so let's program an update for this.
|
||||||
|
*/
|
||||||
|
if (must_stop && !(fdtab[fd].update_mask & tid_bit)) {
|
||||||
|
if (((must_stop & FD_POLL_IN) && !fd_recv_active(fd)) ||
|
||||||
|
((must_stop & FD_POLL_OUT) && !fd_send_active(fd)))
|
||||||
|
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
|
||||||
|
fd_updt[fd_nbupdt++] = fd;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>
|
/* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>
|
||||||
* optionally followed by a newline if <nl> is non-null, to file descriptor
|
* optionally followed by a newline if <nl> is non-null, to file descriptor
|
||||||
* <fd>. The message is sent atomically using writev(). It may be truncated to
|
* <fd>. The message is sent atomically using writev(). It may be truncated to
|
||||||
|
Loading…
Reference in New Issue
Block a user