diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index f4300de57..a98234253 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -104,7 +104,7 @@ struct qcc { #define QC_SF_NONE 0x00000000 #define QC_SF_SIZE_KNOWN 0x00000001 /* last frame received for this stream */ #define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */ -/* unused 0x00000004 */ +#define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */ #define QC_SF_DETACH 0x00000008 /* sc is detached but there is remaining data to send */ /* unused 0x00000010 */ #define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */ diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 425924212..a35dddc9b 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -24,6 +24,8 @@ void qcs_notify_send(struct qcs *qcs); struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs); struct buffer *qcc_get_stream_txbuf(struct qcs *qcs); +int qcc_release_stream_txbuf(struct qcs *qcs); +int qcc_stream_can_send(const struct qcs *qcs); void qcc_reset_stream(struct qcs *qcs, int err); void qcc_send_stream(struct qcs *qcs, int urg, int count); void qcc_abort_stream_read(struct qcs *qcs); diff --git a/src/h3.c b/src/h3.c index 778f5bb69..431922f01 100644 --- a/src/h3.c +++ b/src/h3.c @@ -1707,6 +1707,7 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx) list[hdr].n = ist(""); + start: if (!(res = qcc_get_stream_txbuf(qcs))) { TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); h3c->err = H3_INTERNAL_ERROR; @@ -1715,9 +1716,12 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx) /* At least 9 bytes to store frame type + length as a varint max size */ if (b_room(res) < 9) { - /* TODO */ TRACE_STATE("not enough room for trailers frame", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); - ABORT_NOW(); + if (qcc_release_stream_txbuf(qcs)) + goto end; + + /* Buffer released, restart processing. */ + goto start; } /* Force buffer realignment as size required to encode headers is unknown. */ @@ -1727,9 +1731,12 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx) headers_buf = b_make(b_peek(res, b_data(res) + 9), b_contig_space(res) - 9, 0, 0); if (qpack_encode_field_section_line(&headers_buf)) { - /* TODO */ TRACE_STATE("not enough room for trailers section line", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); - ABORT_NOW(); + if (qcc_release_stream_txbuf(qcs)) + goto end; + + /* Buffer released, restart processing. */ + goto start; } tail = b_tail(&headers_buf); @@ -1749,9 +1756,12 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx) } if (qpack_encode_header(&headers_buf, list[hdr].n, list[hdr].v)) { - /* TODO */ TRACE_STATE("not enough room for all trailers", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); - ABORT_NOW(); + if (qcc_release_stream_txbuf(qcs)) + goto end; + + /* Buffer released, restart processing. */ + goto start; } } @@ -1871,10 +1881,17 @@ static int h3_resp_data_send(struct qcs *qcs, struct htx *htx, /* TODO buffer can be realign only if no data waiting for ACK. */ outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0); + /* Not enough room for headers and at least one data byte, try to + * release the current buffer and allocate a new one. If not possible, + * stconn layer will subscribe on SEND. + */ if (b_size(&outbuf) <= hsize) { - /* TODO */ TRACE_STATE("not enough room for data frame", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs); - ABORT_NOW(); + if (qcc_release_stream_txbuf(qcs)) + goto end; + + /* Buffer released, restart processing. */ + goto new_frame; } if (b_size(&outbuf) < hsize + fsize) @@ -1925,7 +1942,8 @@ static size_t h3_snd_buf(struct qcs *qcs, struct buffer *buf, size_t count) htx = htx_from_buf(buf); - while (count && !htx_is_empty(htx) && !h3c->err) { + while (count && !htx_is_empty(htx) && qcc_stream_can_send(qcs) && + !h3c->err) { idx = htx_get_head(htx); blk = htx_get_blk(htx, idx); @@ -2028,6 +2046,7 @@ static size_t h3_nego_ff(struct qcs *qcs, size_t count) TRACE_ENTER(H3_EV_STRM_SEND, qcs->qcc->conn, qcs); + start: if (!(res = qcc_get_stream_txbuf(qcs))) { qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF; goto end; @@ -2042,9 +2061,13 @@ static size_t h3_nego_ff(struct qcs *qcs, size_t count) * on SEND. */ if (b_contig_space(res) <= hsize) { - /* TODO */ - qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; - goto end; + if (qcc_release_stream_txbuf(qcs)) { + qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + goto end; + } + + /* Buffer released, restart processing. */ + goto start; } /* Cannot forward more than available room in output buffer */ diff --git a/src/hq_interop.c b/src/hq_interop.c index 261db276b..0d0e47f59 100644 --- a/src/hq_interop.c +++ b/src/hq_interop.c @@ -98,7 +98,7 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf, htx = htx_from_buf(buf); - while (count && !htx_is_empty(htx)) { + while (count && !htx_is_empty(htx) && qcc_stream_can_send(qcs)) { /* Not implemented : QUIC on backend side */ idx = htx_get_head(htx); blk = htx_get_blk(htx, idx); @@ -144,8 +144,9 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf, fsize = b_contig_space(res); if (!fsize) { - /* TODO */ - ABORT_NOW(); + /* Release buf and restart parsing if sending still possible. */ + qcc_release_stream_txbuf(qcs); + continue; } b_putblk(res, htx_get_blk_ptr(htx, blk), fsize); @@ -181,6 +182,7 @@ static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count) int ret = 0; struct buffer *res; + start: res = qcc_get_stream_txbuf(qcs); if (!res) { qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; @@ -190,8 +192,12 @@ static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count) } if (!b_room(res)) { - /* TODO */ - ABORT_NOW(); + if (qcc_release_stream_txbuf(qcs)) { + qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + goto end; + } + + goto start; } /* No header required for HTTP/0.9, no need to reserve an offset. */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 0024b5d45..408e20c01 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -962,6 +962,35 @@ static uint64_t qcs_prep_bytes(const struct qcs *qcs) return b_data(out) - diff; } +/* Release the current Tx buffer. This is useful if space left is not + * enough anymore. A new instance can then be allocated to continue sending. + * + * This operation fails if there is not yet sent bytes in the buffer. In this + * case, stream layer should interrupt sending until further notification. + * + * Returns 0 if buffer is released and a new one can be allocated or non-zero + * if there is still remaining data. + */ +int qcc_release_stream_txbuf(struct qcs *qcs) +{ + const uint64_t bytes = qcs_prep_bytes(qcs); + + /* Cannot release buffer if prepared data is not fully sent. */ + if (bytes) { + qcs->flags |= QC_SF_BLK_MROOM; + return 1; + } + + qc_stream_buf_release(qcs->stream); + return 0; +} + +/* Returns true if stream layer can proceed to emission via . */ +int qcc_stream_can_send(const struct qcs *qcs) +{ + return !(qcs->flags & QC_SF_BLK_MROOM); +} + /* Wakes up every streams of which are currently waiting for sending but * are blocked on connection flow control. */ @@ -1726,8 +1755,11 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) QMUX_EV_QCS_SEND, qcc->conn, qcs); } /* Release buffer if everything sent and buf is full or stream is waiting for room. */ - if (!qcs_prep_bytes(qcs) && (b_full(&qcs->stream->buf->buf))) { + if (!qcs_prep_bytes(qcs) && + (b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) { qc_stream_buf_release(qcs->stream); + qcs->flags &= ~QC_SF_BLK_MROOM; + qcs_notify_send(qcs); } /* Add measurement for send rate. This is done at the MUX layer