MEDIUM: mux-quic: respect peer connection data limit

This commit is similar to the previous one, but this time on the
connection level instead of the stream.

When the connection limit is reached, the connection is flagged with
QC_CF_BLK_MFCTL. This flag is checked in qc_send.

qcs_push_frame uses a new parameter which is used to not exceed the
connection flow-limit while calling it repeatdly over multiple streams
instance before transfering data to the transport layer.
This commit is contained in:
Amaury Denoyelle 2022-03-08 10:35:42 +01:00
parent 6ea781919a
commit 05ce55e582
2 changed files with 30 additions and 7 deletions

View File

@ -22,7 +22,8 @@ enum qcs_type {
QCS_MAX_TYPES QCS_MAX_TYPES
}; };
#define QC_CF_CC_RECV 0x00000001 #define QC_CF_CC_RECV 0x00000001
#define QC_CF_BLK_MFCTL 0x00000002 /* sending blocked due to connection flow-control */
struct qcc { struct qcc {
struct connection *conn; struct connection *conn;
@ -51,6 +52,7 @@ struct qcc {
/* flow-control fields set by the peer which we must respect. */ /* flow-control fields set by the peer which we must respect. */
struct { struct {
uint64_t md; /* connection flow control limit updated on MAX_DATA frames reception */
uint64_t msd_bidi_l; /* initial max-stream-data for peer local streams */ uint64_t msd_bidi_l; /* initial max-stream-data for peer local streams */
uint64_t msd_bidi_r; /* initial max-stream-data for peer remote streams */ uint64_t msd_bidi_r; /* initial max-stream-data for peer remote streams */
} rfctl; } rfctl;
@ -59,7 +61,7 @@ struct qcc {
uint64_t max_data; /* Maximum number of bytes which may be received */ uint64_t max_data; /* Maximum number of bytes which may be received */
} rx; } rx;
struct { struct {
uint64_t max_data; /* Maximum number of bytes which may be sent */ uint64_t sent_offsets; /* sum of all offset sent */
} tx; } tx;
struct eb_root streams_by_id; /* all active streams by their ID */ struct eb_root streams_by_id; /* all active streams by their ID */

View File

@ -361,14 +361,18 @@ static void qc_release(struct qcc *qcc)
* buffer. The frame is then pushed to <frm_list>. If <fin> is set, and the * buffer. The frame is then pushed to <frm_list>. If <fin> is set, and the
* <payload> buf is emptied after transfer, FIN bit is set on the STREAM frame. * <payload> buf is emptied after transfer, FIN bit is set on the STREAM frame.
* Transfer is automatically adjusted to not exceed the stream flow-control * Transfer is automatically adjusted to not exceed the stream flow-control
* limit. * limit. <max_data> must contains the current sum offsets for the connection.
* This is useful to not exceed the connection flow-control limit when using
* repeatdly this function on multiple streams before passing the data to the
* lower layer.
* *
* Returns the total bytes of newly transferred data or a negative error code. * Returns the total bytes of newly transferred data or a negative error code.
*/ */
static int qcs_push_frame(struct qcs *qcs, struct buffer *out, static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
struct buffer *payload, int fin, struct buffer *payload, int fin,
struct list *frm_list) struct list *frm_list, uint64_t max_data)
{ {
struct qcc *qcc = qcs->qcc;
struct quic_frame *frm; struct quic_frame *frm;
int head, left, to_xfer; int head, left, to_xfer;
int total = 0; int total = 0;
@ -403,6 +407,11 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
if (qcs->tx.offset + to_xfer > qcs->tx.msd) if (qcs->tx.offset + to_xfer > qcs->tx.msd)
to_xfer = qcs->tx.msd - qcs->tx.offset; to_xfer = qcs->tx.msd - qcs->tx.offset;
BUG_ON_HOT(max_data > qcc->rfctl.md);
/* do not overcome flow control limit on connection */
if (max_data + to_xfer > qcc->rfctl.md)
to_xfer = qcc->rfctl.md - max_data;
if (!left && !to_xfer) if (!left && !to_xfer)
goto out; goto out;
@ -449,7 +458,8 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
*/ */
void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
{ {
uint64_t diff = data; struct qcc *qcc = qcs->qcc;
uint64_t diff;
BUG_ON(offset > qcs->tx.sent_offset); BUG_ON(offset > qcs->tx.sent_offset);
@ -461,6 +471,12 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
diff = offset + data - qcs->tx.sent_offset; diff = offset + data - qcs->tx.sent_offset;
/* increase offset sum on connection */
qcc->tx.sent_offsets += diff;
BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md);
if (qcc->tx.sent_offsets == qcc->rfctl.md)
qcc->flags |= QC_CF_BLK_MFCTL;
/* increase offset on stream */ /* increase offset on stream */
qcs->tx.sent_offset += diff; qcs->tx.sent_offset += diff;
BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd); BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
@ -549,6 +565,9 @@ static int qc_send(struct qcc *qcc)
fprintf(stderr, "%s\n", __func__); fprintf(stderr, "%s\n", __func__);
if (qcc->flags & QC_CF_BLK_MFCTL)
return 0;
/* loop through all streams, construct STREAM frames if data available. /* loop through all streams, construct STREAM frames if data available.
* TODO optimize the loop to favor streams which are not too heavy. * TODO optimize the loop to favor streams which are not too heavy.
*/ */
@ -577,7 +596,8 @@ static int qc_send(struct qcc *qcc)
int ret; int ret;
char fin = qcs->flags & QC_SF_FIN_STREAM; char fin = qcs->flags & QC_SF_FIN_STREAM;
ret = qcs_push_frame(qcs, out, buf, fin, &frms); ret = qcs_push_frame(qcs, out, buf, fin, &frms,
qcc->tx.sent_offsets + total);
BUG_ON(ret < 0); /* TODO handle this properly */ BUG_ON(ret < 0); /* TODO handle this properly */
if (ret > 0) { if (ret > 0) {
@ -742,7 +762,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
lparams = &conn->qc->rx.params; lparams = &conn->qc->rx.params;
qcc->rx.max_data = lparams->initial_max_data; qcc->rx.max_data = lparams->initial_max_data;
qcc->tx.max_data = 0; qcc->tx.sent_offsets = 0;
/* Client initiated streams must respect the server flow control. */ /* Client initiated streams must respect the server flow control. */
qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi; qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;
@ -774,6 +794,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
qcc->lfctl.cl_bidi_r = 0; qcc->lfctl.cl_bidi_r = 0;
rparams = &conn->qc->tx.params; rparams = &conn->qc->tx.params;
qcc->rfctl.md = rparams->initial_max_data;
qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local; qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote; qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;