diff --git a/src/mux_h1.c b/src/mux_h1.c index ff54adc56..6b00320bd 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -1463,14 +1463,16 @@ static int h1_recv(struct h1c *h1c) { struct connection *conn = h1c->conn; struct h1s *h1s = h1c->h1s; - size_t ret, max; + size_t ret = 0, max; int rcvd = 0; if (h1c->wait_event.wait_reason & SUB_CAN_RECV) return 0; - if (!h1_recv_allowed(h1c)) + if (!h1_recv_allowed(h1c)) { + rcvd = 1; goto end; + } if (h1s && (h1s->flags & (H1S_F_BUF_FLUSH|H1S_F_SPLICED_DATA))) { rcvd = 1; @@ -1482,7 +1484,6 @@ static int h1_recv(struct h1c *h1c) goto end; } - ret = 0; max = b_room(&h1c->ibuf); if (max) { h1c->flags &= ~H1C_F_IN_FULL; @@ -1503,6 +1504,13 @@ static int h1_recv(struct h1c *h1c) rcvd = 1; end: + if ((ret > 0 || (conn->flags & CO_FL_ERROR) || + conn_xprt_read0_pending(conn)) && h1s && h1s->recv_wait) { + h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(h1s->recv_wait->task); + h1s->recv_wait = NULL; + + } if (!b_data(&h1c->ibuf)) h1_release_buf(h1c, &h1c->ibuf); else if (b_full(&h1c->ibuf)) @@ -1544,6 +1552,13 @@ static int h1_send(struct h1c *h1c) } end: + if (!(h1c->flags & H1C_F_OUT_FULL) && h1c->h1s && h1c->h1s->send_wait) { + struct h1s *h1s = h1c->h1s; + + h1s->send_wait->wait_reason &= ~SUB_CAN_SEND; + tasklet_wakeup(h1s->send_wait->task); + h1s->send_wait = NULL; + } /* We're done, no more to send */ if (!b_data(&h1c->obuf)) { h1_release_buf(h1c, &h1c->obuf); @@ -1558,38 +1573,6 @@ static int h1_send(struct h1c *h1c) } -static void h1_wake_stream(struct h1c *h1c) -{ - struct connection *conn = h1c->conn; - struct h1s *h1s = h1c->h1s; - uint32_t flags = 0; - int dont_wake = 0; - - if (!h1s || !h1s->cs) - return; - - if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR)) - flags |= CS_FL_ERROR; - if (conn_xprt_read0_pending(conn)) - flags |= CS_FL_REOS; - - h1s->cs->flags |= flags; - if (h1s->recv_wait) { - h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV; - tasklet_wakeup(h1s->recv_wait->task); - h1s->recv_wait = NULL; - dont_wake = 1; - } - if (h1s->send_wait) { - h1s->send_wait->wait_reason &= ~SUB_CAN_SEND; - tasklet_wakeup(h1s->send_wait->task); - h1s->send_wait = NULL; - dont_wake = 1; - } - if (!dont_wake && h1s->cs->data_cb->wake) - h1s->cs->data_cb->wake(h1s->cs); -} - /* callback called on any event by the connection handler. * It applies changes and returns zero, or < 0 if it wants immediate * destruction of the connection. @@ -1623,7 +1606,18 @@ static int h1_process(struct h1c * h1c) if (b_data(&h1c->ibuf) && h1s->csinfo.t_idle == -1) h1s->csinfo.t_idle = tv_ms_elapsed(&h1s->csinfo.tv_create, &now) - h1s->csinfo.t_handshake; - h1_wake_stream(h1c); + if (!b_data(&h1c->ibuf) && h1s && h1s->cs && h1s->cs->data_cb->wake && + (conn_xprt_read0_pending(conn) || h1c->flags & H1C_F_CS_ERROR || + conn->flags & CO_FL_ERROR)) { + int flags = 0; + + if (h1c->flags & H1C_F_CS_ERROR || conn->flags & CO_FL_ERROR) + flags |= CS_FL_ERROR; + if (conn_xprt_read0_pending(conn)) + flags |= CS_FL_REOS; + h1s->cs->flags |= flags; + h1s->cs->data_cb->wake(h1s->cs); + } end: return 0; @@ -1650,9 +1644,17 @@ static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status) static int h1_wake(struct connection *conn) { struct h1c *h1c = conn->mux_ctx; + int ret; h1_send(h1c); - return h1_process(h1c); + ret = h1_process(h1c); + if (ret == 0) { + struct h1s *h1s = h1c->h1s; + + if (h1s && h1s->cs && h1s->cs->data_cb->wake) + ret = h1s->cs->data_cb->wake(h1s->cs); + } + return ret; } /*******************************************/ @@ -1921,10 +1923,6 @@ static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun */ if (!b_data(buf)) total = count; - else if (total != count) { - if (!(h1c->wait_event.wait_reason & SUB_CAN_SEND)) - cs->conn->xprt->subscribe(cs->conn, SUB_CAN_SEND, &h1c->wait_event); - } return total; }