MAJOR: mux-quic: allocate Tx buffers based on congestion window

Each QUIC MUX may allocate buffers for MUX stream emission. These
buffers are then shared with quic_conn to handle ACK reception and
retransmission. A limit on the number of concurrent buffers used per
connection has been defined statically and can be updated via a
configuration option. This commit replaces the limit to instead use the
current underlying congestion window size.

The purpose of this change is to remove the artificial static buffer
count limit, which may be difficult to choose. Indeed, if a connection
performs with minimal loss rate, the buffer count would limit severely
its throughput. It could be increase to fix this, but it also impacts
others connections, even with less optimal performance, causing too many
extra data buffering on the MUX layer. By using the dynamic congestion
window size, haproxy ensures that MUX buffering corresponds roughly to
the network conditions.

Using QCC <buf_in_flight>, a new buffer can be allocated if it is less
than the current window size. If not, QCS emission is interrupted and
haproxy stream layer will subscribe until a new buffer is ready.

One of the criticals parts is to ensure that MUX layer previously
blocked on buffer allocation is properly woken up when sending can be
retried. This occurs on two occasions :

* after an already used Tx buffer is cleared on ACK reception. This case
  is already handled by qcc_notify_buf() via quic_stream layer.

* on congestion window increase. A new qcc_notify_buf() invokation is
  added into qc_notify_send().

Finally, remove <avail_bufs> QCC field which is now unused.

This commit is labelled MAJOR as it may have unexpected effect and could
cause significant behavior change. For example, in previous
implementation QUIC MUX would be able to buffer more data even if the
congestion window is small. With this patch, data cannot be transferred
from the stream layer which may cause more streams to be shut down on
client timeout. Another effect may be more CPU consumption as the
connection limit would be hit more often, causing more streams to be
interrupted and woken up in cycle.
This commit is contained in:
Amaury Denoyelle 2024-06-13 17:06:40 +02:00
parent 000976af58
commit aeb8c1ddc3
6 changed files with 38 additions and 25 deletions

View File

@ -67,7 +67,6 @@ struct qcc {
struct {
struct quic_fctl fc; /* stream flow control applied on sending */
int avail_bufs; /* count of available buffers for this connection */
uint64_t buf_in_flight; /* sum of currently allocated Tx buffer sizes */
} tx;

View File

@ -23,7 +23,7 @@ int qcs_is_close_remote(struct qcs *qcs);
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
void qcs_notify_recv(struct qcs *qcs);
void qcs_notify_send(struct qcs *qcs);
void qcc_notify_buf(struct qcc *qcc, int free_count, uint64_t free_size);
void qcc_notify_buf(struct qcc *qcc, uint64_t free_size);
struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs);
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err);

View File

@ -1612,7 +1612,7 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
goto err;
}
TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
TRACE_STATE("buf window full", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
goto end;
}
@ -1770,7 +1770,7 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
goto err;
}
TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
TRACE_STATE("buf window full", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
goto end;
}
@ -1904,8 +1904,7 @@ static int h3_resp_data_send(struct qcs *qcs, struct htx *htx,
goto err;
}
/* Connection buf limit reached, stconn will subscribe on SEND. */
TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
TRACE_STATE("buf window full", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
goto end;
}

View File

