mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-22 14:21:25 +02:00
MINOR: quic: limit total stream buffers per connection
MUX streams can now allocate multiple buffers for sending. quic-conn is responsible to limit the total count of allowed allocated buffers. A counter is stored in the new field <stream_buf_count>. For the moment, the value is hardcoded to 30. On stream buffer allocation failure, the qcc MUX is flagged with QC_CF_CONN_FULL. The MUX is then woken up as soon as a buffer is freed, most notably on ACK reception.
This commit is contained in:
parent
1b81dda3e0
commit
d2f80a2e63
@ -26,6 +26,7 @@ enum qcs_type {
|
|||||||
};
|
};
|
||||||
|
|
||||||
#define QC_CF_BLK_MFCTL 0x00000001 /* sending blocked due to connection flow-control */
|
#define QC_CF_BLK_MFCTL 0x00000001 /* sending blocked due to connection flow-control */
|
||||||
|
#define QC_CF_CONN_FULL 0x00000002 /* no stream buffers available on connection */
|
||||||
|
|
||||||
struct qcc {
|
struct qcc {
|
||||||
struct connection *conn;
|
struct connection *conn;
|
||||||
|
@ -14,6 +14,7 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len
|
|||||||
void qc_stream_desc_free(struct qc_stream_desc *stream);
|
void qc_stream_desc_free(struct qc_stream_desc *stream);
|
||||||
|
|
||||||
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
|
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
|
||||||
|
int qc_stream_buf_avail(struct quic_conn *qc);
|
||||||
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
|
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
|
||||||
uint64_t offset);
|
uint64_t offset);
|
||||||
void qc_stream_buf_release(struct qc_stream_desc *stream);
|
void qc_stream_buf_release(struct qc_stream_desc *stream);
|
||||||
|
@ -750,6 +750,7 @@ struct quic_conn {
|
|||||||
struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
|
struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
|
||||||
|
|
||||||
struct eb_root streams_by_id; /* qc_stream_desc tree */
|
struct eb_root streams_by_id; /* qc_stream_desc tree */
|
||||||
|
int stream_buf_count; /* total count of allocated stream buffers for this connection */
|
||||||
|
|
||||||
/* MUX */
|
/* MUX */
|
||||||
struct qcc *qcc;
|
struct qcc *qcc;
|
||||||
|
@ -722,7 +722,14 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
|
|||||||
|
|
||||||
if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
|
if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
|
||||||
qc_stream_buf_release(qcs->stream);
|
qc_stream_buf_release(qcs->stream);
|
||||||
tasklet_wakeup(qcc->wait_event.tasklet);
|
|
||||||
|
/* reschedule send if buffers available */
|
||||||
|
if (qc_stream_buf_avail(qcc->conn->handle.qc)) {
|
||||||
|
tasklet_wakeup(qcc->wait_event.tasklet);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
qcc->flags |= QC_CF_CONN_FULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -884,14 +891,15 @@ static int qc_send(struct qcc *qcc)
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!out) {
|
if (!out && (qcc->flags & QC_CF_CONN_FULL)) {
|
||||||
struct connection *conn = qcc->conn;
|
node = eb64_next(node);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
out = qc_stream_buf_alloc(qcs->stream,
|
if (!out) {
|
||||||
qcs->tx.offset);
|
out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
|
||||||
if (!out) {
|
if (!out) {
|
||||||
conn->xprt->subscribe(conn, conn->xprt_ctx,
|
qcc->flags |= QC_CF_CONN_FULL;
|
||||||
SUB_RETRY_SEND, &qcc->wait_event);
|
|
||||||
node = eb64_next(node);
|
node = eb64_next(node);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -75,11 +75,11 @@ void qc_stream_desc_release(struct qc_stream_desc *stream)
|
|||||||
* Returns the count of byte removed from stream. Do not forget to check if
|
* Returns the count of byte removed from stream. Do not forget to check if
|
||||||
* <stream> is NULL after invocation.
|
* <stream> is NULL after invocation.
|
||||||
*/
|
*/
|
||||||
int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset,
|
int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len)
|
||||||
size_t len)
|
|
||||||
{
|
{
|
||||||
struct qc_stream_desc *s = *stream;
|
struct qc_stream_desc *s = *stream;
|
||||||
struct qc_stream_buf *stream_buf;
|
struct qc_stream_buf *stream_buf;
|
||||||
|
struct quic_conn *qc = s->qc;
|
||||||
struct buffer *buf;
|
struct buffer *buf;
|
||||||
size_t diff;
|
size_t diff;
|
||||||
|
|
||||||
@ -115,6 +115,15 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset,
|
|||||||
pool_free(pool_head_quic_conn_stream_buf, stream_buf);
|
pool_free(pool_head_quic_conn_stream_buf, stream_buf);
|
||||||
offer_buffers(NULL, 1);
|
offer_buffers(NULL, 1);
|
||||||
|
|
||||||
|
/* notify MUX about available buffers. */
|
||||||
|
--qc->stream_buf_count;
|
||||||
|
if (qc->mux_state == QC_MUX_READY) {
|
||||||
|
if (qc->qcc->flags & QC_CF_CONN_FULL) {
|
||||||
|
qc->qcc->flags &= ~QC_CF_CONN_FULL;
|
||||||
|
tasklet_wakeup(qc->qcc->wait_event.tasklet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Free stream instance if already released and no buffers left. */
|
/* Free stream instance if already released and no buffers left. */
|
||||||
if (s->release && LIST_ISEMPTY(&s->buf_list)) {
|
if (s->release && LIST_ISEMPTY(&s->buf_list)) {
|
||||||
qc_stream_desc_free(s);
|
qc_stream_desc_free(s);
|
||||||
@ -131,6 +140,7 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset,
|
|||||||
void qc_stream_desc_free(struct qc_stream_desc *stream)
|
void qc_stream_desc_free(struct qc_stream_desc *stream)
|
||||||
{
|
{
|
||||||
struct qc_stream_buf *buf, *buf_back;
|
struct qc_stream_buf *buf, *buf_back;
|
||||||
|
struct quic_conn *qc = stream->qc;
|
||||||
struct eb64_node *frm_node;
|
struct eb64_node *frm_node;
|
||||||
unsigned int free_count = 0;
|
unsigned int free_count = 0;
|
||||||
|
|
||||||
@ -148,9 +158,19 @@ void qc_stream_desc_free(struct qc_stream_desc *stream)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (free_count)
|
if (free_count) {
|
||||||
offer_buffers(NULL, free_count);
|
offer_buffers(NULL, free_count);
|
||||||
|
|
||||||
|
qc->stream_buf_count -= free_count;
|
||||||
|
if (qc->mux_state == QC_MUX_READY) {
|
||||||
|
/* notify MUX about available buffers. */
|
||||||
|
if (qc->qcc->flags & QC_CF_CONN_FULL) {
|
||||||
|
qc->qcc->flags &= ~QC_CF_CONN_FULL;
|
||||||
|
tasklet_wakeup(qc->qcc->wait_event.tasklet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* qc_stream_desc might be freed before having received all its ACKs.
|
/* qc_stream_desc might be freed before having received all its ACKs.
|
||||||
* This is the case if some frames were retransmitted.
|
* This is the case if some frames were retransmitted.
|
||||||
*/
|
*/
|
||||||
@ -183,18 +203,35 @@ struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
|
|||||||
return &stream->buf->buf;
|
return &stream->buf->buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Allocate a new current buffer for <stream>. This function is not allowed if
|
/* Check if a new stream buffer can be allocated for the connection <qc>.
|
||||||
* current buffer is not NULL prior to this call. The new buffer represents
|
* Returns a boolean.
|
||||||
* stream payload at offset <offset>.
|
*/
|
||||||
|
int qc_stream_buf_avail(struct quic_conn *qc)
|
||||||
|
{
|
||||||
|
/* TODO use a global tune settings for max */
|
||||||
|
return qc->stream_buf_count < 30;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Allocate a new current buffer for <stream>. The buffer limit count for the
|
||||||
|
* connection is checked first. This function is not allowed if current buffer
|
||||||
|
* is not NULL prior to this call. The new buffer represents stream payload at
|
||||||
|
* offset <offset>.
|
||||||
*
|
*
|
||||||
* Returns the buffer or NULL.
|
* Returns the buffer or NULL.
|
||||||
*/
|
*/
|
||||||
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
|
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
|
||||||
uint64_t offset)
|
uint64_t offset)
|
||||||
{
|
{
|
||||||
|
struct quic_conn *qc = stream->qc;
|
||||||
|
|
||||||
/* current buffer must be released first before allocate a new one. */
|
/* current buffer must be released first before allocate a new one. */
|
||||||
BUG_ON(stream->buf);
|
BUG_ON(stream->buf);
|
||||||
|
|
||||||
|
if (!qc_stream_buf_avail(qc))
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
++qc->stream_buf_count;
|
||||||
|
|
||||||
stream->buf_offset = offset;
|
stream->buf_offset = offset;
|
||||||
stream->buf = pool_alloc(pool_head_quic_conn_stream_buf);
|
stream->buf = pool_alloc(pool_head_quic_conn_stream_buf);
|
||||||
if (!stream->buf)
|
if (!stream->buf)
|
||||||
|
@ -4057,6 +4057,7 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4,
|
|||||||
MT_LIST_INIT(&qc->accept_list);
|
MT_LIST_INIT(&qc->accept_list);
|
||||||
|
|
||||||
qc->streams_by_id = EB_ROOT_UNIQUE;
|
qc->streams_by_id = EB_ROOT_UNIQUE;
|
||||||
|
qc->stream_buf_count = 0;
|
||||||
|
|
||||||
TRACE_LEAVE(QUIC_EV_CONN_INIT, qc);
|
TRACE_LEAVE(QUIC_EV_CONN_INIT, qc);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user