From 670e8b2f9a3fcb52a728bca7126dcb58155bb973 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Wed, 10 Dec 2025 15:02:01 +0100 Subject: [PATCH] MINOR: mux-quic: merge the common code from QOS to QMUX --- Makefile | 2 +- include/haproxy/mux_quic_qos.h | 8 + src/mux_quic.c | 318 +++++++-------------------------- src/mux_quic_qos.c | 80 +++++++++ 4 files changed, 151 insertions(+), 257 deletions(-) create mode 100644 include/haproxy/mux_quic_qos.h create mode 100644 src/mux_quic_qos.c diff --git a/Makefile b/Makefile index 006875048..5d67b228e 100644 --- a/Makefile +++ b/Makefile @@ -670,7 +670,7 @@ OPTIONS_OBJS += src/mux_quic.o src/h3.o src/quic_rx.o src/quic_tx.o \ src/quic_cc_nocc.o src/quic_cc.o src/quic_pacing.o \ src/h3_stats.o src/quic_stats.o src/qpack-enc.o \ src/qpack-tbl.o src/quic_cc_drs.o src/quic_fctl.o \ - src/quic_enc.o + src/quic_enc.o src/mux_quic_qos.o endif ifneq ($(USE_QUIC_OPENSSL_COMPAT:0=),) diff --git a/include/haproxy/mux_quic_qos.h b/include/haproxy/mux_quic_qos.h new file mode 100644 index 000000000..79dcdf943 --- /dev/null +++ b/include/haproxy/mux_quic_qos.h @@ -0,0 +1,8 @@ +#ifndef _HAPROXY_MUX_QUIC_QOS_H +#define _HAPROXY_MUX_QUIC_QOS_H + +#include + +int qcc_qos_recv(struct qcc *qcc); + +#endif /* _HAPROXY_MUX_QUIC_QOS_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 61eca75a1..120505150 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -33,6 +33,8 @@ #include #include +#include + DECLARE_TYPED_POOL(pool_head_qcc, "qcc", struct qcc); DECLARE_TYPED_POOL(pool_head_qcs, "qcs", struct qcs); DECLARE_STATIC_TYPED_POOL(pool_head_qc_stream_rxbuf, "qc_stream_rxbuf", struct qc_stream_rxbuf); @@ -3275,6 +3277,11 @@ static int qcc_io_recv(struct qcc *qcc) if ((qcc->flags & QC_CF_WAIT_HS) && !(qcc->wait_event.events & SUB_RETRY_RECV)) qcc_wait_for_hs(qcc); + if (!qmux_is_quic(qcc)) { + if (!(qcc->wait_event.events & SUB_RETRY_RECV)) + qcc_qos_recv(qcc); + } + while (!LIST_ISEMPTY(&qcc->recv_list)) { qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv); /* No need to add an uni local stream in recv_list. */ @@ -3768,6 +3775,8 @@ static void _qcc_init(struct qcc *qcc) LIST_INIT(&qcc->tx.frms); } +struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status); + static int qmux_init(struct connection *conn, struct proxy *prx, struct session *sess, struct buffer *input) { @@ -3791,30 +3800,47 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->glitches = 0; qcc->err = quic_err_transport(QC_ERR_NO_ERROR); - /* Server parameters, params used for RX flow control. */ - lparams = &conn->handle.qc->rx.params; + if (qmux_is_quic(qcc)) { + /* Server parameters, params used for RX flow control. */ + lparams = &conn->handle.qc->rx.params; - qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi; - qcc->lfctl.ms_uni = lparams->initial_max_streams_uni; - qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local; - qcc->lfctl.msd_bidi_r = lparams->initial_max_stream_data_bidi_remote; - qcc->lfctl.msd_uni_r = lparams->initial_max_stream_data_uni; - qcc->lfctl.cl_bidi_r = 0; + qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi; + qcc->lfctl.ms_uni = lparams->initial_max_streams_uni; + qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local; + qcc->lfctl.msd_bidi_r = lparams->initial_max_stream_data_bidi_remote; + qcc->lfctl.msd_uni_r = lparams->initial_max_stream_data_uni; + qcc->lfctl.cl_bidi_r = 0; - qcc->lfctl.md = qcc->lfctl.md_init = lparams->initial_max_data; - qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0; + qcc->lfctl.md = qcc->lfctl.md_init = lparams->initial_max_data; + qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0; - rparams = &conn->handle.qc->tx.params; - qfctl_init(&qcc->tx.fc, rparams->initial_max_data); - qcc->rfctl.ms_uni = rparams->initial_max_streams_uni; - qcc->rfctl.ms_bidi = rparams->initial_max_streams_bidi; - qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local; - qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote; - qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni; + rparams = &conn->handle.qc->tx.params; + qfctl_init(&qcc->tx.fc, rparams->initial_max_data); + qcc->rfctl.ms_uni = rparams->initial_max_streams_uni; + qcc->rfctl.ms_bidi = rparams->initial_max_streams_bidi; + qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local; + qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote; + qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni; + } + else { + /* hardcoded inital TP values. Is this really necessary? */ + qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = 16384; + qcc->lfctl.ms_uni = 3; + qcc->lfctl.msd_bidi_l = 16384; + qcc->lfctl.msd_bidi_r = 16384; + qcc->lfctl.msd_uni_r = 16384; + qcc->lfctl.cl_bidi_r = 0; + + qcc->lfctl.md = qcc->lfctl.md_init = 16384; + qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0; + + qfctl_init(&qcc->tx.fc, 0); + qcc->rfctl.ms_uni = 3; + } qcc->tx.buf_in_flight = 0; - if (qcc_is_pacing_active(conn)) { + if (qmux_is_quic(qcc) && qcc_is_pacing_active(conn)) { quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc); qcc->tx.paced_sent_ctr = 0; @@ -3855,7 +3881,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, LIST_INIT(&qcc->buf_wait_list); LIST_INIT(&qcc->purg_list); - qcc->wait_event.tasklet->process = qcc_io_cb; + qcc->wait_event.tasklet->process = qmux_is_quic(qcc) ? qcc_io_cb : qcc_qos_io_cb; qcc->wait_event.tasklet->context = qcc; qcc->wait_event.tasklet->state |= TASK_F_WANTS_TIME; qcc->wait_event.events = 0; @@ -3888,14 +3914,17 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc_reset_idle_start(qcc); LIST_INIT(&qcc->opening_list); - HA_ATOMIC_STORE(&conn->handle.qc->qcc, qcc); + if (qmux_is_quic(qcc)) + HA_ATOMIC_STORE(&conn->handle.qc->qcc, qcc); /* Register conn as app_ops may use it. */ qcc->conn = conn; - if (qcc_install_app_ops(qcc, conn->handle.qc->app_ops)) { - TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn); - goto err; + if (qmux_is_quic(qcc)) { + if (qcc_install_app_ops(qcc, conn->handle.qc->app_ops)) { + TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn); + goto err; + } } if (qcc->app_ops == &h3_ops && !conn_is_back(conn)) @@ -3963,11 +3992,13 @@ static int qmux_init(struct connection *conn, struct proxy *prx, return 0; err: - /* Prepare CONNECTION_CLOSE, using INTERNAL_ERROR as fallback code if unset. */ - if (!(conn->handle.qc->flags & QUIC_FL_CONN_IMMEDIATE_CLOSE)) { - struct quic_err err = qcc && qcc->err.code ? - qcc->err : quic_err_transport(QC_ERR_INTERNAL_ERROR); - quic_set_connection_close(conn->handle.qc, err); + if (qmux_is_quic(qcc)) { + /* Prepare CONNECTION_CLOSE, using INTERNAL_ERROR as fallback code if unset. */ + if (!(conn->handle.qc->flags & QUIC_FL_CONN_IMMEDIATE_CLOSE)) { + struct quic_err err = qcc && qcc->err.code ? + qcc->err : quic_err_transport(QC_ERR_INTERNAL_ERROR); + quic_set_connection_close(conn->handle.qc, err); + } } if (qcc) { @@ -4688,103 +4719,6 @@ static struct mux_proto_list mux_proto_quic = INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_quic); -static int qcc_qos_recv(struct qcc *qcc) -{ - struct connection *conn = qcc->conn; - struct quic_frame frm; - const unsigned char *pos, *end; - int ret; - - TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); - - chunk_reset(&trash); - ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &trash, trash.size, NULL, 0, 0); - BUG_ON(ret < 0); - - if (ret) { - b_add(&trash, ret); - - pos = (unsigned char *)b_head(&trash); - end = (unsigned char *)b_tail(&trash); - ret = qc_parse_frm(&frm, NULL, &pos, end, NULL); - BUG_ON(!ret); - - if (frm.type == QUIC_FT_QS_TP) { - struct qf_qs_tp *qs_tp_frm = &frm.qs_tp; - fprintf(stderr, "got qs_transport_parameters frame\n"); - fprintf(stderr, " max_idle_timeout=%llu\n", (ullong)qs_tp_frm->tps.max_idle_timeout); - fprintf(stderr, " initial_max_data=%llu\n", (ullong)qs_tp_frm->tps.initial_max_data); - qfctl_set_max(&qcc->tx.fc, qs_tp_frm->tps.initial_max_data, NULL, NULL); - fprintf(stderr, " initial_max_stream_data_bidi_local=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_local); - qcc->rfctl.msd_bidi_l = qs_tp_frm->tps.initial_max_stream_data_bidi_local; - fprintf(stderr, " initial_max_stream_data_bidi_remote=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_remote); - qcc->rfctl.msd_bidi_r = qs_tp_frm->tps.initial_max_stream_data_bidi_remote; - fprintf(stderr, " initial_max_stream_data_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_uni); - qcc->rfctl.msd_uni_l = qs_tp_frm->tps.initial_max_stream_data_uni; - fprintf(stderr, " initial_max_streams_bidi=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_bidi); - fprintf(stderr, " initial_max_streams_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_uni); - } - else if (frm.type >= QUIC_FT_STREAM_8 && - frm.type <= QUIC_FT_STREAM_F) { - struct qf_stream *strm_frm = &frm.stream; - - qcc_recv(qcc, strm_frm->id, strm_frm->len, strm_frm->offset, - (frm.type & QUIC_STREAM_FRAME_TYPE_FIN_BIT), (char *)strm_frm->data); - } - else if (frm.type == QUIC_FT_RESET_STREAM) { - struct qf_reset_stream *rst_frm = &frm.reset_stream; - qcc_recv_reset_stream(qcc, rst_frm->id, rst_frm->app_error_code, rst_frm->final_size); - } - else { - ABORT_NOW(); - } - - } - else { - BUG_ON(!trash.size); - if (!conn_xprt_read0_pending(qcc->conn)) { - conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, - &qcc->wait_event); - } - } - - TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); - return ret; - - err: - return -1; -} - -static int qcc_qos_io_recv(struct qcc *qcc) -{ - struct qcs *qcs; - - TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); - - if (qcc->flags & QC_CF_ERRL) { - TRACE_DATA("connection on error", QMUX_EV_QCC_RECV, qcc->conn); - TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); - return 0; - } - - if ((qcc->flags & QC_CF_WAIT_HS) && !(qcc->wait_event.events & SUB_RETRY_RECV)) - qcc_wait_for_hs(qcc); - - if (!(qcc->wait_event.events & SUB_RETRY_RECV)) - qcc_qos_recv(qcc); - - while (!LIST_ISEMPTY(&qcc->recv_list)) { - qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv); - /* no need to add an uni local stream in recv_list. */ - BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_local(qcc, qcs->id)); - qcc_decode_qcs(qcc, qcs); - LIST_DEL_INIT(&qcs->el_recv); - } - - TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); - return 0; -} - static int qcc_qos_send_tp(struct qcc *qcc) { struct quic_frame *frm; @@ -4906,12 +4840,12 @@ struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status) TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); - qcc_qos_io_recv(qcc); + qcc_io_recv(qcc); if (!(qcc->wait_event.events & SUB_RETRY_SEND)) qcc_qos_io_send(qcc); - qcc_qos_io_recv(qcc); + qcc_io_recv(qcc); if (qcc_io_process(qcc)) { TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn); @@ -4933,136 +4867,8 @@ struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status) return NULL; } -static int qmux_qos_init(struct connection *conn, struct proxy *prx, - struct session *sess, struct buffer *input) -{ - struct qcc *qcc; - - TRACE_ENTER(QMUX_EV_QCC_NEW); - - qcc = pool_alloc(pool_head_qcc); - if (!qcc) { - TRACE_ERROR("alloc failure", QMUX_EV_QCC_NEW); - goto err; - } - - _qcc_init(qcc); - conn->ctx = qcc; - qcc->nb_hreq = qcc->nb_sc = 0; - qcc->flags = QC_CF_QOS; - qcc->app_st = QCC_APP_ST_NULL; - qcc->glitches = 0; - qcc->err = quic_err_transport(QC_ERR_NO_ERROR); - - /* hardcoded inital TP values. Is this really necessary? */ - qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = 16384; - qcc->lfctl.ms_uni = 3; - qcc->lfctl.msd_bidi_l = 16384; - qcc->lfctl.msd_bidi_r = 16384; - qcc->lfctl.msd_uni_r = 16384; - qcc->lfctl.cl_bidi_r = 0; - - qcc->lfctl.md = qcc->lfctl.md_init = 16384; - qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0; - - qfctl_init(&qcc->tx.fc, 0); - qcc->rfctl.ms_uni = 3; - - qcc->tx.buf_in_flight = 0; - - if (conn_is_back(conn)) { - qcc->next_bidi_l = 0x00; - qcc->largest_bidi_r = 0x01; - qcc->next_uni_l = 0x02; - qcc->largest_uni_r = 0x03; - } - else { - qcc->largest_bidi_r = 0x00; - qcc->next_bidi_l = 0x01; - qcc->largest_uni_r = 0x02; - qcc->next_uni_l = 0x03; - } - - qcc->wait_event.tasklet = tasklet_new(); - if (!qcc->wait_event.tasklet) { - TRACE_ERROR("taslket alloc failure", QMUX_EV_QCC_NEW); - goto err; - } - - LIST_INIT(&qcc->recv_list); - LIST_INIT(&qcc->send_list); - LIST_INIT(&qcc->fctl_list); - LIST_INIT(&qcc->buf_wait_list); - LIST_INIT(&qcc->purg_list); - - qcc->wait_event.tasklet->process = qcc_qos_io_cb; - qcc->wait_event.tasklet->context = qcc; - qcc->wait_event.tasklet->state |= TASK_F_WANTS_TIME; - qcc->wait_event.events = 0; - - qcc->proxy = prx; - /* haproxy timeouts */ - if (conn_is_back(conn)) { - qcc->timeout = prx->timeout.server; - qcc->shut_timeout = tick_isset(prx->timeout.serverfin) ? - prx->timeout.serverfin : prx->timeout.server; - } - else { - qcc->timeout = prx->timeout.client; - qcc->shut_timeout = tick_isset(prx->timeout.clientfin) ? - prx->timeout.clientfin : prx->timeout.client; - } - - /* Always allocate task even if timeout is unset. In MUX code, if task - * is NULL, it indicates that a timeout has stroke earlier. - */ - qcc->task = task_new_here(); - if (!qcc->task) { - TRACE_ERROR("timeout task alloc failure", QMUX_EV_QCC_NEW); - goto err; - } - qcc->task->process = qcc_timeout_task; - qcc->task->context = qcc; - qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout); - - qcc_reset_idle_start(qcc); - LIST_INIT(&qcc->opening_list); - - /* Register conn as app_ops may use it. */ - qcc->conn = conn; - - /* TODO hardcoded HTTP/3 ops */ - if (qcc_install_app_ops(qcc, &h3_ops)) { - TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn); - goto err; - } - - if (qcc->app_ops == &h3_ops) - proxy_inc_fe_cum_sess_ver_ctr(sess->listener, prx, 3); - - /* Register conn for idle front closing. This is done once everything is allocated. */ - if (!conn_is_back(conn)) - LIST_APPEND(&mux_stopping_data[tid].list, &conn->stopping_list); - - /* init read cycle */ - tasklet_wakeup(qcc->wait_event.tasklet); - - TRACE_LEAVE(QMUX_EV_QCC_NEW, conn); - return 0; - - err: - if (qcc) { - /* In case of MUX init failure, session will ensure connection is freed. */ - qcc->conn = NULL; - qcc_release(qcc); - } - - TRACE_DEVEL("leaving on error", QMUX_EV_QCC_NEW, conn); - return -1; -} - static const struct mux_ops qmux_qos_ops = { - .init = qmux_qos_init, + .init = qmux_init, .destroy = qmux_destroy, .detach = qmux_strm_detach, .rcv_buf = qmux_strm_rcv_buf, diff --git a/src/mux_quic_qos.c b/src/mux_quic_qos.c new file mode 100644 index 000000000..c86579811 --- /dev/null +++ b/src/mux_quic_qos.c @@ -0,0 +1,80 @@ +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +int qcc_qos_recv(struct qcc *qcc) +{ + struct connection *conn = qcc->conn; + struct quic_frame frm; + const unsigned char *pos, *end; + int ret; + + TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); + + chunk_reset(&trash); + ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &trash, trash.size, NULL, 0, 0); + BUG_ON(ret < 0); + + if (ret) { + b_add(&trash, ret); + + pos = (unsigned char *)b_head(&trash); + end = (unsigned char *)b_tail(&trash); + ret = qc_parse_frm(&frm, NULL, &pos, end, NULL); + BUG_ON(!ret); + + if (frm.type == QUIC_FT_QS_TP) { + struct qf_qs_tp *qs_tp_frm = &frm.qs_tp; + fprintf(stderr, "got qs_transport_parameters frame\n"); + fprintf(stderr, " max_idle_timeout=%llu\n", (ullong)qs_tp_frm->tps.max_idle_timeout); + fprintf(stderr, " initial_max_data=%llu\n", (ullong)qs_tp_frm->tps.initial_max_data); + qfctl_set_max(&qcc->tx.fc, qs_tp_frm->tps.initial_max_data, NULL, NULL); + fprintf(stderr, " initial_max_stream_data_bidi_local=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_local); + qcc->rfctl.msd_bidi_l = qs_tp_frm->tps.initial_max_stream_data_bidi_local; + fprintf(stderr, " initial_max_stream_data_bidi_remote=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_remote); + qcc->rfctl.msd_bidi_r = qs_tp_frm->tps.initial_max_stream_data_bidi_remote; + fprintf(stderr, " initial_max_stream_data_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_uni); + qcc->rfctl.msd_uni_l = qs_tp_frm->tps.initial_max_stream_data_uni; + fprintf(stderr, " initial_max_streams_bidi=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_bidi); + fprintf(stderr, " initial_max_streams_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_uni); + } + else if (frm.type >= QUIC_FT_STREAM_8 && + frm.type <= QUIC_FT_STREAM_F) { + struct qf_stream *strm_frm = &frm.stream; + + qcc_recv(qcc, strm_frm->id, strm_frm->len, strm_frm->offset, + (frm.type & QUIC_STREAM_FRAME_TYPE_FIN_BIT), (char *)strm_frm->data); + } + else if (frm.type == QUIC_FT_RESET_STREAM) { + struct qf_reset_stream *rst_frm = &frm.reset_stream; + qcc_recv_reset_stream(qcc, rst_frm->id, rst_frm->app_error_code, rst_frm->final_size); + } + else { + ABORT_NOW(); + } + + } + else { + BUG_ON(!trash.size); + if (!conn_xprt_read0_pending(qcc->conn)) { + conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, + &qcc->wait_event); + } + } + + TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); + return ret; + + err: + return -1; +}