MEDIUM: mux-quic: implement QMux receive

This patch implements a new function qcc_qstrm_recv() dedicated to the
new QMux protocol. It is responsible to perform data reception via
rcv_buf() callback. This is defined in a new mux_quic_strm module.

Read data are parsed in frames. Each frame is handled via standard
mux-quic functions. Currently, only STREAM and RESET_STREAM types are
implemented.

One major difference between QUIC and QMux is that mux-quic is passive
on the reception side on the former protocol. For the new one, mux-quic
becomes active. Thus, a new call to qcc_qstrm_recv() is performed via
qcc_io_recv().
This commit is contained in:
Amaury Denoyelle 2026-03-27 14:39:34 +01:00
parent f16c851625
commit 6ae22a50e5
4 changed files with 127 additions and 1 deletions

View File

@ -670,7 +670,7 @@ OPTIONS_OBJS += src/mux_quic.o src/h3.o src/quic_rx.o src/quic_tx.o \
src/quic_cc_nocc.o src/quic_cc.o src/quic_pacing.o \
src/h3_stats.o src/quic_stats.o src/qpack-enc.o \
src/qpack-tbl.o src/quic_cc_drs.o src/quic_fctl.o \
src/quic_enc.o
src/quic_enc.o src/mux_quic_qstrm.o
endif
ifneq ($(USE_QUIC_OPENSSL_COMPAT:0=),)

View File

@ -0,0 +1,8 @@
#ifndef _HAPROXY_MUX_QUIC_QSTRM_H
#define _HAPROXY_MUX_QUIC_QSTRM_H
#include <haproxy/mux_quic.h>
int qcc_qstrm_recv(struct qcc *qcc);
#endif /* _HAPROXY_MUX_QUIC_QSTRM_H */

View File

@ -10,6 +10,7 @@
#include <haproxy/global-t.h>
#include <haproxy/h3.h>
#include <haproxy/list.h>
#include <haproxy/mux_quic_qstrm.h>
#include <haproxy/ncbuf.h>
#include <haproxy/pool.h>
#include <haproxy/proxy.h>
@ -3182,6 +3183,11 @@ static int qcc_io_recv(struct qcc *qcc)
if ((qcc->flags & QC_CF_WAIT_HS) && !(qcc->wait_event.events & SUB_RETRY_RECV))
qcc_wait_for_hs(qcc);
if (!conn_is_quic(qcc->conn)) {
if (!(qcc->wait_event.events & SUB_RETRY_RECV))
qcc_qstrm_recv(qcc);
}
while (!LIST_ISEMPTY(&qcc->recv_list)) {
qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv);
/* No need to add an uni local stream in recv_list. */

112
src/mux_quic_qstrm.c Normal file
View File

@ -0,0 +1,112 @@
#include <haproxy/mux_quic_qstrm.h>
#include <haproxy/api.h>
#include <haproxy/buf.h>
#include <haproxy/chunk.h>
#include <haproxy/connection.h>
#include <haproxy/mux_quic.h>
#include <haproxy/qmux_trace.h>
#include <haproxy/quic_frame.h>
#include <haproxy/trace.h>
/* Returns true if <frm> type can be used for QMux protocol. */
static int qstrm_is_frm_valid(const struct quic_frame *frm)
{
return frm->type == QUIC_FT_PADDING ||
frm->type == QUIC_FT_RESET_STREAM ||
frm->type == QUIC_FT_STOP_SENDING ||
(frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) ||
frm->type == QUIC_FT_MAX_DATA ||
frm->type == QUIC_FT_MAX_STREAM_DATA ||
frm->type == QUIC_FT_MAX_STREAMS_BIDI ||
frm->type == QUIC_FT_MAX_STREAMS_UNI ||
frm->type == QUIC_FT_DATA_BLOCKED ||
frm->type == QUIC_FT_STREAM_DATA_BLOCKED ||
frm->type == QUIC_FT_STREAMS_BLOCKED_BIDI ||
frm->type == QUIC_FT_STREAMS_BLOCKED_UNI ||
frm->type == QUIC_FT_CONNECTION_CLOSE ||
frm->type == QUIC_FT_CONNECTION_CLOSE_APP;
}
/* Parse the next frame in <buf> and handle it by the MUX layer.
*
* Returns the frame length on success. If frame is truncated, 0 is returned.
* A negative error code is used for fatal failures.
*/
static int qstrm_parse_frm(struct qcc *qcc, struct buffer *buf)
{
struct quic_frame frm;
const unsigned char *pos, *old, *end;
int ret;
old = pos = (unsigned char *)b_head(buf);
end = (unsigned char *)b_head(buf) + b_data(buf);
ret = qc_parse_frm_type(&frm, &pos, end, NULL);
BUG_ON(!ret);
if (!qstrm_is_frm_valid(&frm)) {
/* TODO close connection with FRAME_ENCODING_ERROR */
b_reset(buf);
return -1;
}
ret = qc_parse_frm_payload(&frm, &pos, end, NULL);
BUG_ON(!ret);
if (frm.type >= QUIC_FT_STREAM_8 &&
frm.type <= QUIC_FT_STREAM_F) {
struct qf_stream *strm_frm = &frm.stream;
qcc_recv(qcc, strm_frm->id, strm_frm->len, strm_frm->offset,
(frm.type & QUIC_STREAM_FRAME_TYPE_FIN_BIT), (char *)strm_frm->data);
}
else if (frm.type == QUIC_FT_RESET_STREAM) {
struct qf_reset_stream *rst_frm = &frm.reset_stream;
qcc_recv_reset_stream(qcc, rst_frm->id, rst_frm->app_error_code, rst_frm->final_size);
}
else {
ABORT_NOW();
}
return pos - old;
}
/* Perform data reception for <qcc> connection. Content is parsed as QMux
* frames. These operations are performed in loop until read returns no data.
*
* Returns the total amount of read data or -1 on error.
*/
int qcc_qstrm_recv(struct qcc *qcc)
{
/* TODO add a buffer on the connection for incomplete data read */
struct connection *conn = qcc->conn;
int total = 0, frm_ret;
size_t ret;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
do {
b_reset(&trash);
ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &trash, trash.size, NULL, 0, 0);
BUG_ON(conn->flags & CO_FL_ERROR);
total += ret;
while (b_data(&trash)) {
frm_ret = qstrm_parse_frm(qcc, &trash);
BUG_ON(!frm_ret);
b_del(&trash, frm_ret);
}
} while (ret > 0);
if (!conn_xprt_read0_pending(qcc->conn)) {
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV,
&qcc->wait_event);
}
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
return total;
err:
return -1;
}