diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index a5d4d2d23..c3aac1860 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -51,6 +51,8 @@ struct qcc { /* flow-control fields set by the peer which we must respect. */ struct { + uint64_t msd_bidi_l; /* initial max-stream-data for peer local streams */ + uint64_t msd_bidi_r; /* initial max-stream-data for peer remote streams */ } rfctl; struct { @@ -78,6 +80,7 @@ struct qcc { #define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */ #define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */ #define QC_SF_DETACH 0x00000008 /* cs is detached but there is remaining data to send */ +#define QC_SF_BLK_SFCTL 0x00000010 /* stream blocked due to stream flow control limit */ struct qcs { struct qcc *qcc; @@ -97,6 +100,7 @@ struct qcs { uint64_t ack_offset; /* last acked ordered byte offset */ struct buffer buf; /* transmit buffer before sending via xprt */ struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */ + uint64_t msd; /* fctl bytes limit to respect on emission */ } tx; struct eb64_node by_id; /* place in qcc's streams_by_id */ diff --git a/src/mux_quic.c b/src/mux_quic.c index e23985670..eda63470d 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -33,6 +33,11 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) eb64_insert(&qcc->streams_by_id, &qcs->by_id); qcc->strms[type].nb_streams++; + /* If stream is local, use peer remote-limit, or else the opposite. */ + /* TODO use uni limit for unidirectional streams */ + qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r : + qcc->rfctl.msd_bidi_l; + qcs->rx.buf = BUF_NULL; qcs->rx.app_buf = BUF_NULL; qcs->rx.offset = 0; @@ -355,6 +360,8 @@ static void qc_release(struct qcc *qcc) * to buffer. The STREAM frame payload points to the * buffer. The frame is then pushed to . If is set, and the * buf is emptied after transfer, FIN bit is set on the STREAM frame. + * Transfer is automatically adjusted to not exceed the stream flow-control + * limit. * * Returns the total bytes of newly transferred data or a negative error code. */ @@ -390,6 +397,12 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out, head = qcs->tx.sent_offset - qcs->tx.ack_offset; left = qcs->tx.offset - qcs->tx.sent_offset; to_xfer = QUIC_MIN(b_data(payload), b_room(out)); + + BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd); + /* do not exceed flow control limit */ + if (qcs->tx.offset + to_xfer > qcs->tx.msd) + to_xfer = qcs->tx.msd - qcs->tx.offset; + if (!left && !to_xfer) goto out; @@ -450,6 +463,9 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) /* increase offset on stream */ qcs->tx.sent_offset += diff; + BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd); + if (qcs->tx.sent_offset == qcs->tx.msd) + qcs->flags |= QC_SF_BLK_SFCTL; } /* Wrapper for send on transport layer. Send a list of frames for the @@ -552,6 +568,11 @@ static int qc_send(struct qcc *qcc) continue; } + if (qcs->flags & QC_SF_BLK_SFCTL) { + node = eb64_next(node); + continue; + } + if (b_data(buf) || b_data(out)) { int ret; char fin = qcs->flags & QC_SF_FIN_STREAM; @@ -703,7 +724,7 @@ static int qc_init(struct connection *conn, struct proxy *prx, struct session *sess, struct buffer *input) { struct qcc *qcc; - struct quic_transport_params *lparams; + struct quic_transport_params *lparams, *rparams; qcc = pool_alloc(pool_head_qcc); if (!qcc) @@ -752,6 +773,10 @@ static int qc_init(struct connection *conn, struct proxy *prx, qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi; qcc->lfctl.cl_bidi_r = 0; + rparams = &conn->qc->tx.params; + qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local; + qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote; + qcc->wait_event.tasklet = tasklet_new(); if (!qcc->wait_event.tasklet) goto fail_no_tasklet;