MINOR: mux-quic: extract code to build STREAM frames list

Extracts code responsible to generate STREAM, RESET_STREAM and
STOP_SENDING frames for each qcs instances registered in qcc send_list.
It is moved from qcc_io_send() to its owned new function
qcc_build_frms().

This commit does not bring functional change. It is a preparatory step
to adapt QUIC MUX send mechanism to allow reusing of qcc frms list
accross qcc_io_send() invokation.

As a side change, qcc_tx_frms_free() is renamed to qcc_clear_frms().
This better highlights its relationship with qcc_build_frms().

This should be bkacported up to 3.1.
This commit is contained in:
Amaury Denoyelle 2024-12-11 17:53:29 +01:00
parent e296585ae9
commit 244dc00b09

View File

@ -44,15 +44,6 @@ static int qcc_is_pacing_active(const struct connection *conn)
return !!(qc->path->cc.algo->pacing_rate); return !!(qc->path->cc.algo->pacing_rate);
} }
/* Free <qcc> STREAM frames in Tx list. */
static void qcc_tx_frms_free(struct qcc *qcc)
{
while (!LIST_ISEMPTY(&qcc->tx.frms)) {
struct quic_frame *frm = LIST_ELEM(qcc->tx.frms.n, struct quic_frame *, list);
qc_frm_free(qcc->conn->handle.qc, &frm);
}
}
static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
{ {
struct buffer buf; struct buffer buf;
@ -1357,6 +1348,15 @@ static void qcc_notify_fctl(struct qcc *qcc)
} }
} }
/* Free <qcc> STREAM frames in Tx list. */
static void qcc_clear_frms(struct qcc *qcc)
{
while (!LIST_ISEMPTY(&qcc->tx.frms)) {
struct quic_frame *frm = LIST_ELEM(qcc->tx.frms.n, struct quic_frame *, list);
qc_frm_free(qcc->conn->handle.qc, &frm);
}
}
/* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. */ /* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. */
void qcc_reset_stream(struct qcs *qcs, int err) void qcc_reset_stream(struct qcs *qcs, int err)
{ {
@ -2298,6 +2298,111 @@ static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn)
return -1; return -1;
} }
/* Encode STREAM frames into <qcc> tx frms for streams registered into
* send_list. On each error, related stream is removed from send_list and
* inserted into <qcs_failed> list.
*
* This functions also serves to emit RESET_STREAM and STOP_SENDING frames. In
* this case, frame is emitted immediately without using <qcc> tx frms. If an
* error occured during this step, this is considered as fatal. Tx frms is
* cleared and 0 is returned.
*
* Returns the sum of encoded STREAM frames length or 0 if no frame built.
*/
static int qcc_build_frms(struct qcc *qcc, struct list *qcs_failed)
{
struct list *frms = &qcc->tx.frms;
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
int ret = 0, total = 0;
TRACE_ENTER(QMUX_EV_QCC_END, qcc->conn);
/* Frames list must first be cleared via qcc_clear_frms(). */
BUG_ON(!LIST_ISEMPTY(&qcc->tx.frms));
list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_list, el_send) {
/* Check if all QCS were processed. */
if (qcs == first_qcs)
break;
/* Stream must not be present in send_list if it has nothing to send. */
BUG_ON(!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) &&
(!qcs->stream || !qcs_prep_bytes(qcs)));
/* Each STOP_SENDING/RESET_STREAM frame is sent individually to
* guarantee its emission.
*
* TODO multiplex several frames in same datagram to optimize sending
*/
if (qcs->flags & QC_SF_TO_STOP_SENDING) {
if (qcs_send_stop_sending(qcs))
goto err;
/* Remove stream from send_list if it had only STOP_SENDING
* to send.
*/
if (!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_RESET)) &&
(!qcs->stream || !qcs_prep_bytes(qcs))) {
LIST_DEL_INIT(&qcs->el_send);
continue;
}
}
if (qcs->flags & QC_SF_TO_RESET) {
if (qcs_send_reset(qcs))
goto err;
/* RFC 9000 3.3. Permitted Frame Types
*
* A sender MUST NOT send
* a STREAM or STREAM_DATA_BLOCKED frame for a stream in the
* "Reset Sent" state or any terminal state -- that is, after
* sending a RESET_STREAM frame.
*/
LIST_DEL_INIT(&qcs->el_send);
if (qcs_is_completed(qcs)) {
TRACE_STATE("add stream in purg_list", QMUX_EV_QCC_SEND|QMUX_EV_QCS_SEND, qcc->conn, qcs);
LIST_APPEND(&qcc->purg_list, &qcs->el_send);
}
continue;
}
/* Total sent bytes must not exceed connection window. */
BUG_ON(total > window_conn);
if (!qfctl_rblocked(&qcc->tx.fc) &&
!qfctl_rblocked(&qcs->tx.fc) && window_conn > total) {
if ((ret = qcs_send(qcs, frms, window_conn - total)) < 0) {
/* Temporarily remove QCS from send-list. */
LIST_DEL_INIT(&qcs->el_send);
LIST_APPEND(qcs_failed, &qcs->el_send);
continue;
}
total += ret;
if (ret) {
/* Move QCS with some bytes transferred at the
* end of send-list for next iterations.
*/
LIST_DEL_INIT(&qcs->el_send);
LIST_APPEND(&qcc->send_list, &qcs->el_send);
/* Remember first moved QCS as checkpoint to interrupt loop */
if (!first_qcs)
first_qcs = qcs;
}
}
}
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
return total;
err:
qcc_clear_frms(qcc);
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
return 0;
}
/* Proceed to sending. Loop through all available streams for the <qcc> /* Proceed to sending. Loop through all available streams for the <qcc>
* instance and try to send as much as possible. * instance and try to send as much as possible.
* *
@ -2308,9 +2413,9 @@ static int qcc_io_send(struct qcc *qcc)
struct list *frms = &qcc->tx.frms; struct list *frms = &qcc->tx.frms;
/* Temporary list for QCS on error. */ /* Temporary list for QCS on error. */
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; struct qcs *qcs, *qcs_tmp;
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
int ret = 0, ret_sent = 0, total = 0, resent; int ret = 0, total = 0, resent;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
@ -2320,7 +2425,7 @@ static int qcc_io_send(struct qcc *qcc)
* apply for STREAM frames. * apply for STREAM frames.
*/ */
qcc_tx_frms_free(qcc); qcc_clear_frms(qcc);
/* Check for transport error. */ /* Check for transport error. */
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
@ -2353,78 +2458,9 @@ static int qcc_io_send(struct qcc *qcc)
} }
/* Send STREAM/STOP_SENDING/RESET_STREAM data for registered streams. */ /* Send STREAM/STOP_SENDING/RESET_STREAM data for registered streams. */
list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_list, el_send) { total = qcc_build_frms(qcc, &qcs_failed);
/* Check if all QCS were processed. */ if (!total)
if (qcs == first_qcs) goto sent_done;
break;
/* Stream must not be present in send_list if it has nothing to send. */
BUG_ON(!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) &&
(!qcs->stream || !qcs_prep_bytes(qcs)));
/* Each STOP_SENDING/RESET_STREAM frame is sent individually to
* guarantee its emission.
*
* TODO multiplex several frames in same datagram to optimize sending
*/
if (qcs->flags & QC_SF_TO_STOP_SENDING) {
if (qcs_send_stop_sending(qcs))
goto sent_done;
/* Remove stream from send_list if it had only STOP_SENDING
* to send.
*/
if (!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_RESET)) &&
(!qcs->stream || !qcs_prep_bytes(qcs))) {
LIST_DEL_INIT(&qcs->el_send);
continue;
}
}
if (qcs->flags & QC_SF_TO_RESET) {
if (qcs_send_reset(qcs))
goto sent_done;
/* RFC 9000 3.3. Permitted Frame Types
*
* A sender MUST NOT send
* a STREAM or STREAM_DATA_BLOCKED frame for a stream in the
* "Reset Sent" state or any terminal state -- that is, after
* sending a RESET_STREAM frame.
*/
LIST_DEL_INIT(&qcs->el_send);
if (qcs_is_completed(qcs)) {
TRACE_STATE("add stream in purg_list", QMUX_EV_QCC_SEND|QMUX_EV_QCS_SEND, qcc->conn, qcs);
LIST_APPEND(&qcc->purg_list, &qcs->el_send);
}
continue;
}
/* Total sent bytes must not exceed connection window. */
BUG_ON(total > window_conn);
if (!qfctl_rblocked(&qcc->tx.fc) &&
!qfctl_rblocked(&qcs->tx.fc) && window_conn > total) {
if ((ret = qcs_send(qcs, frms, window_conn - total)) < 0) {
/* Temporarily remove QCS from send-list. */
LIST_DEL_INIT(&qcs->el_send);
LIST_APPEND(&qcs_failed, &qcs->el_send);
continue;
}
total += ret;
if (ret) {
/* Move QCS with some bytes transferred at the
* end of send-list for next iterations.
*/
LIST_DEL_INIT(&qcs->el_send);
LIST_APPEND(&qcc->send_list, &qcs->el_send);
/* Remember first moved QCS as checkpoint to interrupt loop */
if (!first_qcs)
first_qcs = qcs;
}
}
}
if (qcc_is_pacing_active(qcc->conn)) { if (qcc_is_pacing_active(qcc->conn)) {
if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) { if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
@ -2436,7 +2472,7 @@ static int qcc_io_send(struct qcc *qcc)
/* Retry sending until no frame to send, data rejected or connection /* Retry sending until no frame to send, data rejected or connection
* flow-control limit reached. * flow-control limit reached.
*/ */
while ((ret_sent = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
window_conn = qfctl_rcap(&qcc->tx.fc); window_conn = qfctl_rcap(&qcc->tx.fc);
resent = 0; resent = 0;
@ -2467,7 +2503,7 @@ static int qcc_io_send(struct qcc *qcc)
} }
sent_done: sent_done:
if (ret_sent == 1) { if (ret == 1) {
/* qcc_send_frames cannot return 1 if pacing not used. */ /* qcc_send_frames cannot return 1 if pacing not used. */
BUG_ON(!qcc_is_pacing_active(qcc->conn)); BUG_ON(!qcc_is_pacing_active(qcc->conn));
qcc_wakeup_pacing(qcc); qcc_wakeup_pacing(qcc);
@ -2475,7 +2511,7 @@ static int qcc_io_send(struct qcc *qcc)
} }
else if (!LIST_ISEMPTY(&qcc->tx.frms)) { else if (!LIST_ISEMPTY(&qcc->tx.frms)) {
/* Deallocate frames that the transport layer has rejected. */ /* Deallocate frames that the transport layer has rejected. */
qcc_tx_frms_free(qcc); qcc_clear_frms(qcc);
} }
else { else {
/* Everything sent */ /* Everything sent */
@ -2761,7 +2797,7 @@ static void qcc_release(struct qcc *qcc)
qc_frm_free(qcc->conn->handle.qc, &frm); qc_frm_free(qcc->conn->handle.qc, &frm);
} }
qcc_tx_frms_free(qcc); qcc_clear_frms(qcc);
if (qcc->app_ops && qcc->app_ops->release) if (qcc->app_ops && qcc->app_ops->release)
qcc->app_ops->release(qcc->ctx); qcc->app_ops->release(qcc->ctx);