diff --git a/include/proto/connection.h b/include/proto/connection.h index ea6b17b67..c7f25613c 100644 --- a/include/proto/connection.h +++ b/include/proto/connection.h @@ -602,6 +602,8 @@ static inline void cs_init(struct conn_stream *cs, struct connection *conn) cs->flags = CS_FL_NONE; LIST_INIT(&cs->wait_list.list); LIST_INIT(&cs->send_wait_list); + LIST_INIT(&cs->recv_wait_list); + LIST_INIT(&cs->sendrecv_wait_list); cs->conn = conn; cs->wait_list.wait_reason = 0; } @@ -629,6 +631,8 @@ static inline void conn_init(struct connection *conn) conn->proxy_netns = NULL; LIST_INIT(&conn->list); LIST_INIT(&conn->send_wait_list); + LIST_INIT(&conn->recv_wait_list); + LIST_INIT(&conn->sendrecv_wait_list); } /* sets as the connection's owner */ @@ -711,8 +715,19 @@ static inline struct conn_stream *cs_new(struct connection *conn) /* Releases a connection previously allocated by conn_new() */ static inline void conn_free(struct connection *conn) { - LIST_DEL(&conn->send_wait_list); - LIST_INIT(&conn->send_wait_list); + struct wait_list *sw, *sw_back; + list_for_each_entry_safe(sw, sw_back, &conn->recv_wait_list, list) { + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + } + list_for_each_entry_safe(sw, sw_back, &conn->send_wait_list, list) { + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + } + list_for_each_entry_safe(sw, sw_back, &conn->sendrecv_wait_list, list) { + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + } pool_free(pool_head_connection, conn); } diff --git a/include/types/connection.h b/include/types/connection.h index 9a1ba9667..421df3c02 100644 --- a/include/types/connection.h +++ b/include/types/connection.h @@ -375,6 +375,8 @@ struct conn_stream { struct connection *conn; /* xprt-level connection */ struct wait_list wait_list; /* We're in a wait list for send */ struct list send_wait_list; /* list of tasks to wake when we're ready to send */ + struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */ + struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */ void *data; /* pointer to upper layer's entity (eg: stream interface) */ const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */ void *ctx; /* mux-specific context */ @@ -406,6 +408,8 @@ struct connection { /* second cache line */ struct list send_wait_list; /* list of tasks to wake when we're ready to send */ + struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */ + struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */ struct list list; /* attach point to various connection lists (idle, ...) */ int xprt_st; /* transport layer state, initialized to zero */ int tmp_early_data; /* 1st byte of early data, if any */ diff --git a/src/connection.c b/src/connection.c index e303f2c3b..005e0e741 100644 --- a/src/connection.c +++ b/src/connection.c @@ -137,6 +137,15 @@ void conn_fd_handler(int fd) sw->wait_reason &= ~SUB_CAN_SEND; tasklet_wakeup(sw->task); } + while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) { + struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n, + struct wait_list *, list); + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + LIST_ADDQ(&conn->recv_wait_list, &sw->list); + sw->wait_reason &= ~SUB_CAN_SEND; + tasklet_wakeup(sw->task); + } } /* The data transfer starts here and stops on error and handshakes. Note @@ -334,11 +343,34 @@ int conn_subscribe(struct connection *conn, int event_type, void *param) struct wait_list *sw; switch (event_type) { + case SUB_CAN_RECV: + sw = param; + if (!(sw->wait_reason & SUB_CAN_RECV)) { + sw->wait_reason |= SUB_CAN_RECV; + /* If we're already subscribed for send(), move it + * to the send+recv list + */ + if (sw->wait_reason & SUB_CAN_SEND) { + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list); + } else + LIST_ADDQ(&conn->recv_wait_list, &sw->list); + } + return 0; case SUB_CAN_SEND: sw = param; if (!(sw->wait_reason & SUB_CAN_SEND)) { sw->wait_reason |= SUB_CAN_SEND; - LIST_ADDQ(&conn->send_wait_list, &sw->list); + /* If we're already subscribed for recv(), move it + * to the send+recv list + */ + if (sw->wait_reason & SUB_CAN_RECV) { + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list); + } else + LIST_ADDQ(&conn->send_wait_list, &sw->list); } return 0; default: diff --git a/src/mux_h2.c b/src/mux_h2.c index 946288d3e..3c873e9ed 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -121,6 +121,8 @@ struct h2c { struct list fctl_list; /* list of streams blocked by connection's fctl */ struct buffer_wait buf_wait; /* wait list for buffer allocations */ struct list send_wait_list; /* list of tasks to wake when we're ready to send */ + struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */ + struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */ struct wait_list wait_list; /* We're in a wait list, to send */ }; @@ -406,6 +408,8 @@ static int h2c_frt_init(struct connection *conn) task_queue(t); conn_xprt_want_recv(conn); LIST_INIT(&h2c->send_wait_list); + LIST_INIT(&h2c->recv_wait_list); + LIST_INIT(&h2c->sendrecv_wait_list); LIST_INIT(&h2c->wait_list.list); /* mux->wake will be called soon to complete the operation */ @@ -2333,6 +2337,16 @@ static void h2_send(struct h2c *h2c) sw->wait_reason &= ~SUB_CAN_SEND; tasklet_wakeup(sw->task); } + while (!(LIST_ISEMPTY(&h2c->sendrecv_wait_list))) { + struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n, + struct wait_list *, list); + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + LIST_ADDQ(&h2c->recv_wait_list, &sw->list); + sw->wait_reason &= ~SUB_CAN_SEND; + tasklet_wakeup(sw->task); + } + } /* We're done, no more to send */ @@ -3456,14 +3470,37 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param) { struct wait_list *sw; struct h2s *h2s = cs->ctx; + struct h2c *h2c = h2s->h2c; switch (event_type) { + case SUB_CAN_RECV: + sw = param; + if (!(sw->wait_reason & SUB_CAN_RECV)) { + sw->wait_reason |= SUB_CAN_RECV; + /* If we're already subscribed for send(), move it + * to the send+recv list + */ + if (sw->wait_reason & SUB_CAN_SEND) { + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list); + } else + LIST_ADDQ(&h2c->recv_wait_list, &sw->list); + } + return 0; case SUB_CAN_SEND: sw = param; - if (LIST_ISEMPTY(&h2s->list) && - !(sw->wait_reason & SUB_CAN_SEND)) { - LIST_ADDQ(&h2s->h2c->send_wait_list, &sw->list); + if (!(sw->wait_reason & SUB_CAN_SEND)) { sw->wait_reason |= SUB_CAN_SEND; + /* If we're already subscribed for recv(), move it + * to the send+recv list + */ + if (sw->wait_reason & SUB_CAN_RECV) { + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list); + } else + LIST_ADDQ(&h2c->send_wait_list, &sw->list); } return 0; default: diff --git a/src/stream_interface.c b/src/stream_interface.c index 72fec21b6..cfa613a3c 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -752,6 +752,16 @@ wake_others: sw->wait_reason &= ~SUB_CAN_SEND; tasklet_wakeup(sw->task); } + while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) { + struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n, + struct wait_list *, list); + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + LIST_ADDQ(&cs->recv_wait_list, &sw->list); + sw->wait_reason &= ~SUB_CAN_SEND; + tasklet_wakeup(sw->task); + } + } return NULL; }