diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 251fe7496..ee145de40 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -82,6 +82,7 @@ struct qcc { struct eb_root streams_by_id; /* all active streams by their ID */ + struct list recv_list; /* list of qcs for which demux can be resumed */ struct list send_list; /* list of qcs ready to send (STREAM, STOP_SENDING or RESET_STREAM emission) */ struct list fctl_list; /* list of sending qcs blocked on conn flow control */ struct list buf_wait_list; /* list of qcs blocked on stream desc buf */ @@ -150,6 +151,7 @@ struct qcs { uint64_t id; struct qc_stream_desc *stream; + struct list el_recv; /* element of qcc.recv_list */ struct list el_send; /* element of qcc.send_list */ struct list el_opening; /* element of qcc.opening_list */ struct list el_fctl; /* element of qcc.fctl_list */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 7a254901e..4695d1607 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -85,6 +85,7 @@ static void qcs_free(struct qcs *qcs) /* Safe to use even if already removed from the list. */ LIST_DEL_INIT(&qcs->el_opening); + LIST_DEL_INIT(&qcs->el_recv); LIST_DEL_INIT(&qcs->el_send); LIST_DEL_INIT(&qcs->el_fctl); LIST_DEL_INIT(&qcs->el_buf); @@ -138,6 +139,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) * These fields must be initialed before. */ LIST_INIT(&qcs->el_opening); + LIST_INIT(&qcs->el_recv); LIST_INIT(&qcs->el_send); LIST_INIT(&qcs->el_fctl); LIST_INIT(&qcs->el_buf); @@ -1605,6 +1607,7 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, if ((ncb_data(&qcs->rx.ncbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) || fin) { qcc_decode_qcs(qcc, qcs); + LIST_DEL_INIT(&qcs->el_recv); qcc_refresh_timeout(qcc); } @@ -2509,14 +2512,13 @@ static void qcc_wait_for_hs(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); } -/* Proceed on receiving. Loop through all streams from and use decode_qcs - * operation. +/* Proceed on receiving. Loop on streams subscribed in recv_list and performed + * STREAM frames decoding upon them. * * Returns 0 on success else non-zero. */ static int qcc_io_recv(struct qcc *qcc) { - struct eb64_node *node; struct qcs *qcs; TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); @@ -2530,25 +2532,12 @@ static int qcc_io_recv(struct qcc *qcc) if ((qcc->flags & QC_CF_WAIT_HS) && !(qcc->wait_event.events & SUB_RETRY_RECV)) qcc_wait_for_hs(qcc); - node = eb64_first(&qcc->streams_by_id); - while (node) { - uint64_t id; - - qcs = eb64_entry(node, struct qcs, by_id); - id = qcs->id; - - if (!ncb_data(&qcs->rx.ncbuf, 0) || (qcs->flags & QC_SF_DEM_FULL)) { - node = eb64_next(node); - continue; - } - - if (quic_stream_is_uni(id) && quic_stream_is_local(qcc, id)) { - node = eb64_next(node); - continue; - } - + while (!LIST_ISEMPTY(&qcc->recv_list)) { + qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv); + /* No need to add an uni local stream in recv_list. */ + BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_local(qcc, qcs->id)); qcc_decode_qcs(qcc, qcs); - node = eb64_next(node); + LIST_DEL_INIT(&qcs->el_recv); } TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); @@ -2998,6 +2987,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, goto err; } + LIST_INIT(&qcc->recv_list); LIST_INIT(&qcc->send_list); LIST_INIT(&qcc->fctl_list); LIST_INIT(&qcc->buf_wait_list); @@ -3213,8 +3203,10 @@ static size_t qmux_strm_rcv_buf(struct stconn *sc, struct buffer *buf, BUG_ON(!ncb_data(&qcs->rx.ncbuf, 0)); qcs->flags &= ~QC_SF_DEM_FULL; - if (!(qcc->flags & QC_CF_ERRL)) + if (!(qcc->flags & QC_CF_ERRL)) { + LIST_APPEND(&qcc->recv_list, &qcs->el_recv); qcc_wakeup(qcc); + } } TRACE_LEAVE(QMUX_EV_STRM_RECV, qcc->conn, qcs);