diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index f05bf9c3c..f4300de57 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -72,8 +72,6 @@ struct qcc { struct { struct quic_fctl fc; /* stream flow control applied on sending */ - - uint64_t offsets; /* sum of all offsets prepared */ } tx; uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */ @@ -159,8 +157,6 @@ struct qcs { } rx; struct { struct quic_fctl fc; /* stream flow control applied on sending */ - - uint64_t offset; /* last offset of data ready to be sent */ } tx; struct eb64_node by_id; diff --git a/src/mux_quic.c b/src/mux_quic.c index 06b97226e..0024b5d45 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -137,8 +137,6 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) } qcs->rx.msd_init = qcs->rx.msd; - qcs->tx.offset = 0; - qcs->wait_event.tasklet = NULL; qcs->wait_event.events = 0; qcs->subs = NULL; @@ -949,6 +947,21 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs) return out; } +/* Returns total number of bytes not already sent to quic-conn layer. */ +static uint64_t qcs_prep_bytes(const struct qcs *qcs) +{ + struct buffer *out = qc_stream_buf_get(qcs->stream); + uint64_t diff, base_off; + + if (!out) + return 0; + + /* if ack_offset < buf_offset, it points to an older buffer. */ + base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset); + diff = qcs->tx.fc.off_real - base_off; + return b_data(out) - diff; +} + /* Wakes up every streams of which are currently waiting for sending but * are blocked on connection flow control. */ @@ -967,6 +980,7 @@ static void qcc_notify_fctl(struct qcc *qcc) void qcc_reset_stream(struct qcs *qcs, int err) { struct qcc *qcc = qcs->qcc; + const uint64_t diff = qcs_prep_bytes(qcs); if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs)) return; @@ -975,23 +989,19 @@ void qcc_reset_stream(struct qcs *qcs, int err) qcs->flags |= QC_SF_TO_RESET; qcs->err = err; - /* Remove prepared stream data from connection flow-control calcul. */ - if (qcs->tx.offset > qcs->tx.fc.off_real) { - const uint64_t diff = qcs->tx.offset - qcs->tx.fc.off_real; - BUG_ON(qcc->tx.offsets - diff < qcc->tx.fc.off_real); - qcc->tx.offsets -= diff; - /* Reset qcs offset to prevent BUG_ON() on qcs_destroy(). */ - qcs->tx.offset = qcs->tx.fc.off_real; - } - - /* Substract to conn flow control data amount prepared on stream not yet sent. */ - if (qcs->tx.fc.off_soft > qcs->tx.fc.off_real) { + if (diff) { const int soft_blocked = qfctl_sblocked(&qcc->tx.fc); - qcc->tx.fc.off_soft -= (qcs->tx.fc.off_soft - qcs->tx.fc.off_real); + /* Soft offset cannot be inferior to real one. */ + BUG_ON(qcc->tx.fc.off_soft - diff < qcc->tx.fc.off_real); + /* Substract to conn flow control data amount prepared on stream not yet sent. */ + qcc->tx.fc.off_soft -= diff; if (soft_blocked && !qfctl_sblocked(&qcc->tx.fc)) qcc_notify_fctl(qcc); + + /* Reset QCS soft off to prevent BUG_ON() on qcs_destroy(). */ + qcs->tx.fc.off_soft = qcs->tx.fc.off_real; } /* Report send error to stream-endpoint layer. */ @@ -1030,9 +1040,6 @@ void qcc_send_stream(struct qcs *qcs, int urg, int count) if (count) { qfctl_sinc(&qcc->tx.fc, count); qfctl_sinc(&qcs->tx.fc, count); - - qcs->tx.offset += count; - qcs->qcc->tx.offsets += count; } TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -1559,7 +1566,7 @@ static void qcs_destroy(struct qcs *qcs) /* MUST not removed a stream with sending prepared data left. This is * to ensure consistency on connection flow-control calculation. */ - BUG_ON(qcs->tx.offset < qcs->tx.fc.off_real); + BUG_ON(qcs->tx.fc.off_soft != qcs->tx.fc.off_real); if (!(qcc->flags & QC_CF_ERRL)) { if (quic_stream_is_remote(qcc, id)) @@ -1585,19 +1592,16 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, { struct qcc *qcc = qcs->qcc; struct quic_frame *frm; - int head, total; - uint64_t base_off; const uint64_t window_stream = qfctl_rcap(&qcs->tx.fc); + const uint64_t bytes = qcs_prep_bytes(qcs); + uint64_t total; TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); - /* if ack_offset < buf_offset, it points to an older buffer. */ - base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset); - BUG_ON(qcs->tx.fc.off_real < base_off); + /* This must only be called if there is data left, or at least a standalone FIN. */ + BUG_ON((!out || !b_data(out)) && !fin); - head = qcs->tx.fc.off_real - base_off; - total = out ? b_data(out) - head : 0; - BUG_ON(total < 0); + total = bytes; /* do not exceed stream flow control limit */ if (total > window_stream) { @@ -1612,7 +1616,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, } /* Reset FIN if bytes to send is capped by flow control. */ - if (out && total < b_data(out) - head) + if (total < bytes) fin = 0; if (!total && !fin) { @@ -1620,10 +1624,6 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); return 0; } - BUG_ON((!total && qcs->tx.fc.off_real > qcs->tx.offset) || - (total && qcs->tx.fc.off_real >= qcs->tx.offset)); - BUG_ON(qcs->tx.fc.off_real + total > qcs->tx.offset); - BUG_ON(qcc->tx.fc.off_real + total > qcc->tx.fc.limit); TRACE_PROTO("sending STREAM frame", QMUX_EV_QCS_SEND, qcc->conn, qcs); frm = qc_frm_alloc(QUIC_FT_STREAM_8); @@ -1639,7 +1639,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, if (total) { frm->stream.buf = out; - frm->stream.data = (unsigned char *)b_peek(out, head); + frm->stream.data = (unsigned char *)b_peek(out, b_data(out) - bytes); } else { /* Empty STREAM frame. */ @@ -1681,13 +1681,6 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, return -1; } -/* Return true if has data to send in new STREAM frames. */ -static forceinline int qcs_need_sending(struct qcs *qcs) -{ - return qcs->tx.fc.off_real < qcs->tx.offset || - qcs->flags & QC_SF_FIN_STREAM; -} - /* This function must be called by the upper layer to inform about the sending * of a STREAM frame for instance. The frame is of length and on * . @@ -1699,8 +1692,8 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); + /* Real off MUST always be the greatest offset sent. */ BUG_ON(offset > qcs->tx.fc.off_real); - BUG_ON(offset + data > qcs->tx.offset); /* check if the STREAM frame has already been notified. It can happen * for retransmission. @@ -1732,11 +1725,8 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) TRACE_STATE("stream flow-control reached", QMUX_EV_QCS_SEND, qcc->conn, qcs); } - BUG_ON_HOT(qcs->tx.fc.off_real > qcs->tx.offset); - - /* If qcs.stream.buf is full, release it to the lower layer. */ - if (qcs->tx.offset == qcs->tx.fc.off_real && - b_full(&qcs->stream->buf->buf)) { + /* Release buffer if everything sent and buf is full or stream is waiting for room. */ + if (!qcs_prep_bytes(qcs) && (b_full(&qcs->stream->buf->buf))) { qc_stream_buf_release(qcs->stream); } @@ -1746,7 +1736,7 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) increment_send_rate(diff, 0); } - if (qcs->tx.offset == qcs->tx.fc.off_real) { + if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) { /* Remove stream from send_list if all was sent. */ LIST_DEL_INIT(&qcs->el_send); TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -1925,7 +1915,7 @@ static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn) BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id)); /* This function must not be called if there is nothing to send. */ - BUG_ON(!fin && !qcs_need_sending(qcs)); + BUG_ON(!fin && !qcs_prep_bytes(qcs)); /* Skip STREAM frame allocation if already subscribed for send. * Happens on sendto transient error or network congestion. @@ -2009,8 +1999,8 @@ static int qcc_io_send(struct qcc *qcc) break; /* Stream must not be present in send_list if it has nothing to send. */ - BUG_ON(!(qcs->flags & (QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) && - !qcs_need_sending(qcs)); + BUG_ON(!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) && + !qcs_prep_bytes(qcs)); /* Each STOP_SENDING/RESET_STREAM frame is sent individually to * guarantee its emission. @@ -2024,7 +2014,8 @@ static int qcc_io_send(struct qcc *qcc) /* Remove stream from send_list if it had only STOP_SENDING * to send. */ - if (!(qcs->flags & QC_SF_TO_RESET) && !qcs_need_sending(qcs)) { + if (!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_RESET)) && + !qcs_prep_bytes(qcs)) { LIST_DEL_INIT(&qcs->el_send); continue; } @@ -2515,8 +2506,6 @@ static int qmux_init(struct connection *conn, struct proxy *prx, /* Server parameters, params used for RX flow control. */ lparams = &conn->handle.qc->rx.params; - qcc->tx.offsets = 0; - qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi; qcc->lfctl.ms_uni = lparams->initial_max_streams_uni; qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local; @@ -2763,8 +2752,7 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, int flags) { struct qcs *qcs = __sc_mux_strm(sc); - struct buffer *out = qc_stream_buf_get(qcs->stream); - const size_t old_data = out ? b_data(out) : 0; + const size_t old_data = qcs_prep_bytes(qcs); size_t ret = 0; char fin; @@ -2807,7 +2795,7 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, } if (ret || fin) { - const size_t data = b_data(qc_stream_buf_get(qcs->stream)) - (old_data); + const size_t data = qcs_prep_bytes(qcs) - old_data; if (data || fin) qcc_send_stream(qcs, 0, data); if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) diff --git a/src/qmux_trace.c b/src/qmux_trace.c index a1f3fd196..254ebb017 100644 --- a/src/qmux_trace.c +++ b/src/qmux_trace.c @@ -76,15 +76,15 @@ static void qmux_trace(enum trace_level level, uint64_t mask, if (qcc->conn->handle.qc) chunk_appendf(&trace_buf, " qc=%p", qcc->conn->handle.qc); - chunk_appendf(&trace_buf, " md=%llu/%llu/%llu", - (ullong)qcc->tx.fc.limit, (ullong)qcc->tx.offsets, (ullong)qcc->tx.fc.off_real); + chunk_appendf(&trace_buf, " md=%llu/%llu", + (ullong)qcc->tx.fc.limit, (ullong)qcc->tx.fc.off_real); if (qcs) { chunk_appendf(&trace_buf, " qcs=%p .id=%llu .st=%s", qcs, (ullong)qcs->id, qcs_st_to_str(qcs->st)); - chunk_appendf(&trace_buf, " msd=%llu/%llu/%llu", - (ullong)qcs->tx.fc.limit, (ullong)qcs->tx.offset, (ullong)qcs->tx.fc.off_real); + chunk_appendf(&trace_buf, " msd=%llu/%llu", + (ullong)qcs->tx.fc.limit, (ullong)qcs->tx.fc.off_real); } if (mask & QMUX_EV_QCC_NQCS) {