From 6ae22a50e5ebd6c915834e7a15d90766b04db468 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Fri, 27 Mar 2026 14:39:34 +0100 Subject: [PATCH] MEDIUM: mux-quic: implement QMux receive This patch implements a new function qcc_qstrm_recv() dedicated to the new QMux protocol. It is responsible to perform data reception via rcv_buf() callback. This is defined in a new mux_quic_strm module. Read data are parsed in frames. Each frame is handled via standard mux-quic functions. Currently, only STREAM and RESET_STREAM types are implemented. One major difference between QUIC and QMux is that mux-quic is passive on the reception side on the former protocol. For the new one, mux-quic becomes active. Thus, a new call to qcc_qstrm_recv() is performed via qcc_io_recv(). --- Makefile | 2 +- include/haproxy/mux_quic_qstrm.h | 8 +++ src/mux_quic.c | 6 ++ src/mux_quic_qstrm.c | 112 +++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 include/haproxy/mux_quic_qstrm.h create mode 100644 src/mux_quic_qstrm.c diff --git a/Makefile b/Makefile index 064fed3af..44d014784 100644 --- a/Makefile +++ b/Makefile @@ -670,7 +670,7 @@ OPTIONS_OBJS += src/mux_quic.o src/h3.o src/quic_rx.o src/quic_tx.o \ src/quic_cc_nocc.o src/quic_cc.o src/quic_pacing.o \ src/h3_stats.o src/quic_stats.o src/qpack-enc.o \ src/qpack-tbl.o src/quic_cc_drs.o src/quic_fctl.o \ - src/quic_enc.o + src/quic_enc.o src/mux_quic_qstrm.o endif ifneq ($(USE_QUIC_OPENSSL_COMPAT:0=),) diff --git a/include/haproxy/mux_quic_qstrm.h b/include/haproxy/mux_quic_qstrm.h new file mode 100644 index 000000000..3e537d416 --- /dev/null +++ b/include/haproxy/mux_quic_qstrm.h @@ -0,0 +1,8 @@ +#ifndef _HAPROXY_MUX_QUIC_QSTRM_H +#define _HAPROXY_MUX_QUIC_QSTRM_H + +#include + +int qcc_qstrm_recv(struct qcc *qcc); + +#endif /* _HAPROXY_MUX_QUIC_QSTRM_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 2c36d8d9d..f696941ff 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -3182,6 +3183,11 @@ static int qcc_io_recv(struct qcc *qcc) if ((qcc->flags & QC_CF_WAIT_HS) && !(qcc->wait_event.events & SUB_RETRY_RECV)) qcc_wait_for_hs(qcc); + if (!conn_is_quic(qcc->conn)) { + if (!(qcc->wait_event.events & SUB_RETRY_RECV)) + qcc_qstrm_recv(qcc); + } + while (!LIST_ISEMPTY(&qcc->recv_list)) { qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv); /* No need to add an uni local stream in recv_list. */ diff --git a/src/mux_quic_qstrm.c b/src/mux_quic_qstrm.c new file mode 100644 index 000000000..17ab97713 --- /dev/null +++ b/src/mux_quic_qstrm.c @@ -0,0 +1,112 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +/* Returns true if type can be used for QMux protocol. */ +static int qstrm_is_frm_valid(const struct quic_frame *frm) +{ + return frm->type == QUIC_FT_PADDING || + frm->type == QUIC_FT_RESET_STREAM || + frm->type == QUIC_FT_STOP_SENDING || + (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) || + frm->type == QUIC_FT_MAX_DATA || + frm->type == QUIC_FT_MAX_STREAM_DATA || + frm->type == QUIC_FT_MAX_STREAMS_BIDI || + frm->type == QUIC_FT_MAX_STREAMS_UNI || + frm->type == QUIC_FT_DATA_BLOCKED || + frm->type == QUIC_FT_STREAM_DATA_BLOCKED || + frm->type == QUIC_FT_STREAMS_BLOCKED_BIDI || + frm->type == QUIC_FT_STREAMS_BLOCKED_UNI || + frm->type == QUIC_FT_CONNECTION_CLOSE || + frm->type == QUIC_FT_CONNECTION_CLOSE_APP; +} + +/* Parse the next frame in and handle it by the MUX layer. + * + * Returns the frame length on success. If frame is truncated, 0 is returned. + * A negative error code is used for fatal failures. + */ +static int qstrm_parse_frm(struct qcc *qcc, struct buffer *buf) +{ + struct quic_frame frm; + const unsigned char *pos, *old, *end; + int ret; + + old = pos = (unsigned char *)b_head(buf); + end = (unsigned char *)b_head(buf) + b_data(buf); + ret = qc_parse_frm_type(&frm, &pos, end, NULL); + BUG_ON(!ret); + + if (!qstrm_is_frm_valid(&frm)) { + /* TODO close connection with FRAME_ENCODING_ERROR */ + b_reset(buf); + return -1; + } + + ret = qc_parse_frm_payload(&frm, &pos, end, NULL); + BUG_ON(!ret); + + if (frm.type >= QUIC_FT_STREAM_8 && + frm.type <= QUIC_FT_STREAM_F) { + struct qf_stream *strm_frm = &frm.stream; + + qcc_recv(qcc, strm_frm->id, strm_frm->len, strm_frm->offset, + (frm.type & QUIC_STREAM_FRAME_TYPE_FIN_BIT), (char *)strm_frm->data); + } + else if (frm.type == QUIC_FT_RESET_STREAM) { + struct qf_reset_stream *rst_frm = &frm.reset_stream; + qcc_recv_reset_stream(qcc, rst_frm->id, rst_frm->app_error_code, rst_frm->final_size); + } + else { + ABORT_NOW(); + } + + return pos - old; +} + +/* Perform data reception for connection. Content is parsed as QMux + * frames. These operations are performed in loop until read returns no data. + * + * Returns the total amount of read data or -1 on error. + */ +int qcc_qstrm_recv(struct qcc *qcc) +{ + /* TODO add a buffer on the connection for incomplete data read */ + struct connection *conn = qcc->conn; + int total = 0, frm_ret; + size_t ret; + + TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); + + do { + b_reset(&trash); + ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &trash, trash.size, NULL, 0, 0); + BUG_ON(conn->flags & CO_FL_ERROR); + + total += ret; + while (b_data(&trash)) { + frm_ret = qstrm_parse_frm(qcc, &trash); + BUG_ON(!frm_ret); + + b_del(&trash, frm_ret); + } + } while (ret > 0); + + if (!conn_xprt_read0_pending(qcc->conn)) { + conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, + &qcc->wait_event); + } + + TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); + return total; + + err: + return -1; +}