mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-22 06:11:32 +02:00
MEDIUM: mux-quic: rationalize tx buffers between qcc/qcs
Remove the tx mux ring buffers in qcs, which should be in the qcc. For the moment, use a simple architecture with 2 simple tx buffers in the qcs. The first buffer is used by the h3 layer to prepare the data. The mux send operation transfer these into the 2nd buffer named xprt_buf. This buffer is only freed when an ACK has been received. This architecture is functional but not optimal for two reasons : - it won't limit the buffer usage by connection - each transfer on a new stream requires an allocation
This commit is contained in:
parent
e1b61090a0
commit
d3d97c6ae7
@ -228,9 +228,8 @@ struct qcs {
|
|||||||
uint64_t bytes; /* number of bytes sent */
|
uint64_t bytes; /* number of bytes sent */
|
||||||
uint64_t ack_offset; /* last acked ordered byte offset */
|
uint64_t ack_offset; /* last acked ordered byte offset */
|
||||||
struct eb_root acked_frms; /* acked frames ordered by their offsets */
|
struct eb_root acked_frms; /* acked frames ordered by their offsets */
|
||||||
struct buffer buf; /* transmit buffer, always valid (buf_empty or real buffer) */
|
struct buffer buf; /* transmit buffer before sending via xprt */
|
||||||
struct buffer mbuf[QCC_MBUF_CNT];
|
struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */
|
||||||
uint64_t left; /* data currently stored in mbuf waiting for send */
|
|
||||||
} tx;
|
} tx;
|
||||||
struct wait_event *subs; /* recv wait_event the conn_stream associated is waiting on (via qc_subscribe) */
|
struct wait_event *subs; /* recv wait_event the conn_stream associated is waiting on (via qc_subscribe) */
|
||||||
struct list list; /* To be used when adding in qcc->send_list or qcc->fctl_lsit */
|
struct list list; /* To be used when adding in qcc->send_list or qcc->fctl_lsit */
|
||||||
|
28
src/h3.c
28
src/h3.c
@ -416,25 +416,15 @@ static int h3_control_send(struct h3_uqs *h3_uqs, void *ctx)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return next empty buffer of mux.
|
/* Returns buffer for data sending.
|
||||||
* TODO to optimize memory consumption, a non-full buffer should be used before
|
* May be NULL if the allocation failed.
|
||||||
* allocating a new one.
|
|
||||||
* TODO put this in mux ??
|
|
||||||
*/
|
*/
|
||||||
static struct buffer *get_mux_next_tx_buf(struct qcs *qcs)
|
static struct buffer *mux_get_buf(struct qcs *qcs)
|
||||||
{
|
{
|
||||||
struct buffer *buf = br_tail(qcs->tx.mbuf);
|
if (!b_size(&qcs->tx.buf))
|
||||||
|
b_alloc(&qcs->tx.buf);
|
||||||
|
|
||||||
if (b_data(buf))
|
return &qcs->tx.buf;
|
||||||
buf = br_tail_add(qcs->tx.mbuf);
|
|
||||||
|
|
||||||
if (!b_size(buf))
|
|
||||||
qc_get_buf(qcs->qcc, buf);
|
|
||||||
|
|
||||||
if (!buf)
|
|
||||||
ABORT_NOW();
|
|
||||||
|
|
||||||
return buf;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
|
static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
|
||||||
@ -484,7 +474,7 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
|
|||||||
|
|
||||||
list[hdr].n = ist("");
|
list[hdr].n = ist("");
|
||||||
|
|
||||||
res = get_mux_next_tx_buf(qcs);
|
res = mux_get_buf(qcs);
|
||||||
|
|
||||||
/* At least 5 bytes to store frame type + length as a varint max size */
|
/* At least 5 bytes to store frame type + length as a varint max size */
|
||||||
if (b_room(res) < 5)
|
if (b_room(res) < 5)
|
||||||
@ -517,7 +507,6 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
|
|||||||
if (!b_quic_enc_int(res, b_data(&headers_buf)))
|
if (!b_quic_enc_int(res, b_data(&headers_buf)))
|
||||||
ABORT_NOW();
|
ABORT_NOW();
|
||||||
b_add(res, b_data(&headers_buf));
|
b_add(res, b_data(&headers_buf));
|
||||||
qcs->tx.left += 1 + frame_length_size + b_data(&headers_buf);
|
|
||||||
|
|
||||||
ret = 0;
|
ret = 0;
|
||||||
blk = htx_get_head_blk(htx);
|
blk = htx_get_head_blk(htx);
|
||||||
@ -563,7 +552,7 @@ static int h3_resp_data_send(struct qcs *qcs, struct buffer *buf, size_t count)
|
|||||||
if (type != HTX_BLK_DATA)
|
if (type != HTX_BLK_DATA)
|
||||||
goto end;
|
goto end;
|
||||||
|
|
||||||
res = get_mux_next_tx_buf(qcs);
|
res = mux_get_buf(qcs);
|
||||||
|
|
||||||
if (fsize > count)
|
if (fsize > count)
|
||||||
fsize = count;
|
fsize = count;
|
||||||
@ -589,7 +578,6 @@ static int h3_resp_data_send(struct qcs *qcs, struct buffer *buf, size_t count)
|
|||||||
htx_cut_data_blk(htx, blk, fsize);
|
htx_cut_data_blk(htx, blk, fsize);
|
||||||
|
|
||||||
b_add(res, b_data(&outbuf));
|
b_add(res, b_data(&outbuf));
|
||||||
qcs->tx.left += b_data(&outbuf);
|
|
||||||
goto new_frame;
|
goto new_frame;
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
@ -905,6 +905,9 @@ static void qcs_destroy(struct qcs *qcs)
|
|||||||
offer_buffers(NULL, 1);
|
offer_buffers(NULL, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b_free(&qcs->tx.buf);
|
||||||
|
b_free(&qcs->tx.xprt_buf);
|
||||||
|
|
||||||
if (qcs->subs)
|
if (qcs->subs)
|
||||||
qcs->subs->events = 0;
|
qcs->subs->events = 0;
|
||||||
|
|
||||||
@ -963,8 +966,7 @@ struct qcs *bidi_qcs_new(struct qcc *qcc, uint64_t id)
|
|||||||
qcs->tx.acked_frms = EB_ROOT_UNIQUE;
|
qcs->tx.acked_frms = EB_ROOT_UNIQUE;
|
||||||
qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
|
qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
|
||||||
qcs->tx.buf = BUF_NULL;
|
qcs->tx.buf = BUF_NULL;
|
||||||
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
|
qcs->tx.xprt_buf = BUF_NULL;
|
||||||
qcs->tx.left = 0;
|
|
||||||
|
|
||||||
eb64_insert(&qcc->streams_by_id, &qcs->by_id);
|
eb64_insert(&qcc->streams_by_id, &qcs->by_id);
|
||||||
qcc->strms[qcs_type].nb_streams++;
|
qcc->strms[qcs_type].nb_streams++;
|
||||||
@ -1028,8 +1030,7 @@ struct qcs *luqs_new(struct qcc *qcc)
|
|||||||
qcs->tx.offset = qcs->tx.bytes = qcs->tx.ack_offset = 0;
|
qcs->tx.offset = qcs->tx.bytes = qcs->tx.ack_offset = 0;
|
||||||
qcs->tx.acked_frms = EB_ROOT_UNIQUE;
|
qcs->tx.acked_frms = EB_ROOT_UNIQUE;
|
||||||
qcs->tx.buf = BUF_NULL;
|
qcs->tx.buf = BUF_NULL;
|
||||||
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
|
qcs->tx.xprt_buf = BUF_NULL;
|
||||||
qcs->tx.left = 0;
|
|
||||||
|
|
||||||
qcs->subs = NULL;
|
qcs->subs = NULL;
|
||||||
LIST_INIT(&qcs->list);
|
LIST_INIT(&qcs->list);
|
||||||
@ -1070,8 +1071,8 @@ struct qcs *ruqs_new(struct qcc *qcc, uint64_t id)
|
|||||||
qcs->rx.offset = qcs->rx.bytes = 0;
|
qcs->rx.offset = qcs->rx.bytes = 0;
|
||||||
qcs->rx.buf = BUF_NULL;
|
qcs->rx.buf = BUF_NULL;
|
||||||
qcs->rx.frms = EB_ROOT_UNIQUE;
|
qcs->rx.frms = EB_ROOT_UNIQUE;
|
||||||
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
|
qcs->tx.buf = BUF_NULL;
|
||||||
qcs->tx.left = 0;
|
qcs->tx.xprt_buf = BUF_NULL;
|
||||||
|
|
||||||
qcs->subs = NULL;
|
qcs->subs = NULL;
|
||||||
LIST_INIT(&qcs->list);
|
LIST_INIT(&qcs->list);
|
||||||
@ -1304,7 +1305,7 @@ static int qc_recv(struct qcc *qcc)
|
|||||||
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
|
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
|
||||||
{
|
{
|
||||||
struct quic_frame *frm;
|
struct quic_frame *frm;
|
||||||
struct buffer *buf = &qcs->tx.buf;
|
struct buffer *buf = &qcs->tx.xprt_buf;
|
||||||
struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
|
struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
|
||||||
int total = 0, to_xfer;
|
int total = 0, to_xfer;
|
||||||
|
|
||||||
@ -1348,7 +1349,6 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint
|
|||||||
*/
|
*/
|
||||||
static int qc_send(struct qcc *qcc)
|
static int qc_send(struct qcc *qcc)
|
||||||
{
|
{
|
||||||
struct qcs *qcs;
|
|
||||||
struct eb64_node *node;
|
struct eb64_node *node;
|
||||||
int ret, done;
|
int ret, done;
|
||||||
|
|
||||||
@ -1363,33 +1363,29 @@ static int qc_send(struct qcc *qcc)
|
|||||||
*/
|
*/
|
||||||
node = eb64_first(&qcc->streams_by_id);
|
node = eb64_first(&qcc->streams_by_id);
|
||||||
while (node) {
|
while (node) {
|
||||||
struct buffer *buf;
|
struct qcs *qcs = container_of(node, struct qcs, by_id);
|
||||||
qcs = container_of(node, struct qcs, by_id);
|
struct buffer *buf = &qcs->tx.buf;
|
||||||
for (buf = br_head(qcs->tx.mbuf); b_size(buf); buf = br_del_head(qcs->tx.mbuf)) {
|
if (b_data(buf)) {
|
||||||
if (b_data(buf)) {
|
char fin = 0;
|
||||||
char fin = 0;
|
|
||||||
|
|
||||||
/* if FIN is activated, ensure the buffer to
|
/* if FIN is activated, ensure the buffer to
|
||||||
* send is the last
|
* send is the last
|
||||||
*/
|
*/
|
||||||
if (qcs->flags & QC_SF_FIN_STREAM) {
|
if (qcs->flags & QC_SF_FIN_STREAM) {
|
||||||
BUG_ON(qcs->tx.left < b_data(buf));
|
BUG_ON(b_data(&qcs->tx.buf) < b_data(buf));
|
||||||
fin = !(qcs->tx.left - b_data(buf));
|
fin = (b_data(&qcs->tx.buf) - b_data(buf) == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset);
|
ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
ABORT_NOW();
|
ABORT_NOW();
|
||||||
|
|
||||||
qcs->tx.left -= ret;
|
qcs->tx.offset += ret;
|
||||||
qcs->tx.offset += ret;
|
if (b_data(buf)) {
|
||||||
if (b_data(buf)) {
|
qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
|
||||||
qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
|
SUB_RETRY_SEND, &qcc->wait_event);
|
||||||
SUB_RETRY_SEND, &qcc->wait_event);
|
break;
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
b_free(buf);
|
|
||||||
}
|
}
|
||||||
node = eb64_next(node);
|
node = eb64_next(node);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user