MEDIUM: mux-h1: Revamp the way subscriptions are handled.

Don't always wake the tasklets subscribed to recv or send events as soon as
we had any I/O event, and don't call the wake() method if there were no
subscription, instead, wake the recv tasklet if we received data in h2_recv(),
and wake the send tasklet if we were able to send data in h2_send(), and the
buffer is not full anymore.
Only call the data_cb->wake() method if we get an error/a read 0, just in
case the stream was not subscribed to receive events.
This commit is contained in:
Olivier Houchard 2018-12-03 18:46:09 +01:00 committed by Christopher Faulet
parent c490efd625
commit 75159a96de

View File

@ -1463,14 +1463,16 @@ static int h1_recv(struct h1c *h1c)
{ {
struct connection *conn = h1c->conn; struct connection *conn = h1c->conn;
struct h1s *h1s = h1c->h1s; struct h1s *h1s = h1c->h1s;
size_t ret, max; size_t ret = 0, max;
int rcvd = 0; int rcvd = 0;
if (h1c->wait_event.wait_reason & SUB_CAN_RECV) if (h1c->wait_event.wait_reason & SUB_CAN_RECV)
return 0; return 0;
if (!h1_recv_allowed(h1c)) if (!h1_recv_allowed(h1c)) {
rcvd = 1;
goto end; goto end;
}
if (h1s && (h1s->flags & (H1S_F_BUF_FLUSH|H1S_F_SPLICED_DATA))) { if (h1s && (h1s->flags & (H1S_F_BUF_FLUSH|H1S_F_SPLICED_DATA))) {
rcvd = 1; rcvd = 1;
@ -1482,7 +1484,6 @@ static int h1_recv(struct h1c *h1c)
goto end; goto end;
} }
ret = 0;
max = b_room(&h1c->ibuf); max = b_room(&h1c->ibuf);
if (max) { if (max) {
h1c->flags &= ~H1C_F_IN_FULL; h1c->flags &= ~H1C_F_IN_FULL;
@ -1503,6 +1504,13 @@ static int h1_recv(struct h1c *h1c)
rcvd = 1; rcvd = 1;
end: end:
if ((ret > 0 || (conn->flags & CO_FL_ERROR) ||
conn_xprt_read0_pending(conn)) && h1s && h1s->recv_wait) {
h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(h1s->recv_wait->task);
h1s->recv_wait = NULL;
}
if (!b_data(&h1c->ibuf)) if (!b_data(&h1c->ibuf))
h1_release_buf(h1c, &h1c->ibuf); h1_release_buf(h1c, &h1c->ibuf);
else if (b_full(&h1c->ibuf)) else if (b_full(&h1c->ibuf))
@ -1544,6 +1552,13 @@ static int h1_send(struct h1c *h1c)
} }
end: end:
if (!(h1c->flags & H1C_F_OUT_FULL) && h1c->h1s && h1c->h1s->send_wait) {
struct h1s *h1s = h1c->h1s;
h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(h1s->send_wait->task);
h1s->send_wait = NULL;
}
/* We're done, no more to send */ /* We're done, no more to send */
if (!b_data(&h1c->obuf)) { if (!b_data(&h1c->obuf)) {
h1_release_buf(h1c, &h1c->obuf); h1_release_buf(h1c, &h1c->obuf);
@ -1558,38 +1573,6 @@ static int h1_send(struct h1c *h1c)
} }
static void h1_wake_stream(struct h1c *h1c)
{
struct connection *conn = h1c->conn;
struct h1s *h1s = h1c->h1s;
uint32_t flags = 0;
int dont_wake = 0;
if (!h1s || !h1s->cs)
return;
if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR))
flags |= CS_FL_ERROR;
if (conn_xprt_read0_pending(conn))
flags |= CS_FL_REOS;
h1s->cs->flags |= flags;
if (h1s->recv_wait) {
h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(h1s->recv_wait->task);
h1s->recv_wait = NULL;
dont_wake = 1;
}
if (h1s->send_wait) {
h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(h1s->send_wait->task);
h1s->send_wait = NULL;
dont_wake = 1;
}
if (!dont_wake && h1s->cs->data_cb->wake)
h1s->cs->data_cb->wake(h1s->cs);
}
/* callback called on any event by the connection handler. /* callback called on any event by the connection handler.
* It applies changes and returns zero, or < 0 if it wants immediate * It applies changes and returns zero, or < 0 if it wants immediate
* destruction of the connection. * destruction of the connection.
@ -1623,7 +1606,18 @@ static int h1_process(struct h1c * h1c)
if (b_data(&h1c->ibuf) && h1s->csinfo.t_idle == -1) if (b_data(&h1c->ibuf) && h1s->csinfo.t_idle == -1)
h1s->csinfo.t_idle = tv_ms_elapsed(&h1s->csinfo.tv_create, &now) - h1s->csinfo.t_handshake; h1s->csinfo.t_idle = tv_ms_elapsed(&h1s->csinfo.tv_create, &now) - h1s->csinfo.t_handshake;
h1_wake_stream(h1c); if (!b_data(&h1c->ibuf) && h1s && h1s->cs && h1s->cs->data_cb->wake &&
(conn_xprt_read0_pending(conn) || h1c->flags & H1C_F_CS_ERROR ||
conn->flags & CO_FL_ERROR)) {
int flags = 0;
if (h1c->flags & H1C_F_CS_ERROR || conn->flags & CO_FL_ERROR)
flags |= CS_FL_ERROR;
if (conn_xprt_read0_pending(conn))
flags |= CS_FL_REOS;
h1s->cs->flags |= flags;
h1s->cs->data_cb->wake(h1s->cs);
}
end: end:
return 0; return 0;
@ -1650,9 +1644,17 @@ static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status)
static int h1_wake(struct connection *conn) static int h1_wake(struct connection *conn)
{ {
struct h1c *h1c = conn->mux_ctx; struct h1c *h1c = conn->mux_ctx;
int ret;
h1_send(h1c); h1_send(h1c);
return h1_process(h1c); ret = h1_process(h1c);
if (ret == 0) {
struct h1s *h1s = h1c->h1s;
if (h1s && h1s->cs && h1s->cs->data_cb->wake)
ret = h1s->cs->data_cb->wake(h1s->cs);
}
return ret;
} }
/*******************************************/ /*******************************************/
@ -1921,10 +1923,6 @@ static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
*/ */
if (!b_data(buf)) if (!b_data(buf))
total = count; total = count;
else if (total != count) {
if (!(h1c->wait_event.wait_reason & SUB_CAN_SEND))
cs->conn->xprt->subscribe(cs->conn, SUB_CAN_SEND, &h1c->wait_event);
}
return total; return total;
} }