From f4d1bd0b7676e39c20bfe609d32f0da021c4aba7 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Tue, 13 Aug 2024 11:57:50 +0200 Subject: [PATCH] MINOR: mux-quic: account stream txbuf in QCC A limit per connection is put on the number of buffers allocated by QUIC MUX for emission accross all its streams. This ensures memory consumption remains under control. This limit is simply explained as a count of buffers which can be concurrently allocated for each connection. As such, quic_conn structure was used to account currently allocated buffers. However, a quic_conn nevers allocates new stream buffers. This is only done at QUIC MUX layer. As such, this commit moves buffer accounting inside QCC structure. This simplifies the API, most notably qc_stream_buf_alloc() usage. Note that this commit inverts the accounting. Previously, it was initially set to 0 and increment for each allocated buffer. Now, it is set to the maximum value and decrement for each buf usage. This is considered as clearer to use. --- include/haproxy/mux_quic-t.h | 1 + include/haproxy/mux_quic.h | 2 +- include/haproxy/quic_conn-t.h | 1 - include/haproxy/quic_stream.h | 2 +- src/mux_quic.c | 45 ++++++++++++++++++--------------- src/quic_conn.c | 1 - src/quic_stream.c | 47 ++++++++--------------------------- 7 files changed, 38 insertions(+), 61 deletions(-) diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index ee3fbe54c..22fd1b910 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -67,6 +67,7 @@ struct qcc { struct { struct quic_fctl fc; /* stream flow control applied on sending */ + int avail_bufs; /* count of available buffers for this connection */ } tx; uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */ diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 1ed8ad1dc..28a5af7b8 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -22,7 +22,7 @@ int qcs_is_close_remote(struct qcs *qcs); int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); -int qcc_notify_buf(struct qcc *qcc); +void qcc_notify_buf(struct qcc *qcc, int free_count); struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs); struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err); diff --git a/include/haproxy/quic_conn-t.h b/include/haproxy/quic_conn-t.h index ed5ec01c9..5ac3f1ff4 100644 --- a/include/haproxy/quic_conn-t.h +++ b/include/haproxy/quic_conn-t.h @@ -398,7 +398,6 @@ struct quic_conn { struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */ struct eb_root streams_by_id; /* qc_stream_desc tree */ - int stream_buf_count; /* total count of allocated stream buffers for this connection */ /* MUX */ struct qcc *qcc; diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h index 7788d9039..c7a237969 100644 --- a/include/haproxy/quic_stream.h +++ b/include/haproxy/quic_stream.h @@ -16,7 +16,7 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing); struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream); struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream, - uint64_t offset, int *avail); + uint64_t offset); void qc_stream_buf_release(struct qc_stream_desc *stream); #endif /* USE_QUIC */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 30882ab2e..bdd366f4f 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -523,34 +524,36 @@ void qcs_notify_send(struct qcs *qcs) } } -/* Notify on a new stream-desc buffer available for connection. - * - * Returns true if a stream was woken up. If false is returned, this indicates - * to the caller that it's currently unnecessary to notify for the rest of the - * available buffers. +/* Report that stream-desc buffer have been released for + * connection. */ -int qcc_notify_buf(struct qcc *qcc) +void qcc_notify_buf(struct qcc *qcc, int free_count) { struct qcs *qcs; - int ret = 0; TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); + BUG_ON(qcc->tx.avail_bufs + free_count > global.tune.quic_streams_buf); + qcc->tx.avail_bufs += free_count; + if (qcc->flags & QC_CF_CONN_FULL) { TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn); qcc->flags &= ~QC_CF_CONN_FULL; } - if (!LIST_ISEMPTY(&qcc->buf_wait_list)) { + /* TODO a simple optimization would be to only wake up QCS + * instances. But it may not work if a woken QCS is in error and does + * not try to allocate a buffer, leaving the unwoken QCS indefinitely + * in the buflist. + */ + while (!LIST_ISEMPTY(&qcc->buf_wait_list)) { qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf); LIST_DEL_INIT(&qcs->el_buf); tot_time_stop(&qcs->timer.buf); qcs_notify_send(qcs); - ret = 1; } TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); - return ret; } /* A fatal error is detected locally for connection. It should be closed @@ -1007,7 +1010,6 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs) struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err) { struct qcc *qcc = qcs->qcc; - int buf_avail; struct buffer *out = qc_stream_buf_get(qcs->stream); /* Stream must not try to reallocate a buffer if currently waiting for one. */ @@ -1022,21 +1024,22 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err) goto out; } - out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real, - &buf_avail); - if (!out) { - if (buf_avail) { - TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); - *err = 1; - goto out; - } - + if (!qcc->tx.avail_bufs) { TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs); LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf); tot_time_start(&qcs->timer.buf); qcc->flags |= QC_CF_CONN_FULL; goto out; } + + out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real); + if (!out) { + TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); + *err = 1; + goto out; + } + + --qcc->tx.avail_bufs; } out: @@ -2703,6 +2706,8 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote; qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni; + qcc->tx.avail_bufs = global.tune.quic_streams_buf; + if (conn_is_back(conn)) { qcc->next_bidi_l = 0x00; qcc->largest_bidi_r = 0x01; diff --git a/src/quic_conn.c b/src/quic_conn.c index 651669696..a07ffd2f4 100644 --- a/src/quic_conn.c +++ b/src/quic_conn.c @@ -1181,7 +1181,6 @@ struct quic_conn *qc_new_conn(const struct quic_version *qv, int ipv4, quic_cc_path_init(qc->path, ipv4, server ? l->bind_conf->max_cwnd : 0, cc_algo ? cc_algo : default_quic_cc_algo, qc); - qc->stream_buf_count = 0; memcpy(&qc->local_addr, local_addr, sizeof(qc->local_addr)); memcpy(&qc->peer_addr, peer_addr, sizeof qc->peer_addr); diff --git a/src/quic_stream.c b/src/quic_stream.c index a21391346..b45bac7bb 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -41,15 +41,9 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream, *stream_buf = NULL; /* notify MUX about available buffers. */ - --qc->stream_buf_count; if (qc->mux_state == QC_MUX_READY) { - /* notify MUX about available buffers. - * - * TODO several streams may be woken up even if a single buffer - * is available for now. - */ - while (qcc_notify_buf(qc->qcc)) - ; + /* notify MUX about available buffers. */ + qcc_notify_buf(qc->qcc, 1); } } @@ -222,15 +216,9 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing) if (free_count) { offer_buffers(NULL, free_count); - qc->stream_buf_count -= free_count; if (qc->mux_state == QC_MUX_READY) { - /* notify MUX about available buffers. - * - * TODO several streams may be woken up even if a single buffer - * is available for now. - */ - while (qcc_notify_buf(qc->qcc)) - ; + /* notify MUX about available buffers. */ + qcc_notify_buf(qc->qcc, free_count); } } @@ -265,45 +253,30 @@ struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream) return &stream->buf->buf; } -/* Returns the count of available buffer left for . */ -static int qc_stream_buf_avail(struct quic_conn *qc) -{ - BUG_ON(qc->stream_buf_count > global.tune.quic_streams_buf); - return global.tune.quic_streams_buf - qc->stream_buf_count; -} - -/* Allocate a new current buffer for . The buffer limit count for the - * connection is checked first. This function is not allowed if current buffer - * is not NULL prior to this call. The new buffer represents stream payload at - * offset . +/* Allocate a new current buffer for . This function is not allowed if + * current buffer is not NULL prior to this call. The new buffer represents + * stream payload at offset . * - * Returns the buffer or NULL on error. Caller may check to ensure if - * the connection buffer limit was reached or a fatal error was encountered. + * Returns the buffer or NULL on error. */ struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream, - uint64_t offset, int *avail) + uint64_t offset) { - struct quic_conn *qc = stream->qc; - /* current buffer must be released first before allocate a new one. */ BUG_ON(stream->buf); - *avail = qc_stream_buf_avail(qc); - if (!*avail) - return NULL; - stream->buf_offset = offset; stream->buf = pool_alloc(pool_head_quic_stream_buf); if (!stream->buf) return NULL; + stream->buf->buf = BUF_NULL; if (!b_alloc(&stream->buf->buf, DB_MUX_TX)) { pool_free(pool_head_quic_stream_buf, stream->buf); stream->buf = NULL; return NULL; } - ++qc->stream_buf_count; LIST_APPEND(&stream->buf_list, &stream->buf->list); return &stream->buf->buf;