diff --git a/src/mux_h2.c b/src/mux_h2.c index aeea4ecf6..4517b2fe4 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -77,6 +78,7 @@ struct h2c { struct eb_root streams_by_id; /* all active streams by their ID */ struct list send_list; /* list of blocked streams requesting to send */ struct list fctl_list; /* list of streams blocked by connection's fctl */ + struct buffer_wait dbuf_wait; /* wait list for demux buffer allocation */ }; /* H2 stream state, in h2s->st */ @@ -131,6 +133,52 @@ static int h2_settings_initial_window_size = 65535; /* initial value */ static int h2_settings_max_concurrent_streams = 100; +/*****************************************************/ +/* functions below are for dynamic buffer management */ +/*****************************************************/ + +/* re-enables receiving on mux after a buffer was allocated. It returns + * 1 if the allocation succeeds, in which case the connection is woken up, or 0 + * if it's impossible to wake up and we prefer to be woken up later. + */ +static int h2_dbuf_available(void *target) +{ + struct h2c *h2c = target; + + /* take the buffer now as we'll get scheduled waiting for ->wake() */ + if (b_alloc_margin(&h2c->dbuf, 0)) { + conn_xprt_want_recv(h2c->conn); + return 1; + } + return 0; +} + +static inline struct buffer *h2_get_dbuf(struct h2c *h2c) +{ + struct buffer *buf = NULL; + + if (likely(LIST_ISEMPTY(&h2c->dbuf_wait.list)) && + unlikely((buf = b_alloc_margin(&h2c->dbuf, 0)) == NULL)) { + h2c->dbuf_wait.target = h2c->conn; + h2c->dbuf_wait.wakeup_cb = h2_dbuf_available; + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); + LIST_ADDQ(&buffer_wq, &h2c->dbuf_wait.list); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); + __conn_xprt_stop_recv(h2c->conn); + } + return buf; +} + +static inline void h2_release_dbuf(struct h2c *h2c) +{ + if (h2c->dbuf->size) { + b_free(&h2c->dbuf); + offer_buffers(h2c->dbuf_wait.target, + tasks_run_queue + applets_active_queue); + } +} + + /*****************************************************************/ /* functions below are dedicated to the mux setup and management */ /*****************************************************************/ @@ -169,6 +217,7 @@ static int h2c_frt_init(struct connection *conn) h2c->streams_by_id = EB_ROOT_UNIQUE; LIST_INIT(&h2c->send_list); LIST_INIT(&h2c->fctl_list); + LIST_INIT(&h2c->dbuf_wait.list); conn->mux_ctx = h2c; conn_xprt_want_recv(conn); @@ -205,6 +254,10 @@ static void h2_release(struct connection *conn) if (h2c) { hpack_dht_free(h2c->ddht); + h2_release_dbuf(h2c); + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); + LIST_DEL(&h2c->dbuf_wait.list); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); pool_free2(pool2_h2c, h2c); } @@ -227,12 +280,16 @@ static void h2_release(struct connection *conn) static void h2_recv(struct connection *conn) { struct h2c *h2c = conn->mux_ctx; - struct buffer *buf = h2c->dbuf; + struct buffer *buf; int max; if (conn->flags & CO_FL_ERROR) goto error; + buf = h2_get_dbuf(h2c); + if (!buf) + return; + /* note: buf->o == 0 */ max = buf->size - buf->i; if (!max) { @@ -245,6 +302,9 @@ static void h2_recv(struct connection *conn) if (conn->flags & CO_FL_ERROR) goto error; + if (!buf->i) + h2_release_dbuf(h2c); + if (buf->i == buf->size) { /* buffer now full */ __conn_xprt_stop_recv(conn);