diff --git a/include/haproxy/quic_stream-t.h b/include/haproxy/quic_stream-t.h index f1feb5c3d..e10ca6da0 100644 --- a/include/haproxy/quic_stream-t.h +++ b/include/haproxy/quic_stream-t.h @@ -6,6 +6,18 @@ #include #include +#include + +/* A QUIC STREAM buffer used for Tx. + * + * Currently, no offset is associated with an offset. The qc_stream_desc must + * store them in order and keep the offset of the oldest buffer. The buffers + * can be freed in strict order. + */ +struct qc_stream_buf { + struct buffer buf; /* STREAM payload */ + struct list list; /* element for qc_stream_desc list */ +}; /* QUIC STREAM descriptor. * @@ -20,7 +32,10 @@ struct qc_stream_desc { struct eb64_node by_id; /* node for quic_conn tree */ struct quic_conn *qc; - struct buffer buf; /* buffer for STREAM data on Tx, emptied on acknowledge */ + struct list buf_list; /* buffers waiting for ACK, oldest offset first */ + struct qc_stream_buf *buf; /* current buffer used by the MUX */ + uint64_t buf_offset; /* base offset of current buffer */ + uint64_t ack_offset; /* last acknowledged offset */ struct eb_root acked_frms; /* ACK frames tree for non-contiguous ACK ranges */ diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h index 38a180ba2..bd26a489f 100644 --- a/include/haproxy/quic_stream.h +++ b/include/haproxy/quic_stream.h @@ -10,7 +10,13 @@ struct quic_conn; struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx, struct quic_conn *qc); void qc_stream_desc_release(struct qc_stream_desc *stream); -int qc_stream_desc_free(struct qc_stream_desc *stream); +void qc_stream_desc_free(struct qc_stream_desc *stream); + +struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream); +struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream, + uint64_t offset); +void qc_stream_buf_release(struct qc_stream_desc *stream); +int qc_stream_desc_free_buf(struct qc_stream_desc *stream); #endif /* USE_QUIC */ #endif /* _HAPROXY_QUIC_STREAM_H_ */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 63a75e294..d0fd0b2a7 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -627,16 +627,24 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, struct qcc *qcc = qcs->qcc; struct quic_frame *frm; int head, total; + uint64_t base_off; TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); - /* cf buffer schema in qcs_xfer_data */ - head = qcs->tx.sent_offset - qcs->stream->ack_offset; + /* if ack_offset < buf_offset, it points to an older buffer. */ + base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset); + BUG_ON(qcs->tx.sent_offset < base_off); + + head = qcs->tx.sent_offset - base_off; total = b_data(out) - head; + BUG_ON(total < 0); + if (!total) { TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); return 0; } + BUG_ON(qcs->tx.sent_offset >= qcs->tx.offset); + BUG_ON(qcs->tx.sent_offset + total > qcs->tx.offset); frm = pool_zalloc(pool_head_quic_frame); if (!frm) @@ -689,6 +697,7 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) uint64_t diff; BUG_ON(offset > qcs->tx.sent_offset); + BUG_ON(offset >= qcs->tx.offset); /* check if the STREAM frame has already been notified. It can happen * for retransmission. @@ -707,8 +716,14 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) /* increase offset on stream */ qcs->tx.sent_offset += diff; BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd); + BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset); if (qcs->tx.sent_offset == qcs->tx.msd) qcs->flags |= QC_SF_BLK_SFCTL; + + if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) { + qc_stream_buf_release(qcs->stream); + tasklet_wakeup(qcc->wait_event.tasklet); + } } /* Wrapper for send on transport layer. Send a list of frames for the @@ -847,7 +862,7 @@ static int qc_send(struct qcc *qcc) while (node) { struct qcs *qcs = eb64_entry(node, struct qcs, by_id); struct buffer *buf = &qcs->tx.buf; - struct buffer *out = &qcs->stream->buf; + struct buffer *out = qc_stream_buf_get(qcs->stream); /* TODO * for the moment, unidirectional streams have their own @@ -864,6 +879,24 @@ static int qc_send(struct qcc *qcc) continue; } + if (!b_data(buf) && !out) { + node = eb64_next(node); + continue; + } + + if (!out) { + struct connection *conn = qcc->conn; + + out = qc_stream_buf_alloc(qcs->stream, + qcs->tx.offset); + if (!out) { + conn->xprt->subscribe(conn, conn->xprt_ctx, + SUB_RETRY_SEND, &qcc->wait_event); + node = eb64_next(node); + continue; + } + } + /* Prepare buffer with data from . */ if (b_data(buf)) { int ret = qcs_xfer_data(qcs, out, buf, @@ -887,7 +920,7 @@ static int qc_send(struct qcc *qcc) } /* Build a new STREAM frame with buffer. */ - if (b_data(out)) { + if (b_data(out) && qcs->tx.sent_offset != qcs->tx.offset) { int ret; char fin = !!(qcs->flags & QC_SF_FIN_STREAM); diff --git a/src/quic_stream.c b/src/quic_stream.c index 69fafbb34..72474ac19 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -11,6 +11,9 @@ DECLARE_STATIC_POOL(pool_head_quic_conn_stream, "qc_stream_desc", sizeof(struct qc_stream_desc)); +DECLARE_STATIC_POOL(pool_head_quic_conn_stream_buf, "qc_stream_buf", + sizeof(struct qc_stream_buf)); + /* Allocate a new stream descriptor with id . The caller is responsible to * store the stream in the appropriate tree. @@ -30,7 +33,10 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx, eb64_insert(&qc->streams_by_id, &stream->by_id); stream->qc = qc; - stream->buf = BUF_NULL; + stream->buf = NULL; + LIST_INIT(&stream->buf_list); + stream->buf_offset = 0; + stream->acked_frms = EB_ROOT; stream->ack_offset = 0; stream->release = 0; @@ -50,46 +56,144 @@ void qc_stream_desc_release(struct qc_stream_desc *stream) stream->release = 1; stream->ctx = NULL; - if (!b_data(&stream->buf)) + if (LIST_ISEMPTY(&stream->buf_list)) { + /* if no buffer left we can free the stream. */ qc_stream_desc_free(stream); + } + else { + /* A released stream does not use . */ + stream->buf = NULL; + } } -/* Free the stream descriptor buffer. This function should be used - * when all its data have been acknowledged. If the stream was released by the - * upper layer, the stream descriptor will be freed. - * - * Returns 0 if the stream was not freed else non-zero. +/* Free the stream descriptor content. This function should be used + * when all its data have been acknowledged or on full connection closing. It + * must only be called after the stream is released. */ -int qc_stream_desc_free(struct qc_stream_desc *stream) +void qc_stream_desc_free(struct qc_stream_desc *stream) { - b_free(&stream->buf); + struct qc_stream_buf *buf, *buf_back; + struct eb64_node *frm_node; + unsigned int free_count = 0; + + /* This function only deals with released streams. */ + BUG_ON(!stream->release); + + /* free remaining stream buffers */ + list_for_each_entry_safe(buf, buf_back, &stream->buf_list, list) { + if (!(b_data(&buf->buf))) { + b_free(&buf->buf); + LIST_DELETE(&buf->list); + pool_free(pool_head_quic_conn_stream_buf, buf); + + ++free_count; + } + } + + if (free_count) + offer_buffers(NULL, free_count); + + /* qc_stream_desc might be freed before having received all its ACKs. + * This is the case if some frames were retransmitted. + */ + frm_node = eb64_first(&stream->acked_frms); + while (frm_node) { + struct quic_stream *strm; + struct quic_frame *frm; + + strm = eb64_entry(&frm_node->node, struct quic_stream, offset); + + frm_node = eb64_next(frm_node); + eb64_delete(&strm->offset); + + frm = container_of(strm, struct quic_frame, stream); + LIST_DELETE(&frm->list); + quic_tx_packet_refdec(frm->pkt); + pool_free(pool_head_quic_frame, frm); + } + + eb64_delete(&stream->by_id); + pool_free(pool_head_quic_conn_stream, stream); +} + +/* Return the current buffer of . May be NULL if not allocated. */ +struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream) +{ + if (!stream->buf) + return NULL; + + return &stream->buf->buf; +} + +/* Allocate a new current buffer for . This function is not allowed if + * current buffer is not NULL prior to this call. The new buffer represents + * stream payload at offset . + * + * Returns the buffer or NULL. + */ +struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream, + uint64_t offset) +{ + /* current buffer must be released first before allocate a new one. */ + BUG_ON(stream->buf); + + stream->buf_offset = offset; + stream->buf = pool_alloc(pool_head_quic_conn_stream_buf); + if (!stream->buf) + return NULL; + + stream->buf->buf = BUF_NULL; + LIST_APPEND(&stream->buf_list, &stream->buf->list); + + return &stream->buf->buf; +} + +/* Release the current buffer of . It will be kept internally by + * the . The current buffer cannot be NULL. + */ +void qc_stream_buf_release(struct qc_stream_desc *stream) +{ + /* current buffer already released */ + BUG_ON(!stream->buf); + + stream->buf = NULL; + stream->buf_offset = 0; +} + +/* Free the oldest buffer of . If the stream was already released and + * does not references any buffers, it is freed. This function must only be + * called if at least one buffer is present. Use qc_stream_desc_free() to free + * a released stream. + * + * Returns 0 if the stream is not yet freed else 1. + */ +int qc_stream_desc_free_buf(struct qc_stream_desc *stream) +{ + struct qc_stream_buf *stream_buf; + + BUG_ON(LIST_ISEMPTY(&stream->buf_list) && !stream->buf); + + if (!LIST_ISEMPTY(&stream->buf_list)) { + /* get first buffer from buf_list and remove it */ + stream_buf = LIST_NEXT(&stream->buf_list, struct qc_stream_buf *, + list); + LIST_DELETE(&stream_buf->list); + } + else { + stream_buf = stream->buf; + stream->buf = NULL; + } + + b_free(&stream_buf->buf); + pool_free(pool_head_quic_conn_stream_buf, stream_buf); + offer_buffers(NULL, 1); - if (stream->release) { - /* Free frames still waiting for an ACK. Even if the stream buf - * is NULL, some frames could still be not acknowledged. This - * is notably the case for retransmission where multiple frames - * points to the same buffer content. - */ - struct eb64_node *frm_node = eb64_first(&stream->acked_frms); - while (frm_node) { - struct quic_stream *strm; - struct quic_frame *frm; - - strm = eb64_entry(&frm_node->node, struct quic_stream, offset); - - frm_node = eb64_next(frm_node); - eb64_delete(&strm->offset); - - frm = container_of(strm, struct quic_frame, stream); - LIST_DELETE(&frm->list); - quic_tx_packet_refdec(frm->pkt); - pool_free(pool_head_quic_frame, frm); - } - - eb64_delete(&stream->by_id); - pool_free(pool_head_quic_conn_stream, stream); - + /* If qc_stream_desc is released and does not contains any buffers, we + * can free it now. + */ + if (stream->release && LIST_ISEMPTY(&stream->buf_list)) { + qc_stream_desc_free(stream); return 1; } diff --git a/src/xprt_quic.c b/src/xprt_quic.c index 47ec2c416..85cc76458 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -1457,6 +1457,12 @@ static int quic_stream_try_to_consume(struct quic_conn *qc, stream->ack_offset; stream->ack_offset += diff; b_del(strm->buf, diff); + if (!b_data(strm->buf)) { + if (qc_stream_desc_free_buf(stream)) { + /* stream is freed here */ + return 1; + } + } ret = 1; } @@ -1469,11 +1475,6 @@ static int quic_stream_try_to_consume(struct quic_conn *qc, pool_free(pool_head_quic_frame, frm); } - if (!b_data(&stream->buf)) { - if (qc_stream_desc_free(stream)) - TRACE_PROTO("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc); - } - return ret; } @@ -1521,7 +1522,7 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc, stream_acked = 1; if (!b_data(strm_frm->buf)) { - if (qc_stream_desc_free(stream)) { + if (qc_stream_desc_free_buf(stream)) { /* stream is freed at this stage, * no need to continue. */