mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-04-16 11:13:21 +02:00
MINOR: mux-quic: define Tx connection buffer for QMux
Similarly to reception, a new buffer is defined in QCC connection to handle emission for QMux protocol. This replaces the trash buffer usage in qcc_qstrm_send_frames(). This buffer is necessary to handle partial emission. On retry, the buffer must be completely emitted before starting to send new frames.
This commit is contained in:
parent
621f21f6fd
commit
782894f5b8
@ -81,8 +81,15 @@ struct qcc {
|
||||
struct quic_fctl fc; /* stream flow control applied on sending */
|
||||
uint64_t buf_in_flight; /* sum of currently allocated Tx buffer sizes */
|
||||
struct list frms; /* list of STREAM frames ready for sent */
|
||||
struct quic_pacer pacer; /* engine used to pace emission */
|
||||
int paced_sent_ctr; /* counter for when emission is interrupted due to pacing */
|
||||
union {
|
||||
struct {
|
||||
/* quic */
|
||||
struct quic_pacer pacer; /* engine used to pace emission */
|
||||
int paced_sent_ctr; /* counter for when emission is interrupted due to pacing */
|
||||
};
|
||||
/* qstrm */
|
||||
struct buffer qstrm_buf;
|
||||
};
|
||||
} tx;
|
||||
struct {
|
||||
struct buffer qstrm_buf;
|
||||
|
||||
@ -3760,6 +3760,13 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
|
||||
}
|
||||
|
||||
if (!conn_is_quic(conn)) {
|
||||
qcc->tx.qstrm_buf = BUF_NULL;
|
||||
b_alloc(&qcc->tx.qstrm_buf, DB_MUX_TX);
|
||||
if (!b_size(&qcc->tx.qstrm_buf)) {
|
||||
TRACE_ERROR("tx qstrm buf alloc failure", QMUX_EV_QCC_NEW);
|
||||
goto err;
|
||||
}
|
||||
|
||||
qcc->rx.qstrm_buf = BUF_NULL;
|
||||
b_alloc(&qcc->rx.qstrm_buf, DB_MUX_RX);
|
||||
if (!b_size(&qcc->rx.qstrm_buf)) {
|
||||
|
||||
@ -224,16 +224,32 @@ int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms)
|
||||
struct connection *conn = qcc->conn;
|
||||
struct quic_frame *frm, *frm_old;
|
||||
struct quic_frame *split_frm, *orig_frm;
|
||||
struct buffer *buf = &qcc->tx.qstrm_buf;
|
||||
unsigned char *pos, *old, *end;
|
||||
size_t ret;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
|
||||
|
||||
if (b_data(buf)) {
|
||||
ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), NULL, 0, 0);
|
||||
if (!ret) {
|
||||
TRACE_DEVEL("snd_buf interrupted", QMUX_EV_QCC_SEND, qcc->conn);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (ret != b_data(buf)) {
|
||||
/* TODO */
|
||||
ABORT_NOW();
|
||||
}
|
||||
}
|
||||
|
||||
b_reset(buf);
|
||||
list_for_each_entry_safe(frm, frm_old, frms, list) {
|
||||
loop:
|
||||
split_frm = NULL;
|
||||
b_reset(&trash);
|
||||
old = pos = (unsigned char *)b_orig(&trash);
|
||||
end = (unsigned char *)b_wrap(&trash);
|
||||
b_reset(buf);
|
||||
old = pos = (unsigned char *)b_orig(buf);
|
||||
end = (unsigned char *)b_wrap(buf);
|
||||
|
||||
BUG_ON(!frm);
|
||||
TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, QMUX_EV_QCC_SEND, qcc->conn, 0, 0, 0,
|
||||
@ -261,9 +277,9 @@ int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms)
|
||||
qc_build_frm(frm, &pos, end, NULL);
|
||||
BUG_ON(pos - old > global.tune.bufsize);
|
||||
BUG_ON(pos == old);
|
||||
b_add(&trash, pos - old);
|
||||
b_add(buf, pos - old);
|
||||
|
||||
ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, &trash, b_data(&trash), NULL, 0, 0);
|
||||
ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, b_data(buf), NULL, 0, 0);
|
||||
if (!ret) {
|
||||
TRACE_DEVEL("snd_buf interrupted", QMUX_EV_QCC_SEND, qcc->conn);
|
||||
if (split_frm)
|
||||
@ -271,7 +287,7 @@ int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms)
|
||||
break;
|
||||
}
|
||||
|
||||
if (ret != b_data(&trash)) {
|
||||
if (ret != b_data(buf)) {
|
||||
/* TODO */
|
||||
ABORT_NOW();
|
||||
}
|
||||
@ -286,6 +302,7 @@ int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms)
|
||||
}
|
||||
}
|
||||
|
||||
out:
|
||||
if (conn->flags & CO_FL_ERROR) {
|
||||
/* TODO */
|
||||
//ABORT_NOW();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user