MAJOR: fd: replace all EV_FD_* macros with new fd_*_* inline calls

These functions have a more explicity meaning and will offer provisions
for explicit polling.

EV_FD_ISSET() has been left for now as it is still in use in checks.
This commit is contained in:
Willy Tarreau 2012-08-09 12:11:58 +02:00
parent 4a36b56909
commit 49b046dddf
10 changed files with 78 additions and 57 deletions

View File

@ -1,22 +1,22 @@
/* /*
include/proto/fd.h * include/proto/fd.h
File descriptors states. * File descriptors states.
*
Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu * Copyright (C) 2000-2012 Willy Tarreau - w@1wt.eu
*
This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation, version 2.1 * License as published by the Free Software Foundation, version 2.1
exclusively. * exclusively.
*
This library is distributed in the hope that it will be useful, * This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of * but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details. * Lesser General Public License for more details.
*
You should have received a copy of the GNU Lesser General Public * You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software * License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/ */
#ifndef _PROTO_FD_H #ifndef _PROTO_FD_H
@ -70,12 +70,33 @@ int list_pollers(FILE *out);
*/ */
void run_poller(); void run_poller();
#define EV_FD_SET(fd, ev) (cur_poller.set((fd), (ev)))
#define EV_FD_CLR(fd, ev) (cur_poller.clr((fd), (ev)))
#define EV_FD_ISSET(fd, ev) (cur_poller.is_set((fd), (ev))) #define EV_FD_ISSET(fd, ev) (cur_poller.is_set((fd), (ev)))
#define EV_FD_REM(fd) (cur_poller.rem(fd))
#define EV_FD_CLO(fd) (cur_poller.clo(fd))
/* event manipulation primitives for use by I/O callbacks */
static inline void fd_want_recv(int fd)
{
cur_poller.set(fd, DIR_RD);
}
static inline void fd_stop_recv(int fd)
{
cur_poller.clr(fd, DIR_RD);
}
static inline void fd_want_send(int fd)
{
cur_poller.set(fd, DIR_WR);
}
static inline void fd_stop_send(int fd)
{
cur_poller.clr(fd, DIR_WR);
}
static inline void fd_stop_both(int fd)
{
cur_poller.rem(fd);
}
/* Prepares <fd> for being polled */ /* Prepares <fd> for being polled */
static inline void fd_insert(int fd) static inline void fd_insert(int fd)

View File

@ -167,14 +167,14 @@ static inline void si_get_to_addr(struct stream_interface *si)
static inline void si_shutr(struct stream_interface *si) static inline void si_shutr(struct stream_interface *si)
{ {
if (stream_int_shutr(si)) if (stream_int_shutr(si))
EV_FD_CLR(si_fd(si), DIR_RD); fd_stop_recv(si_fd(si));
} }
/* Sends a shutw to the connection using the data layer */ /* Sends a shutw to the connection using the data layer */
static inline void si_shutw(struct stream_interface *si) static inline void si_shutw(struct stream_interface *si)
{ {
if (stream_int_shutw(si)) if (stream_int_shutw(si))
EV_FD_CLR(si_fd(si), DIR_WR); fd_stop_send(si_fd(si));
} }
/* Calls the data state update on the stream interfaace */ /* Calls the data state update on the stream interfaace */

View File

