[BUG] fix ev_sepoll again, this time with a new state machine

It was possible in ev_sepoll() to ignore certain events if
all speculative events had been processed at once, because
the epoll_wait() timeout was not cleared, thus delaying the
events delivery.

The state machine was complicated, it has been rewritten.
It seems faster and more correct right now.
This commit is contained in:
Willy Tarreau 2007-05-13 01:52:05 +02:00
parent 49fa3a1453
commit 6653d17b8d

View File

@ -105,6 +105,10 @@ static _syscall4 (int, epoll_wait, int, epfd, struct epoll_event *, events, int,
#define FD_EV_MASK (FD_EV_MASK_W | FD_EV_MASK_R)
/* This is the minimum number of events successfully processed in speculative
* mode above which we agree to return without checking epoll() (1/2 times).
*/
#define MIN_RETURN_EVENTS 25
/* descriptor of one FD.
* FIXME: should be a bit field */
@ -215,6 +219,7 @@ REGPRM2 static int __fd_clr(const int fd, int dir)
return 0;
}
/* normally unused */
REGPRM1 static void __fd_rem(int fd)
{
__fd_clr(fd, DIR_RD);
@ -232,148 +237,151 @@ REGPRM1 static void __fd_clo(int fd)
fd_list[fd].e &= ~(FD_EV_MASK);
}
/*
* operations to perform when reaching _do_poll() for some FDs in the spec
* queue depending on their state. This is mainly used to cleanup some FDs
* which are in STOP state. It is also used to compute EPOLL* flags when
* switching from SPEC to WAIT.
*/
static struct ev_to_epoll {
char op; // epoll opcode to switch from spec to wait, 0 if none
char m; // inverted mask for existing events
char ev; // remaining epoll events after change
char pad;
} ev_to_epoll[16] = {
[FD_EV_IDLE_W | FD_EV_STOP_R] = { .op=EPOLL_CTL_DEL, .m=FD_EV_MASK_R },
[FD_EV_SPEC_W | FD_EV_STOP_R] = { .op=EPOLL_CTL_DEL, .m=FD_EV_MASK_R },
[FD_EV_STOP_W | FD_EV_IDLE_R] = { .op=EPOLL_CTL_DEL, .m=FD_EV_MASK_W },
[FD_EV_STOP_W | FD_EV_SPEC_R] = { .op=EPOLL_CTL_DEL, .m=FD_EV_MASK_W },
[FD_EV_WAIT_W | FD_EV_STOP_R] = { .op=EPOLL_CTL_MOD, .m=FD_EV_MASK_R, .ev=EPOLLOUT },
[FD_EV_STOP_W | FD_EV_WAIT_R] = { .op=EPOLL_CTL_MOD, .m=FD_EV_MASK_W, .ev=EPOLLIN },
[FD_EV_STOP_W | FD_EV_STOP_R] = { .op=EPOLL_CTL_DEL, .m=FD_EV_MASK_R|FD_EV_MASK_W },
[FD_EV_SPEC_W | FD_EV_WAIT_R] = { .ev=EPOLLIN },
[FD_EV_WAIT_W | FD_EV_SPEC_R] = { .ev=EPOLLOUT },
[FD_EV_WAIT_W | FD_EV_WAIT_R] = { .ev=EPOLLIN|EPOLLOUT },
};
/*
* speculative epoll() poller
*/
REGPRM2 static void _do_poll(struct poller *p, int wait_time)
{
static unsigned int last_skipped;
int status;
int status, eo;
int fd, opcode;
int count;
int spec_idx;
/* Here we have two options :
* - either walk the list forwards and hope to atch more events
* - either walk the list forwards and hope to match more events
* - or walk it backwards to minimize the number of changes and
* to make better use of the cache.
* Tests have shown that walking backwards improves perf by 0.2%.
*/
status = 0;
spec_idx = nbspec;
while (likely(spec_idx > 0)) {
spec_idx--;
fd = spec_list[spec_idx];
eo = fd_list[fd].e; /* save old events */
/*
* Process the speculative events.
*
* Principle: events which are marked FD_EV_SPEC are processed
* with their assigned function. If the function returns 0, it
* means there is nothing doable without polling first. We will
* then convert the event to a pollable one by assigning them
* the WAIT status.
*/
fdtab[fd].ev = 0;
if ((eo & FD_EV_MASK_R) == FD_EV_SPEC_R) {
/* The owner is interested in reading from this FD */
if (fdtab[fd].state != FD_STCLOSE && fdtab[fd].state != FD_STERROR) {
/* Pretend there is something to read */
fdtab[fd].ev |= FD_POLL_IN;
if (!fdtab[fd].cb[DIR_RD].f(fd))
fd_list[fd].e ^= (FD_EV_WAIT_R ^ FD_EV_SPEC_R);
else
status++;
}
}
else if ((eo & FD_EV_MASK_R) == FD_EV_STOP_R) {
/* This FD was being polled and is now being removed. */
fd_list[fd].e &= ~FD_EV_MASK_R;
}
if ((eo & FD_EV_MASK_W) == FD_EV_SPEC_W) {
/* The owner is interested in writing to this FD */
if (fdtab[fd].state != FD_STCLOSE && fdtab[fd].state != FD_STERROR) {
/* Pretend there is something to write */
fdtab[fd].ev |= FD_POLL_OUT;
if (!fdtab[fd].cb[DIR_WR].f(fd))
fd_list[fd].e ^= (FD_EV_WAIT_W ^ FD_EV_SPEC_W);
else
status++;
}
}
else if ((eo & FD_EV_MASK_W) == FD_EV_STOP_W) {
/* This FD was being polled and is now being removed. */
fd_list[fd].e &= ~FD_EV_MASK_W;
}
/* Now, we will adjust the event in the poll list. Indeed, it
* is possible that an event which was previously in the poll
* list now goes out, and the opposite is possible too. We can
* have opposite changes for READ and WRITE too.
*/
if ((eo ^ fd_list[fd].e) & FD_EV_RW_PL) {
/* poll status changed*/
if ((fd_list[fd].e & FD_EV_RW_PL) == 0) {
/* fd removed from poll list */
opcode = EPOLL_CTL_DEL;
}
else if ((eo & FD_EV_RW_PL) == 0) {
/* new fd in the poll list */
opcode = EPOLL_CTL_ADD;
}
else {
/* fd status changed */
opcode = EPOLL_CTL_MOD;
}
/* construct the epoll events based on new state */
ev.events = 0;
if (fd_list[fd].e & FD_EV_WAIT_R)
ev.events |= EPOLLIN;
if (fd_list[fd].e & FD_EV_WAIT_W)
ev.events |= EPOLLOUT;
opcode = ev_to_epoll[fd_list[fd].e].op;
if (opcode) {
ev.data.fd = fd;
ev.events = ev_to_epoll[fd_list[fd].e].ev;
fd_list[fd].e &= ~(unsigned int)ev_to_epoll[fd_list[fd].e].m;
epoll_ctl(epoll_fd, opcode, fd, &ev);
}
if (!(fd_list[fd].e & FD_EV_RW_SL)) {
// This one must be removed. Let's clear it now.
/* This fd switched to combinations of either WAIT or
* IDLE. It must be removed from the spec list.
*/
delete_spec_entry(spec_idx);
continue;
}
/* OK so now we do not have any event marked STOP anymore in
* the list. We can simply try to execute functions for the
* events we have found, and requeue them in case of EAGAIN.
*/
status = 0;
fdtab[fd].ev = 0;
if ((fd_list[fd].e & FD_EV_MASK_R) == FD_EV_SPEC_R) {
if (fdtab[fd].state != FD_STCLOSE && fdtab[fd].state != FD_STERROR) {
fdtab[fd].ev |= FD_POLL_IN;
if (fdtab[fd].cb[DIR_RD].f(fd) == 0)
status |= EPOLLIN;
}
}
if ((fd_list[fd].e & FD_EV_MASK_W) == FD_EV_SPEC_W) {
if (fdtab[fd].state != FD_STCLOSE && fdtab[fd].state != FD_STERROR) {
fdtab[fd].ev |= FD_POLL_OUT;
if (fdtab[fd].cb[DIR_WR].f(fd) == 0)
status |= EPOLLOUT;
}
}
if (status) {
/* Some speculative accesses have failed, we must
* switch to the WAIT state.
*/
ev.events = status;
ev.data.fd = fd;
if (fd_list[fd].e & FD_EV_RW_PL) {
// Event already in poll list
ev.events |= ev_to_epoll[fd_list[fd].e].ev;
opcode = EPOLL_CTL_MOD;
} else {
// Event not in poll list yet
opcode = EPOLL_CTL_ADD;
}
epoll_ctl(epoll_fd, opcode, fd, &ev);
if (status & EPOLLIN) {
fd_list[fd].e &= ~FD_EV_MASK_R;
fd_list[fd].e |= FD_EV_WAIT_R;
}
if (status & EPOLLOUT) {
fd_list[fd].e &= ~FD_EV_MASK_W;
fd_list[fd].e |= FD_EV_WAIT_W;
}
if ((fd_list[fd].e & FD_EV_MASK_R) != FD_EV_SPEC_R &&
(fd_list[fd].e & FD_EV_MASK_W) != FD_EV_SPEC_W) {
delete_spec_entry(spec_idx);
continue;
}
}
}
/* If some speculative events remain, we must not set the timeout in
* epoll_wait(). Also, if some speculative events remain, it means
* that some have been immediately processed, otherwise they would
* have been disabled.
/* It may make sense to immediately return here if there are enough
* processed events, without passing through epoll_wait() because we
* have exactly done a poll.
* Measures have shown a great performance increase if we call the
* epoll_wait() only the second time after speculative accesses have
* succeeded. This reduces the number of unsucessful calls to
* epoll_wait() by a factor of about 3, and the total number of calls
* by about 2.
*/
if (nbspec) {
if (!last_skipped++) {
/* Measures have shown a great performance increase if
* we call the epoll_wait() only the second time after
* speculative accesses have succeeded. This reduces
* the number of unsucessful calls to epoll_wait() by
* a factor of about 3, and the total number of calls
* by about 2.
*/
if (status >= MIN_RETURN_EVENTS) {
/* We have processed at least MIN_RETURN_EVENTS, it's worth
* returning now without checking epoll_wait().
*/
if (++last_skipped <= 1) {
tv_now(&now);
return;
}
wait_time = 0;
}
last_skipped = 0;
/* now let's wait for events */
if (nbspec || status) {
/* Maybe we have processed some events that we must report, or
* maybe we still have events in the spec list, so we must not
* wait in epoll() otherwise we will delay their delivery by
* the next timeout.
*/
wait_time = 0;
}
/* now let's wait for real events */
status = epoll_wait(epoll_fd, epoll_events, maxfd, wait_time);
tv_now(&now);
for (count = 0; count < status; count++) {