diff --git a/src/mux_h2.c b/src/mux_h2.c index 759c6a15d..5457bc2c7 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -589,7 +589,10 @@ static void h2_recv(struct connection *conn) int max; if (conn->flags & CO_FL_ERROR) - goto error; + return; + + if (h2c->flags & H2_CF_DEM_BLOCK_ANY) + return; buf = h2_get_dbuf(h2c); if (!buf) { @@ -600,32 +603,28 @@ static void h2_recv(struct connection *conn) /* note: buf->o == 0 */ max = buf->size - buf->i; if (!max) { - /* FIXME: buffer full, add a flag, stop polling and wait */ - __conn_xprt_stop_recv(conn); + h2c->flags |= H2_CF_DEM_DFULL; return; } conn->xprt->rcv_buf(conn, buf, max); if (conn->flags & CO_FL_ERROR) - goto error; + return; - if (!buf->i) + if (!buf->i) { h2_release_dbuf(h2c); - - if (buf->i == buf->size) { - /* buffer now full */ - __conn_xprt_stop_recv(conn); return; } + if (buf->i == buf->size) + h2c->flags |= H2_CF_DEM_DFULL; + /* FIXME: should we try to process streams here instead of doing it in ->wake ? */ - if (conn_xprt_read0_pending(conn)) - __conn_xprt_stop_recv(conn); + /* after streams have been processed, we should have made some room */ + if (buf->i != buf->size) + h2c->flags &= ~H2_CF_DEM_DFULL; return; - - error: - __conn_xprt_stop_recv(conn); } /* callback called on send event by the connection handler */ @@ -636,22 +635,16 @@ static void h2_send(struct connection *conn) /* FIXME: should we try to process pending streams here instead of doing it in ->wake ? */ if (conn->flags & CO_FL_ERROR) - goto error; + return; if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) { /* a handshake was requested */ return; } - if (!h2c->mbuf->o) { - /* nothing to send */ - goto done; - } - if (conn->flags & CO_FL_SOCK_WR_SH) { /* output closed, nothing to send, clear the buffer to release it */ h2c->mbuf->o = 0; - goto done; } /* pending response data, we need to try to send or subscribe to @@ -667,33 +660,39 @@ static void h2_send(struct connection *conn) * problematic for ACKs. The latter should possibly not be set * for now. */ - conn->xprt->snd_buf(conn, h2c->mbuf, 0); + if (conn->xprt->snd_buf(conn, h2c->mbuf, 0) > 0) + h2c->flags &= ~(H2_CF_MUX_MFULL | H2_CF_DEM_MROOM); if (conn->flags & CO_FL_ERROR) - goto error; - - if (!h2c->mbuf->o) - h2_release_mbuf(h2c); - - if (h2c->mbuf->o) { - /* incomplete send, the snd_buf callback has already updated - * the connection flags. - * - * FIXME: we should arm a send timeout here - */ - __conn_xprt_want_send(conn); return; +} + +/* call the wake up function of all streams attached to the connection */ +static void h2_wake_all_streams(struct h2c *h2c) +{ + struct eb32_node *node; + struct h2s *h2s; + unsigned int flags = 0; + + if (h2c->st0 >= H2_CS_ERROR || h2c->conn->flags & CO_FL_ERROR) + flags |= CS_FL_ERROR; + + if (conn_xprt_read0_pending(h2c->conn)) + flags |= CS_FL_EOS; + + node = eb32_first(&h2c->streams_by_id); + while (node) { + h2s = container_of(node, struct h2s, by_id); + node = eb32_next(node); + if (h2s->cs) { + h2s->cs->flags |= flags; + /* recv is used to force to detect CS_FL_EOS that wake() + * doesn't handle in the stream int code. + */ + h2s->cs->data_cb->recv(h2s->cs); + h2s->cs->data_cb->wake(h2s->cs); + } } - - done: - /* FIXME: release the output buffer when empty or do it in ->wake() ? */ - __conn_xprt_stop_send(conn); - return; - - error: - /* FIXME: report an error somewhere in the mux */ - __conn_xprt_stop_send(conn); - return; } /* callback called on any event by the connection handler. @@ -704,9 +703,48 @@ static int h2_wake(struct connection *conn) { struct h2c *h2c = conn->mux_ctx; - if ((conn->flags & CO_FL_ERROR) && eb_is_empty(&h2c->streams_by_id)) { - h2_release(conn); - return -1; + if (conn->flags & CO_FL_ERROR || h2c->st0 == H2_CS_ERROR2) { + h2_wake_all_streams(h2c); + + if (eb_is_empty(&h2c->streams_by_id)) { + /* no more stream, kill the connection now */ + h2_release(conn); + return -1; + } + else { + /* some streams still there, we need to signal them all and + * wait for their departure. + */ + __conn_xprt_stop_recv(conn); + __conn_xprt_stop_send(conn); + return 0; + } + } + + if (!h2c->dbuf->i) + h2_release_dbuf(h2c); + + /* stop being notified of incoming data if we can't process them */ + if (h2c->st0 >= H2_CS_ERROR || + (h2c->flags & H2_CF_DEM_BLOCK_ANY) || conn_xprt_read0_pending(conn)) { + /* FIXME: we should clear a read timeout here */ + __conn_xprt_stop_recv(conn); + } + else { + /* FIXME: we should (re-)arm a read timeout here */ + __conn_xprt_want_recv(conn); + } + + /* adjust output polling */ + if ((h2c->st0 == H2_CS_ERROR || h2c->mbuf->o) && + !(conn->flags & CO_FL_SOCK_WR_SH)) { + /* FIXME: we should (re-)arm a send timeout here */ + __conn_xprt_want_send(conn); + } + else { + /* FIXME: we should clear a send timeout here */ + h2_release_mbuf(h2c); + __conn_xprt_stop_send(conn); } return 0;