@ -820,7 +820,7 @@ static int event_srv_chk_w(int fd)
t->expire = tick_add_ifset(now_ms, s->proxy->timeout.check); t->expire = tick_add_ifset(now_ms, s->proxy->timeout.check);
task_queue(t); task_queue(t);
} }
EV_FD_SET(fd, DIR_RD); /* prepare for reading reply */ fd_want_recv(fd); /* prepare for reading reply */
goto out_nowake; goto out_nowake;
} }
else if (ret == 0 || errno == EAGAIN) else if (ret == 0 || errno == EAGAIN)
@ -878,7 +878,7 @@ static int event_srv_chk_w(int fd)
out_wakeup: out_wakeup:
task_wakeup(t, TASK_WOKEN_IO); task_wakeup(t, TASK_WOKEN_IO);
out_nowake: out_nowake:
EV_FD_CLR(fd, DIR_WR); /* nothing more to write */ fd_stop_send(fd); /* nothing more to write */
fdtab[fd].ev &= ~FD_POLL_OUT; fdtab[fd].ev &= ~FD_POLL_OUT;
return 1; return 1;
out_poll: out_poll:
@ -1239,7 +1239,7 @@ static int event_srv_chk_r(int fd)
/* Close the connection... */ /* Close the connection... */
shutdown(fd, SHUT_RDWR); shutdown(fd, SHUT_RDWR);
EV_FD_CLR(fd, DIR_RD); fd_stop_recv(fd);
task_wakeup(t, TASK_WOKEN_IO); task_wakeup(t, TASK_WOKEN_IO);
fdtab[fd].ev &= ~FD_POLL_IN; fdtab[fd].ev &= ~FD_POLL_IN;
return 1; return 1;
@ -1484,7 +1484,7 @@ static struct task *process_chk(struct task *t)
fdtab[fd].owner = t; fdtab[fd].owner = t;
fdtab[fd].iocb = &check_iocb; fdtab[fd].iocb = &check_iocb;
fdtab[fd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY; fdtab[fd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY;
EV_FD_SET(fd, DIR_WR); /* for connect status */ fd_want_send(fd); /* for connect status */
#ifdef DEBUG_FULL #ifdef DEBUG_FULL
assert (!EV_FD_ISSET(fd, DIR_RD)); assert (!EV_FD_ISSET(fd, DIR_RD));
#endif #endif

View File

@ -37,7 +37,7 @@ int nbpollers = 0;
*/ */
void fd_delete(int fd) void fd_delete(int fd)
{ {
EV_FD_CLO(fd); cur_poller.clo(fd);
port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port); port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port);
fdinfo[fd].port_range = NULL; fdinfo[fd].port_range = NULL;
close(fd); close(fd);

View File

@ -212,7 +212,7 @@ int frontend_accept(struct session *s)
stream_int_retnclose(&s->si[0], &msg); /* forge a 200 response */ stream_int_retnclose(&s->si[0], &msg); /* forge a 200 response */
s->req->analysers = 0; s->req->analysers = 0;
s->task->expire = s->rep->wex; s->task->expire = s->rep->wex;
EV_FD_CLR(cfd, DIR_RD); fd_stop_recv(cfd);
} }
else if (unlikely(s->fe->mode == PR_MODE_HEALTH)) { /* health check mode, no client reading */ else if (unlikely(s->fe->mode == PR_MODE_HEALTH)) { /* health check mode, no client reading */
struct chunk msg; struct chunk msg;
@ -220,7 +220,7 @@ int frontend_accept(struct session *s)
stream_int_retnclose(&s->si[0], &msg); /* forge an "OK" response */ stream_int_retnclose(&s->si[0], &msg); /* forge an "OK" response */
s->req->analysers = 0; s->req->analysers = 0;
s->task->expire = s->rep->wex; s->task->expire = s->rep->wex;
EV_FD_CLR(cfd, DIR_RD); fd_stop_recv(cfd);
} }
/* everything's OK, let's go on */ /* everything's OK, let's go on */
return 1; return 1;

View File

@ -475,7 +475,7 @@ int tcp_connect_server(struct stream_interface *si)
fdtab[fd].iocb = conn_fd_handler; fdtab[fd].iocb = conn_fd_handler;
fd_insert(fd); fd_insert(fd);
EV_FD_SET(fd, DIR_WR); /* for connect status */ fd_want_send(fd); /* for connect status */
si->state = SI_ST_CON; si->state = SI_ST_CON;
si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */ si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
@ -570,7 +570,7 @@ int tcp_connect_probe(struct connection *conn)
*/ */
conn->flags |= CO_FL_ERROR; conn->flags |= CO_FL_ERROR;
EV_FD_REM(fd); fd_stop_both(fd);
return 1; return 1;
} }

View File

