diff --git a/include/haproxy/stconn-t.h b/include/haproxy/stconn-t.h index 78c77de9e..ce06a5cce 100644 --- a/include/haproxy/stconn-t.h +++ b/include/haproxy/stconn-t.h @@ -33,6 +33,10 @@ enum iobuf_flags { IOBUF_FL_NO_FF = 0x00000001, /* Fast-forwarding is not supported */ IOBUF_FL_NO_SPLICING = 0x00000002, /* Splicing is not supported or unusable for this stream */ IOBUF_FL_FF_BLOCKED = 0x00000004, /* Fast-forwarding is blocked (buffer allocation/full) */ + + IOBUF_FL_INTERIM_FF = 0x00000008, /* Producer side warn it will immediately retry a fast-forward. + * .done_fastfwd() on consumer side must take care of this flag + */ }; struct iobuf { diff --git a/src/mux_h1.c b/src/mux_h1.c index 7ee61c393..b022c0d01 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -4588,6 +4588,9 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) goto out; } + retry: + ret = 0; + if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN)) && count > h1m->curr_len) count = h1m->curr_len; @@ -4603,7 +4606,7 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) /* Fast forwading is not supported by the consumer */ h1c->flags = (h1c->flags & ~H1C_F_WANT_FASTFWD) | H1C_F_CANT_FASTFWD; TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", H1_EV_STRM_RECV, h1c->conn, h1s); - goto end; + goto out; } if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) { se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); @@ -4626,6 +4629,7 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) goto end; } total += ret; + count -= ret; if (!ret) { TRACE_STATE("failed to receive data, subscribing", H1_EV_STRM_RECV, h1c->conn); h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); @@ -4642,16 +4646,30 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) } b_sub(sdo->iobuf.buf, sdo->iobuf.offset); total += ret; + count -= ret; sdo->iobuf.data += ret; } + /* Till now, we forwarded less than a buffer, we can immediately retry + * to fast-forward more data. Instruct the consumer it is an interim + * fast-forward. It is of course only possible if there is still data to + * fast-forward (count > 0), if the previous attempt was a full success + * (0 > ret == try) and if we are not splicing (iobuf.buf != NULL). + */ + if (ret > 0 && ret == try && count && sdo->iobuf.buf && total < b_size(sdo->iobuf.buf)) { + sdo->iobuf.flags |= IOBUF_FL_INTERIM_FF; + se_done_ff(sdo); + goto retry; + } + + out: if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN))) { if (total > h1m->curr_len) { h1s->flags |= H1S_F_PARSING_ERROR; se_fl_set(h1s->sd, SE_FL_ERROR); TRACE_ERROR("too much payload, more than announced", H1_EV_STRM_RECV|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s); - goto out; + goto end; } h1m->curr_len -= total; if (!h1m->curr_len) { @@ -4677,17 +4695,6 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) } } - HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, total); - ret = total; - se_done_ff(sdo); - - if (sdo->iobuf.pipe) { - se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); - } - - TRACE_DEVEL("Data fast-forwarded", H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){ret}); - - out: if (conn_xprt_read0_pending(h1c->conn)) { se_fl_set(h1s->sd, SE_FL_EOS); TRACE_STATE("report EOS to SE", H1_EV_STRM_RECV, h1c->conn, h1s); @@ -4711,7 +4718,19 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s); } + + sdo->iobuf.flags &= ~IOBUF_FL_INTERIM_FF; + se_done_ff(sdo); + + ret = total; + HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, total); + + if (sdo->iobuf.pipe) { + se_fl_set(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + } + end: + if (!(h1c->flags & H1C_F_WANT_FASTFWD)) { TRACE_STATE("notify the mux can't use fast-forward anymore", H1_EV_STRM_RECV, h1c->conn, h1s); se_fl_clr(h1s->sd, SE_FL_MAY_FASTFWD); diff --git a/src/mux_h2.c b/src/mux_h2.c index 98b796b81..ab5e3cf60 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -6912,6 +6912,9 @@ static size_t h2_nego_ff(struct stconn *sc, struct buffer *input, size_t count, /* If we were not just woken because we wanted to send but couldn't, * and there's somebody else that is waiting to send, do nothing, * we will subscribe later and be put at the end of the list + * + * WARNING: h2_done_ff() is responsible to remove H2_SF_NOTIFIED flags + * depending on iobuf flags. */ if (!(h2s->flags & H2_SF_NOTIFIED) && (!LIST_ISEMPTY(&h2c->send_list) || !LIST_ISEMPTY(&h2c->fctl_list))) { @@ -6924,7 +6927,6 @@ static size_t h2_nego_ff(struct stconn *sc, struct buffer *input, size_t count, h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; goto end; } - h2s->flags &= ~H2_SF_NOTIFIED; if (h2s_mws(h2s) <= 0) { h2s->flags |= H2_SF_BLK_SFCTL; @@ -7081,6 +7083,9 @@ static size_t h2_done_ff(struct stconn *sc) sd->iobuf.offset = 0; sd->iobuf.data = 0; + if (!(sd->iobuf.flags & IOBUF_FL_INTERIM_FF)) + h2s->flags &= ~H2_SF_NOTIFIED; + TRACE_LEAVE(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s); return total; }