diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index ee145de40..1f69d8838 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -86,6 +86,7 @@ struct qcc { struct list send_list; /* list of qcs ready to send (STREAM, STOP_SENDING or RESET_STREAM emission) */ struct list fctl_list; /* list of sending qcs blocked on conn flow control */ struct list buf_wait_list; /* list of qcs blocked on stream desc buf */ + struct list purg_list; /* list of qcs which can be purged */ struct wait_event wait_event; /* To be used if we're waiting for I/Os */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 4695d1607..59c474e1e 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -456,6 +456,16 @@ static void qcs_close_local(struct qcs *qcs) } } +/* Returns true if can be purged. */ +static int qcs_is_completed(struct qcs *qcs) +{ + /* A stream is completed if fully closed and stconn released, or simply + * detached and everything already sent. + */ + return (qcs->st == QC_SS_CLO && !qcs_sc(qcs)) || + (qcs_is_close_local(qcs) && (qcs->flags & QC_SF_DETACH)); +} + /* Close the remote channel of instance. */ static void qcs_close_remote(struct qcs *qcs) { @@ -475,6 +485,12 @@ static void qcs_close_remote(struct qcs *qcs) BUG_ON_HOT(quic_stream_is_local(qcs->qcc, qcs->id)); qcs->st = QC_SS_CLO; } + + if (qcs_is_completed(qcs)) { + BUG_ON(LIST_INLIST(&qcs->el_send)); + TRACE_STATE("add stream in purg_list", QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs); + LIST_APPEND(&qcs->qcc->purg_list, &qcs->el_send); + } } int qcs_is_close_local(struct qcs *qcs) @@ -661,6 +677,11 @@ static void qmux_ctrl_send(struct qc_stream_desc *stream, uint64_t data, uint64_ /* Unsubscribe from streamdesc when everything sent. */ qc_stream_desc_sub_send(qcs->stream, NULL); + + if (qcs_is_completed(qcs)) { + TRACE_STATE("add stream in purg_list", QMUX_EV_QCS_SEND, qcc->conn, qcs); + LIST_APPEND(&qcc->purg_list, &qcs->el_send); + } } } @@ -2372,6 +2393,10 @@ static int qcc_io_send(struct qcc *qcc) * sending a RESET_STREAM frame. */ LIST_DEL_INIT(&qcs->el_send); + if (qcs_is_completed(qcs)) { + TRACE_STATE("add stream in purg_list", QMUX_EV_QCC_SEND|QMUX_EV_QCS_SEND, qcc->conn, qcs); + LIST_APPEND(&qcc->purg_list, &qcs->el_send); + } continue; } @@ -2545,43 +2570,22 @@ static int qcc_io_recv(struct qcc *qcc) } -/* Release all streams which have their transfer operation achieved. - * - * Returns true if at least one stream is released. - */ -static int qcc_purge_streams(struct qcc *qcc) +/* Release all streams which have their transfer operation achieved. */ +static void qcc_purge_streams(struct qcc *qcc) { - struct eb64_node *node; - int release = 0; + struct qcs *qcs; TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); - node = eb64_first(&qcc->streams_by_id); - while (node) { - struct qcs *qcs = eb64_entry(node, struct qcs, by_id); - node = eb64_next(node); + while (!LIST_ISEMPTY(&qcc->purg_list)) { + qcs = LIST_ELEM(qcc->purg_list.n, struct qcs *, el_send); - /* Release not attached closed streams. */ - if (qcs->st == QC_SS_CLO && !qcs_sc(qcs)) { - TRACE_STATE("purging closed stream", QMUX_EV_QCC_WAKE, qcs->qcc->conn, qcs); - qcs_destroy(qcs); - release = 1; - continue; - } - - /* Release detached streams with empty buffer. */ - if (qcs->flags & QC_SF_DETACH) { - if (qcs_is_close_local(qcs)) { - TRACE_STATE("purging detached stream", QMUX_EV_QCC_WAKE, qcs->qcc->conn, qcs); - qcs_destroy(qcs); - release = 1; - continue; - } - } + TRACE_STATE("purging stream", QMUX_EV_QCC_WAKE, qcs->qcc->conn, qcs); + BUG_ON_HOT(!qcs_is_completed(qcs)); + qcs_destroy(qcs); } TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); - return release; } /* Execute application layer shutdown. If this operation is not defined, a @@ -2659,7 +2663,8 @@ static int qcc_wake_some_streams(struct qcc *qcc) */ static int qcc_io_process(struct qcc *qcc) { - qcc_purge_streams(qcc); + if (!LIST_ISEMPTY(&qcc->purg_list)) + qcc_purge_streams(qcc); /* Check if a soft-stop is in progress. * @@ -2991,6 +2996,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, LIST_INIT(&qcc->send_list); LIST_INIT(&qcc->fctl_list); LIST_INIT(&qcc->buf_wait_list); + LIST_INIT(&qcc->purg_list); qcc->wait_event.tasklet->process = qcc_io_cb; qcc->wait_event.tasklet->context = qcc;