@ -524,31 +524,40 @@ void qcs_notify_send(struct qcs *qcs)
}
}
/* Returns true if a Tx stream buffer can be allocated. */
static inline int qcc_bufwnd_full(const struct qcc *qcc)
{
const struct quic_conn *qc = qcc->conn->handle.qc;
return qcc->tx.buf_in_flight >= qc->path->cwnd;
}
/* Report that one or several stream-desc buffers have been released for <qcc>
* connection. <free_size> represent the sum of freed buffers sizes.
* connection. <free_size> represent the sum of freed buffers sizes. May also
* be used to notify about congestion window increase, in which case
* <free_size> can be nul.
*/
void qcc_notify_buf(struct qcc *qcc, int free_count, uint64_t free_size)
void qcc_notify_buf(struct qcc *qcc, uint64_t free_size)
{
struct qcs *qcs;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
BUG_ON(qcc->tx.avail_bufs + free_count > global.tune.quic_streams_buf);
qcc->tx.avail_bufs += free_count;
/* Cannot have a negative buf_in_flight counter */
BUG_ON(qcc->tx.buf_in_flight < free_size);
qcc->tx.buf_in_flight -= free_size;
if (qcc_bufwnd_full(qcc))
return;
if (qcc->flags & QC_CF_CONN_FULL) {
TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn);
TRACE_STATE("buf window now available", QMUX_EV_QCC_WAKE, qcc->conn);
qcc->flags &= ~QC_CF_CONN_FULL;
}
/* TODO a simple optimization would be to only wake up <free_count> QCS
* instances. But it may not work if a woken QCS is in error and does
* not try to allocate a buffer, leaving the unwoken QCS indefinitely
* in the buflist.
/* TODO an optimization would be to only wake up a limited count of QCS
* instances based on <free_size>. But it may not work if a woken QCS
* is in error and does not try to allocate a buffer, leaving the
* unwoken QCS indefinitely in the buflist.
*/
while (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf);
@ -1019,7 +1028,7 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs)
*
* <err> is an output argument which is useful to differentiate the failure
* cause when the buffer cannot be allocated. It is set to 0 if the connection
* buffer limit is reached. For fatal errors, its value is non-zero.
* buffer window is full. For fatal errors, its value is non-zero.
*
* Streams reserved for application protocol metadata transfer are not subject
* to the buffer limit per connection. Hence, for them only a memory error
@ -1047,8 +1056,8 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
goto out;
}
if (!qcc->tx.avail_bufs) {
TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
if (qcc_bufwnd_full(qcc)) {
TRACE_STATE("no more room", QMUX_EV_QCS_SEND, qcc->conn, qcs);
LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
tot_time_start(&qcs->timer.buf);
qcc->flags |= QC_CF_CONN_FULL;
@ -1063,10 +1072,8 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
goto out;
}
if (likely(!unlimited)) {
--qcc->tx.avail_bufs;
if (likely(!unlimited))
qcc->tx.buf_in_flight += global.tune.bufsize;
}
}
out:
@ -2733,7 +2740,6 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
qcc->tx.avail_bufs = global.tune.quic_streams_buf;
qcc->tx.buf_in_flight = 0;
if (conn_is_back(conn)) {

View File

@ -1717,6 +1717,9 @@ int qc_notify_send(struct quic_conn *qc)
{
const struct quic_pktns *pktns = qc->apktns;
/* Wake up MUX for new emission unless there is no congestion room or
* connection FD is not ready.
*/
if (qc->subs && qc->subs->events & SUB_RETRY_SEND) {
/* RFC 9002 7.5. Probe Timeout
*
@ -1733,6 +1736,12 @@ int qc_notify_send(struct quic_conn *qc)
}
}
/* Wake up streams layer waiting for buffer. Useful after congestion
* window increase.
*/
if (qc->mux_state == QC_MUX_READY && (qc->qcc->flags & QC_CF_CONN_FULL))
qcc_notify_buf(qc->qcc, 0);
return 0;
}

View File

@ -46,7 +46,7 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
if (qc->mux_state == QC_MUX_READY) {
if (!(stream->flags & QC_SD_FL_OOB_BUF)) {
/* notify MUX about available buffers. */
qcc_notify_buf(qc->qcc, 1, free_size);
qcc_notify_buf(qc->qcc, free_size);
}
}
}
@ -224,7 +224,7 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
if (qc->mux_state == QC_MUX_READY) {
if (!(stream->flags & QC_SD_FL_OOB_BUF)) {
/* notify MUX about available buffers. */
qcc_notify_buf(qc->qcc, free_count, free_size);
qcc_notify_buf(qc->qcc, free_size);
}
}
}