mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-06 15:17:01 +02:00
MAJOR: remove the stream interface and task management code from sock_*
The socket data layer code must only focus on moving data between a socket and a buffer. We need a special stream interface handler to update the stream interface and the file descriptor status. At the moment the code works but suffers from a race condition caused by its API : the read/write callbacks still make use of the fd instead of using the connection. And when a double shutdown is performed, a call to ->write() after ->read() processed an error results in dereferencing a NULL fdtab[]->owner. This is only a temporary issue which doesn't need to be fixed now since this will automatically go away when the functions change to use the connection instead.
This commit is contained in:
parent
076be25ab8
commit
fd31e53139
@ -34,6 +34,7 @@ int stream_int_check_timeouts(struct stream_interface *si);
|
||||
void stream_int_report_error(struct stream_interface *si);
|
||||
void stream_int_retnclose(struct stream_interface *si, const struct chunk *msg);
|
||||
int conn_si_send_proxy(struct connection *conn, unsigned int flag);
|
||||
void stream_sock_update_conn(struct connection *conn);
|
||||
|
||||
extern struct sock_ops stream_int_embedded;
|
||||
extern struct sock_ops stream_int_task;
|
||||
|
@ -38,6 +38,7 @@ enum {
|
||||
CO_FL_WAIT_L4_CONN = 0x00000002, /* waiting for L4 to be connected */
|
||||
/* flags below are used for connection handshakes */
|
||||
CO_FL_SI_SEND_PROXY = 0x00000004, /* send a valid PROXY protocol header */
|
||||
CO_FL_NOTIFY_SI = 0x00000008, /* notify stream interface about changes */
|
||||
};
|
||||
|
||||
/* This structure describes a connection with its methods and data.
|
||||
|
@ -60,6 +60,9 @@ int conn_fd_handler(int fd)
|
||||
}
|
||||
|
||||
leave:
|
||||
if (conn->flags & CO_FL_NOTIFY_SI)
|
||||
stream_sock_update_conn(conn);
|
||||
|
||||
/* remove the events before leaving */
|
||||
fdtab[fd].ev &= ~(FD_POLL_IN | FD_POLL_OUT | FD_POLL_HUP | FD_POLL_ERR);
|
||||
return ret;
|
||||
|
@ -467,6 +467,7 @@ int tcp_connect_server(struct stream_interface *si)
|
||||
fdtab[fd].owner = &si->conn;
|
||||
fdtab[fd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY;
|
||||
si->conn.flags = CO_FL_WAIT_L4_CONN; /* connection in progress */
|
||||
si->conn.flags |= CO_FL_NOTIFY_SI; /* we're on a stream_interface */
|
||||
|
||||
/* Prepare to send a few handshakes related to the on-wire protocol. */
|
||||
if (si->send_proxy_ofs)
|
||||
@ -574,11 +575,8 @@ int tcp_connect_probe(int fd)
|
||||
*/
|
||||
conn->flags &= ~CO_FL_WAIT_L4_CONN;
|
||||
si->exp = TICK_ETERNITY;
|
||||
return si_data(si)->write(fd);
|
||||
|
||||
out_wakeup:
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
|
||||
out_ignore:
|
||||
return retval;
|
||||
|
||||
|
@ -87,7 +87,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
|
||||
s->term_trace = 0;
|
||||
s->si[0].conn.t.sock.fd = cfd;
|
||||
s->si[0].conn.ctrl = l->proto;
|
||||
s->si[0].conn.flags = CO_FL_NONE;
|
||||
s->si[0].conn.flags = CO_FL_NONE | CO_FL_NOTIFY_SI; /* we're on a stream_interface */
|
||||
s->si[0].addr.from = *addr;
|
||||
s->si[0].conn.peeraddr = (struct sockaddr *)&s->si[0].addr.from;
|
||||
s->si[0].conn.peerlen = sizeof(s->si[0].addr.from);
|
||||
|
@ -240,6 +240,9 @@ static int sock_raw_read(int fd)
|
||||
|
||||
retval = 1;
|
||||
|
||||
if (!conn)
|
||||
goto out_wakeup;
|
||||
|
||||
/* stop immediately on errors. Note that we DON'T want to stop on
|
||||
* POLL_ERR, as the poller might report a write error while there
|
||||
* are still data available in the recv buffer. This typically
|
||||
@ -447,43 +450,6 @@ static int sock_raw_read(int fd)
|
||||
} /* while (1) */
|
||||
|
||||
out_wakeup:
|
||||
/* We might have some data the consumer is waiting for.
|
||||
* We can do fast-forwarding, but we avoid doing this for partial
|
||||
* buffers, because it is very likely that it will be done again
|
||||
* immediately afterwards once the following data is parsed (eg:
|
||||
* HTTP chunking).
|
||||
*/
|
||||
if (b->pipe || /* always try to send spliced data */
|
||||
(b->i == 0 && (b->cons->flags & SI_FL_WAIT_DATA))) {
|
||||
int last_len = b->pipe ? b->pipe->data : 0;
|
||||
|
||||
si_chk_snd(b->cons);
|
||||
|
||||
/* check if the consumer has freed some space */
|
||||
if (!(b->flags & BF_FULL) &&
|
||||
(!last_len || !b->pipe || b->pipe->data < last_len))
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
}
|
||||
|
||||
if (si->flags & SI_FL_WAIT_ROOM) {
|
||||
EV_FD_CLR(fd, DIR_RD);
|
||||
b->rex = TICK_ETERNITY;
|
||||
}
|
||||
else if ((b->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL)
|
||||
b->rex = tick_add_ifset(now_ms, b->rto);
|
||||
|
||||
/* we have to wake up if there is a special event or if we don't have
|
||||
* any more data to forward.
|
||||
*/
|
||||
if ((b->flags & (BF_READ_NULL|BF_READ_ERROR)) ||
|
||||
si->state != SI_ST_EST ||
|
||||
(si->flags & SI_FL_ERR) ||
|
||||
((b->flags & BF_READ_PARTIAL) && (!b->to_forward || b->cons->state != SI_ST_EST)))
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
|
||||
if (b->flags & BF_READ_ACTIVITY)
|
||||
b->flags &= ~BF_READ_DONTWAIT;
|
||||
|
||||
return retval;
|
||||
|
||||
out_shutdown_r:
|
||||
@ -677,6 +643,9 @@ static int sock_raw_write(int fd)
|
||||
#endif
|
||||
|
||||
retval = 1;
|
||||
if (!conn)
|
||||
goto out_wakeup;
|
||||
|
||||
if (conn->flags & CO_FL_ERROR)
|
||||
goto out_error;
|
||||
|
||||
@ -688,58 +657,7 @@ static int sock_raw_write(int fd)
|
||||
if (retval < 0)
|
||||
goto out_error;
|
||||
|
||||
if (b->flags & BF_OUT_EMPTY) {
|
||||
/* the connection is established but we can't write. Either the
|
||||
* buffer is empty, or we just refrain from sending because the
|
||||
* ->o limit was reached. Maybe we just wrote the last
|
||||
* chunk and need to close.
|
||||
*/
|
||||
if (((b->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
|
||||
(si->state == SI_ST_EST)) {
|
||||
sock_raw_shutw(si);
|
||||
goto out_wakeup;
|
||||
}
|
||||
|
||||
if ((b->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_FULL|BF_HIJACK)) == 0)
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
|
||||
EV_FD_CLR(fd, DIR_WR);
|
||||
b->wex = TICK_ETERNITY;
|
||||
}
|
||||
|
||||
if (b->flags & BF_WRITE_ACTIVITY) {
|
||||
/* update timeout if we have written something */
|
||||
if ((b->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
|
||||
b->wex = tick_add_ifset(now_ms, b->wto);
|
||||
|
||||
out_wakeup:
|
||||
if (tick_isset(si->ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
|
||||
/* Note: to prevent the client from expiring read timeouts
|
||||
* during writes, we refresh it. We only do this if the
|
||||
* interface is not configured for "independent streams",
|
||||
* because for some applications it's better not to do this,
|
||||
* for instance when continuously exchanging small amounts
|
||||
* of data which can full the socket buffers long before a
|
||||
* write timeout is detected.
|
||||
*/
|
||||
si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
|
||||
}
|
||||
|
||||
/* the producer might be waiting for more room to store data */
|
||||
if (likely((b->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
|
||||
(b->prod->flags & SI_FL_WAIT_ROOM)))
|
||||
si_chk_rcv(b->prod);
|
||||
|
||||
/* we have to wake up if there is a special event or if we don't have
|
||||
* any more data to forward and it's not planned to send any more.
|
||||
*/
|
||||
if (likely((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
|
||||
((b->flags & BF_OUT_EMPTY) && !b->to_forward) ||
|
||||
si->state != SI_ST_EST ||
|
||||
b->prod->state != SI_ST_EST))
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
|
||||
out_wakeup:
|
||||
return retval;
|
||||
|
||||
out_error:
|
||||
@ -754,7 +672,6 @@ static int sock_raw_write(int fd)
|
||||
fdtab[fd].ev &= ~FD_POLL_STICKY;
|
||||
EV_FD_REM(fd);
|
||||
si->flags |= SI_FL_ERR;
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -33,6 +33,8 @@
|
||||
#include <proto/stream_interface.h>
|
||||
#include <proto/task.h>
|
||||
|
||||
#include <types/pipe.h>
|
||||
|
||||
/* socket functions used when running a stream interface as a task */
|
||||
static void stream_int_update(struct stream_interface *si);
|
||||
static void stream_int_update_embedded(struct stream_interface *si);
|
||||
@ -486,6 +488,95 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
|
||||
return FD_WAIT_WRITE;
|
||||
}
|
||||
|
||||
/* function to be called on stream sockets after all I/O handlers */
|
||||
void stream_sock_update_conn(struct connection *conn)
|
||||
{
|
||||
int fd = conn->t.sock.fd;
|
||||
struct stream_interface *si = container_of(conn, struct stream_interface, conn);
|
||||
|
||||
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
|
||||
__FUNCTION__,
|
||||
si, si->state, si->ib->flags, si->ob->flags);
|
||||
|
||||
/* process consumer side, only once if possible */
|
||||
if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) {
|
||||
if (si->ob->flags & BF_OUT_EMPTY) {
|
||||
if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
|
||||
(si->state == SI_ST_EST))
|
||||
si_shutw(si);
|
||||
EV_FD_CLR(fd, DIR_WR);
|
||||
si->ob->wex = TICK_ETERNITY;
|
||||
}
|
||||
|
||||
if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
|
||||
if (si->ob->flags & BF_WRITE_ACTIVITY) {
|
||||
/* update timeouts if we have written something */
|
||||
if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
|
||||
if (tick_isset(si->ob->wex))
|
||||
si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
|
||||
|
||||
if (!(si->flags & SI_FL_INDEP_STR))
|
||||
if (tick_isset(si->ib->rex))
|
||||
si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
|
||||
|
||||
if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
|
||||
(si->ob->prod->flags & SI_FL_WAIT_ROOM)))
|
||||
si_chk_rcv(si->ob->prod);
|
||||
}
|
||||
}
|
||||
|
||||
/* process producer side, only once if possible */
|
||||
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) {
|
||||
/* We might have some data the consumer is waiting for.
|
||||
* We can do fast-forwarding, but we avoid doing this for partial
|
||||
* buffers, because it is very likely that it will be done again
|
||||
* immediately afterwards once the following data is parsed (eg:
|
||||
* HTTP chunking).
|
||||
*/
|
||||
if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
|
||||
(si->ib->pipe /* always try to send spliced data */ ||
|
||||
(si->ib->i == 0 && (si->ib->cons->flags & SI_FL_WAIT_DATA)))) {
|
||||
int last_len = si->ib->pipe ? si->ib->pipe->data : 0;
|
||||
|
||||
si_chk_snd(si->ib->cons);
|
||||
|
||||
/* check if the consumer has freed some space */
|
||||
if (!(si->ib->flags & BF_FULL) &&
|
||||
(!last_len || !si->ib->pipe || si->ib->pipe->data < last_len))
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
}
|
||||
|
||||
if (si->flags & SI_FL_WAIT_ROOM) {
|
||||
EV_FD_CLR(fd, DIR_RD);
|
||||
si->ib->rex = TICK_ETERNITY;
|
||||
}
|
||||
else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
|
||||
if (tick_isset(si->ib->rex))
|
||||
si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
|
||||
}
|
||||
}
|
||||
|
||||
/* wake the task up only when needed */
|
||||
if (/* changes on the production side */
|
||||
(si->ib->flags & (BF_READ_NULL|BF_READ_ERROR)) ||
|
||||
si->state != SI_ST_EST ||
|
||||
(si->flags & SI_FL_ERR) ||
|
||||
((si->ib->flags & BF_READ_PARTIAL) &&
|
||||
(!si->ib->to_forward || si->ib->cons->state != SI_ST_EST)) ||
|
||||
|
||||
/* changes on the consumption side */
|
||||
(si->ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR)) ||
|
||||
((si->ob->flags & BF_WRITE_ACTIVITY) &&
|
||||
((si->ob->flags & BF_SHUTW) ||
|
||||
si->ob->prod->state != SI_ST_EST ||
|
||||
((si->ob->flags & BF_OUT_EMPTY) && !si->ob->to_forward)))) {
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
if (si->ib->flags & BF_READ_ACTIVITY)
|
||||
si->ib->flags &= ~BF_READ_DONTWAIT;
|
||||
}
|
||||
|
||||
/*
|
||||
* Local variables:
|
||||
|
Loading…
Reference in New Issue
Block a user