From f9dabecd03860440ea0efde0db4061b5a7adffdf Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Fri, 17 Aug 2012 17:33:53 +0200 Subject: [PATCH] MEDIUM: connection: make use of the new polling functions Now the connection handler, the handshake callbacks and the I/O callbacks make use of the connection-layer polling functions to enable or disable polling on a file descriptor. Some changes still need to be done to avoid using the FD_WAIT_* constants. --- include/proto/stream_interface.h | 5 +++-- src/connection.c | 13 ++++++++++++- src/proto_tcp.c | 11 ++++++++--- src/session.c | 2 +- src/sock_raw.c | 28 +++++++++++++--------------- src/stream_interface.c | 8 +++++--- 6 files changed, 42 insertions(+), 25 deletions(-) diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 3ddc56dd8..224ce8499 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -27,6 +27,7 @@ #include #include #include +#include /* main event functions used to move data between sockets and buffers */ @@ -167,14 +168,14 @@ static inline void si_get_to_addr(struct stream_interface *si) static inline void si_shutr(struct stream_interface *si) { if (stream_int_shutr(si)) - fd_stop_recv(si_fd(si)); + conn_data_stop_recv(&si->conn); } /* Sends a shutw to the connection using the data layer */ static inline void si_shutw(struct stream_interface *si) { if (stream_int_shutw(si)) - fd_stop_send(si_fd(si)); + conn_data_stop_send(&si->conn); } /* Calls the data state update on the stream interfaace */ diff --git a/src/connection.c b/src/connection.c index 712dfbaa3..a3f38eb84 100644 --- a/src/connection.c +++ b/src/connection.c @@ -31,6 +31,12 @@ int conn_fd_handler(int fd) goto leave; process_handshake: + /* The handshake callbacks are called in sequence. If either of them is + * missing something, it must enable the required polling at the socket + * layer of the connection. Polling state is not guaranteed when entering + * these handlers, so any handshake handler which does not complete its + * work must explicitly disable events it's not interested in. + */ while (unlikely(conn->flags & CO_FL_HANDSHAKE)) { if (unlikely(conn->flags & CO_FL_ERROR)) goto leave; @@ -40,7 +46,9 @@ int conn_fd_handler(int fd) goto leave; } - /* OK now we're in the data phase now */ + /* Once we're purely in the data phase, we disable handshake polling */ + if (!(conn->flags & CO_FL_POLL_SOCK)) + __conn_sock_stop_both(conn); if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) if (!conn->data->read(conn)) @@ -86,6 +94,9 @@ int conn_fd_handler(int fd) /* remove the events before leaving */ fdtab[fd].ev &= ~(FD_POLL_IN | FD_POLL_OUT | FD_POLL_HUP | FD_POLL_ERR); + + /* commit polling changes */ + conn_cond_update_polling(conn); return ret; } diff --git a/src/proto_tcp.c b/src/proto_tcp.c index 7de238a21..2f5679714 100644 --- a/src/proto_tcp.c +++ b/src/proto_tcp.c @@ -475,7 +475,9 @@ int tcp_connect_server(struct stream_interface *si) fdtab[fd].iocb = conn_fd_handler; fd_insert(fd); - fd_want_send(fd); /* for connect status */ + conn_sock_want_send(&si->conn); /* for connect status */ + if (!(si->ob->flags & BF_OUT_EMPTY)) + conn_data_want_send(&si->conn); /* prepare to send data if any */ si->state = SI_ST_CON; si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */ @@ -548,8 +550,11 @@ int tcp_connect_probe(struct connection *conn) * - connected (EISCONN, 0) */ if ((connect(fd, conn->peeraddr, conn->peerlen) < 0)) { - if (errno == EALREADY || errno == EINPROGRESS) + if (errno == EALREADY || errno == EINPROGRESS) { + conn_sock_stop_recv(conn); + conn_sock_poll_send(conn); return 0; + } if (errno && errno != EISCONN) goto out_error; @@ -570,7 +575,7 @@ int tcp_connect_probe(struct connection *conn) */ conn->flags |= CO_FL_ERROR; - fd_stop_both(fd); + conn_sock_stop_both(conn); return 1; } diff --git a/src/session.c b/src/session.c index ffb683ef3..6612f0a43 100644 --- a/src/session.c +++ b/src/session.c @@ -284,7 +284,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) fdtab[cfd].owner = &s->si[0].conn; fdtab[cfd].flags = 0; fdtab[cfd].iocb = conn_fd_handler; - fd_want_recv(cfd); + conn_data_want_recv(&s->si[0].conn); if (p->accept && (ret = p->accept(s)) <= 0) { /* Either we had an unrecoverable error (<0) or work is diff --git a/src/sock_raw.c b/src/sock_raw.c index eb2bfbdcc..c48480a81 100644 --- a/src/sock_raw.c +++ b/src/sock_raw.c @@ -102,7 +102,7 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si) * place and ask the consumer to hurry. */ si->flags |= SI_FL_WAIT_ROOM; - fd_stop_recv(fd); + conn_data_stop_recv(&si->conn); b->rex = TICK_ETERNITY; si_chk_snd(b->cons); return 1; @@ -467,7 +467,7 @@ static int sock_raw_read(struct connection *conn) */ conn->flags |= CO_FL_ERROR; - fd_stop_both(fd); + conn_data_stop_both(conn); retval = 1; goto out_wakeup; } @@ -628,7 +628,6 @@ static int sock_raw_write_loop(struct stream_interface *si, struct buffer *b) */ static int sock_raw_write(struct connection *conn) { - int fd = conn->t.sock.fd; struct stream_interface *si = container_of(conn, struct stream_interface, conn); struct buffer *b = si->ob; int retval = 1; @@ -660,7 +659,7 @@ static int sock_raw_write(struct connection *conn) */ conn->flags |= CO_FL_ERROR; - fd_stop_both(fd); + conn_data_stop_both(conn); return 1; } @@ -700,7 +699,7 @@ static void sock_raw_read0(struct stream_interface *si) } /* otherwise that's just a normal read shutdown */ - fd_stop_recv(si_fd(si)); + conn_data_stop_recv(&si->conn); return; do_close: @@ -723,11 +722,10 @@ static void sock_raw_data_finish(struct stream_interface *si) { struct buffer *ib = si->ib; struct buffer *ob = si->ob; - int fd = si_fd(si); DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibh=%d ibt=%d obh=%d obd=%d si=%d\n", now_ms, __FUNCTION__, - fd, fdtab[fd].owner, + si_fd(si), fdtab[si_fd(fd)].owner, ib, ob, ib->rex, ob->wex, ib->flags, ob->flags, @@ -741,7 +739,7 @@ static void sock_raw_data_finish(struct stream_interface *si) if (!(si->flags & SI_FL_WAIT_ROOM)) { if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL) si->flags |= SI_FL_WAIT_ROOM; - fd_stop_recv(fd); + conn_data_stop_recv(&si->conn); ib->rex = TICK_ETERNITY; } } @@ -752,7 +750,7 @@ static void sock_raw_data_finish(struct stream_interface *si) * have updated it if there has been a completed I/O. */ si->flags &= ~SI_FL_WAIT_ROOM; - fd_want_recv(fd); + conn_data_want_recv(&si->conn); if (!(ib->flags & (BF_READ_NOEXP|BF_DONT_READ)) && !tick_isset(ib->rex)) ib->rex = tick_add_ifset(now_ms, ib->rto); } @@ -766,7 +764,7 @@ static void sock_raw_data_finish(struct stream_interface *si) if (!(si->flags & SI_FL_WAIT_DATA)) { if ((ob->flags & (BF_FULL|BF_HIJACK|BF_SHUTW_NOW)) == 0) si->flags |= SI_FL_WAIT_DATA; - fd_stop_send(fd); + conn_data_stop_send(&si->conn); ob->wex = TICK_ETERNITY; } } @@ -777,7 +775,7 @@ static void sock_raw_data_finish(struct stream_interface *si) * have updated it if there has been a completed I/O. */ si->flags &= ~SI_FL_WAIT_DATA; - fd_want_send(fd); + conn_data_want_send(&si->conn); if (!tick_isset(ob->wex)) { ob->wex = tick_add_ifset(now_ms, ob->wto); if (tick_isset(ib->rex) && !(si->flags & SI_FL_INDEP_STR)) { @@ -818,12 +816,12 @@ static void sock_raw_chk_rcv(struct stream_interface *si) /* stop reading */ if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL) si->flags |= SI_FL_WAIT_ROOM; - fd_stop_recv(si_fd(si)); + conn_data_stop_recv(&si->conn); } else { /* (re)start reading */ si->flags &= ~SI_FL_WAIT_ROOM; - fd_want_recv(si_fd(si)); + conn_data_want_recv(&si->conn); } } @@ -869,7 +867,7 @@ static void sock_raw_chk_snd(struct stream_interface *si) */ si->conn.flags |= CO_FL_ERROR; fdtab[si_fd(si)].ev &= ~FD_POLL_STICKY; - fd_stop_both(si_fd(si)); + conn_data_stop_both(&si->conn); si->flags |= SI_FL_ERR; goto out_wakeup; } @@ -899,7 +897,7 @@ static void sock_raw_chk_snd(struct stream_interface *si) /* Otherwise there are remaining data to be sent in the buffer, * which means we have to poll before doing so. */ - fd_want_send(si_fd(si)); + conn_data_want_send(&si->conn); si->flags &= ~SI_FL_WAIT_DATA; if (!tick_isset(ob->wex)) ob->wex = tick_add_ifset(now_ms, ob->wto); diff --git a/src/stream_interface.c b/src/stream_interface.c index 45707dd2c..c5da3f110 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -550,10 +550,12 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) conn->flags |= CO_FL_ERROR; fdtab[fd].ev &= ~FD_POLL_STICKY; - fd_stop_both(fd); + conn_sock_stop_both(conn); goto out_leave; out_wait: + conn_sock_stop_recv(conn); + conn_sock_poll_send(conn); return FD_WAIT_WRITE; } @@ -582,7 +584,7 @@ void stream_sock_update_conn(struct connection *conn) if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) && (si->state == SI_ST_EST)) stream_int_shutw(si); - fd_stop_send(fd); + conn_data_stop_send(conn); si->ob->wex = TICK_ETERNITY; } @@ -627,7 +629,7 @@ void stream_sock_update_conn(struct connection *conn) } if (si->flags & SI_FL_WAIT_ROOM) { - fd_stop_recv(fd); + conn_data_stop_recv(conn); si->ib->rex = TICK_ETERNITY; } else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {