MAJOR: mux-quic: switch to ncbmbuf for stream Rx

This commit is contained in:
Amaury Denoyelle 2026-04-10 11:01:43 +02:00
parent 63febbace7
commit 78e01192eb
5 changed files with 40 additions and 54 deletions

View File

@ -12,7 +12,7 @@
#include <haproxy/connection-t.h>
#include <haproxy/htx-t.h>
#include <haproxy/list-t.h>
#include <haproxy/ncbuf-t.h>
#include <haproxy/ncbmbuf-t.h>
#include <haproxy/quic_fctl-t.h>
#include <haproxy/quic_frame-t.h>
#include <haproxy/quic_pacing-t.h>
@ -153,7 +153,7 @@ enum qcs_state {
*/
struct qc_stream_rxbuf {
struct eb64_node off_node; /* base offset of current buffer, node for QCS rx.bufs */
struct ncbuf ncb; /* data storage with support for out of order offset */
struct ncbmbuf ncb; /* data storage with support for out of order offset */
uint64_t off_end; /* first offset directly outside of current buffer */
};

View File

@ -53,7 +53,8 @@ int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err);
static inline int qmux_stream_rx_bufsz(void)
{
return global.tune.bufsize - NCB_RESERVED_SZ;
ncb_sz_t size_bm = (global.tune.bufsize + 8) / 9;
return global.tune.bufsize - size_bm;
}
#define QCS_ID_TYPE_MASK 0x3

View File

@ -42,6 +42,8 @@ static inline ncb_sz_t ncbmb_size(const struct ncbmbuf *buf)
int ncbmb_is_empty(const struct ncbmbuf *buf);
int ncbmb_is_full(const struct ncbmbuf *buf);
ncb_sz_t ncbmb_data(const struct ncbmbuf *buf, ncb_sz_t offset);
enum ncb_ret ncbmb_add(struct ncbmbuf *buf, ncb_sz_t off,

View File

@ -12,7 +12,7 @@
#include <haproxy/h3.h>
#include <haproxy/list.h>
#include <haproxy/mux_quic_qstrm.h>
#include <haproxy/ncbuf.h>
#include <haproxy/ncbmbuf.h>
#include <haproxy/pool.h>
#include <haproxy/proxy.h>
#include <haproxy/qmux_http.h>
@ -53,16 +53,16 @@ static int qcc_is_pacing_active(const struct connection *conn)
/* Free <rxbuf> instance and its inner data storage attached to <qcs> stream. */
static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
{
struct ncbuf *ncbuf;
struct ncbmbuf *ncbuf;
struct buffer buf;
ncbuf = &rxbuf->ncb;
if (!ncb_is_null(ncbuf)) {
if (!ncbmb_is_null(ncbuf)) {
buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
b_free(&buf);
offer_buffers(NULL, 1);
}
rxbuf->ncb = NCBUF_NULL;
rxbuf->ncb = NCBMBUF_NULL;
/* Reset DEM_FULL as buffer is released. This ensures mux is not woken
* up from rcv_buf stream callback when demux was previously blocked.
@ -504,16 +504,16 @@ int qcs_is_close_remote(struct qcs *qcs)
*
* Returns the buffer instance or NULL on allocation failure.
*/
static struct ncbuf *qcs_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
static struct ncbmbuf *qcs_get_ncbuf(struct qcs *qcs, struct ncbmbuf *ncbuf)
{
struct buffer buf = BUF_NULL;
if (ncb_is_null(ncbuf)) {
if (ncbmb_is_null(ncbuf)) {
if (!b_alloc(&buf, DB_MUX_RX))
return NULL;
*ncbuf = ncb_make(buf.area, buf.size, 0);
ncb_init(ncbuf, 0);
*ncbuf = ncbmb_make(buf.area, buf.size, 0);
ncbmb_init(ncbuf, 0);
}
return ncbuf;
@ -1151,8 +1151,8 @@ int qcc_get_qcs(struct qcc *qcc, uint64_t id, int receive_only, int send_only,
static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b)
{
if (b) {
const struct ncbuf *ncb = &b->ncb;
return b_make(ncb_orig(ncb), ncb->size, ncb->head, ncb_data(ncb, 0));
const struct ncbmbuf *ncb = &b->ncb;
return b_make(ncbmb_orig(ncb), ncb->size, ncb->head, ncbmb_data(ncb, 0));
}
else {
return BUF_NULL;
@ -1174,7 +1174,7 @@ static int qcs_transfer_rx_data(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
size_t to_copy;
int ret = 1;
BUG_ON(ncb_is_full(&rxbuf->ncb));
BUG_ON(ncbmb_is_full(&rxbuf->ncb));
next = eb64_next(&rxbuf->off_node);
if (!next)
@ -1182,19 +1182,19 @@ static int qcs_transfer_rx_data(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
rxbuf_next = container_of(next, struct qc_stream_rxbuf, off_node);
if (rxbuf_next->off_node.key == rxbuf->off_end &&
ncb_data(&rxbuf_next->ncb, 0)) {
ncbmb_data(&rxbuf_next->ncb, 0)) {
eb64_delete(&rxbuf->off_node);
eb64_delete(next);
b = qcs_b_dup(rxbuf);
b_next = qcs_b_dup(rxbuf_next);
to_copy = MIN(b_data(&b_next), ncb_size(&rxbuf->ncb) - b_data(&b));
to_copy = MIN(b_data(&b_next), ncbmb_size(&rxbuf->ncb) - b_data(&b));
ncb_ret = ncb_add(&rxbuf->ncb, ncb_data(&rxbuf->ncb, 0),
b_head(&b_next), to_copy, NCB_ADD_COMPARE);
ncb_ret = ncbmb_add(&rxbuf->ncb, ncbmb_data(&rxbuf->ncb, 0),
b_head(&b_next), to_copy, NCB_ADD_OVERWRT);
BUG_ON(ncb_ret != NCB_RET_OK);
ncb_ret = ncb_advance(&rxbuf_next->ncb, to_copy);
ncb_ret = ncbmb_advance(&rxbuf_next->ncb, to_copy);
BUG_ON(ncb_ret != NCB_RET_OK);
rxbuf->off_node.key = qcs->rx.offset;
@ -1251,7 +1251,7 @@ static struct qc_stream_rxbuf *qcs_get_curr_rxbuf(struct qcs *qcs)
static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs)
{
struct qc_stream_rxbuf *b = qcs_get_curr_rxbuf(qcs);
return b ? ncb_data(&b->ncb, 0) : 0;
return b ? ncbmb_data(&b->ncb, 0) : 0;
}
/* Remove <bytes> from <buf> current Rx buffer of <qcs> stream. Flow-control
@ -1270,7 +1270,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes, struct qc_stream_rxbuf
BUG_ON_HOT(buf->off_node.key > qcs->rx.offset ||
qcs->rx.offset >= buf->off_end);
ret = ncb_advance(&buf->ncb, bytes);
ret = ncbmb_advance(&buf->ncb, bytes);
if (ret) {
ABORT_NOW(); /* should not happens because removal only in data */
}
@ -1396,7 +1396,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
* restart decoding.
*/
if (!ret && rxbuf && !(qcs->flags & QC_SF_DEM_FULL) &&
qcs->rx.offset + ncb_data(&rxbuf->ncb, 0) == rxbuf->off_end) {
qcs->rx.offset + ncbmb_data(&rxbuf->ncb, 0) == rxbuf->off_end) {
if (!qcs_transfer_rx_data(qcs, rxbuf)) {
TRACE_DEVEL("restart parsing after data realignment", QMUX_EV_QCS_RECV, qcc->conn, qcs);
goto restart;
@ -1419,7 +1419,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
if (ret)
qcs_consume(qcs, ret, rxbuf);
if (ncb_is_empty(&rxbuf->ncb)) {
if (ncbmb_is_empty(&rxbuf->ncb)) {
qcs_free_rxbuf(qcs, rxbuf);
/* Close QCS remotely if only one Rx buffer remains and
@ -1787,7 +1787,7 @@ static struct qc_stream_rxbuf *qcs_get_rxbuf(struct qcs *qcs, uint64_t offset,
struct qcc *qcc = qcs->qcc;
struct eb64_node *node;
struct qc_stream_rxbuf *buf;
struct ncbuf *ncbuf;
struct ncbmbuf *ncbuf;
TRACE_ENTER(QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
@ -1805,7 +1805,7 @@ static struct qc_stream_rxbuf *qcs_get_rxbuf(struct qcs *qcs, uint64_t offset,
goto err;
}
buf->ncb = NCBUF_NULL;
buf->ncb = NCBMBUF_NULL;
buf->off_node.key = aligned_off;
buf->off_end = aligned_off + qmux_stream_rx_bufsz();
eb64_insert(&qcs->rx.bufs, &buf->off_node);
@ -1813,7 +1813,7 @@ static struct qc_stream_rxbuf *qcs_get_rxbuf(struct qcs *qcs, uint64_t offset,
}
ncbuf = &buf->ncb;
if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
if (!qcs_get_ncbuf(qcs, ncbuf) || ncbmb_is_null(ncbuf)) {
TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
goto err;
}
@ -1938,8 +1938,6 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
left = len;
while (left) {
struct qc_stream_rxbuf *buf;
struct proxy *px;
struct quic_counters *prx_counters;
ncb_sz_t ncb_off;
buf = qcs_get_rxbuf(qcs, offset, &len);
@ -1952,34 +1950,13 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
/* For oldest buffer, ncb_advance() may already have been performed. */
ncb_off = offset - MAX(qcs->rx.offset, buf->off_node.key);
ncb_ret = ncb_add(&buf->ncb, ncb_off, data, len, NCB_ADD_COMPARE);
ncb_ret = ncbmb_add(&buf->ncb, ncb_off, data, len, NCB_ADD_OVERWRT);
switch (ncb_ret) {
case NCB_RET_OK:
break;
case NCB_RET_DATA_REJ:
/* RFC 9000 2.2. Sending and Receiving Data
*
* An endpoint could receive data for a stream at the
* same stream offset multiple times. Data that has
* already been received can be discarded. The data at
* a given offset MUST NOT change if it is sent
* multiple times; an endpoint MAY treat receipt of
* different data at the same offset within a stream as
* a connection error of type PROTOCOL_VIOLATION.
*/
TRACE_ERROR("overlapping data rejected", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV|QMUX_EV_PROTO_ERR,
qcc->conn, qcs);
qcc_set_error(qcc, QC_ERR_PROTOCOL_VIOLATION, 0);
return 1;
case NCB_RET_GAP_SIZE:
TRACE_DATA("cannot bufferize frame due to gap size limit", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
qcc->conn, qcs);
px = qcc->proxy;
prx_counters = EXTRA_COUNTERS_GET(px->extra_counters_fe, &quic_stats_module);
HA_ATOMIC_INC(&prx_counters->ncbuf_gap_limit);
return 1;
default:
ABORT_NOW();
}
offset += len;

View File

@ -218,8 +218,14 @@ int ncbmb_is_empty(const struct ncbmbuf *buf)
int ncbmb_is_full(const struct ncbmbuf *buf)
{
/* TODO */
return 0;
size_t i = 0;
for (i = 0; i < buf->size_bm; ++i) {
if (!buf->bitmap[i])
return 0;
}
return 1;
}
int ncbmb_is_fragmented(const struct ncbmbuf *buf)