mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 23:56:57 +02:00
MEDIUM: connection: make use of the new polling functions
Now the connection handler, the handshake callbacks and the I/O callbacks make use of the connection-layer polling functions to enable or disable polling on a file descriptor. Some changes still need to be done to avoid using the FD_WAIT_* constants.
This commit is contained in:
parent
b5e2cbdcc8
commit
f9dabecd03
@ -27,6 +27,7 @@
|
|||||||
#include <common/config.h>
|
#include <common/config.h>
|
||||||
#include <types/session.h>
|
#include <types/session.h>
|
||||||
#include <types/stream_interface.h>
|
#include <types/stream_interface.h>
|
||||||
|
#include <proto/connection.h>
|
||||||
|
|
||||||
|
|
||||||
/* main event functions used to move data between sockets and buffers */
|
/* main event functions used to move data between sockets and buffers */
|
||||||
@ -167,14 +168,14 @@ static inline void si_get_to_addr(struct stream_interface *si)
|
|||||||
static inline void si_shutr(struct stream_interface *si)
|
static inline void si_shutr(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
if (stream_int_shutr(si))
|
if (stream_int_shutr(si))
|
||||||
fd_stop_recv(si_fd(si));
|
conn_data_stop_recv(&si->conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Sends a shutw to the connection using the data layer */
|
/* Sends a shutw to the connection using the data layer */
|
||||||
static inline void si_shutw(struct stream_interface *si)
|
static inline void si_shutw(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
if (stream_int_shutw(si))
|
if (stream_int_shutw(si))
|
||||||
fd_stop_send(si_fd(si));
|
conn_data_stop_send(&si->conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Calls the data state update on the stream interfaace */
|
/* Calls the data state update on the stream interfaace */
|
||||||
|
@ -31,6 +31,12 @@ int conn_fd_handler(int fd)
|
|||||||
goto leave;
|
goto leave;
|
||||||
|
|
||||||
process_handshake:
|
process_handshake:
|
||||||
|
/* The handshake callbacks are called in sequence. If either of them is
|
||||||
|
* missing something, it must enable the required polling at the socket
|
||||||
|
* layer of the connection. Polling state is not guaranteed when entering
|
||||||
|
* these handlers, so any handshake handler which does not complete its
|
||||||
|
* work must explicitly disable events it's not interested in.
|
||||||
|
*/
|
||||||
while (unlikely(conn->flags & CO_FL_HANDSHAKE)) {
|
while (unlikely(conn->flags & CO_FL_HANDSHAKE)) {
|
||||||
if (unlikely(conn->flags & CO_FL_ERROR))
|
if (unlikely(conn->flags & CO_FL_ERROR))
|
||||||
goto leave;
|
goto leave;
|
||||||
@ -40,7 +46,9 @@ int conn_fd_handler(int fd)
|
|||||||
goto leave;
|
goto leave;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* OK now we're in the data phase now */
|
/* Once we're purely in the data phase, we disable handshake polling */
|
||||||
|
if (!(conn->flags & CO_FL_POLL_SOCK))
|
||||||
|
__conn_sock_stop_both(conn);
|
||||||
|
|
||||||
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
|
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
|
||||||
if (!conn->data->read(conn))
|
if (!conn->data->read(conn))
|
||||||
@ -86,6 +94,9 @@ int conn_fd_handler(int fd)
|
|||||||
|
|
||||||
/* remove the events before leaving */
|
/* remove the events before leaving */
|
||||||
fdtab[fd].ev &= ~(FD_POLL_IN | FD_POLL_OUT | FD_POLL_HUP | FD_POLL_ERR);
|
fdtab[fd].ev &= ~(FD_POLL_IN | FD_POLL_OUT | FD_POLL_HUP | FD_POLL_ERR);
|
||||||
|
|
||||||
|
/* commit polling changes */
|
||||||
|
conn_cond_update_polling(conn);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -475,7 +475,9 @@ int tcp_connect_server(struct stream_interface *si)
|
|||||||
|
|
||||||
fdtab[fd].iocb = conn_fd_handler;
|
fdtab[fd].iocb = conn_fd_handler;
|
||||||
fd_insert(fd);
|
fd_insert(fd);
|
||||||
fd_want_send(fd); /* for connect status */
|
conn_sock_want_send(&si->conn); /* for connect status */
|
||||||
|
if (!(si->ob->flags & BF_OUT_EMPTY))
|
||||||
|
conn_data_want_send(&si->conn); /* prepare to send data if any */
|
||||||
|
|
||||||
si->state = SI_ST_CON;
|
si->state = SI_ST_CON;
|
||||||
si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
|
si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
|
||||||
@ -548,8 +550,11 @@ int tcp_connect_probe(struct connection *conn)
|
|||||||
* - connected (EISCONN, 0)
|
* - connected (EISCONN, 0)
|
||||||
*/
|
*/
|
||||||
if ((connect(fd, conn->peeraddr, conn->peerlen) < 0)) {
|
if ((connect(fd, conn->peeraddr, conn->peerlen) < 0)) {
|
||||||
if (errno == EALREADY || errno == EINPROGRESS)
|
if (errno == EALREADY || errno == EINPROGRESS) {
|
||||||
|
conn_sock_stop_recv(conn);
|
||||||
|
conn_sock_poll_send(conn);
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (errno && errno != EISCONN)
|
if (errno && errno != EISCONN)
|
||||||
goto out_error;
|
goto out_error;
|
||||||
@ -570,7 +575,7 @@ int tcp_connect_probe(struct connection *conn)
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
conn->flags |= CO_FL_ERROR;
|
conn->flags |= CO_FL_ERROR;
|
||||||
fd_stop_both(fd);
|
conn_sock_stop_both(conn);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,7 +284,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
|
|||||||
fdtab[cfd].owner = &s->si[0].conn;
|
fdtab[cfd].owner = &s->si[0].conn;
|
||||||
fdtab[cfd].flags = 0;
|
fdtab[cfd].flags = 0;
|
||||||
fdtab[cfd].iocb = conn_fd_handler;
|
fdtab[cfd].iocb = conn_fd_handler;
|
||||||
fd_want_recv(cfd);
|
conn_data_want_recv(&s->si[0].conn);
|
||||||
|
|
||||||
if (p->accept && (ret = p->accept(s)) <= 0) {
|
if (p->accept && (ret = p->accept(s)) <= 0) {
|
||||||
/* Either we had an unrecoverable error (<0) or work is
|
/* Either we had an unrecoverable error (<0) or work is
|
||||||
|
@ -102,7 +102,7 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si)
|
|||||||
* place and ask the consumer to hurry.
|
* place and ask the consumer to hurry.
|
||||||
*/
|
*/
|
||||||
si->flags |= SI_FL_WAIT_ROOM;
|
si->flags |= SI_FL_WAIT_ROOM;
|
||||||
fd_stop_recv(fd);
|
conn_data_stop_recv(&si->conn);
|
||||||
b->rex = TICK_ETERNITY;
|
b->rex = TICK_ETERNITY;
|
||||||
si_chk_snd(b->cons);
|
si_chk_snd(b->cons);
|
||||||
return 1;
|
return 1;
|
||||||
@ -467,7 +467,7 @@ static int sock_raw_read(struct connection *conn)
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
conn->flags |= CO_FL_ERROR;
|
conn->flags |= CO_FL_ERROR;
|
||||||
fd_stop_both(fd);
|
conn_data_stop_both(conn);
|
||||||
retval = 1;
|
retval = 1;
|
||||||
goto out_wakeup;
|
goto out_wakeup;
|
||||||
}
|
}
|
||||||
@ -628,7 +628,6 @@ static int sock_raw_write_loop(struct stream_interface *si, struct buffer *b)
|
|||||||
*/
|
*/
|
||||||
static int sock_raw_write(struct connection *conn)
|
static int sock_raw_write(struct connection *conn)
|
||||||
{
|
{
|
||||||
int fd = conn->t.sock.fd;
|
|
||||||
struct stream_interface *si = container_of(conn, struct stream_interface, conn);
|
struct stream_interface *si = container_of(conn, struct stream_interface, conn);
|
||||||
struct buffer *b = si->ob;
|
struct buffer *b = si->ob;
|
||||||
int retval = 1;
|
int retval = 1;
|
||||||
@ -660,7 +659,7 @@ static int sock_raw_write(struct connection *conn)
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
conn->flags |= CO_FL_ERROR;
|
conn->flags |= CO_FL_ERROR;
|
||||||
fd_stop_both(fd);
|
conn_data_stop_both(conn);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -700,7 +699,7 @@ static void sock_raw_read0(struct stream_interface *si)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* otherwise that's just a normal read shutdown */
|
/* otherwise that's just a normal read shutdown */
|
||||||
fd_stop_recv(si_fd(si));
|
conn_data_stop_recv(&si->conn);
|
||||||
return;
|
return;
|
||||||
|
|
||||||
do_close:
|
do_close:
|
||||||
@ -723,11 +722,10 @@ static void sock_raw_data_finish(struct stream_interface *si)
|
|||||||
{
|
{
|
||||||
struct buffer *ib = si->ib;
|
struct buffer *ib = si->ib;
|
||||||
struct buffer *ob = si->ob;
|
struct buffer *ob = si->ob;
|
||||||
int fd = si_fd(si);
|
|
||||||
|
|
||||||
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibh=%d ibt=%d obh=%d obd=%d si=%d\n",
|
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibh=%d ibt=%d obh=%d obd=%d si=%d\n",
|
||||||
now_ms, __FUNCTION__,
|
now_ms, __FUNCTION__,
|
||||||
fd, fdtab[fd].owner,
|
si_fd(si), fdtab[si_fd(fd)].owner,
|
||||||
ib, ob,
|
ib, ob,
|
||||||
ib->rex, ob->wex,
|
ib->rex, ob->wex,
|
||||||
ib->flags, ob->flags,
|
ib->flags, ob->flags,
|
||||||
@ -741,7 +739,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
|
|||||||
if (!(si->flags & SI_FL_WAIT_ROOM)) {
|
if (!(si->flags & SI_FL_WAIT_ROOM)) {
|
||||||
if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
|
if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
|
||||||
si->flags |= SI_FL_WAIT_ROOM;
|
si->flags |= SI_FL_WAIT_ROOM;
|
||||||
fd_stop_recv(fd);
|
conn_data_stop_recv(&si->conn);
|
||||||
ib->rex = TICK_ETERNITY;
|
ib->rex = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -752,7 +750,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
|
|||||||
* have updated it if there has been a completed I/O.
|
* have updated it if there has been a completed I/O.
|
||||||
*/
|
*/
|
||||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||||
fd_want_recv(fd);
|
conn_data_want_recv(&si->conn);
|
||||||
if (!(ib->flags & (BF_READ_NOEXP|BF_DONT_READ)) && !tick_isset(ib->rex))
|
if (!(ib->flags & (BF_READ_NOEXP|BF_DONT_READ)) && !tick_isset(ib->rex))
|
||||||
ib->rex = tick_add_ifset(now_ms, ib->rto);
|
ib->rex = tick_add_ifset(now_ms, ib->rto);
|
||||||
}
|
}
|
||||||
@ -766,7 +764,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
|
|||||||
if (!(si->flags & SI_FL_WAIT_DATA)) {
|
if (!(si->flags & SI_FL_WAIT_DATA)) {
|
||||||
if ((ob->flags & (BF_FULL|BF_HIJACK|BF_SHUTW_NOW)) == 0)
|
if ((ob->flags & (BF_FULL|BF_HIJACK|BF_SHUTW_NOW)) == 0)
|
||||||
si->flags |= SI_FL_WAIT_DATA;
|
si->flags |= SI_FL_WAIT_DATA;
|
||||||
fd_stop_send(fd);
|
conn_data_stop_send(&si->conn);
|
||||||
ob->wex = TICK_ETERNITY;
|
ob->wex = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -777,7 +775,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
|
|||||||
* have updated it if there has been a completed I/O.
|
* have updated it if there has been a completed I/O.
|
||||||
*/
|
*/
|
||||||
si->flags &= ~SI_FL_WAIT_DATA;
|
si->flags &= ~SI_FL_WAIT_DATA;
|
||||||
fd_want_send(fd);
|
conn_data_want_send(&si->conn);
|
||||||
if (!tick_isset(ob->wex)) {
|
if (!tick_isset(ob->wex)) {
|
||||||
ob->wex = tick_add_ifset(now_ms, ob->wto);
|
ob->wex = tick_add_ifset(now_ms, ob->wto);
|
||||||
if (tick_isset(ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
|
if (tick_isset(ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
|
||||||
@ -818,12 +816,12 @@ static void sock_raw_chk_rcv(struct stream_interface *si)
|
|||||||
/* stop reading */
|
/* stop reading */
|
||||||
if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
|
if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
|
||||||
si->flags |= SI_FL_WAIT_ROOM;
|
si->flags |= SI_FL_WAIT_ROOM;
|
||||||
fd_stop_recv(si_fd(si));
|
conn_data_stop_recv(&si->conn);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
/* (re)start reading */
|
/* (re)start reading */
|
||||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||||
fd_want_recv(si_fd(si));
|
conn_data_want_recv(&si->conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -869,7 +867,7 @@ static void sock_raw_chk_snd(struct stream_interface *si)
|
|||||||
*/
|
*/
|
||||||
si->conn.flags |= CO_FL_ERROR;
|
si->conn.flags |= CO_FL_ERROR;
|
||||||
fdtab[si_fd(si)].ev &= ~FD_POLL_STICKY;
|
fdtab[si_fd(si)].ev &= ~FD_POLL_STICKY;
|
||||||
fd_stop_both(si_fd(si));
|
conn_data_stop_both(&si->conn);
|
||||||
si->flags |= SI_FL_ERR;
|
si->flags |= SI_FL_ERR;
|
||||||
goto out_wakeup;
|
goto out_wakeup;
|
||||||
}
|
}
|
||||||
@ -899,7 +897,7 @@ static void sock_raw_chk_snd(struct stream_interface *si)
|
|||||||
/* Otherwise there are remaining data to be sent in the buffer,
|
/* Otherwise there are remaining data to be sent in the buffer,
|
||||||
* which means we have to poll before doing so.
|
* which means we have to poll before doing so.
|
||||||
*/
|
*/
|
||||||
fd_want_send(si_fd(si));
|
conn_data_want_send(&si->conn);
|
||||||
si->flags &= ~SI_FL_WAIT_DATA;
|
si->flags &= ~SI_FL_WAIT_DATA;
|
||||||
if (!tick_isset(ob->wex))
|
if (!tick_isset(ob->wex))
|
||||||
ob->wex = tick_add_ifset(now_ms, ob->wto);
|
ob->wex = tick_add_ifset(now_ms, ob->wto);
|
||||||
|
@ -550,10 +550,12 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
|
|||||||
|
|
||||||
conn->flags |= CO_FL_ERROR;
|
conn->flags |= CO_FL_ERROR;
|
||||||
fdtab[fd].ev &= ~FD_POLL_STICKY;
|
fdtab[fd].ev &= ~FD_POLL_STICKY;
|
||||||
fd_stop_both(fd);
|
conn_sock_stop_both(conn);
|
||||||
goto out_leave;
|
goto out_leave;
|
||||||
|
|
||||||
out_wait:
|
out_wait:
|
||||||
|
conn_sock_stop_recv(conn);
|
||||||
|
conn_sock_poll_send(conn);
|
||||||
return FD_WAIT_WRITE;
|
return FD_WAIT_WRITE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -582,7 +584,7 @@ void stream_sock_update_conn(struct connection *conn)
|
|||||||
if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
|
if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
|
||||||
(si->state == SI_ST_EST))
|
(si->state == SI_ST_EST))
|
||||||
stream_int_shutw(si);
|
stream_int_shutw(si);
|
||||||
fd_stop_send(fd);
|
conn_data_stop_send(conn);
|
||||||
si->ob->wex = TICK_ETERNITY;
|
si->ob->wex = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -627,7 +629,7 @@ void stream_sock_update_conn(struct connection *conn)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (si->flags & SI_FL_WAIT_ROOM) {
|
if (si->flags & SI_FL_WAIT_ROOM) {
|
||||||
fd_stop_recv(fd);
|
conn_data_stop_recv(conn);
|
||||||
si->ib->rex = TICK_ETERNITY;
|
si->ib->rex = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
|
else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
|
||||||
|
Loading…
Reference in New Issue
Block a user