mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-05-04 12:41:00 +02:00
MEDIUM: pollers: Remember the state for read and write for each threads.
In the poller code, instead of just remembering if we're currently polling a fd or not, remember if we're polling it for writing and/or for reading, that way, we can avoid to modify the polling if it's already polled as needed.
This commit is contained in:
parent
305d5ab469
commit
53055055c5
@ -37,7 +37,11 @@
|
||||
|
||||
extern volatile struct fdlist update_list;
|
||||
|
||||
extern unsigned long *polled_mask;
|
||||
|
||||
extern struct polled_mask {
|
||||
unsigned long poll_recv;
|
||||
unsigned long poll_send;
|
||||
} *polled_mask;
|
||||
|
||||
extern THREAD_LOCAL int *fd_updt; // FD updates list
|
||||
extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
|
||||
|
||||
@ -53,7 +53,7 @@ static THREAD_LOCAL struct epoll_event ev;
|
||||
REGPRM1 static void __fd_clo(int fd)
|
||||
{
|
||||
if (unlikely(fdtab[fd].cloned)) {
|
||||
unsigned long m = polled_mask[fd];
|
||||
unsigned long m = polled_mask[fd].poll_recv | polled_mask[fd].poll_send;
|
||||
int i;
|
||||
|
||||
for (i = global.nbthread - 1; i >= 0; i--)
|
||||
@ -68,13 +68,35 @@ static void _update_fd(int fd)
|
||||
|
||||
en = fdtab[fd].state;
|
||||
|
||||
if (polled_mask[fd] & tid_bit) {
|
||||
if ((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit) {
|
||||
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
|
||||
/* fd removed from poll list */
|
||||
opcode = EPOLL_CTL_DEL;
|
||||
_HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
|
||||
if (polled_mask[fd].poll_recv & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
|
||||
if (polled_mask[fd].poll_send & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
|
||||
}
|
||||
else {
|
||||
if (((en & FD_EV_POLLED_R) != 0) ==
|
||||
((polled_mask[fd].poll_recv & tid_bit) != 0) &&
|
||||
((en & FD_EV_POLLED_W) != 0) ==
|
||||
((polled_mask[fd].poll_send & tid_bit) != 0))
|
||||
return;
|
||||
if (en & FD_EV_POLLED_R) {
|
||||
if (!(polled_mask[fd].poll_recv & tid_bit))
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
|
||||
} else {
|
||||
if (polled_mask[fd].poll_recv & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
|
||||
}
|
||||
if (en & FD_EV_POLLED_W) {
|
||||
if (!(polled_mask[fd].poll_send & tid_bit))
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
|
||||
} else {
|
||||
if (polled_mask[fd].poll_send & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
|
||||
}
|
||||
/* fd status changed */
|
||||
opcode = EPOLL_CTL_MOD;
|
||||
}
|
||||
@ -82,7 +104,10 @@ static void _update_fd(int fd)
|
||||
else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
|
||||
/* new fd in the poll list */
|
||||
opcode = EPOLL_CTL_ADD;
|
||||
_HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
|
||||
if (en & FD_EV_POLLED_R)
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
|
||||
if (en & FD_EV_POLLED_W)
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
|
||||
}
|
||||
else {
|
||||
return;
|
||||
@ -188,7 +213,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake)
|
||||
/* FD has been migrated */
|
||||
activity[tid].poll_skip++;
|
||||
epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@ -74,18 +74,36 @@ static void _update_fd(int fd)
|
||||
en = fdtab[fd].state;
|
||||
|
||||
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
|
||||
if (!(polled_mask[fd] & tid_bit)) {
|
||||
if (!(polled_mask[fd].poll_recv & tid_bit) &&
|
||||
!(polled_mask[fd].poll_send & tid_bit)) {
|
||||
/* fd was not watched, it's still not */
|
||||
return;
|
||||
}
|
||||
/* fd totally removed from poll list */
|
||||
events = 0;
|
||||
_HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
|
||||
if (polled_mask[fd].poll_recv & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
|
||||
if (polled_mask[fd].poll_send & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
|
||||
}
|
||||
else {
|
||||
/* OK fd has to be monitored, it was either added or changed */
|
||||
events = evports_state_to_events(en);
|
||||
_HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
|
||||
if (en & FD_EV_POLLED_R) {
|
||||
if (!(polled_mask[fd].poll_recv & tid_bit))
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
|
||||
} else {
|
||||
if (polled_mask[fd].poll_recv & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
|
||||
}
|
||||
if (en & FD_EV_POLLED_W) {
|
||||
if (!(polled_mask[fd].poll_send & tid_bit))
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
|
||||
} else {
|
||||
if (polled_mask[fd].poll_send & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
|
||||
}
|
||||
|
||||
}
|
||||
evports_resync_fd(fd, events);
|
||||
}
|
||||
|
||||
@ -44,29 +44,44 @@ static int _update_fd(int fd, int start)
|
||||
en = fdtab[fd].state;
|
||||
|
||||
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
|
||||
if (!(polled_mask[fd] & tid_bit)) {
|
||||
if (!(polled_mask[fd].poll_recv & tid_bit) &&
|
||||
!(polled_mask[fd].poll_send & tid_bit)) {
|
||||
/* fd was not watched, it's still not */
|
||||
return changes;
|
||||
}
|
||||
/* fd totally removed from poll list */
|
||||
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
|
||||
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
|
||||
if (polled_mask[fd].poll_recv & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
|
||||
if (polled_mask[fd].poll_send & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
|
||||
}
|
||||
else {
|
||||
/* OK fd has to be monitored, it was either added or changed */
|
||||
|
||||
if (en & FD_EV_POLLED_R)
|
||||
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
|
||||
else if (polled_mask[fd] & tid_bit)
|
||||
if (en & FD_EV_POLLED_R) {
|
||||
if (!(polled_mask[fd].poll_recv & tid_bit)) {
|
||||
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
|
||||
}
|
||||
}
|
||||
else if (polled_mask[fd].poll_recv & tid_bit) {
|
||||
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
|
||||
HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
|
||||
}
|
||||
|
||||
if (en & FD_EV_POLLED_W)
|
||||
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
|
||||
else if (polled_mask[fd] & tid_bit)
|
||||
if (en & FD_EV_POLLED_W) {
|
||||
if (!(polled_mask[fd].poll_send & tid_bit)) {
|
||||
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
|
||||
}
|
||||
}
|
||||
else if (polled_mask[fd].poll_send & tid_bit) {
|
||||
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
|
||||
}
|
||||
|
||||
_HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
|
||||
}
|
||||
return changes;
|
||||
}
|
||||
|
||||
@ -58,28 +58,38 @@ static void _update_fd(int fd, int *max_add_fd)
|
||||
* takes it for every other one.
|
||||
*/
|
||||
if (!(en & FD_EV_POLLED_RW)) {
|
||||
if (!polled_mask[fd]) {
|
||||
if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
|
||||
/* fd was not watched, it's still not */
|
||||
return;
|
||||
}
|
||||
/* fd totally removed from poll list */
|
||||
hap_fd_clr(fd, fd_evts[DIR_RD]);
|
||||
hap_fd_clr(fd, fd_evts[DIR_WR]);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd], 0);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, 0);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, 0);
|
||||
}
|
||||
else {
|
||||
/* OK fd has to be monitored, it was either added or changed */
|
||||
if (!(en & FD_EV_POLLED_R))
|
||||
if (!(en & FD_EV_POLLED_R)) {
|
||||
hap_fd_clr(fd, fd_evts[DIR_RD]);
|
||||
else
|
||||
if (polled_mask[fd].poll_recv & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
|
||||
} else {
|
||||
hap_fd_set(fd, fd_evts[DIR_RD]);
|
||||
if (!(polled_mask[fd].poll_recv & tid_bit))
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
|
||||
}
|
||||
|
||||
if (!(en & FD_EV_POLLED_W))
|
||||
if (!(en & FD_EV_POLLED_W)) {
|
||||
hap_fd_clr(fd, fd_evts[DIR_WR]);
|
||||
else
|
||||
if (polled_mask[fd].poll_send & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
|
||||
}else {
|
||||
hap_fd_set(fd, fd_evts[DIR_WR]);
|
||||
if (!(polled_mask[fd].poll_send & tid_bit))
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
|
||||
}
|
||||
|
||||
_HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
|
||||
if (fd > *max_add_fd)
|
||||
*max_add_fd = fd;
|
||||
}
|
||||
|
||||
@ -49,28 +49,38 @@ static void _update_fd(int fd, int *max_add_fd)
|
||||
* takes it for every other one.
|
||||
*/
|
||||
if (!(en & FD_EV_POLLED_RW)) {
|
||||
if (!polled_mask[fd]) {
|
||||
if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
|
||||
/* fd was not watched, it's still not */
|
||||
return;
|
||||
}
|
||||
/* fd totally removed from poll list */
|
||||
hap_fd_clr(fd, fd_evts[DIR_RD]);
|
||||
hap_fd_clr(fd, fd_evts[DIR_WR]);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd], 0);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, 0);
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, 0);
|
||||
}
|
||||
else {
|
||||
/* OK fd has to be monitored, it was either added or changed */
|
||||
if (!(en & FD_EV_POLLED_R))
|
||||
if (!(en & FD_EV_POLLED_R)) {
|
||||
hap_fd_clr(fd, fd_evts[DIR_RD]);
|
||||
else
|
||||
if (polled_mask[fd].poll_recv & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
|
||||
} else {
|
||||
hap_fd_set(fd, fd_evts[DIR_RD]);
|
||||
if (!(polled_mask[fd].poll_recv & tid_bit))
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
|
||||
}
|
||||
|
||||
if (!(en & FD_EV_POLLED_W))
|
||||
if (!(en & FD_EV_POLLED_W)) {
|
||||
hap_fd_clr(fd, fd_evts[DIR_WR]);
|
||||
else
|
||||
if (polled_mask[fd].poll_send & tid_bit)
|
||||
_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
|
||||
} else {
|
||||
hap_fd_set(fd, fd_evts[DIR_WR]);
|
||||
if (!(polled_mask[fd].poll_send & tid_bit))
|
||||
_HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
|
||||
}
|
||||
|
||||
_HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
|
||||
if (fd > *max_add_fd)
|
||||
*max_add_fd = fd;
|
||||
}
|
||||
|
||||
6
src/fd.c
6
src/fd.c
@ -122,7 +122,7 @@
|
||||
#include <proto/port_range.h>
|
||||
|
||||
struct fdtab *fdtab = NULL; /* array of all the file descriptors */
|
||||
unsigned long *polled_mask = NULL; /* Array for the polled_mask of each fd */
|
||||
struct polled_mask *polled_mask = NULL; /* Array for the polled_mask of each fd */
|
||||
struct fdinfo *fdinfo = NULL; /* less-often used infos for file descriptors */
|
||||
int totalconn; /* total # of terminated sessions */
|
||||
int actconn; /* # of active sessions */
|
||||
@ -338,7 +338,7 @@ static void fd_dodelete(int fd, int do_close)
|
||||
fdtab[fd].owner = NULL;
|
||||
fdtab[fd].thread_mask = 0;
|
||||
if (do_close) {
|
||||
polled_mask[fd] = 0;
|
||||
polled_mask[fd].poll_recv = polled_mask[fd].poll_send = 0;
|
||||
close(fd);
|
||||
_HA_ATOMIC_SUB(&ha_used_fds, 1);
|
||||
}
|
||||
@ -525,7 +525,7 @@ int init_pollers()
|
||||
if ((fdtab = calloc(global.maxsock, sizeof(struct fdtab))) == NULL)
|
||||
goto fail_tab;
|
||||
|
||||
if ((polled_mask = calloc(global.maxsock, sizeof(unsigned long))) == NULL)
|
||||
if ((polled_mask = calloc(global.maxsock, sizeof(*polled_mask))) == NULL)
|
||||
goto fail_polledmask;
|
||||
|
||||
if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user