@ -39,7 +39,7 @@ void enable_listener(struct listener *listener)
{ {
if (listener->state == LI_LISTEN) { if (listener->state == LI_LISTEN) {
if (listener->nbconn < listener->maxconn) { if (listener->nbconn < listener->maxconn) {
EV_FD_SET(listener->fd, DIR_RD); fd_want_recv(listener->fd);
listener->state = LI_READY; listener->state = LI_READY;
} else { } else {
listener->state = LI_FULL; listener->state = LI_FULL;
@ -56,7 +56,7 @@ void disable_listener(struct listener *listener)
if (listener->state < LI_READY) if (listener->state < LI_READY)
return; return;
if (listener->state == LI_READY) if (listener->state == LI_READY)
EV_FD_CLR(listener->fd, DIR_RD); fd_stop_recv(listener->fd);
if (listener->state == LI_LIMITED) if (listener->state == LI_LIMITED)
LIST_DEL(&listener->wait_queue); LIST_DEL(&listener->wait_queue);
listener->state = LI_LISTEN; listener->state = LI_LISTEN;
@ -86,7 +86,7 @@ int pause_listener(struct listener *l)
if (l->state == LI_LIMITED) if (l->state == LI_LIMITED)
LIST_DEL(&l->wait_queue); LIST_DEL(&l->wait_queue);
EV_FD_CLR(l->fd, DIR_RD); fd_stop_recv(l->fd);
l->state = LI_PAUSED; l->state = LI_PAUSED;
return 1; return 1;
} }
@ -116,7 +116,7 @@ int resume_listener(struct listener *l)
return 1; return 1;
} }
EV_FD_SET(l->fd, DIR_RD); fd_want_recv(l->fd);
l->state = LI_READY; l->state = LI_READY;
return 1; return 1;
} }
@ -130,7 +130,7 @@ void listener_full(struct listener *l)
if (l->state == LI_LIMITED) if (l->state == LI_LIMITED)
LIST_DEL(&l->wait_queue); LIST_DEL(&l->wait_queue);
EV_FD_CLR(l->fd, DIR_RD); fd_stop_recv(l->fd);
l->state = LI_FULL; l->state = LI_FULL;
} }
} }
@ -142,7 +142,7 @@ void limit_listener(struct listener *l, struct list *list)
{ {
if (l->state == LI_READY) { if (l->state == LI_READY) {
LIST_ADDQ(list, &l->wait_queue); LIST_ADDQ(list, &l->wait_queue);
EV_FD_CLR(l->fd, DIR_RD); fd_stop_recv(l->fd);
l->state = LI_LIMITED; l->state = LI_LIMITED;
} }
} }
@ -198,7 +198,7 @@ void dequeue_all_listeners(struct list *list)
int unbind_listener(struct listener *listener) int unbind_listener(struct listener *listener)
{ {
if (listener->state == LI_READY) if (listener->state == LI_READY)
EV_FD_CLR(listener->fd, DIR_RD); fd_stop_recv(listener->fd);
if (listener->state == LI_LIMITED) if (listener->state == LI_LIMITED)
LIST_DEL(&listener->wait_queue); LIST_DEL(&listener->wait_queue);

View File

@ -284,7 +284,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
fdtab[cfd].owner = &s->si[0].conn; fdtab[cfd].owner = &s->si[0].conn;
fdtab[cfd].flags = 0; fdtab[cfd].flags = 0;
fdtab[cfd].iocb = conn_fd_handler; fdtab[cfd].iocb = conn_fd_handler;
EV_FD_SET(cfd, DIR_RD); fd_want_recv(cfd);
if (p->accept && (ret = p->accept(s)) <= 0) { if (p->accept && (ret = p->accept(s)) <= 0) {
/* Either we had an unrecoverable error (<0) or work is /* Either we had an unrecoverable error (<0) or work is

View File

@ -102,7 +102,7 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si)
* place and ask the consumer to hurry. * place and ask the consumer to hurry.
*/ */
si->flags |= SI_FL_WAIT_ROOM; si->flags |= SI_FL_WAIT_ROOM;
EV_FD_CLR(fd, DIR_RD); fd_stop_recv(fd);
b->rex = TICK_ETERNITY; b->rex = TICK_ETERNITY;
si_chk_snd(b->cons); si_chk_snd(b->cons);
return 1; return 1;
@ -467,7 +467,7 @@ static int sock_raw_read(struct connection *conn)
*/ */
conn->flags |= CO_FL_ERROR; conn->flags |= CO_FL_ERROR;
EV_FD_REM(fd); fd_stop_both(fd);
retval = 1; retval = 1;
goto out_wakeup; goto out_wakeup;
} }
@ -660,7 +660,7 @@ static int sock_raw_write(struct connection *conn)
*/ */
conn->flags |= CO_FL_ERROR; conn->flags |= CO_FL_ERROR;
EV_FD_REM(fd); fd_stop_both(fd);
return 1; return 1;
} }
@ -700,7 +700,7 @@ static void sock_raw_read0(struct stream_interface *si)
} }
/* otherwise that's just a normal read shutdown */ /* otherwise that's just a normal read shutdown */
EV_FD_CLR(si_fd(si), DIR_RD); fd_stop_recv(si_fd(si));
return; return;
do_close: do_close:
@ -741,7 +741,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
if (!(si->flags & SI_FL_WAIT_ROOM)) { if (!(si->flags & SI_FL_WAIT_ROOM)) {
if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL) if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
si->flags |= SI_FL_WAIT_ROOM; si->flags |= SI_FL_WAIT_ROOM;
EV_FD_CLR(fd, DIR_RD); fd_stop_recv(fd);
ib->rex = TICK_ETERNITY; ib->rex = TICK_ETERNITY;
} }
} }
@ -752,7 +752,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
* have updated it if there has been a completed I/O. * have updated it if there has been a completed I/O.
*/ */
si->flags &= ~SI_FL_WAIT_ROOM; si->flags &= ~SI_FL_WAIT_ROOM;
EV_FD_SET(fd, DIR_RD); fd_want_recv(fd);
if (!(ib->flags & (BF_READ_NOEXP|BF_DONT_READ)) && !tick_isset(ib->rex)) if (!(ib->flags & (BF_READ_NOEXP|BF_DONT_READ)) && !tick_isset(ib->rex))
ib->rex = tick_add_ifset(now_ms, ib->rto); ib->rex = tick_add_ifset(now_ms, ib->rto);
} }
@ -766,7 +766,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
if (!(si->flags & SI_FL_WAIT_DATA)) { if (!(si->flags & SI_FL_WAIT_DATA)) {
if ((ob->flags & (BF_FULL|BF_HIJACK|BF_SHUTW_NOW)) == 0) if ((ob->flags & (BF_FULL|BF_HIJACK|BF_SHUTW_NOW)) == 0)
si->flags |= SI_FL_WAIT_DATA; si->flags |= SI_FL_WAIT_DATA;
EV_FD_CLR(fd, DIR_WR); fd_stop_send(fd);
ob->wex = TICK_ETERNITY; ob->wex = TICK_ETERNITY;
} }
} }
@ -777,7 +777,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
* have updated it if there has been a completed I/O. * have updated it if there has been a completed I/O.
*/ */
si->flags &= ~SI_FL_WAIT_DATA; si->flags &= ~SI_FL_WAIT_DATA;
EV_FD_SET(fd, DIR_WR); fd_want_send(fd);
if (!tick_isset(ob->wex)) { if (!tick_isset(ob->wex)) {
ob->wex = tick_add_ifset(now_ms, ob->wto); ob->wex = tick_add_ifset(now_ms, ob->wto);
if (tick_isset(ib->rex) && !(si->flags & SI_FL_INDEP_STR)) { if (tick_isset(ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
@ -818,12 +818,12 @@ static void sock_raw_chk_rcv(struct stream_interface *si)
/* stop reading */ /* stop reading */
if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL) if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
si->flags |= SI_FL_WAIT_ROOM; si->flags |= SI_FL_WAIT_ROOM;
EV_FD_CLR(si_fd(si), DIR_RD); fd_stop_recv(si_fd(si));
} }
else { else {
/* (re)start reading */ /* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM; si->flags &= ~SI_FL_WAIT_ROOM;
EV_FD_SET(si_fd(si), DIR_RD); fd_want_recv(si_fd(si));
} }
} }
@ -869,7 +869,7 @@ static void sock_raw_chk_snd(struct stream_interface *si)
*/ */
si->conn.flags |= CO_FL_ERROR; si->conn.flags |= CO_FL_ERROR;
fdtab[si_fd(si)].ev &= ~FD_POLL_STICKY; fdtab[si_fd(si)].ev &= ~FD_POLL_STICKY;
EV_FD_REM(si_fd(si)); fd_stop_both(si_fd(si));
si->flags |= SI_FL_ERR; si->flags |= SI_FL_ERR;
goto out_wakeup; goto out_wakeup;
} }
@ -899,7 +899,7 @@ static void sock_raw_chk_snd(struct stream_interface *si)
/* Otherwise there are remaining data to be sent in the buffer, /* Otherwise there are remaining data to be sent in the buffer,
* which means we have to poll before doing so. * which means we have to poll before doing so.
*/ */
EV_FD_SET(si_fd(si), DIR_WR); fd_want_send(si_fd(si));
si->flags &= ~SI_FL_WAIT_DATA; si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(ob->wex)) if (!tick_isset(ob->wex))
ob->wex = tick_add_ifset(now_ms, ob->wto); ob->wex = tick_add_ifset(now_ms, ob->wto);

View File

@ -550,7 +550,7 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
conn->flags |= CO_FL_ERROR; conn->flags |= CO_FL_ERROR;
fdtab[fd].ev &= ~FD_POLL_STICKY; fdtab[fd].ev &= ~FD_POLL_STICKY;
EV_FD_REM(fd); fd_stop_both(fd);
goto out_leave; goto out_leave;
out_wait: out_wait:
@ -582,7 +582,7 @@ void stream_sock_update_conn(struct connection *conn)
if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) && if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
(si->state == SI_ST_EST)) (si->state == SI_ST_EST))
stream_int_shutw(si); stream_int_shutw(si);
EV_FD_CLR(fd, DIR_WR); fd_stop_send(fd);
si->ob->wex = TICK_ETERNITY; si->ob->wex = TICK_ETERNITY;
} }
@ -627,7 +627,7 @@ void stream_sock_update_conn(struct connection *conn)
} }
if (si->flags & SI_FL_WAIT_ROOM) { if (si->flags & SI_FL_WAIT_ROOM) {
EV_FD_CLR(fd, DIR_RD); fd_stop_recv(fd);
si->ib->rex = TICK_ETERNITY; si->ib->rex = TICK_ETERNITY;
} }
else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) { else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {