MEDIUM: connections/mux: Add a recv and a send+recv wait list.

For struct connection, struct conn_stream, and for the h2 mux, add 2 new
lists, one that handles waiters for recv, and one that handles waiters for
recv and send. That way we can ask to subscribe for either recv or send.
This commit is contained in:
Olivier Houchard 2018-08-02 19:23:05 +02:00 committed by Willy Tarreau
parent 524344b4e0
commit 4cf7fb148f
5 changed files with 104 additions and 6 deletions

View File

@ -602,6 +602,8 @@ static inline void cs_init(struct conn_stream *cs, struct connection *conn)
cs->flags = CS_FL_NONE;
LIST_INIT(&cs->wait_list.list);
LIST_INIT(&cs->send_wait_list);
LIST_INIT(&cs->recv_wait_list);
LIST_INIT(&cs->sendrecv_wait_list);
cs->conn = conn;
cs->wait_list.wait_reason = 0;
}
@ -629,6 +631,8 @@ static inline void conn_init(struct connection *conn)
conn->proxy_netns = NULL;
LIST_INIT(&conn->list);
LIST_INIT(&conn->send_wait_list);
LIST_INIT(&conn->recv_wait_list);
LIST_INIT(&conn->sendrecv_wait_list);
}
/* sets <owner> as the connection's owner */
@ -711,8 +715,19 @@ static inline struct conn_stream *cs_new(struct connection *conn)
/* Releases a connection previously allocated by conn_new() */
static inline void conn_free(struct connection *conn)
{
LIST_DEL(&conn->send_wait_list);
LIST_INIT(&conn->send_wait_list);
struct wait_list *sw, *sw_back;
list_for_each_entry_safe(sw, sw_back, &conn->recv_wait_list, list) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
}
list_for_each_entry_safe(sw, sw_back, &conn->send_wait_list, list) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
}
list_for_each_entry_safe(sw, sw_back, &conn->sendrecv_wait_list, list) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
}
pool_free(pool_head_connection, conn);
}

View File

@ -375,6 +375,8 @@ struct conn_stream {
struct connection *conn; /* xprt-level connection */
struct wait_list wait_list; /* We're in a wait list for send */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */
struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */
void *data; /* pointer to upper layer's entity (eg: stream interface) */
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
void *ctx; /* mux-specific context */
@ -406,6 +408,8 @@ struct connection {
/* second cache line */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */
struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */
struct list list; /* attach point to various connection lists (idle, ...) */
int xprt_st; /* transport layer state, initialized to zero */
int tmp_early_data; /* 1st byte of early data, if any */

View File

@ -137,6 +137,15 @@ void conn_fd_handler(int fd)
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) {
struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&conn->recv_wait_list, &sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
}
/* The data transfer starts here and stops on error and handshakes. Note
@ -334,11 +343,34 @@ int conn_subscribe(struct connection *conn, int event_type, void *param)
struct wait_list *sw;
switch (event_type) {
case SUB_CAN_RECV:
sw = param;
if (!(sw->wait_reason & SUB_CAN_RECV)) {
sw->wait_reason |= SUB_CAN_RECV;
/* If we're already subscribed for send(), move it
* to the send+recv list
*/
if (sw->wait_reason & SUB_CAN_SEND) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list);
} else
LIST_ADDQ(&conn->recv_wait_list, &sw->list);
}
return 0;
case SUB_CAN_SEND:
sw = param;
if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
LIST_ADDQ(&conn->send_wait_list, &sw->list);
/* If we're already subscribed for recv(), move it
* to the send+recv list
*/
if (sw->wait_reason & SUB_CAN_RECV) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list);
} else
LIST_ADDQ(&conn->send_wait_list, &sw->list);
}
return 0;
default:

View File

@ -121,6 +121,8 @@ struct h2c {
struct list fctl_list; /* list of streams blocked by connection's fctl */
struct buffer_wait buf_wait; /* wait list for buffer allocations */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */
struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */
struct wait_list wait_list; /* We're in a wait list, to send */
};
@ -406,6 +408,8 @@ static int h2c_frt_init(struct connection *conn)
task_queue(t);
conn_xprt_want_recv(conn);
LIST_INIT(&h2c->send_wait_list);
LIST_INIT(&h2c->recv_wait_list);
LIST_INIT(&h2c->sendrecv_wait_list);
LIST_INIT(&h2c->wait_list.list);
/* mux->wake will be called soon to complete the operation */
@ -2333,6 +2337,16 @@ static void h2_send(struct h2c *h2c)
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
while (!(LIST_ISEMPTY(&h2c->sendrecv_wait_list))) {
struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
}
/* We're done, no more to send */
@ -3456,14 +3470,37 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
{
struct wait_list *sw;
struct h2s *h2s = cs->ctx;
struct h2c *h2c = h2s->h2c;
switch (event_type) {
case SUB_CAN_RECV:
sw = param;
if (!(sw->wait_reason & SUB_CAN_RECV)) {
sw->wait_reason |= SUB_CAN_RECV;
/* If we're already subscribed for send(), move it
* to the send+recv list
*/
if (sw->wait_reason & SUB_CAN_SEND) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
} else
LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
}
return 0;
case SUB_CAN_SEND:
sw = param;
if (LIST_ISEMPTY(&h2s->list) &&
!(sw->wait_reason & SUB_CAN_SEND)) {
LIST_ADDQ(&h2s->h2c->send_wait_list, &sw->list);
if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
/* If we're already subscribed for recv(), move it
* to the send+recv list
*/
if (sw->wait_reason & SUB_CAN_RECV) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
} else
LIST_ADDQ(&h2c->send_wait_list, &sw->list);
}
return 0;
default:

View File

@ -752,6 +752,16 @@ wake_others:
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) {
struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
LIST_ADDQ(&cs->recv_wait_list, &sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
}
return NULL;
}