diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 61441091a..f05bf9c3c 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -32,7 +32,7 @@ enum qcs_type { #define QC_CF_ERRL 0x00000001 /* fatal error detected locally, connection should be closed soon */ #define QC_CF_ERRL_DONE 0x00000002 /* local error properly handled, connection can be released */ /* unused 0x00000004 */ -#define QC_CF_CONN_FULL 0x00000008 /* no stream buffers available on connection */ +/* unused 0x00000008 */ #define QC_CF_APP_SHUT 0x00000010 /* Application layer shutdown done. */ #define QC_CF_ERR_CONN 0x00000020 /* fatal error reported by transport layer */ @@ -106,7 +106,7 @@ struct qcc { #define QC_SF_NONE 0x00000000 #define QC_SF_SIZE_KNOWN 0x00000001 /* last frame received for this stream */ #define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */ -#define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */ +/* unused 0x00000004 */ #define QC_SF_DETACH 0x00000008 /* sc is detached but there is remaining data to send */ /* unused 0x00000010 */ #define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */ @@ -161,7 +161,6 @@ struct qcs { struct quic_fctl fc; /* stream flow control applied on sending */ uint64_t offset; /* last offset of data ready to be sent */ - struct buffer buf; /* transmit buffer before sending via xprt */ } tx; struct eb64_node by_id; diff --git a/src/h3.c b/src/h3.c index ec8edfea6..778f5bb69 100644 --- a/src/h3.c +++ b/src/h3.c @@ -1715,9 +1715,9 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx) /* At least 9 bytes to store frame type + length as a varint max size */ if (b_room(res) < 9) { + /* TODO */ TRACE_STATE("not enough room for trailers frame", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); - qcs->flags |= QC_SF_BLK_MROOM; - goto end; + ABORT_NOW(); } /* Force buffer realignment as size required to encode headers is unknown. */ @@ -1727,9 +1727,9 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx) headers_buf = b_make(b_peek(res, b_data(res) + 9), b_contig_space(res) - 9, 0, 0); if (qpack_encode_field_section_line(&headers_buf)) { + /* TODO */ TRACE_STATE("not enough room for trailers section line", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); - qcs->flags |= QC_SF_BLK_MROOM; - goto end; + ABORT_NOW(); } tail = b_tail(&headers_buf); @@ -1749,9 +1749,9 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx) } if (qpack_encode_header(&headers_buf, list[hdr].n, list[hdr].v)) { + /* TODO */ TRACE_STATE("not enough room for all trailers", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); - qcs->flags |= QC_SF_BLK_MROOM; - goto end; + ABORT_NOW(); } } @@ -1868,22 +1868,13 @@ static int h3_resp_data_send(struct qcs *qcs, struct htx *htx, if (fsize > count) fsize = count; - while (1) { - b_reset(&outbuf); - outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0); - if (b_size(&outbuf) > hsize || !b_space_wraps(res)) - break; - b_slow_realign(res, trash.area, b_data(res)); - } + /* TODO buffer can be realign only if no data waiting for ACK. */ + outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0); - /* Not enough room for headers and at least one data byte, block the - * stream. It is expected that the stream connector layer will subscribe - * on SEND. - */ if (b_size(&outbuf) <= hsize) { + /* TODO */ TRACE_STATE("not enough room for data frame", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs); - qcs->flags |= QC_SF_BLK_MROOM; - goto end; + ABORT_NOW(); } if (b_size(&outbuf) < hsize + fsize) @@ -1934,8 +1925,7 @@ static size_t h3_snd_buf(struct qcs *qcs, struct buffer *buf, size_t count) htx = htx_from_buf(buf); - while (count && !htx_is_empty(htx) && - !(qcs->flags & QC_SF_BLK_MROOM) && !h3c->err) { + while (count && !htx_is_empty(htx) && !h3c->err) { idx = htx_get_head(htx); blk = htx_get_blk(htx, idx); @@ -2045,18 +2035,14 @@ static size_t h3_nego_ff(struct qcs *qcs, size_t count) /* h3 DATA headers : 1-byte frame type + varint frame length */ hsize = 1 + QUIC_VARINT_MAX_SIZE; - while (1) { - if (b_contig_space(res) >= hsize || !b_space_wraps(res)) - break; - b_slow_realign(res, trash.area, b_data(res)); - } + /* TODO buffer can be realign only if no data waiting for ACK. */ /* Not enough room for headers and at least one data byte, block the * stream. It is expected that the stream connector layer will subscribe * on SEND. */ if (b_contig_space(res) <= hsize) { - qcs->flags |= QC_SF_BLK_MROOM; + /* TODO */ qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; goto end; } diff --git a/src/hq_interop.c b/src/hq_interop.c index 690c13bad..261db276b 100644 --- a/src/hq_interop.c +++ b/src/hq_interop.c @@ -89,19 +89,16 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf, size_t count) { enum htx_blk_type btype; - struct htx *htx; + struct htx *htx = NULL; struct htx_blk *blk; int32_t idx; uint32_t bsize, fsize; - struct buffer *res, outbuf; + struct buffer *res = NULL; size_t total = 0; - res = qcc_get_stream_txbuf(qcs); - outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0); - htx = htx_from_buf(buf); - while (count && !htx_is_empty(htx) && !(qcs->flags & QC_SF_BLK_MROOM)) { + while (count && !htx_is_empty(htx)) { /* Not implemented : QUIC on backend side */ idx = htx_get_head(htx); blk = htx_get_blk(htx, idx); @@ -112,6 +109,12 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf, switch (btype) { case HTX_BLK_DATA: + res = qcc_get_stream_txbuf(qcs); + if (!res) { + /* TODO */ + ABORT_NOW(); + } + if (unlikely(fsize == count && !b_data(res) && htx_nbblks(htx) == 1 && btype == HTX_BLK_DATA)) { @@ -137,15 +140,15 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf, if (fsize > count) fsize = count; - if (b_room(&outbuf) < fsize) - fsize = b_room(&outbuf); + if (b_contig_space(res) < fsize) + fsize = b_contig_space(res); if (!fsize) { - qcs->flags |= QC_SF_BLK_MROOM; - goto end; + /* TODO */ + ABORT_NOW(); } - b_putblk(&outbuf, htx_get_blk_ptr(htx, blk), fsize); + b_putblk(res, htx_get_blk_ptr(htx, blk), fsize); total += fsize; count -= fsize; @@ -168,7 +171,6 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf, } end: - b_add(res, b_data(&outbuf)); htx_to_buf(htx, buf); return total; @@ -176,12 +178,20 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf, static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count) { - struct buffer *res = qcc_get_stream_txbuf(qcs); + int ret = 0; + struct buffer *res; - if (!b_room(res)) { - qcs->flags |= QC_SF_BLK_MROOM; + res = qcc_get_stream_txbuf(qcs); + if (!res) { qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; goto end; + /* TODO */ + ABORT_NOW(); + } + + if (!b_room(res)) { + /* TODO */ + ABORT_NOW(); } /* No header required for HTTP/0.9, no need to reserve an offset. */ @@ -189,8 +199,9 @@ static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count) qcs->sd->iobuf.offset = 0; qcs->sd->iobuf.data = 0; + ret = MIN(count, b_contig_space(res)); end: - return MIN(b_contig_space(res), count); + return ret; } static size_t hq_interop_done_ff(struct qcs *qcs) diff --git a/src/mux_quic.c b/src/mux_quic.c index f52e3c951..06b97226e 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -72,9 +72,8 @@ static void qcs_free(struct qcs *qcs) /* Release qc_stream_desc buffer from quic-conn layer. */ qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real); - /* Free Rx/Tx buffers. */ + /* Free Rx buffer. */ qcs_free_ncbuf(qcs, &qcs->rx.ncbuf); - b_free(&qcs->tx.buf); /* Remove qcs from qcc tree. */ eb64_delete(&qcs->by_id); @@ -138,7 +137,6 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) } qcs->rx.msd_init = qcs->rx.msd; - qcs->tx.buf = BUF_NULL; qcs->tx.offset = 0; qcs->wait_event.tasklet = NULL; @@ -931,7 +929,24 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs) */ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs) { - return b_alloc(&qcs->tx.buf); + struct qcc *qcc = qcs->qcc; + int buf_avail; + struct buffer *out = qc_stream_buf_get(qcs->stream); + + if (!out) { + out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real, + &buf_avail); + if (!out) + goto out; + + if (!b_alloc(out)) { + TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); + goto out; + } + } + + out: + return out; } /* Wakes up every streams of which are currently waiting for sending but @@ -974,6 +989,7 @@ void qcc_reset_stream(struct qcs *qcs, int err) const int soft_blocked = qfctl_sblocked(&qcc->tx.fc); qcc->tx.fc.off_soft -= (qcs->tx.fc.off_soft - qcs->tx.fc.off_real); + if (soft_blocked && !qfctl_sblocked(&qcc->tx.fc)) qcc_notify_fctl(qcc); } @@ -1014,6 +1030,9 @@ void qcc_send_stream(struct qcs *qcs, int urg, int count) if (count) { qfctl_sinc(&qcc->tx.fc, count); qfctl_sinc(&qcs->tx.fc, count); + + qcs->tx.offset += count; + qcs->qcc->tx.offsets += count; } TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -1552,64 +1571,6 @@ static void qcs_destroy(struct qcs *qcs) TRACE_LEAVE(QMUX_EV_QCS_END, conn); } -/* Transfer as much as possible data on from to . - * - * Returns the total bytes of transferred data or a negative error code. - */ -static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in) -{ - struct qcc *qcc = qcs->qcc; - int to_xfer; - int total = 0; - - TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); - - if (!b_alloc(out)) { - TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); - goto err; - } - - /* - * QCS out buffer diagram - * head left to_xfer - * -------------> ----------> -----> - * -------------------------------------------------- - * |...............|xxxxxxxxxxx|<<<<< - * -------------------------------------------------- - * ^ ack-off ^ sent-off ^ off - * - * STREAM frame - * ^ ^ - * |xxxxxxxxxxxxxxxxx| - */ - - BUG_ON_HOT(qcs->tx.fc.off_real < qcs->stream->ack_offset); - BUG_ON_HOT(qcs->tx.offset < qcs->tx.fc.off_real); - BUG_ON_HOT(qcc->tx.offsets < qcc->tx.fc.off_real); - - to_xfer = QUIC_MIN(b_data(in), b_room(out)); - - if (!to_xfer) - goto out; - - total = b_force_xfer(out, in, to_xfer); - - out: - { - struct qcs_xfer_data_trace_arg arg = { - .prep = b_data(out), .xfer = total, - }; - TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_XFER_DATA, - qcc->conn, qcs, &arg); - } - - return total; - - err: - TRACE_DEVEL("leaving on error", QMUX_EV_QCS_SEND, qcc->conn, qcs); - return -1; -} - /* Prepare a STREAM frame for instance using as payload. The frame * is appended in . Set if this is supposed to be the last * stream frame. If is NULL an empty STREAM frame is built : this may be @@ -1720,21 +1681,11 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, return -1; } -/* Check after transferring data from qcs.tx.buf if FIN must be set on the next - * STREAM frame for . - * - * Returns true if FIN must be set else false. - */ -static int qcs_stream_fin(struct qcs *qcs) -{ - return qcs->flags & QC_SF_FIN_STREAM && !b_data(&qcs->tx.buf); -} - /* Return true if has data to send in new STREAM frames. */ static forceinline int qcs_need_sending(struct qcs *qcs) { - return b_data(&qcs->tx.buf) || qcs->tx.fc.off_real < qcs->tx.offset || - qcs_stream_fin(qcs); + return qcs->tx.fc.off_real < qcs->tx.offset || + qcs->flags & QC_SF_FIN_STREAM; } /* This function must be called by the upper layer to inform about the sending @@ -1795,7 +1746,7 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) increment_send_rate(diff, 0); } - if (qcs->tx.offset == qcs->tx.fc.off_real && !b_data(&qcs->tx.buf)) { + if (qcs->tx.offset == qcs->tx.fc.off_real) { /* Remove stream from send_list if all was sent. */ LIST_DEL_INIT(&qcs->el_send); TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -1953,82 +1904,42 @@ static int qcs_send_stop_sending(struct qcs *qcs) return 0; } -/* Used internally by qcc_io_send function. Proceed to send for . This will - * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame - * is then generated and inserted in list. Frame length will be - * truncated if greater than . This allows to prepare several - * frames in a loop while respecting connection flow control window. +/* Used internally by qcc_io_send function. Proceed to send for . A STREAM + * frame is generated poiting to QCS stream descriptor content and inserted in + * list. Frame length will be truncated if greater than . + * This allows to prepare several frames in a loop while respecting connection + * flow control window. * * Returns the payload length of the STREAM frame or a negative error code. */ static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn) { struct qcc *qcc = qcs->qcc; - struct buffer *buf = &qcs->tx.buf; struct buffer *out = qc_stream_buf_get(qcs->stream); - int xfer = 0, flen = 0, buf_avail; - char fin = 0; + int flen = 0; + const char fin = qcs->flags & QC_SF_FIN_STREAM; TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); /* Cannot send STREAM on remote unidirectional streams. */ BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id)); - if (b_data(buf)) { - /* Allocate buffer if not already done. */ - if (!out) { - if (qcc->flags & QC_CF_CONN_FULL) - goto out; + /* This function must not be called if there is nothing to send. */ + BUG_ON(!fin && !qcs_need_sending(qcs)); - out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset, - &buf_avail); - if (!out) { - if (buf_avail) { - TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); - goto err; - } - - TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs); - qcc->flags |= QC_CF_CONN_FULL; - goto out; - } - } - - /* Transfer data from to . */ - xfer = qcs_xfer_data(qcs, out, buf); - if (xfer < 0) - goto err; - - if (xfer > 0) { - qcs_notify_send(qcs); - qcs->flags &= ~QC_SF_BLK_MROOM; - } - - qcs->tx.offset += xfer; - qcc->tx.offsets += xfer; - - /* If out buffer is empty, QCS offsets must be equal. */ - BUG_ON(!b_data(out) && qcs->tx.fc.off_real != qcs->tx.offset); + /* Skip STREAM frame allocation if already subscribed for send. + * Happens on sendto transient error or network congestion. + */ + if (qcc->wait_event.events & SUB_RETRY_SEND) { + TRACE_DEVEL("already subscribed for sending", + QMUX_EV_QCS_SEND, qcc->conn, qcs); + goto err; } - /* FIN is set if all incoming data were transferred. */ - fin = qcs_stream_fin(qcs); - /* Build a new STREAM frame with buffer. */ - if (qcs->tx.fc.off_real != qcs->tx.offset || fin) { - /* Skip STREAM frame allocation if already subscribed for send. - * Happens on sendto transient error or network congestion. - */ - if (qcc->wait_event.events & SUB_RETRY_SEND) { - TRACE_DEVEL("already subscribed for sending", - QMUX_EV_QCS_SEND, qcc->conn, qcs); - goto err; - } - - flen = qcs_build_stream_frm(qcs, out, fin, frms, window_conn); - if (flen < 0) - goto err; - } + flen = qcs_build_stream_frm(qcs, out, fin, frms, window_conn); + if (flen < 0) + goto err; out: TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -2852,7 +2763,8 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, int flags) { struct qcs *qcs = __sc_mux_strm(sc); - const size_t old_data = b_data(&qcs->tx.buf); + struct buffer *out = qc_stream_buf_get(qcs->stream); + const size_t old_data = out ? b_data(out) : 0; size_t ret = 0; char fin; @@ -2895,7 +2807,9 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, } if (ret || fin) { - qcc_send_stream(qcs, 0, b_data(&qcs->tx.buf) - old_data); + const size_t data = b_data(qc_stream_buf_get(qcs->stream)) - (old_data); + if (data || fin) + qcc_send_stream(qcs, 0, data); if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) tasklet_wakeup(qcs->qcc->wait_event.tasklet); } @@ -3007,14 +2921,12 @@ static size_t qmux_strm_done_ff(struct stconn *sc) data += sd->iobuf.offset; total = qcs->qcc->app_ops->done_ff(qcs); - qcc_send_stream(qcs, 0, data); + if (data || qcs->flags & QC_SF_FIN_STREAM) + qcc_send_stream(qcs, 0, data); if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) tasklet_wakeup(qcc->wait_event.tasklet); end: - if (!b_data(&qcs->tx.buf)) - b_free(&qcs->tx.buf); - TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); return total; } diff --git a/src/quic_stream.c b/src/quic_stream.c index a4b984d04..aa8895ae7 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -37,10 +37,7 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream, /* notify MUX about available buffers. */ --qc->stream_buf_count; if (qc->mux_state == QC_MUX_READY) { - if (qc->qcc->flags & QC_CF_CONN_FULL) { - qc->qcc->flags &= ~QC_CF_CONN_FULL; - tasklet_wakeup(qc->qcc->wait_event.tasklet); - } + /* TODO notify MUX for available buffer. */ } } @@ -202,11 +199,7 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing) qc->stream_buf_count -= free_count; if (qc->mux_state == QC_MUX_READY) { - /* notify MUX about available buffers. */ - if (qc->qcc->flags & QC_CF_CONN_FULL) { - qc->qcc->flags &= ~QC_CF_CONN_FULL; - tasklet_wakeup(qc->qcc->wait_event.tasklet); - } + /* TODO notify MUX for available buffer. */ } }