MINOR: quic: store streambuf in a streamdesc tree

qc_stream_desc layer is used by QUIC MUX to store emitted STREAM data
until their acknowledgement. Each stream with Tx capability can allocate
its own qc_stream_desc. In turn, each stream desc can have one or
multiple data buffers. This is useful when a MUX stream releases a
buffer and allocate a new one, to preserve bandwith without waiting to
receive all acknowledgement of the previous buffer.

Each buffer is encapsulated in a qc_stream_buf structure. Previously, it
was stored as a list into qc_stream_desc. Change this storage to use a
tree instead. Each buffer is indexed by their offset.

This commit does not introduce functional changes. However, this
rearchitecture will be necessary for future commit to extend ACK
management which require fetching individual buffer instance, not just
the first or last element of a streamdesc, by their offset.
This commit is contained in:
Amaury Denoyelle 2024-10-01 11:27:37 +02:00
parent f4a83fbb14
commit 943e48dadd
4 changed files with 18 additions and 16 deletions

View File

@ -15,8 +15,8 @@
* can be freed in strict order.
*/
struct qc_stream_buf {
struct eb64_node offset_node; /* node for qc_stream_desc buf tree */
struct buffer buf; /* STREAM payload */
struct list list; /* element for qc_stream_desc list */
int sbuf;
};
@ -36,11 +36,11 @@ struct qc_stream_desc {
struct eb64_node by_id; /* node for quic_conn tree */
struct quic_conn *qc;
struct list buf_list; /* buffers waiting for ACK, oldest offset first */
struct qc_stream_buf *buf; /* current buffer used by the MUX */
uint64_t buf_offset; /* base offset of current buffer */
uint64_t ack_offset; /* last acknowledged offset */
struct eb_root buf_tree; /* list of active and released buffers */
struct eb_root acked_frms; /* ACK frames tree for non-contiguous ACK ranges */
int flags; /* QC_SD_FL_* values */

View File

@ -25,7 +25,7 @@ void qc_stream_buf_release(struct qc_stream_desc *stream);
static inline int qc_stream_desc_done(const struct qc_stream_desc *s)
{
return (s->flags & (QC_SD_FL_RELEASE|QC_SD_FL_WAIT_FOR_FIN)) == QC_SD_FL_RELEASE &&
LIST_ISEMPTY(&s->buf_list);
eb_is_empty(&s->buf_tree);
}
/* Reports emission of STREAM frame starting at <offset> and of length <len>,

View File

@ -849,7 +849,7 @@ void qcs_send_metadata(struct qcs *qcs)
/* Reserved for stream with Tx capability. */
BUG_ON(!qcs->stream);
/* Cannot use if some data already transferred for this stream. */
BUG_ON(qcs->stream->ack_offset || !LIST_ISEMPTY(&qcs->stream->buf_list));
BUG_ON(qcs->stream->ack_offset || !eb_is_empty(&qcs->stream->buf_tree));
qcs->flags |= QC_SF_TXBUB_OOB;
qc_stream_desc_sub_room(qcs->stream, NULL);

View File

@ -6,7 +6,6 @@
#include <haproxy/buf.h>
#include <haproxy/dynbuf.h>
#include <haproxy/errors.h>
#include <haproxy/list.h>
#include <haproxy/mux_quic.h>
#include <haproxy/pool.h>
#include <haproxy/quic_conn.h>
@ -25,7 +24,7 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
struct buffer *buf = &(*stream_buf)->buf;
uint64_t free_size;
LIST_DEL_INIT(&(*stream_buf)->list);
eb64_delete(&(*stream_buf)->offset_node);
/* Reset current buf ptr if deleted instance is the same one. */
if (*stream_buf == stream->buf)
@ -73,7 +72,7 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void
stream->qc = qc;
stream->buf = NULL;
LIST_INIT(&stream->buf_list);
stream->buf_tree = EB_ROOT_UNIQUE;
stream->buf_offset = 0;
stream->acked_frms = EB_ROOT;
@ -156,10 +155,10 @@ int qc_stream_desc_ack(struct qc_stream_desc *stream, size_t offset, size_t len,
diff = offset + len - stream->ack_offset;
if (diff) {
/* Buf list cannot be empty if there is still unacked data. */
BUG_ON(LIST_ISEMPTY(&stream->buf_list));
BUG_ON(eb_is_empty(&stream->buf_tree));
/* get oldest buffer from buf_list */
stream_buf = LIST_NEXT(&stream->buf_list, struct qc_stream_buf *, list);
/* get oldest buffer from buf tree */
stream_buf = eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node);
buf = &stream_buf->buf;
stream->ack_offset += diff;
@ -186,16 +185,18 @@ int qc_stream_desc_ack(struct qc_stream_desc *stream, size_t offset, size_t len,
*/
void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
{
struct qc_stream_buf *buf, *buf_back;
struct qc_stream_buf *buf;
struct quic_conn *qc = stream->qc;
struct eb64_node *frm_node;
struct eb64_node *frm_node, *buf_node;
unsigned int free_count = 0;
/* This function only deals with released streams. */
BUG_ON(!(stream->flags & QC_SD_FL_RELEASE));
/* free remaining stream buffers */
list_for_each_entry_safe(buf, buf_back, &stream->buf_list, list) {
while (!eb_is_empty(&stream->buf_tree)) {
buf_node = eb64_first(&stream->buf_tree);
buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
/* qc_stream_desc_free() can only be used after all data is
* acknowledged or on connection shutdown. In the contrary
@ -208,7 +209,7 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
else
b_free(&buf->buf);
LIST_DELETE(&buf->list);
eb64_delete(&buf->offset_node);
pool_free(pool_head_quic_stream_buf, buf);
++free_count;
}
@ -265,6 +266,7 @@ struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
return NULL;
stream->buf->buf = BUF_NULL;
stream->buf->offset_node.key = offset;
if (!small) {
stream->buf->sbuf = 0;
@ -287,7 +289,7 @@ struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
stream->buf->buf = b_make(area, global.tune.bufsize_small, 0, 0);
}
LIST_APPEND(&stream->buf_list, &stream->buf->list);
eb64_insert(&stream->buf_tree, &stream->buf->offset_node);
return &stream->buf->buf;
}
@ -311,7 +313,7 @@ struct buffer *qc_stream_buf_realloc(struct qc_stream_desc *stream)
stream->buf->sbuf = 0;
if (!b_alloc(&stream->buf->buf, DB_MUX_TX)) {
LIST_DEL_INIT(&stream->buf->list);
eb64_delete(&stream->buf->offset_node);
pool_free(pool_head_quic_stream_buf, stream->buf);
stream->buf = NULL;
return NULL;