MINOR: mux-quic: merge the common code from QOS to QMUX

This commit is contained in:
Amaury Denoyelle 2025-12-10 15:02:01 +01:00
parent c530030121
commit 670e8b2f9a
4 changed files with 151 additions and 257 deletions

View File

@ -670,7 +670,7 @@ OPTIONS_OBJS += src/mux_quic.o src/h3.o src/quic_rx.o src/quic_tx.o \
src/quic_cc_nocc.o src/quic_cc.o src/quic_pacing.o \
src/h3_stats.o src/quic_stats.o src/qpack-enc.o \
src/qpack-tbl.o src/quic_cc_drs.o src/quic_fctl.o \
src/quic_enc.o
src/quic_enc.o src/mux_quic_qos.o
endif
ifneq ($(USE_QUIC_OPENSSL_COMPAT:0=),)

View File

@ -0,0 +1,8 @@
#ifndef _HAPROXY_MUX_QUIC_QOS_H
#define _HAPROXY_MUX_QUIC_QOS_H
#include <haproxy/mux_quic.h>
int qcc_qos_recv(struct qcc *qcc);
#endif /* _HAPROXY_MUX_QUIC_QOS_H */

View File

@ -33,6 +33,8 @@
#include <haproxy/trace.h>
#include <haproxy/xref.h>
#include <haproxy/mux_quic_qos.h>
DECLARE_TYPED_POOL(pool_head_qcc, "qcc", struct qcc);
DECLARE_TYPED_POOL(pool_head_qcs, "qcs", struct qcs);
DECLARE_STATIC_TYPED_POOL(pool_head_qc_stream_rxbuf, "qc_stream_rxbuf", struct qc_stream_rxbuf);
@ -3275,6 +3277,11 @@ static int qcc_io_recv(struct qcc *qcc)
if ((qcc->flags & QC_CF_WAIT_HS) && !(qcc->wait_event.events & SUB_RETRY_RECV))
qcc_wait_for_hs(qcc);
if (!qmux_is_quic(qcc)) {
if (!(qcc->wait_event.events & SUB_RETRY_RECV))
qcc_qos_recv(qcc);
}
while (!LIST_ISEMPTY(&qcc->recv_list)) {
qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv);
/* No need to add an uni local stream in recv_list. */
@ -3768,6 +3775,8 @@ static void _qcc_init(struct qcc *qcc)
LIST_INIT(&qcc->tx.frms);
}
struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status);
static int qmux_init(struct connection *conn, struct proxy *prx,
struct session *sess, struct buffer *input)
{
@ -3791,30 +3800,47 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc->glitches = 0;
qcc->err = quic_err_transport(QC_ERR_NO_ERROR);
/* Server parameters, params used for RX flow control. */
lparams = &conn->handle.qc->rx.params;
if (qmux_is_quic(qcc)) {
/* Server parameters, params used for RX flow control. */
lparams = &conn->handle.qc->rx.params;
qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi;
qcc->lfctl.ms_uni = lparams->initial_max_streams_uni;
qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local;
qcc->lfctl.msd_bidi_r = lparams->initial_max_stream_data_bidi_remote;
qcc->lfctl.msd_uni_r = lparams->initial_max_stream_data_uni;
qcc->lfctl.cl_bidi_r = 0;
qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi;
qcc->lfctl.ms_uni = lparams->initial_max_streams_uni;
qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local;
qcc->lfctl.msd_bidi_r = lparams->initial_max_stream_data_bidi_remote;
qcc->lfctl.msd_uni_r = lparams->initial_max_stream_data_uni;
qcc->lfctl.cl_bidi_r = 0;
qcc->lfctl.md = qcc->lfctl.md_init = lparams->initial_max_data;
qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0;
qcc->lfctl.md = qcc->lfctl.md_init = lparams->initial_max_data;
qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0;
rparams = &conn->handle.qc->tx.params;
qfctl_init(&qcc->tx.fc, rparams->initial_max_data);
qcc->rfctl.ms_uni = rparams->initial_max_streams_uni;
qcc->rfctl.ms_bidi = rparams->initial_max_streams_bidi;
qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
rparams = &conn->handle.qc->tx.params;
qfctl_init(&qcc->tx.fc, rparams->initial_max_data);
qcc->rfctl.ms_uni = rparams->initial_max_streams_uni;
qcc->rfctl.ms_bidi = rparams->initial_max_streams_bidi;
qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
}
else {
/* hardcoded inital TP values. Is this really necessary? */
qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = 16384;
qcc->lfctl.ms_uni = 3;
qcc->lfctl.msd_bidi_l = 16384;
qcc->lfctl.msd_bidi_r = 16384;
qcc->lfctl.msd_uni_r = 16384;
qcc->lfctl.cl_bidi_r = 0;
qcc->lfctl.md = qcc->lfctl.md_init = 16384;
qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0;
qfctl_init(&qcc->tx.fc, 0);
qcc->rfctl.ms_uni = 3;
}
qcc->tx.buf_in_flight = 0;
if (qcc_is_pacing_active(conn)) {
if (qmux_is_quic(qcc) && qcc_is_pacing_active(conn)) {
quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc);
qcc->tx.paced_sent_ctr = 0;
@ -3855,7 +3881,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
LIST_INIT(&qcc->buf_wait_list);
LIST_INIT(&qcc->purg_list);
qcc->wait_event.tasklet->process = qcc_io_cb;
qcc->wait_event.tasklet->process = qmux_is_quic(qcc) ? qcc_io_cb : qcc_qos_io_cb;
qcc->wait_event.tasklet->context = qcc;
qcc->wait_event.tasklet->state |= TASK_F_WANTS_TIME;
qcc->wait_event.events = 0;
@ -3888,14 +3914,17 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc_reset_idle_start(qcc);
LIST_INIT(&qcc->opening_list);
HA_ATOMIC_STORE(&conn->handle.qc->qcc, qcc);
if (qmux_is_quic(qcc))
HA_ATOMIC_STORE(&conn->handle.qc->qcc, qcc);
/* Register conn as app_ops may use it. */
qcc->conn = conn;
if (qcc_install_app_ops(qcc, conn->handle.qc->app_ops)) {
TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn);
goto err;
if (qmux_is_quic(qcc)) {
if (qcc_install_app_ops(qcc, conn->handle.qc->app_ops)) {
TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn);
goto err;
}
}
if (qcc->app_ops == &h3_ops && !conn_is_back(conn))
@ -3963,11 +3992,13 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
return 0;
err:
/* Prepare CONNECTION_CLOSE, using INTERNAL_ERROR as fallback code if unset. */
if (!(conn->handle.qc->flags & QUIC_FL_CONN_IMMEDIATE_CLOSE)) {
struct quic_err err = qcc && qcc->err.code ?
qcc->err : quic_err_transport(QC_ERR_INTERNAL_ERROR);
quic_set_connection_close(conn->handle.qc, err);
if (qmux_is_quic(qcc)) {
/* Prepare CONNECTION_CLOSE, using INTERNAL_ERROR as fallback code if unset. */
if (!(conn->handle.qc->flags & QUIC_FL_CONN_IMMEDIATE_CLOSE)) {
struct quic_err err = qcc && qcc->err.code ?
qcc->err : quic_err_transport(QC_ERR_INTERNAL_ERROR);
quic_set_connection_close(conn->handle.qc, err);
}
}
if (qcc) {
@ -4688,103 +4719,6 @@ static struct mux_proto_list mux_proto_quic =
INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_quic);
static int qcc_qos_recv(struct qcc *qcc)
{
struct connection *conn = qcc->conn;
struct quic_frame frm;
const unsigned char *pos, *end;
int ret;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
chunk_reset(&trash);
ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &trash, trash.size, NULL, 0, 0);
BUG_ON(ret < 0);
if (ret) {
b_add(&trash, ret);
pos = (unsigned char *)b_head(&trash);
end = (unsigned char *)b_tail(&trash);
ret = qc_parse_frm(&frm, NULL, &pos, end, NULL);
BUG_ON(!ret);
if (frm.type == QUIC_FT_QS_TP) {
struct qf_qs_tp *qs_tp_frm = &frm.qs_tp;
fprintf(stderr, "got qs_transport_parameters frame\n");
fprintf(stderr, " max_idle_timeout=%llu\n", (ullong)qs_tp_frm->tps.max_idle_timeout);
fprintf(stderr, " initial_max_data=%llu\n", (ullong)qs_tp_frm->tps.initial_max_data);
qfctl_set_max(&qcc->tx.fc, qs_tp_frm->tps.initial_max_data, NULL, NULL);
fprintf(stderr, " initial_max_stream_data_bidi_local=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_local);
qcc->rfctl.msd_bidi_l = qs_tp_frm->tps.initial_max_stream_data_bidi_local;
fprintf(stderr, " initial_max_stream_data_bidi_remote=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_remote);
qcc->rfctl.msd_bidi_r = qs_tp_frm->tps.initial_max_stream_data_bidi_remote;
fprintf(stderr, " initial_max_stream_data_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_uni);
qcc->rfctl.msd_uni_l = qs_tp_frm->tps.initial_max_stream_data_uni;
fprintf(stderr, " initial_max_streams_bidi=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_bidi);
fprintf(stderr, " initial_max_streams_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_uni);
}
else if (frm.type >= QUIC_FT_STREAM_8 &&
frm.type <= QUIC_FT_STREAM_F) {
struct qf_stream *strm_frm = &frm.stream;
qcc_recv(qcc, strm_frm->id, strm_frm->len, strm_frm->offset,
(frm.type & QUIC_STREAM_FRAME_TYPE_FIN_BIT), (char *)strm_frm->data);
}
else if (frm.type == QUIC_FT_RESET_STREAM) {
struct qf_reset_stream *rst_frm = &frm.reset_stream;
qcc_recv_reset_stream(qcc, rst_frm->id, rst_frm->app_error_code, rst_frm->final_size);
}
else {
ABORT_NOW();
}
}
else {
BUG_ON(!trash.size);
if (!conn_xprt_read0_pending(qcc->conn)) {
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV,
&qcc->wait_event);
}
}
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
return ret;
err:
return -1;
}
static int qcc_qos_io_recv(struct qcc *qcc)
{
struct qcs *qcs;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
if (qcc->flags & QC_CF_ERRL) {
TRACE_DATA("connection on error", QMUX_EV_QCC_RECV, qcc->conn);
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
return 0;
}
if ((qcc->flags & QC_CF_WAIT_HS) && !(qcc->wait_event.events & SUB_RETRY_RECV))
qcc_wait_for_hs(qcc);
if (!(qcc->wait_event.events & SUB_RETRY_RECV))
qcc_qos_recv(qcc);
while (!LIST_ISEMPTY(&qcc->recv_list)) {
qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv);
/* no need to add an uni local stream in recv_list. */
BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_local(qcc, qcs->id));
qcc_decode_qcs(qcc, qcs);
LIST_DEL_INIT(&qcs->el_recv);
}
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
return 0;
}
static int qcc_qos_send_tp(struct qcc *qcc)
{
struct quic_frame *frm;
@ -4906,12 +4840,12 @@ struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status)
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
qcc_qos_io_recv(qcc);
qcc_io_recv(qcc);
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
qcc_qos_io_send(qcc);
qcc_qos_io_recv(qcc);
qcc_io_recv(qcc);
if (qcc_io_process(qcc)) {
TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn);
@ -4933,136 +4867,8 @@ struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status)
return NULL;
}
static int qmux_qos_init(struct connection *conn, struct proxy *prx,
struct session *sess, struct buffer *input)
{
struct qcc *qcc;
TRACE_ENTER(QMUX_EV_QCC_NEW);
qcc = pool_alloc(pool_head_qcc);
if (!qcc) {
TRACE_ERROR("alloc failure", QMUX_EV_QCC_NEW);
goto err;
}
_qcc_init(qcc);
conn->ctx = qcc;
qcc->nb_hreq = qcc->nb_sc = 0;
qcc->flags = QC_CF_QOS;
qcc->app_st = QCC_APP_ST_NULL;
qcc->glitches = 0;
qcc->err = quic_err_transport(QC_ERR_NO_ERROR);
/* hardcoded inital TP values. Is this really necessary? */
qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = 16384;
qcc->lfctl.ms_uni = 3;
qcc->lfctl.msd_bidi_l = 16384;
qcc->lfctl.msd_bidi_r = 16384;
qcc->lfctl.msd_uni_r = 16384;
qcc->lfctl.cl_bidi_r = 0;
qcc->lfctl.md = qcc->lfctl.md_init = 16384;
qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0;
qfctl_init(&qcc->tx.fc, 0);
qcc->rfctl.ms_uni = 3;
qcc->tx.buf_in_flight = 0;
if (conn_is_back(conn)) {
qcc->next_bidi_l = 0x00;
qcc->largest_bidi_r = 0x01;
qcc->next_uni_l = 0x02;
qcc->largest_uni_r = 0x03;
}
else {
qcc->largest_bidi_r = 0x00;
qcc->next_bidi_l = 0x01;
qcc->largest_uni_r = 0x02;
qcc->next_uni_l = 0x03;
}
qcc->wait_event.tasklet = tasklet_new();
if (!qcc->wait_event.tasklet) {
TRACE_ERROR("taslket alloc failure", QMUX_EV_QCC_NEW);
goto err;
}
LIST_INIT(&qcc->recv_list);
LIST_INIT(&qcc->send_list);
LIST_INIT(&qcc->fctl_list);
LIST_INIT(&qcc->buf_wait_list);
LIST_INIT(&qcc->purg_list);
qcc->wait_event.tasklet->process = qcc_qos_io_cb;
qcc->wait_event.tasklet->context = qcc;
qcc->wait_event.tasklet->state |= TASK_F_WANTS_TIME;
qcc->wait_event.events = 0;
qcc->proxy = prx;
/* haproxy timeouts */
if (conn_is_back(conn)) {
qcc->timeout = prx->timeout.server;
qcc->shut_timeout = tick_isset(prx->timeout.serverfin) ?
prx->timeout.serverfin : prx->timeout.server;
}
else {
qcc->timeout = prx->timeout.client;
qcc->shut_timeout = tick_isset(prx->timeout.clientfin) ?
prx->timeout.clientfin : prx->timeout.client;
}
/* Always allocate task even if timeout is unset. In MUX code, if task
* is NULL, it indicates that a timeout has stroke earlier.
*/
qcc->task = task_new_here();
if (!qcc->task) {
TRACE_ERROR("timeout task alloc failure", QMUX_EV_QCC_NEW);
goto err;
}
qcc->task->process = qcc_timeout_task;
qcc->task->context = qcc;
qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
qcc_reset_idle_start(qcc);
LIST_INIT(&qcc->opening_list);
/* Register conn as app_ops may use it. */
qcc->conn = conn;
/* TODO hardcoded HTTP/3 ops */
if (qcc_install_app_ops(qcc, &h3_ops)) {
TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn);
goto err;
}
if (qcc->app_ops == &h3_ops)
proxy_inc_fe_cum_sess_ver_ctr(sess->listener, prx, 3);
/* Register conn for idle front closing. This is done once everything is allocated. */
if (!conn_is_back(conn))
LIST_APPEND(&mux_stopping_data[tid].list, &conn->stopping_list);
/* init read cycle */
tasklet_wakeup(qcc->wait_event.tasklet);
TRACE_LEAVE(QMUX_EV_QCC_NEW, conn);
return 0;
err:
if (qcc) {
/* In case of MUX init failure, session will ensure connection is freed. */
qcc->conn = NULL;
qcc_release(qcc);
}
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_NEW, conn);
return -1;
}
static const struct mux_ops qmux_qos_ops = {
.init = qmux_qos_init,
.init = qmux_init,
.destroy = qmux_destroy,
.detach = qmux_strm_detach,
.rcv_buf = qmux_strm_rcv_buf,

80
src/mux_quic_qos.c Normal file
View File

@ -0,0 +1,80 @@
#include <haproxy/mux_quic_qos.h>
#include <stdio.h>
#include <haproxy/api.h>
#include <haproxy/buf.h>
#include <haproxy/chunk.h>
#include <haproxy/connection.h>
#include <haproxy/mux_quic.h>
#include <haproxy/qmux_trace.h>
#include <haproxy/quic_fctl.h>
#include <haproxy/quic_frame.h>
#include <haproxy/trace.h>
int qcc_qos_recv(struct qcc *qcc)
{
struct connection *conn = qcc->conn;
struct quic_frame frm;
const unsigned char *pos, *end;
int ret;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
chunk_reset(&trash);
ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &trash, trash.size, NULL, 0, 0);
BUG_ON(ret < 0);
if (ret) {
b_add(&trash, ret);
pos = (unsigned char *)b_head(&trash);
end = (unsigned char *)b_tail(&trash);
ret = qc_parse_frm(&frm, NULL, &pos, end, NULL);
BUG_ON(!ret);
if (frm.type == QUIC_FT_QS_TP) {
struct qf_qs_tp *qs_tp_frm = &frm.qs_tp;
fprintf(stderr, "got qs_transport_parameters frame\n");
fprintf(stderr, " max_idle_timeout=%llu\n", (ullong)qs_tp_frm->tps.max_idle_timeout);
fprintf(stderr, " initial_max_data=%llu\n", (ullong)qs_tp_frm->tps.initial_max_data);
qfctl_set_max(&qcc->tx.fc, qs_tp_frm->tps.initial_max_data, NULL, NULL);
fprintf(stderr, " initial_max_stream_data_bidi_local=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_local);
qcc->rfctl.msd_bidi_l = qs_tp_frm->tps.initial_max_stream_data_bidi_local;
fprintf(stderr, " initial_max_stream_data_bidi_remote=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_remote);
qcc->rfctl.msd_bidi_r = qs_tp_frm->tps.initial_max_stream_data_bidi_remote;
fprintf(stderr, " initial_max_stream_data_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_uni);
qcc->rfctl.msd_uni_l = qs_tp_frm->tps.initial_max_stream_data_uni;
fprintf(stderr, " initial_max_streams_bidi=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_bidi);
fprintf(stderr, " initial_max_streams_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_uni);
}
else if (frm.type >= QUIC_FT_STREAM_8 &&
frm.type <= QUIC_FT_STREAM_F) {
struct qf_stream *strm_frm = &frm.stream;
qcc_recv(qcc, strm_frm->id, strm_frm->len, strm_frm->offset,
(frm.type & QUIC_STREAM_FRAME_TYPE_FIN_BIT), (char *)strm_frm->data);
}
else if (frm.type == QUIC_FT_RESET_STREAM) {
struct qf_reset_stream *rst_frm = &frm.reset_stream;
qcc_recv_reset_stream(qcc, rst_frm->id, rst_frm->app_error_code, rst_frm->final_size);
}
else {
ABORT_NOW();
}
}
else {
BUG_ON(!trash.size);
if (!conn_xprt_read0_pending(qcc->conn)) {
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV,
&qcc->wait_event);
}
}
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
return ret;
err:
return -1;
}