diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 0c8ce6008..44fb4d627 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -26,6 +26,7 @@ enum qcs_type { }; #define QC_CF_BLK_MFCTL 0x00000001 /* sending blocked due to connection flow-control */ +#define QC_CF_CONN_FULL 0x00000002 /* no stream buffers available on connection */ struct qcc { struct connection *conn; diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h index 0550f4f0c..6d9359d0f 100644 --- a/include/haproxy/quic_stream.h +++ b/include/haproxy/quic_stream.h @@ -14,6 +14,7 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len void qc_stream_desc_free(struct qc_stream_desc *stream); struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream); +int qc_stream_buf_avail(struct quic_conn *qc); struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream, uint64_t offset); void qc_stream_buf_release(struct qc_stream_desc *stream); diff --git a/include/haproxy/xprt_quic-t.h b/include/haproxy/xprt_quic-t.h index ab714ea50..6c4336cc4 100644 --- a/include/haproxy/xprt_quic-t.h +++ b/include/haproxy/xprt_quic-t.h @@ -750,6 +750,7 @@ struct quic_conn { struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */ struct eb_root streams_by_id; /* qc_stream_desc tree */ + int stream_buf_count; /* total count of allocated stream buffers for this connection */ /* MUX */ struct qcc *qcc; diff --git a/src/mux_quic.c b/src/mux_quic.c index d0fd0b2a7..b969d3a4b 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -722,7 +722,14 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) { qc_stream_buf_release(qcs->stream); - tasklet_wakeup(qcc->wait_event.tasklet); + + /* reschedule send if buffers available */ + if (qc_stream_buf_avail(qcc->conn->handle.qc)) { + tasklet_wakeup(qcc->wait_event.tasklet); + } + else { + qcc->flags |= QC_CF_CONN_FULL; + } } } @@ -884,14 +891,15 @@ static int qc_send(struct qcc *qcc) continue; } - if (!out) { - struct connection *conn = qcc->conn; + if (!out && (qcc->flags & QC_CF_CONN_FULL)) { + node = eb64_next(node); + continue; + } - out = qc_stream_buf_alloc(qcs->stream, - qcs->tx.offset); + if (!out) { + out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset); if (!out) { - conn->xprt->subscribe(conn, conn->xprt_ctx, - SUB_RETRY_SEND, &qcc->wait_event); + qcc->flags |= QC_CF_CONN_FULL; node = eb64_next(node); continue; } diff --git a/src/quic_stream.c b/src/quic_stream.c index 0e58366e2..2dd9b1c4c 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -75,11 +75,11 @@ void qc_stream_desc_release(struct qc_stream_desc *stream) * Returns the count of byte removed from stream. Do not forget to check if * is NULL after invocation. */ -int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, - size_t len) +int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len) { struct qc_stream_desc *s = *stream; struct qc_stream_buf *stream_buf; + struct quic_conn *qc = s->qc; struct buffer *buf; size_t diff; @@ -115,6 +115,15 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, pool_free(pool_head_quic_conn_stream_buf, stream_buf); offer_buffers(NULL, 1); + /* notify MUX about available buffers. */ + --qc->stream_buf_count; + if (qc->mux_state == QC_MUX_READY) { + if (qc->qcc->flags & QC_CF_CONN_FULL) { + qc->qcc->flags &= ~QC_CF_CONN_FULL; + tasklet_wakeup(qc->qcc->wait_event.tasklet); + } + } + /* Free stream instance if already released and no buffers left. */ if (s->release && LIST_ISEMPTY(&s->buf_list)) { qc_stream_desc_free(s); @@ -131,6 +140,7 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, void qc_stream_desc_free(struct qc_stream_desc *stream) { struct qc_stream_buf *buf, *buf_back; + struct quic_conn *qc = stream->qc; struct eb64_node *frm_node; unsigned int free_count = 0; @@ -148,9 +158,19 @@ void qc_stream_desc_free(struct qc_stream_desc *stream) } } - if (free_count) + if (free_count) { offer_buffers(NULL, free_count); + qc->stream_buf_count -= free_count; + if (qc->mux_state == QC_MUX_READY) { + /* notify MUX about available buffers. */ + if (qc->qcc->flags & QC_CF_CONN_FULL) { + qc->qcc->flags &= ~QC_CF_CONN_FULL; + tasklet_wakeup(qc->qcc->wait_event.tasklet); + } + } + } + /* qc_stream_desc might be freed before having received all its ACKs. * This is the case if some frames were retransmitted. */ @@ -183,18 +203,35 @@ struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream) return &stream->buf->buf; } -/* Allocate a new current buffer for . This function is not allowed if - * current buffer is not NULL prior to this call. The new buffer represents - * stream payload at offset . +/* Check if a new stream buffer can be allocated for the connection . + * Returns a boolean. + */ +int qc_stream_buf_avail(struct quic_conn *qc) +{ + /* TODO use a global tune settings for max */ + return qc->stream_buf_count < 30; +} + +/* Allocate a new current buffer for . The buffer limit count for the + * connection is checked first. This function is not allowed if current buffer + * is not NULL prior to this call. The new buffer represents stream payload at + * offset . * * Returns the buffer or NULL. */ struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream, uint64_t offset) { + struct quic_conn *qc = stream->qc; + /* current buffer must be released first before allocate a new one. */ BUG_ON(stream->buf); + if (!qc_stream_buf_avail(qc)) + return NULL; + + ++qc->stream_buf_count; + stream->buf_offset = offset; stream->buf = pool_alloc(pool_head_quic_conn_stream_buf); if (!stream->buf) diff --git a/src/xprt_quic.c b/src/xprt_quic.c index 543fa2c23..21abd8557 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -4057,6 +4057,7 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4, MT_LIST_INIT(&qc->accept_list); qc->streams_by_id = EB_ROOT_UNIQUE; + qc->stream_buf_count = 0; TRACE_LEAVE(QUIC_EV_CONN_INIT, qc);