MINOR: mux-quic: reorganize flow-control frames emission

Adjust the mechanism for MAX_STREAMS_BIDI emission. When a bidirectional
stream is removed, current flow-control level is checked. If needed, a
MAX_STREAMS_BIDI frame is generated and inserted in a new list in the
QCS instance. The new frames will be emitted at the start of qc_send().

This has no impact on the current MAX_STREAMS_BIDI behavior. However,
this mechanism is more flexible and will allow to implement quickly
MAX_STREAM_DATA/MAX_DATA emission.
This commit is contained in:
Amaury Denoyelle 2022-05-16 14:29:59 +02:00
parent 3a0864067a
commit c985cb167d
2 changed files with 44 additions and 42 deletions

View File

@ -50,6 +50,7 @@ struct qcc {
/* flow-control fields set by us enforced on our side. */ /* flow-control fields set by us enforced on our side. */
struct { struct {
struct list frms; /* prepared frames related to flow-control */
uint64_t ms_bidi_init; /* max initial sub-ID of bidi stream allowed for the peer */ uint64_t ms_bidi_init; /* max initial sub-ID of bidi stream allowed for the peer */
uint64_t ms_bidi; /* max sub-ID of bidi stream allowed for the peer */ uint64_t ms_bidi; /* max sub-ID of bidi stream allowed for the peer */
uint64_t msd_bidi_l; /* initial max-stream-data on local streams */ uint64_t msd_bidi_l; /* initial max-stream-data on local streams */

View File

@ -506,9 +506,35 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
return 0; return 0;
} }
static int qc_is_max_streams_needed(struct qcc *qcc) /* Signal the closing of remote stream with id <id>. Flow-control for new
* streams may be allocated for the peer if needed.
*/
static int qcc_release_remote_stream(struct qcc *qcc, uint64_t id)
{ {
return qcc->lfctl.cl_bidi_r > qcc->lfctl.ms_bidi_init / 2; struct quic_frame *frm;
if (quic_stream_is_bidi(id)) {
++qcc->lfctl.cl_bidi_r;
if (qcc->lfctl.cl_bidi_r > qcc->lfctl.ms_bidi_init / 2) {
frm = pool_zalloc(pool_head_quic_frame);
BUG_ON(!frm); /* TODO handle this properly */
LIST_INIT(&frm->reflist);
frm->type = QUIC_FT_MAX_STREAMS_BIDI;
frm->max_streams_bidi.max_streams = qcc->lfctl.ms_bidi +
qcc->lfctl.cl_bidi_r;
LIST_APPEND(&qcc->lfctl.frms, &frm->list);
tasklet_wakeup(qcc->wait_event.tasklet);
qcc->lfctl.ms_bidi += qcc->lfctl.cl_bidi_r;
qcc->lfctl.cl_bidi_r = 0;
}
}
else {
/* TODO */
}
return 0;
} }
/* detaches the QUIC stream from its QCC and releases it to the QCS pool. */ /* detaches the QUIC stream from its QCC and releases it to the QCS pool. */
@ -519,13 +545,8 @@ static void qcs_destroy(struct qcs *qcs)
TRACE_ENTER(QMUX_EV_QCS_END, conn, qcs); TRACE_ENTER(QMUX_EV_QCS_END, conn, qcs);
if (quic_stream_is_remote(qcs->qcc, id)) { if (quic_stream_is_remote(qcs->qcc, id))
if (quic_stream_is_bidi(id)) { qcc_release_remote_stream(qcs->qcc, id);
++qcs->qcc->lfctl.cl_bidi_r;
if (qc_is_max_streams_needed(qcs->qcc))
tasklet_wakeup(qcs->qcc->wait_event.tasklet);
}
}
qcs_free(qcs); qcs_free(qcs);
@ -584,6 +605,12 @@ static void qc_release(struct qcc *qcc)
qcs_free(qcs); qcs_free(qcs);
} }
while (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
struct quic_frame *frm = LIST_ELEM(&qcc->lfctl.frms, struct quic_frame *, list);
LIST_DELETE(&frm->list);
pool_free(pool_head_quic_frame, frm);
}
pool_free(pool_head_qcc, qcc); pool_free(pool_head_qcc, qcc);
if (conn) { if (conn) {
@ -858,37 +885,6 @@ static int qc_send_frames(struct qcc *qcc, struct list *frms)
return 0; return 0;
} }
/* Send a MAX_STREAM_BIDI frame to update the limit of bidirectional streams
* allowed to be opened by the peer. The caller should have first checked if
* this is required with qc_is_max_streams_needed.
*
* Returns 0 on success else non-zero.
*/
static int qc_send_max_streams(struct qcc *qcc)
{
struct list frms = LIST_HEAD_INIT(frms);
struct quic_frame *frm;
frm = pool_zalloc(pool_head_quic_frame);
BUG_ON(!frm); /* TODO handle this properly */
LIST_INIT(&frm->reflist);
frm->type = QUIC_FT_MAX_STREAMS_BIDI;
frm->max_streams_bidi.max_streams = qcc->lfctl.ms_bidi +
qcc->lfctl.cl_bidi_r;
TRACE_DEVEL("sending MAX_STREAMS frame", QMUX_EV_SEND_FRM, qcc->conn, NULL, frm);
LIST_APPEND(&frms, &frm->list);
if (qc_send_frames(qcc, &frms))
return 1;
/* save the new limit if the frame has been send. */
qcc->lfctl.ms_bidi += qcc->lfctl.cl_bidi_r;
qcc->lfctl.cl_bidi_r = 0;
return 0;
}
/* Used internally by qc_send function. Proceed to send for <qcs>. This will /* Used internally by qc_send function. Proceed to send for <qcs>. This will
* transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
* is then generated and inserted in <frms> list. <qcc_max_data> is the current * is then generated and inserted in <frms> list. <qcc_max_data> is the current
@ -966,8 +962,12 @@ static int qc_send(struct qcc *qcc)
return 0; return 0;
} }
if (qc_is_max_streams_needed(qcc)) if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
qc_send_max_streams(qcc); if (qc_send_frames(qcc, &qcc->lfctl.frms)) {
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
goto out;
}
}
if (qcc->flags & QC_CF_BLK_MFCTL) if (qcc->flags & QC_CF_BLK_MFCTL)
return 0; return 0;
@ -1213,6 +1213,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
qcc->strms[QCS_SRV_UNI].rx.max_data = lparams->initial_max_stream_data_uni; qcc->strms[QCS_SRV_UNI].rx.max_data = lparams->initial_max_stream_data_uni;
qcc->strms[QCS_SRV_UNI].tx.max_data = 0; qcc->strms[QCS_SRV_UNI].tx.max_data = 0;
LIST_INIT(&qcc->lfctl.frms);
qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi; qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi;
qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local; qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local;
qcc->lfctl.msd_bidi_r = lparams->initial_max_stream_data_bidi_remote; qcc->lfctl.msd_bidi_r = lparams->initial_max_stream_data_bidi_remote;