MINOR: xprt_qstrm: implement Rx buffering

Implement buffering for reception on xprt_qstrm layer. This is necessary
to handle reception of a truncated QMux transport parameters frame.

This is performed via a new dedicated <rxbuf> member in xprt_qstrm
context. Read is performed by reusing the buffer until a whole frame can
be read.
This commit is contained in:
Amaury Denoyelle 2026-04-07 15:19:47 +02:00
parent c63e6ecd4b
commit 890831f292

View File

@ -1,6 +1,7 @@
#include <haproxy/api.h>
#include <haproxy/buf.h>
#include <haproxy/connection.h>
#include <haproxy/dynbuf.h>
#include <haproxy/fd.h>
#include <haproxy/global.h>
#include <haproxy/mux_quic.h>
@ -18,6 +19,8 @@ struct xprt_qstrm_ctx {
struct quic_transport_params lparams;
struct quic_transport_params rparams;
struct buffer rxbuf;
};
DECLARE_STATIC_TYPED_POOL(xprt_qstrm_ctx_pool, "xprt_qstrm_ctx", struct xprt_qstrm_ctx);
@ -37,8 +40,9 @@ const struct quic_transport_params *xprt_qstrm_rparams(const void *context)
int conn_recv_qstrm(struct connection *conn, struct xprt_qstrm_ctx *ctx, int flag)
{
struct quic_frame frm;
struct buffer *buf = &ctx->rxbuf;
const unsigned char *pos, *end;
int ret;
size_t ret;
if (!conn_ctrl_ready(conn))
goto fail;
@ -48,26 +52,27 @@ int conn_recv_qstrm(struct connection *conn, struct xprt_qstrm_ctx *ctx, int fla
if (!fd_recv_ready(conn->handle.fd))
goto not_ready;
while (1) {
ret = ctx->ops_lower->rcv_buf(conn, ctx->ctx_lower, &trash, trash.size, NULL, 0, MSG_PEEK);
BUG_ON(conn->flags & CO_FL_ERROR); /* TODO handle fatal errors */
trash.data = ret;
break;
}
if (!b_size(buf) && !b_alloc(buf, DB_MUX_RX))
goto fail;
if (!trash.data)
do {
ret = ctx->ops_lower->rcv_buf(conn, ctx->ctx_lower, buf, b_room(buf), NULL, 0, 0);
BUG_ON(conn->flags & CO_FL_ERROR);
} while (ret);
if (!b_data(buf))
goto not_ready;
pos = (unsigned char *)b_orig(&trash);
end = (unsigned char *)(b_orig(&trash) + b_data(&trash));
ret = qc_parse_frm_type(&frm, &pos, end, NULL);
BUG_ON(!ret); /* TODO handle a truncated frame, recv must be performed again. */
pos = (unsigned char *)b_orig(buf);
end = (unsigned char *)(b_orig(buf) + b_data(buf));
if (!qc_parse_frm_type(&frm, &pos, end, NULL))
goto not_ready;
/* TODO close connection with TRANSPORT_PARAMETER_ERROR if frame not present. */
BUG_ON(frm.type != QUIC_FT_QX_TRANSPORT_PARAMETERS);
ret = qc_parse_frm_payload(&frm, &pos, end, NULL);
BUG_ON(!ret); /* TODO handle a truncated frame, recv must be performed again. */
if (!qc_parse_frm_payload(&frm, &pos, end, NULL))
goto not_ready;
ctx->rparams = frm.qmux_transport_params.params;
@ -149,6 +154,8 @@ struct task *xprt_qstrm_io_cb(struct task *t, void *context, unsigned int state)
conn->xprt = ctx->ops_lower;
conn->mux->wake(conn);
b_free(&ctx->rxbuf);
tasklet_free(ctx->wait_event.tasklet);
pool_free(xprt_qstrm_ctx_pool, ctx);
t = NULL;
@ -195,6 +202,8 @@ static int xprt_qstrm_init(struct connection *conn, void **xprt_ctx)
ctx->ctx_lower = NULL;
ctx->ops_lower = NULL;
ctx->rxbuf = BUF_NULL;
memset(&ctx->rparams, 0, sizeof(struct quic_transport_params));
/* TP configuration advertised by us */