diff --git a/src/mux_quic.c b/src/mux_quic.c index e30c5fcc8..f52e3c951 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -1552,15 +1552,14 @@ static void qcs_destroy(struct qcs *qcs) TRACE_LEAVE(QMUX_EV_QCS_END, conn); } -/* Transfer as much as possible data on from to . This is done - * in respect with available flow-control at stream and connection level. +/* Transfer as much as possible data on from to . * * Returns the total bytes of transferred data or a negative error code. */ static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in) { struct qcc *qcc = qcs->qcc; - int left, to_xfer; + int to_xfer; int total = 0; TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -1588,24 +1587,9 @@ static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in) BUG_ON_HOT(qcs->tx.offset < qcs->tx.fc.off_real); BUG_ON_HOT(qcc->tx.offsets < qcc->tx.fc.off_real); - left = qcs->tx.offset - qcs->tx.fc.off_real; to_xfer = QUIC_MIN(b_data(in), b_room(out)); - BUG_ON_HOT(qcs->tx.offset > qcs->tx.fc.limit); - /* do not exceed flow control limit */ - if (qcs->tx.offset + to_xfer > qcs->tx.fc.limit) { - TRACE_DATA("do not exceed stream flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs); - to_xfer = qcs->tx.fc.limit - qcs->tx.offset; - } - - BUG_ON_HOT(qcc->tx.offsets > qcc->tx.fc.limit); - /* do not overcome flow control limit on connection */ - if (qcc->tx.offsets + to_xfer > qcc->tx.fc.limit) { - TRACE_DATA("do not exceed conn flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs); - to_xfer = qcc->tx.fc.limit - qcc->tx.offsets; - } - - if (!left && !to_xfer) + if (!to_xfer) goto out; total = b_force_xfer(out, in, to_xfer); @@ -1629,17 +1613,20 @@ static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in) /* Prepare a STREAM frame for instance using as payload. The frame * is appended in . Set if this is supposed to be the last * stream frame. If is NULL an empty STREAM frame is built : this may be - * useful if FIN needs to be sent without any data left. + * useful if FIN needs to be sent without any data left. Frame length will be + * truncated if greater than . This allows to prepare several + * frames in a loop while respecting connection flow control window. * * Returns the payload length of the STREAM frame or a negative error code. */ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, - struct list *frm_list) + struct list *frm_list, uint64_t window_conn) { struct qcc *qcc = qcs->qcc; struct quic_frame *frm; int head, total; uint64_t base_off; + const uint64_t window_stream = qfctl_rcap(&qcs->tx.fc); TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -1651,6 +1638,22 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, total = out ? b_data(out) - head : 0; BUG_ON(total < 0); + /* do not exceed stream flow control limit */ + if (total > window_stream) { + TRACE_DATA("do not exceed stream flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs); + total = window_stream; + } + + /* do not exceed connection flow control limit */ + if (total > window_conn) { + TRACE_DATA("do not exceed conn flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs); + total = window_conn; + } + + /* Reset FIN if bytes to send is capped by flow control. */ + if (out && total < b_data(out) - head) + fin = 0; + if (!total && !fin) { /* No need to send anything if total is NULL and no FIN to signal. */ TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -1952,18 +1955,18 @@ static int qcs_send_stop_sending(struct qcs *qcs) /* Used internally by qcc_io_send function. Proceed to send for . This will * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame - * is then generated and inserted in list. + * is then generated and inserted in list. Frame length will be + * truncated if greater than . This allows to prepare several + * frames in a loop while respecting connection flow control window. * - * Returns the total bytes transferred between qcs and quic_stream buffers. Can - * be null if out buffer cannot be allocated. On error a negative error code is - * used. + * Returns the payload length of the STREAM frame or a negative error code. */ -static int qcs_send(struct qcs *qcs, struct list *frms) +static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn) { struct qcc *qcc = qcs->qcc; struct buffer *buf = &qcs->tx.buf; struct buffer *out = qc_stream_buf_get(qcs->stream); - int xfer = 0, buf_avail; + int xfer = 0, flen = 0, buf_avail; char fin = 0; TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -2022,13 +2025,14 @@ static int qcs_send(struct qcs *qcs, struct list *frms) goto err; } - if (qcs_build_stream_frm(qcs, out, fin, frms) < 0) + flen = qcs_build_stream_frm(qcs, out, fin, frms, window_conn); + if (flen < 0) goto err; } out: TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); - return xfer; + return flen; err: TRACE_DEVEL("leaving on error", QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -2046,7 +2050,8 @@ static int qcc_io_send(struct qcc *qcc) /* Temporary list for QCS on error. */ struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; - int ret, total = 0; + uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); + int ret, total = 0, resent; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); @@ -2129,9 +2134,12 @@ static int qcc_io_send(struct qcc *qcc) continue; } + /* Total sent bytes must not exceed connection window. */ + BUG_ON(total > window_conn); + if (!qfctl_rblocked(&qcc->tx.fc) && - !qfctl_rblocked(&qcs->tx.fc)) { - if ((ret = qcs_send(qcs, &frms)) < 0) { + !qfctl_rblocked(&qcs->tx.fc) && window_conn > total) { + if ((ret = qcs_send(qcs, &frms, window_conn - total)) < 0) { /* Temporarily remove QCS from send-list. */ LIST_DEL_INIT(&qcs->el_send); LIST_APPEND(&qcs_failed, &qcs->el_send); @@ -2156,6 +2164,9 @@ static int qcc_io_send(struct qcc *qcc) * flow-control limit reached. */ while (qcc_send_frames(qcc, &frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { + window_conn = qfctl_rcap(&qcc->tx.fc); + resent = 0; + /* Reloop over . Useful for streams which have * fulfilled their qc_stream_desc buf and have now release it. */ @@ -2166,14 +2177,18 @@ static int qcc_io_send(struct qcc *qcc) */ BUG_ON(qcs->stream->buf && !qfctl_rblocked(&qcs->tx.fc)); - if (!qfctl_rblocked(&qcs->tx.fc)) { - if ((ret = qcs_send(qcs, &frms)) < 0) { + /* Total sent bytes must not exceed connection window. */ + BUG_ON(resent > window_conn); + + if (!qfctl_rblocked(&qcs->tx.fc) && window_conn > resent) { + if ((ret = qcs_send(qcs, &frms, window_conn - resent)) < 0) { LIST_DEL_INIT(&qcs->el_send); LIST_APPEND(&qcs_failed, &qcs->el_send); continue; } total += ret; + resent += ret; } } }