diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 543df319e..33f617a9a 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -105,7 +106,7 @@ struct qcc { #define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */ #define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */ #define QC_SF_DETACH 0x00000008 /* sc is detached but there is remaining data to send */ -#define QC_SF_BLK_SFCTL 0x00000010 /* stream blocked due to stream flow control limit */ +/* unused 0x00000010 */ #define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */ #define QC_SF_READ_ABORTED 0x00000040 /* Rx closed using STOP_SENDING*/ #define QC_SF_TO_RESET 0x00000080 /* a RESET_STREAM must be sent */ @@ -155,10 +156,11 @@ struct qcs { uint64_t msd_init; /* initial max-stream-data */ } rx; struct { + struct quic_fctl fc; /* stream flow control applied on sending */ + uint64_t offset; /* last offset of data ready to be sent */ uint64_t sent_offset; /* last offset sent by transport layer */ struct buffer buf; /* transmit buffer before sending via xprt */ - uint64_t msd; /* fctl bytes limit to respect on emission */ } tx; struct eb64_node by_id; diff --git a/src/h3.c b/src/h3.c index 00eed1259..6c2cad7dd 100644 --- a/src/h3.c +++ b/src/h3.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -1462,6 +1463,11 @@ static int h3_control_send(struct qcs *qcs, void *ctx) b_quic_enc_int(&pos, h3_settings_max_field_section_size, 0); } + if (qfctl_sblocked(&qcs->tx.fc)) { + TRACE_ERROR("not enough initial credit for control stream", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs); + goto err; + } + if (!(res = qcc_get_stream_txbuf(qcs))) { TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs); goto err; @@ -2229,7 +2235,8 @@ static int h3_send_goaway(struct h3c *h3c) b_quic_enc_int(&pos, h3c->id_goaway, 0); res = qcc_get_stream_txbuf(qcs); - if (!res || b_room(res) < b_data(&pos)) { + if (!res || b_room(res) < b_data(&pos) || + qfctl_sblocked(&qcs->tx.fc)) { /* Do not try forcefully to emit GOAWAY if no space left. */ TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs); goto err; diff --git a/src/mux_quic.c b/src/mux_quic.c index b75d4fff7..6e6654943 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -113,19 +114,15 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qcs->id = qcs->by_id.key = id; eb64_insert(&qcc->streams_by_id, &qcs->by_id); - /* If stream is local, use peer remote-limit, or else the opposite. */ + /* Different limits can be set by the peer for local and remote bidi streams. */ if (quic_stream_is_bidi(id)) { - qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r : - qcc->rfctl.msd_bidi_l; + qfctl_init(&qcs->tx.fc, quic_stream_is_local(qcc, id) ? + qcc->rfctl.msd_bidi_r : qcc->rfctl.msd_bidi_l); } else if (quic_stream_is_local(qcc, id)) { - qcs->tx.msd = qcc->rfctl.msd_uni_l; + qfctl_init(&qcs->tx.fc, qcc->rfctl.msd_uni_l); } - /* Properly set flow-control blocking if initial MSD is nul. */ - if (!qcs->tx.msd) - qcs->flags |= QC_SF_BLK_SFCTL; - qcs->rx.ncbuf = NCBUF_NULL; qcs->rx.app_buf = BUF_NULL; qcs->rx.offset = qcs->rx.offset_max = 0; @@ -970,7 +967,7 @@ void qcc_reset_stream(struct qcs *qcs, int err) /* Register stream for emission of STREAM, STOP_SENDING or RESET_STREAM. * Set to 1 if stream content should be treated in priority compared to * other streams. For STREAM emission, must contains the size of the - * frame payload. + * frame payload. This is used for flow control accounting. */ void qcc_send_stream(struct qcs *qcs, int urg, int count) { @@ -990,6 +987,9 @@ void qcc_send_stream(struct qcs *qcs, int urg, int count) LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send); } + if (count) + qfctl_sinc(&qcs->tx.fc, count); + TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); } @@ -1256,16 +1256,18 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max) goto err; if (qcs) { - TRACE_PROTO("receiving MAX_STREAM_DATA", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); - if (max > qcs->tx.msd) { - qcs->tx.msd = max; - TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); + int unblock_soft = 0, unblock_real = 0; - if (qcs->flags & QC_SF_BLK_SFCTL) { - qcs->flags &= ~QC_SF_BLK_SFCTL; + TRACE_PROTO("receiving MAX_STREAM_DATA", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); + if (qfctl_set_max(&qcs->tx.fc, max, &unblock_soft, &unblock_real)) { + TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); + if (unblock_real) { /* TODO optim: only wakeup IO-CB if stream has data to sent. */ tasklet_wakeup(qcc->wait_event.tasklet); } + + if (unblock_soft) + qcs_notify_send(qcs); } } @@ -1561,11 +1563,11 @@ static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in) left = qcs->tx.offset - qcs->tx.sent_offset; to_xfer = QUIC_MIN(b_data(in), b_room(out)); - BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd); + BUG_ON_HOT(qcs->tx.offset > qcs->tx.fc.limit); /* do not exceed flow control limit */ - if (qcs->tx.offset + to_xfer > qcs->tx.msd) { + if (qcs->tx.offset + to_xfer > qcs->tx.fc.limit) { TRACE_DATA("do not exceed stream flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs); - to_xfer = qcs->tx.msd - qcs->tx.offset; + to_xfer = qcs->tx.fc.limit - qcs->tx.offset; } BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md); @@ -1730,6 +1732,11 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) diff = offset + data - qcs->tx.sent_offset; if (diff) { + struct quic_fctl *fc_strm = &qcs->tx.fc; + + /* Ensure real offset never exceeds soft value. */ + BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft); + /* increase offset sum on connection */ qcc->tx.sent_offsets += diff; BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md); @@ -1740,11 +1747,10 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) /* increase offset on stream */ qcs->tx.sent_offset += diff; - BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd); BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset); - if (qcs->tx.sent_offset == qcs->tx.msd) { - qcs->flags |= QC_SF_BLK_SFCTL; - TRACE_STATE("stream flow-control reached", QMUX_EV_QCS_SEND, qcc->conn, qcs); + if (qfctl_rinc(fc_strm, diff)) { + TRACE_STATE("stream flow-control reached", + QMUX_EV_QCS_SEND, qcc->conn, qcs); } /* If qcs.stream.buf is full, release it to the lower layer. */ @@ -1969,9 +1975,7 @@ static int qcs_send(struct qcs *qcs, struct list *frms) } qcs->tx.offset += xfer; - BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd); qcc->tx.offsets += xfer; - BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md); /* out buffer cannot be emptied if qcs offsets differ. */ BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset); @@ -2099,7 +2103,7 @@ static int qcc_io_send(struct qcc *qcc) } if (!(qcc->flags & QC_CF_BLK_MFCTL) && - !(qcs->flags & QC_SF_BLK_SFCTL)) { + !qfctl_rblocked(&qcs->tx.fc)) { if ((ret = qcs_send(qcs, &frms)) < 0) { /* Temporarily remove QCS from send-list. */ LIST_DEL_INIT(&qcs->el_send); @@ -2133,9 +2137,9 @@ static int qcc_io_send(struct qcc *qcc) * new qc_stream_desc should be present in send_list as * long as transport layer can handle all data. */ - BUG_ON(qcs->stream->buf && !(qcs->flags & QC_SF_BLK_SFCTL)); + BUG_ON(qcs->stream->buf && !qfctl_rblocked(&qcs->tx.fc)); - if (!(qcs->flags & QC_SF_BLK_SFCTL)) { + if (!qfctl_rblocked(&qcs->tx.fc)) { if ((ret = qcs_send(qcs, &frms)) < 0) { LIST_DEL_INIT(&qcs->el_send); LIST_APPEND(&qcs_failed, &qcs->el_send); @@ -2824,6 +2828,12 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, goto end; } + if (qfctl_sblocked(&qcs->tx.fc)) { + TRACE_DEVEL("leaving on flow-control reached", + QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + goto end; + } + ret = qcs_http_snd_buf(qcs, buf, count, &fin); if (fin) { TRACE_STATE("reached stream fin", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); @@ -2877,6 +2887,12 @@ static size_t qmux_strm_nego_ff(struct stconn *sc, struct buffer *input, goto end; } + if (qfctl_sblocked(&qcs->tx.fc)) { + TRACE_DEVEL("leaving on flow-control reached", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + goto end; + } + /* Alawys disable splicing */ qcs->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING; diff --git a/src/qmux_trace.c b/src/qmux_trace.c index b213ed453..298f4a35b 100644 --- a/src/qmux_trace.c +++ b/src/qmux_trace.c @@ -84,7 +84,7 @@ static void qmux_trace(enum trace_level level, uint64_t mask, qcs, (ullong)qcs->id, qcs_st_to_str(qcs->st)); chunk_appendf(&trace_buf, " msd=%llu/%llu/%llu", - (ullong)qcs->tx.msd, (ullong)qcs->tx.offset, (ullong)qcs->tx.sent_offset); + (ullong)qcs->tx.fc.limit, (ullong)qcs->tx.offset, (ullong)qcs->tx.sent_offset); } if (mask & QMUX_EV_QCC_NQCS) {