MINOR: mux-quic: implement qstream send callback

Each time a QUIC frame is emitted, mux-quic layer is notified via a
callback to update the underlying QCS. For QUIC, this is performed via
qc_stream_desc element.

In QMux protocol, this can be simplified as there is no
qc_stream_desc/quic_conn layer interaction. Instead, each time snd_buf
is called, QCS can be updated immediately using its return value. This
is performed via a new function qstrm_ctrl_send().

Its work is similar to the QUIC equivalent but in a simpler mode. In
particular, sent data can be immediately removed from the Tx buffer as
there is no need for retransmission when running above TCP.
This commit is contained in:
Amaury Denoyelle 2026-03-27 10:16:56 +01:00
parent e8d9eb4f7a
commit 621f21f6fd
3 changed files with 86 additions and 5 deletions

View File

@ -0,0 +1,14 @@
#ifndef _HAPROXY_MUX_QUIC_PRIV_H
#define _HAPROXY_MUX_QUIC_PRIV_H
/* This header file should only be used by QUIC-MUX layer internally. */
#include <haproxy/mux_quic-t.h>
void qcs_idle_open(struct qcs *qcs);
void qcs_close_local(struct qcs *qcs);
int qcs_is_completed(struct qcs *qcs);
uint64_t qcs_prep_bytes(const struct qcs *qcs);
#endif /* _HAPROXY_MUX_QUIC_PRIV_H */

View File

@ -1,4 +1,5 @@
#include <haproxy/mux_quic.h>
#include <haproxy/mux_quic_priv.h>
#include <import/eb64tree.h>
@ -409,7 +410,7 @@ static void qcc_refresh_timeout(struct qcc *qcc)
/* Mark a stream as open if it was idle. This can be used on every
* successful emission/reception operation to update the stream state.
*/
static void qcs_idle_open(struct qcs *qcs)
void qcs_idle_open(struct qcs *qcs)
{
/* This operation must not be used if the stream is already closed. */
BUG_ON_HOT(qcs->st == QC_SS_CLO);
@ -421,7 +422,7 @@ static void qcs_idle_open(struct qcs *qcs)
}
/* Close the local channel of <qcs> instance. */
static void qcs_close_local(struct qcs *qcs)
void qcs_close_local(struct qcs *qcs)
{
TRACE_STATE("closing stream locally", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@ -445,7 +446,7 @@ static void qcs_close_local(struct qcs *qcs)
}
/* Returns true if <qcs> can be purged. */
static int qcs_is_completed(struct qcs *qcs)
int qcs_is_completed(struct qcs *qcs)
{
/* A stream is completed if fully closed and stconn released, or simply
* detached and everything already sent.
@ -594,7 +595,7 @@ struct buffer *qcs_tx_buf(struct qcs *qcs)
}
/* Returns total number of bytes not already sent to quic-conn layer. */
static uint64_t qcs_prep_bytes(const struct qcs *qcs)
uint64_t qcs_prep_bytes(const struct qcs *qcs)
{
const struct buffer *out = qcs_tx_buf_const(qcs);
uint64_t diff, base_off;

View File

@ -5,7 +5,10 @@
#include <haproxy/chunk.h>
#include <haproxy/connection.h>
#include <haproxy/mux_quic.h>
#include <haproxy/mux_quic_priv.h>
#include <haproxy/proxy.h>
#include <haproxy/qmux_trace.h>
#include <haproxy/quic_fctl.h>
#include <haproxy/quic_frame.h>
#include <haproxy/trace.h>
@ -148,6 +151,69 @@ int qcc_qstrm_recv(struct qcc *qcc)
return -1;
}
/* Updates a <qcs> stream after a successful emission of data of length <data>. */
static void qstrm_ctrl_send(struct qcs *qcs, uint64_t data)
{
struct qcc *qcc = qcs->qcc;
struct quic_fctl *fc_conn = &qcc->tx.fc;
struct quic_fctl *fc_strm = &qcs->tx.fc;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
qcs_idle_open(qcs);
/* Ensure real offset never exceeds soft value. */
BUG_ON(fc_conn->off_real + data > fc_conn->off_soft);
BUG_ON(fc_strm->off_real + data > fc_strm->off_soft);
/* increase offset on connection */
if (qfctl_rinc(fc_conn, data)) {
TRACE_STATE("connection flow-control reached",
QMUX_EV_QCS_SEND, qcc->conn);
}
/* increase offset on stream */
if (qfctl_rinc(fc_strm, data)) {
TRACE_STATE("stream flow-control reached",
QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
b_del(&qcs->tx.qstrm_buf, data);
/* Release buffer if everything sent and stream is waiting for room. */
if (!qcs_prep_bytes(qcs) && (qcs->flags & QC_SF_BLK_MROOM)) {
qcs->flags &= ~QC_SF_BLK_MROOM;
qcs_notify_send(qcs);
}
/* Add measurement for send rate. This is done at the MUX layer
* to account only for STREAM frames without retransmission.
*/
increment_send_rate(data, 0);
if (!qcs_prep_bytes(qcs)) {
/* Remove stream from send_list if all was sent. */
LIST_DEL_INIT(&qcs->el_send);
TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
/* Close stream locally. */
qcs_close_local(qcs);
if (qcs->flags & QC_SF_FIN_STREAM) {
/* Reset flag to not emit multiple FIN STREAM frames. */
qcs->flags &= ~QC_SF_FIN_STREAM;
}
if (qcs_is_completed(qcs)) {
TRACE_STATE("add stream in purg_list", QMUX_EV_QCS_SEND, qcc->conn, qcs);
LIST_APPEND(&qcc->purg_list, &qcs->el_send);
}
}
}
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
/* Sends <frms> list of frames for <qcc> connection.
*
* Returns 0 if all data are emitted or a positive value if sending should be
@ -211,7 +277,7 @@ int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms)
}
if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F)
/* TODO notify MUX */
qstrm_ctrl_send(frm->stream.stream, frm->stream.len);
LIST_DEL_INIT(&frm->list);
if (split_frm) {