From e8d9eb4f7ae731ed2eaa47867f6770d78fc3f3ef Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Fri, 27 Mar 2026 14:41:40 +0100 Subject: [PATCH] MEDIUM: mux-quic: implement QMux send This patchs implement mux-quic reception for the new QMux protocol. This is performed via the new function qcc_qstrm_send_frames(). Its interface is similar to the QUIC equivalent : it takes a list of frames and encodes them in a buffer before sending it via snd_buf. Contrary to QUIC, a check on CO_FL_ERROR flag is performed prior to every qcc_qstrm_send_frames() invokation to interrupt emission. This is necessary as the transport layer may set it during snd_buf. This is not the case currently for quic_conn layer, but maybe a similar mechanism should be implemented as well for QUIC in the future. --- include/haproxy/mux_quic_qstrm.h | 2 + src/mux_quic.c | 39 ++++++++++----- src/mux_quic_qstrm.c | 85 ++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 13 deletions(-) diff --git a/include/haproxy/mux_quic_qstrm.h b/include/haproxy/mux_quic_qstrm.h index 3e537d416..6b81f3e65 100644 --- a/include/haproxy/mux_quic_qstrm.h +++ b/include/haproxy/mux_quic_qstrm.h @@ -5,4 +5,6 @@ int qcc_qstrm_recv(struct qcc *qcc); +int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms); + #endif /* _HAPROXY_MUX_QUIC_QSTRM_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 1335dfe58..806d2e420 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -2567,24 +2567,13 @@ static int qcc_subscribe_send(struct qcc *qcc) return 1; } -/* Wrapper for send on transport layer. Send a list of frames for the - * connection . - * - * Returns 0 if all data sent with success. On fatal error, a negative error - * code is returned. A positive 1 is used if emission should be paced. - */ -static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream) +static int qcc_quic_send_frames(struct qcc *qcc, struct list *frms, int stream) { enum quic_tx_err ret; struct quic_pacer *pacer = NULL; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); - if (LIST_ISEMPTY(frms)) { - TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn); - return -1; - } - if (stream && qcc_is_pacing_active(qcc->conn)) pacer = &qcc->tx.pacer; @@ -2612,6 +2601,23 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream) return -1; } +/* Wrapper for send on transport layer. Send a list of frames for the + * connection . + * + * Returns 0 if all data sent with success. On fatal error, a negative error + * code is returned. A positive 1 is used if emission should be paced. + */ +static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream) +{ + if (LIST_ISEMPTY(frms)) { + TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn); + return -1; + } + + return conn_is_quic(qcc->conn) ? qcc_quic_send_frames(qcc, frms, stream) : + qcc_qstrm_send_frames(qcc, frms); +} + /* Emit a RESET_STREAM on . * * Returns 0 if the frame has been successfully sent else non-zero. @@ -3072,6 +3078,12 @@ static int qcc_io_send(struct qcc *qcc) * flow-control limit reached. */ while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { + /* TODO should this check also be performed for QUIC ? */ + if (!conn_is_quic(qcc->conn) && (qcc->conn->flags & CO_FL_ERROR)) { + TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); + goto out; + } + window_conn = qfctl_rcap(&qcc->tx.fc); resent = 0; @@ -3083,7 +3095,8 @@ static int qcc_io_send(struct qcc *qcc) * new qc_stream_desc should be present in send_list as * long as transport layer can handle all data. */ - BUG_ON(qcs->tx.stream->buf && !qfctl_rblocked(&qcs->tx.fc)); + BUG_ON((!conn_is_quic(qcc->conn) || qcs->tx.stream->buf) && + !qfctl_rblocked(&qcs->tx.fc)); /* Total sent bytes must not exceed connection window. */ BUG_ON(resent > window_conn); diff --git a/src/mux_quic_qstrm.c b/src/mux_quic_qstrm.c index 25bd37ea6..12f54c396 100644 --- a/src/mux_quic_qstrm.c +++ b/src/mux_quic_qstrm.c @@ -147,3 +147,88 @@ int qcc_qstrm_recv(struct qcc *qcc) err: return -1; } + +/* Sends list of frames for connection. + * + * Returns 0 if all data are emitted or a positive value if sending should be + * retry later. A negative error code is used for a fatal failure. + */ +int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms) +{ + struct connection *conn = qcc->conn; + struct quic_frame *frm, *frm_old; + struct quic_frame *split_frm, *orig_frm; + unsigned char *pos, *old, *end; + size_t ret; + + TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); + list_for_each_entry_safe(frm, frm_old, frms, list) { + loop: + split_frm = NULL; + b_reset(&trash); + old = pos = (unsigned char *)b_orig(&trash); + end = (unsigned char *)b_wrap(&trash); + + BUG_ON(!frm); + TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QMUX_EV_QCC_SEND, qcc->conn, 0, 0, 0, + "frm type %02llx", (ullong)frm->type); + + if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) { + size_t flen, split_size; + + flen = quic_strm_frm_fillbuf(end - pos, frm, &split_size); + if (!flen) + continue; + + if (split_size) { + split_frm = quic_strm_frm_split(frm, split_size); + if (!split_frm) { + ABORT_NOW(); + continue; + } + + orig_frm = frm; + frm = split_frm; + } + } + + qc_build_frm(frm, &pos, end, NULL); + BUG_ON(pos - old > global.tune.bufsize); + BUG_ON(pos == old); + b_add(&trash, pos - old); + + ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &trash, b_data(&trash), NULL, 0, 0); + if (!ret) { + TRACE_DEVEL("snd_buf interrupted", QMUX_EV_QCC_SEND, qcc->conn); + if (split_frm) + LIST_INSERT(frms, &split_frm->list); + break; + } + + if (ret != b_data(&trash)) { + /* TODO */ + ABORT_NOW(); + } + + if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) + /* TODO notify MUX */ + + LIST_DEL_INIT(&frm->list); + if (split_frm) { + frm = orig_frm; + goto loop; + } + } + + if (conn->flags & CO_FL_ERROR) { + /* TODO */ + //ABORT_NOW(); + } + else if (!LIST_ISEMPTY(frms) && !(qcc->wait_event.events & SUB_RETRY_SEND)) { + conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &qcc->wait_event); + return 1; + } + + TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); + return 0; +}