From 244dc00b09868f52864d50fc14af8d981c81424b Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Wed, 11 Dec 2024 17:53:29 +0100 Subject: [PATCH] MINOR: mux-quic: extract code to build STREAM frames list Extracts code responsible to generate STREAM, RESET_STREAM and STOP_SENDING frames for each qcs instances registered in qcc send_list. It is moved from qcc_io_send() to its owned new function qcc_build_frms(). This commit does not bring functional change. It is a preparatory step to adapt QUIC MUX send mechanism to allow reusing of qcc frms list accross qcc_io_send() invokation. As a side change, qcc_tx_frms_free() is renamed to qcc_clear_frms(). This better highlights its relationship with qcc_build_frms(). This should be bkacported up to 3.1. --- src/mux_quic.c | 212 +++++++++++++++++++++++++++++-------------------- 1 file changed, 124 insertions(+), 88 deletions(-) diff --git a/src/mux_quic.c b/src/mux_quic.c index 59c474e1e..b48def292 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -44,15 +44,6 @@ static int qcc_is_pacing_active(const struct connection *conn) return !!(qc->path->cc.algo->pacing_rate); } -/* Free STREAM frames in Tx list. */ -static void qcc_tx_frms_free(struct qcc *qcc) -{ - while (!LIST_ISEMPTY(&qcc->tx.frms)) { - struct quic_frame *frm = LIST_ELEM(qcc->tx.frms.n, struct quic_frame *, list); - qc_frm_free(qcc->conn->handle.qc, &frm); - } -} - static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) { struct buffer buf; @@ -1357,6 +1348,15 @@ static void qcc_notify_fctl(struct qcc *qcc) } } +/* Free STREAM frames in Tx list. */ +static void qcc_clear_frms(struct qcc *qcc) +{ + while (!LIST_ISEMPTY(&qcc->tx.frms)) { + struct quic_frame *frm = LIST_ELEM(qcc->tx.frms.n, struct quic_frame *, list); + qc_frm_free(qcc->conn->handle.qc, &frm); + } +} + /* Prepare for the emission of RESET_STREAM on with error code . */ void qcc_reset_stream(struct qcs *qcs, int err) { @@ -2298,6 +2298,111 @@ static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn) return -1; } +/* Encode STREAM frames into tx frms for streams registered into + * send_list. On each error, related stream is removed from send_list and + * inserted into list. + * + * This functions also serves to emit RESET_STREAM and STOP_SENDING frames. In + * this case, frame is emitted immediately without using tx frms. If an + * error occured during this step, this is considered as fatal. Tx frms is + * cleared and 0 is returned. + * + * Returns the sum of encoded STREAM frames length or 0 if no frame built. + */ +static int qcc_build_frms(struct qcc *qcc, struct list *qcs_failed) +{ + struct list *frms = &qcc->tx.frms; + struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; + uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); + int ret = 0, total = 0; + + TRACE_ENTER(QMUX_EV_QCC_END, qcc->conn); + + /* Frames list must first be cleared via qcc_clear_frms(). */ + BUG_ON(!LIST_ISEMPTY(&qcc->tx.frms)); + + list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_list, el_send) { + /* Check if all QCS were processed. */ + if (qcs == first_qcs) + break; + + /* Stream must not be present in send_list if it has nothing to send. */ + BUG_ON(!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) && + (!qcs->stream || !qcs_prep_bytes(qcs))); + + /* Each STOP_SENDING/RESET_STREAM frame is sent individually to + * guarantee its emission. + * + * TODO multiplex several frames in same datagram to optimize sending + */ + if (qcs->flags & QC_SF_TO_STOP_SENDING) { + if (qcs_send_stop_sending(qcs)) + goto err; + + /* Remove stream from send_list if it had only STOP_SENDING + * to send. + */ + if (!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_RESET)) && + (!qcs->stream || !qcs_prep_bytes(qcs))) { + LIST_DEL_INIT(&qcs->el_send); + continue; + } + } + + if (qcs->flags & QC_SF_TO_RESET) { + if (qcs_send_reset(qcs)) + goto err; + + /* RFC 9000 3.3. Permitted Frame Types + * + * A sender MUST NOT send + * a STREAM or STREAM_DATA_BLOCKED frame for a stream in the + * "Reset Sent" state or any terminal state -- that is, after + * sending a RESET_STREAM frame. + */ + LIST_DEL_INIT(&qcs->el_send); + if (qcs_is_completed(qcs)) { + TRACE_STATE("add stream in purg_list", QMUX_EV_QCC_SEND|QMUX_EV_QCS_SEND, qcc->conn, qcs); + LIST_APPEND(&qcc->purg_list, &qcs->el_send); + } + continue; + } + + /* Total sent bytes must not exceed connection window. */ + BUG_ON(total > window_conn); + + if (!qfctl_rblocked(&qcc->tx.fc) && + !qfctl_rblocked(&qcs->tx.fc) && window_conn > total) { + if ((ret = qcs_send(qcs, frms, window_conn - total)) < 0) { + /* Temporarily remove QCS from send-list. */ + LIST_DEL_INIT(&qcs->el_send); + LIST_APPEND(qcs_failed, &qcs->el_send); + continue; + } + + total += ret; + if (ret) { + /* Move QCS with some bytes transferred at the + * end of send-list for next iterations. + */ + LIST_DEL_INIT(&qcs->el_send); + LIST_APPEND(&qcc->send_list, &qcs->el_send); + /* Remember first moved QCS as checkpoint to interrupt loop */ + if (!first_qcs) + first_qcs = qcs; + } + } + } + + TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); + return total; + + err: + qcc_clear_frms(qcc); + TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn); + return 0; +} + /* Proceed to sending. Loop through all available streams for the * instance and try to send as much as possible. * @@ -2308,9 +2413,9 @@ static int qcc_io_send(struct qcc *qcc) struct list *frms = &qcc->tx.frms; /* Temporary list for QCS on error. */ struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); - struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; + struct qcs *qcs, *qcs_tmp; uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); - int ret = 0, ret_sent = 0, total = 0, resent; + int ret = 0, total = 0, resent; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); @@ -2320,7 +2425,7 @@ static int qcc_io_send(struct qcc *qcc) * apply for STREAM frames. */ - qcc_tx_frms_free(qcc); + qcc_clear_frms(qcc); /* Check for transport error. */ if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { @@ -2353,78 +2458,9 @@ static int qcc_io_send(struct qcc *qcc) } /* Send STREAM/STOP_SENDING/RESET_STREAM data for registered streams. */ - list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_list, el_send) { - /* Check if all QCS were processed. */ - if (qcs == first_qcs) - break; - - /* Stream must not be present in send_list if it has nothing to send. */ - BUG_ON(!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) && - (!qcs->stream || !qcs_prep_bytes(qcs))); - - /* Each STOP_SENDING/RESET_STREAM frame is sent individually to - * guarantee its emission. - * - * TODO multiplex several frames in same datagram to optimize sending - */ - if (qcs->flags & QC_SF_TO_STOP_SENDING) { - if (qcs_send_stop_sending(qcs)) - goto sent_done; - - /* Remove stream from send_list if it had only STOP_SENDING - * to send. - */ - if (!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_RESET)) && - (!qcs->stream || !qcs_prep_bytes(qcs))) { - LIST_DEL_INIT(&qcs->el_send); - continue; - } - } - - if (qcs->flags & QC_SF_TO_RESET) { - if (qcs_send_reset(qcs)) - goto sent_done; - - /* RFC 9000 3.3. Permitted Frame Types - * - * A sender MUST NOT send - * a STREAM or STREAM_DATA_BLOCKED frame for a stream in the - * "Reset Sent" state or any terminal state -- that is, after - * sending a RESET_STREAM frame. - */ - LIST_DEL_INIT(&qcs->el_send); - if (qcs_is_completed(qcs)) { - TRACE_STATE("add stream in purg_list", QMUX_EV_QCC_SEND|QMUX_EV_QCS_SEND, qcc->conn, qcs); - LIST_APPEND(&qcc->purg_list, &qcs->el_send); - } - continue; - } - - /* Total sent bytes must not exceed connection window. */ - BUG_ON(total > window_conn); - - if (!qfctl_rblocked(&qcc->tx.fc) && - !qfctl_rblocked(&qcs->tx.fc) && window_conn > total) { - if ((ret = qcs_send(qcs, frms, window_conn - total)) < 0) { - /* Temporarily remove QCS from send-list. */ - LIST_DEL_INIT(&qcs->el_send); - LIST_APPEND(&qcs_failed, &qcs->el_send); - continue; - } - - total += ret; - if (ret) { - /* Move QCS with some bytes transferred at the - * end of send-list for next iterations. - */ - LIST_DEL_INIT(&qcs->el_send); - LIST_APPEND(&qcc->send_list, &qcs->el_send); - /* Remember first moved QCS as checkpoint to interrupt loop */ - if (!first_qcs) - first_qcs = qcs; - } - } - } + total = qcc_build_frms(qcc, &qcs_failed); + if (!total) + goto sent_done; if (qcc_is_pacing_active(qcc->conn)) { if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) { @@ -2436,7 +2472,7 @@ static int qcc_io_send(struct qcc *qcc) /* Retry sending until no frame to send, data rejected or connection * flow-control limit reached. */ - while ((ret_sent = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { + while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { window_conn = qfctl_rcap(&qcc->tx.fc); resent = 0; @@ -2467,7 +2503,7 @@ static int qcc_io_send(struct qcc *qcc) } sent_done: - if (ret_sent == 1) { + if (ret == 1) { /* qcc_send_frames cannot return 1 if pacing not used. */ BUG_ON(!qcc_is_pacing_active(qcc->conn)); qcc_wakeup_pacing(qcc); @@ -2475,7 +2511,7 @@ static int qcc_io_send(struct qcc *qcc) } else if (!LIST_ISEMPTY(&qcc->tx.frms)) { /* Deallocate frames that the transport layer has rejected. */ - qcc_tx_frms_free(qcc); + qcc_clear_frms(qcc); } else { /* Everything sent */ @@ -2761,7 +2797,7 @@ static void qcc_release(struct qcc *qcc) qc_frm_free(qcc->conn->handle.qc, &frm); } - qcc_tx_frms_free(qcc); + qcc_clear_frms(qcc); if (qcc->app_ops && qcc->app_ops->release) qcc->app_ops->release(qcc->ctx);