diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index f9b8b2fd9..0c8ce6008 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -106,6 +106,7 @@ struct qcs { uint64_t msd; /* fctl bytes limit to respect on emission */ } tx; + struct eb64_node by_id; uint64_t id; struct qc_stream_desc *stream; diff --git a/include/haproxy/quic_stream-t.h b/include/haproxy/quic_stream-t.h index 6a451acd3..ff88053b3 100644 --- a/include/haproxy/quic_stream-t.h +++ b/include/haproxy/quic_stream-t.h @@ -10,14 +10,14 @@ /* QUIC STREAM descriptor. * * This structure is the low-level counterpart of the QUIC STREAM at the MUX - * layer. It provides a node for tree-storage and buffering for Tx. + * layer. It is stored in the quic-conn and provides facility for Tx buffering. * * Once the MUX has finished to transfer data on a STREAM, it must release its * QUIC STREAM descriptor. The descriptor will be kept by the quic_conn until * all acknowledgement has been received. */ struct qc_stream_desc { - struct eb64_node by_id; /* id of the stream used for tree */ + struct eb64_node by_id; /* node for quic_conn tree */ struct buffer buf; /* buffer for STREAM data on Tx, emptied on acknowledge */ uint64_t ack_offset; /* last acknowledged offset */ diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h index c524d1066..38a180ba2 100644 --- a/include/haproxy/quic_stream.h +++ b/include/haproxy/quic_stream.h @@ -7,9 +7,9 @@ struct quic_conn; -struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx); -void qc_stream_desc_release(struct qc_stream_desc *stream, - struct quic_conn *qc); +struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx, + struct quic_conn *qc); +void qc_stream_desc_release(struct qc_stream_desc *stream); int qc_stream_desc_free(struct qc_stream_desc *stream); #endif /* USE_QUIC */ diff --git a/include/haproxy/xprt_quic-t.h b/include/haproxy/xprt_quic-t.h index 247833239..ab714ea50 100644 --- a/include/haproxy/xprt_quic-t.h +++ b/include/haproxy/xprt_quic-t.h @@ -749,7 +749,7 @@ struct quic_conn { struct listener *li; /* only valid for frontend connections */ struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */ - struct eb_root streams_by_id; /* storage for released qc_stream_desc */ + struct eb_root streams_by_id; /* qc_stream_desc tree */ /* MUX */ struct qcc *qcc; diff --git a/src/mux_quic.c b/src/mux_quic.c index 642da7206..63a75e294 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -112,8 +112,12 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) if (!qcs) goto out; - /* allocate transport layer stream descriptor */ - stream = qc_stream_desc_new(id, qcs); + /* allocate transport layer stream descriptor + * + * TODO qc_stream_desc is only useful for Tx buffering. It should not + * be required for unidirectional remote streams. + */ + stream = qc_stream_desc_new(id, qcs, qcc->conn->handle.qc); if (!stream) { pool_free(pool_head_qcs, qcs); qcs = NULL; @@ -134,9 +138,9 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qcs->endp->ctx = qcc->conn; qcs->endp->flags |= (CS_EP_T_MUX|CS_EP_ORPHAN|CS_EP_NOT_FIRST); - qcs->id = id; + qcs->id = qcs->by_id.key = id; /* store transport layer stream descriptor in qcc tree */ - eb64_insert(&qcc->streams_by_id, &stream->by_id); + eb64_insert(&qcc->streams_by_id, &qcs->by_id); qcc->strms[type].nb_streams++; @@ -175,11 +179,12 @@ void qcs_free(struct qcs *qcs) BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams); --qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams; - /* stream desc must be removed from MUX tree before release it */ - eb64_delete(&qcs->stream->by_id); - qc_stream_desc_release(qcs->stream, qcs->qcc->conn->handle.qc); + qc_stream_desc_release(qcs->stream); + BUG_ON(qcs->endp && !(qcs->endp->flags & CS_EP_ORPHAN)); cs_endpoint_free(qcs->endp); + + eb64_delete(&qcs->by_id); pool_free(pool_head_qcs, qcs); } @@ -239,24 +244,22 @@ void qcs_notify_send(struct qcs *qcs) */ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) { - struct qc_stream_desc *stream; unsigned int strm_type; int64_t sub_id; - struct eb64_node *strm_node; + struct eb64_node *node; struct qcs *qcs = NULL; strm_type = id & QCS_ID_TYPE_MASK; sub_id = id >> QCS_ID_TYPE_SHIFT; - strm_node = NULL; + node = NULL; if (quic_stream_is_local(qcc, id)) { /* Local streams: this stream must be already opened. */ - strm_node = eb64_lookup(&qcc->streams_by_id, id); - if (!strm_node) { + node = eb64_lookup(&qcc->streams_by_id, id); + if (!node) { /* unknown stream id */ goto out; } - stream = eb64_entry(strm_node, struct qc_stream_desc, by_id); - qcs = stream->ctx; + qcs = eb64_entry(node, struct qcs, by_id); } else { /* Remote streams. */ @@ -304,11 +307,9 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) qcs = tmp_qcs; } else { - strm_node = eb64_lookup(strms, id); - if (strm_node) { - stream = eb64_entry(strm_node, struct qc_stream_desc, by_id); - qcs = stream->ctx; - } + node = eb64_lookup(strms, id); + if (node) + qcs = eb64_entry(node, struct qcs, by_id); } } @@ -414,14 +415,12 @@ int qcc_recv_max_data(struct qcc *qcc, uint64_t max) */ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max) { - struct qc_stream_desc *stream; struct qcs *qcs; struct eb64_node *node; node = eb64_lookup(&qcc->streams_by_id, id); if (node) { - stream = eb64_entry(node, struct qc_stream_desc, by_id); - qcs = stream->ctx; + qcs = eb64_entry(node, struct qcs, by_id); if (max > qcs->tx.msd) { qcs->tx.msd = max; @@ -522,9 +521,9 @@ static void qc_release(struct qcc *qcc) /* liberate remaining qcs instances */ node = eb64_first(&qcc->streams_by_id); while (node) { - struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id); + struct qcs *qcs = eb64_entry(node, struct qcs, by_id); node = eb64_next(node); - qcs_free(stream->ctx); + qcs_free(qcs); } pool_free(pool_head_qcc, qcc); @@ -846,8 +845,7 @@ static int qc_send(struct qcc *qcc) */ node = eb64_first(&qcc->streams_by_id); while (node) { - struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id); - struct qcs *qcs = stream->ctx; + struct qcs *qcs = eb64_entry(node, struct qcs, by_id); struct buffer *buf = &qcs->tx.buf; struct buffer *out = &qcs->stream->buf; @@ -921,8 +919,7 @@ static int qc_release_detached_streams(struct qcc *qcc) node = eb64_first(&qcc->streams_by_id); while (node) { - struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id); - struct qcs *qcs = stream->ctx; + struct qcs *qcs = eb64_entry(node, struct qcs, by_id); node = eb64_next(node); if (qcs->flags & QC_SF_DETACH) { @@ -1263,14 +1260,12 @@ static int qc_unsubscribe(struct conn_stream *cs, int event_type, struct wait_ev */ static int qc_wake_some_streams(struct qcc *qcc) { - struct qc_stream_desc *stream; struct qcs *qcs; struct eb64_node *node; for (node = eb64_first(&qcc->streams_by_id); node; node = eb64_next(node)) { - stream = eb64_entry(node, struct qc_stream_desc, by_id); - qcs = stream->ctx; + qcs = eb64_entry(node, struct qcs, by_id); if (!qcs->cs) continue; diff --git a/src/quic_stream.c b/src/quic_stream.c index 4213e50fc..6bdd00dce 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -17,7 +17,8 @@ DECLARE_STATIC_POOL(pool_head_quic_conn_stream, "qc_stream_desc", * * Returns the newly allocated instance on success or else NULL. */ -struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx) +struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx, + struct quic_conn *qc) { struct qc_stream_desc *stream; @@ -26,7 +27,7 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx) return NULL; stream->by_id.key = id; - stream->by_id.node.leaf_p = NULL; + eb64_insert(&qc->streams_by_id, &stream->by_id); stream->buf = BUF_NULL; stream->acked_frms = EB_ROOT; @@ -37,23 +38,19 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx) return stream; } -/* Mark the stream descriptor as released by the upper layer. It will - * be freed as soon as all its buffered data are acknowledged. In the meantime, - * the stream is stored in the tree : thus it must have been removed from - * any other tree before calling this function. +/* Mark the stream descriptor as released. It will be freed as soon as + * all its buffered data are acknowledged. */ -void qc_stream_desc_release(struct qc_stream_desc *stream, - struct quic_conn *qc) +void qc_stream_desc_release(struct qc_stream_desc *stream) { - BUG_ON(stream->by_id.node.leaf_p); + /* A stream can be released only one time. */ + BUG_ON(stream->release); stream->release = 1; stream->ctx = NULL; if (!b_data(&stream->buf)) qc_stream_desc_free(stream); - else - eb64_insert(&qc->streams_by_id, &stream->by_id); } /* Free the stream descriptor buffer. This function should be used diff --git a/src/xprt_quic.c b/src/xprt_quic.c index fde571d60..47ec2c416 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -1494,20 +1494,13 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc, /* do not use strm_frm->stream as the qc_stream_desc instance * might be freed at this stage. Use the id to do a proper - * lookup. First search in the MUX then in the released stream - * list. + * lookup. * * TODO if lookup operation impact on the perf is noticeable, * implement a refcount on qc_stream_desc instances. */ - if (qc->mux_state == QC_MUX_READY) - stream = qcc_get_stream(qc->qcc, strm_frm->id); - if (!stream) { - node = eb64_lookup(&qc->streams_by_id, strm_frm->id); - stream = eb64_entry(node, struct qc_stream_desc, by_id); - } - - if (!stream) { + node = eb64_lookup(&qc->streams_by_id, strm_frm->id); + if (!node) { TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm); LIST_DELETE(&frm->list); quic_tx_packet_refdec(frm->pkt); @@ -1516,6 +1509,7 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc, /* early return */ return; } + stream = eb64_entry(node, struct qc_stream_desc, by_id); TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream); if (strm_frm->offset.key <= stream->ack_offset) {