diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index ee3fbe54c..22fd1b910 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -67,6 +67,7 @@ struct qcc { struct { struct quic_fctl fc; /* stream flow control applied on sending */ + int avail_bufs; /* count of available buffers for this connection */ } tx; uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */ diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 1ed8ad1dc..28a5af7b8 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -22,7 +22,7 @@ int qcs_is_close_remote(struct qcs *qcs); int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); -int qcc_notify_buf(struct qcc *qcc); +void qcc_notify_buf(struct qcc *qcc, int free_count); struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs); struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err); diff --git a/include/haproxy/quic_conn-t.h b/include/haproxy/quic_conn-t.h index ed5ec01c9..5ac3f1ff4 100644 --- a/include/haproxy/quic_conn-t.h +++ b/include/haproxy/quic_conn-t.h @@ -398,7 +398,6 @@ struct quic_conn { struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */ struct eb_root streams_by_id; /* qc_stream_desc tree */ - int stream_buf_count; /* total count of allocated stream buffers for this connection */ /* MUX */ struct qcc *qcc; diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h index 7788d9039..c7a237969 100644 --- a/include/haproxy/quic_stream.h +++ b/include/haproxy/quic_stream.h @@ -16,7 +16,7 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing); struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream); struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream, - uint64_t offset, int *avail); + uint64_t offset); void qc_stream_buf_release(struct qc_stream_desc *stream); #endif /* USE_QUIC */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 30882ab2e..bdd366f4f 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -523,34 +524,36 @@ void qcs_notify_send(struct qcs *qcs) } } -/* Notify on a new stream-desc buffer available for connection. - * - * Returns true if a stream was woken up. If false is returned, this indicates - * to the caller that it's currently unnecessary to notify for the rest of the - * available buffers. +/* Report that stream-desc buffer have been released for + * connection. */ -int qcc_notify_buf(struct qcc *qcc) +void qcc_notify_buf(struct qcc *qcc, int free_count) { struct qcs *qcs; - int ret = 0; TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); + BUG_ON(qcc->tx.avail_bufs + free_count > global.tune.quic_streams_buf); + qcc->tx.avail_bufs += free_count; + if (qcc->flags & QC_CF_CONN_FULL) { TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn); qcc->flags &= ~QC_CF_CONN_FULL; } - if (!LIST_ISEMPTY(&qcc->buf_wait_list)) { + /* TODO a simple optimization would be to only wake up QCS + * instances. But it may not work if a woken QCS is in error and does + * not try to allocate a buffer, leaving the unwoken QCS indefinitely + * in the buflist. + */ + while (!LIST_ISEMPTY(&qcc->buf_wait_list)) { qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf); LIST_DEL_INIT(&qcs->el_buf); tot_time_stop(&qcs->timer.buf); qcs_notify_send(qcs); - ret = 1; } TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); - return ret; } /* A fatal error is detected locally for connection. It should be closed @@ -1007,7 +1010,6 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs) struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err) { struct qcc *qcc = qcs->qcc; - int buf_avail; struct buffer *out = qc_stream_buf_get(qcs->stream); /* Stream must not try to reallocate a buffer if currently waiting for one. */ @@ -1022,21 +1024,22 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err) goto out; } - out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real, - &buf_avail); - if (!out) { - if (buf_avail) { - TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); - *err = 1; - goto out; - } - + if (!qcc->tx.avail_bufs) { TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs); LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf); tot_time_start(&qcs->timer.buf); qcc->flags |= QC_CF_CONN_FULL; goto out; } + + out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real); + if (!out) { + TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); + *err = 1; + goto out; + } + + --qcc->tx.avail_bufs; } out: @@ -2703,6 +2706,8 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote; qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni; + qcc->tx.avail_bufs = global.tune.quic_streams_buf; + if (conn_is_back(conn)) { qcc->next_bidi_l = 0x00; qcc->largest_bidi_r = 0x01; diff --git a/src/quic_conn.c b/src/quic_conn.c index 651669696..a07ffd2f4 100644 --- a/src/quic_conn.c +++ b/src/quic_conn.c @@ -1181,7 +1181,6 @@ struct quic_conn *qc_new_conn(const struct quic_version *qv, int ipv4, quic_cc_path_init(qc->path, ipv4, server ? l->bind_conf->max_cwnd : 0, cc_algo ? cc_algo : default_quic_cc_algo, qc); - qc->stream_buf_count = 0; memcpy(&qc->local_addr, local_addr, sizeof(qc->local_addr)); memcpy(&qc->peer_addr, peer_addr, sizeof qc->peer_addr); diff --git a/src/quic_stream.c b/src/quic_stream.c index a21391346..b45bac7bb 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -41,15 +41,9 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream, *stream_buf = NULL; /* notify MUX about available buffers. */ - --qc->stream_buf_count; if (qc->mux_state == QC_MUX_READY) { - /* notify MUX about available buffers. - * - * TODO several streams may be woken up even if a single buffer - * is available for now. - */ - while (qcc_notify_buf(qc->qcc)) - ; + /* notify MUX about available buffers. */ + qcc_notify_buf(qc->qcc, 1); } } @@ -222,15 +216,9 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing) if (free_count) { offer_buffers(NULL, free_count); - qc->stream_buf_count -= free_count; if (qc->mux_state == QC_MUX_READY) { - /* notify MUX about available buffers. - * - * TODO several streams may be woken up even if a single buffer - * is available for now. - */ - while (qcc_notify_buf(qc->qcc)) - ; + /* notify MUX about available buffers. */ + qcc_notify_buf(qc->qcc, free_count); } } @@ -265,45 +253,30 @@ struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream) return &stream->buf->buf; } -/* Returns the count of available buffer left for . */ -static int qc_stream_buf_avail(struct quic_conn *qc) -{ - BUG_ON(qc->stream_buf_count > global.tune.quic_streams_buf); - return global.tune.quic_streams_buf - qc->stream_buf_count; -} - -/* Allocate a new current buffer for . The buffer limit count for the - * connection is checked first. This function is not allowed if current buffer - * is not NULL prior to this call. The new buffer represents stream payload at - * offset . +/* Allocate a new current buffer for . This function is not allowed if + * current buffer is not NULL prior to this call. The new buffer represents + * stream payload at offset . * - * Returns the buffer or NULL on error. Caller may check to ensure if - * the connection buffer limit was reached or a fatal error was encountered. + * Returns the buffer or NULL on error. */ struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream, - uint64_t offset, int *avail) + uint64_t offset) { - struct quic_conn *qc = stream->qc; - /* current buffer must be released first before allocate a new one. */ BUG_ON(stream->buf); - *avail = qc_stream_buf_avail(qc); - if (!*avail) - return NULL; - stream->buf_offset = offset; stream->buf = pool_alloc(pool_head_quic_stream_buf); if (!stream->buf) return NULL; + stream->buf->buf = BUF_NULL; if (!b_alloc(&stream->buf->buf, DB_MUX_TX)) { pool_free(pool_head_quic_stream_buf, stream->buf); stream->buf = NULL; return NULL; } - ++qc->stream_buf_count; LIST_APPEND(&stream->buf_list, &stream->buf->list); return &stream->buf->buf;