MAJOR: mux-quic: remove intermediary Tx buffer

Previously, QUIC MUX sending was implemented with data transfered along
two different buffer instances per stream.

The first QCS buffer was used for HTX blocks conversion into H3 (or
other application protocol) during snd_buf stream callback. QCS instance
is then registered for sending via qcc_io_cb().

For each sending QCS, data memcpy is performed from the first to a
secondary buffer. A STREAM frame is produced for each QCS based on the
content of their secondary buffer.

This model is useful for QUIC MUX which has a major difference with
other muxes : data must be preserved longer, even after sent to the
lower layer. Data references is shared with quic-conn layer which
implements retransmission and data deletion on ACK reception.

This double buffering stages was the first model implemented and remains
active until today. One of its major drawbacks is that it requires
memcpy invocation for every data transferred between the two buffers.
Another important drawback is that the first buffer was is allocated by
each QCS individually without restriction. On the other hand, secondary
buffers are accounted for the connection. A bottleneck can appear if
secondary buffer pool is exhausted, causing unnecessary haproxy
buffering.

The purpose of this commit is to completely break this model. The first
buffer instance is removed. Now, application protocols will directly
allocate buffer from qc_stream_desc layer. This removes completely the
memcpy invocation.

This commit has a lot of code modifications. The most obvious one is the
removal of <qcs.tx.buf> field. Now, qcc_get_stream_txbuf() returns a
buffer instance from qc_stream_desc layer. qcs_xfer_data() which was
responsible for the memcpy between the two buffers is also completely
removed. Offset fields of QCS and QCC are now incremented directly by
qcc_send_stream(). These values are used as boundary with flow control
real offset to delimit the STREAM frames built.

As this change has a big impact on the code, this commit is only the
first part to fully support single buffer emission. For the moment, some
limitations are reintroduced and will be fixed in the next patches :

* on snd_buf if QCS sent buffer in used has room but not enough for the
  application protocol to store its content
* on snd_buf if QCS sent buffer is NULL and allocation cannot succeeds
  due to connection pool exhaustion

One final important aspect is that extra care is necessary now in
snd_buf callback. The same buffer instance is referenced by both the
stream and quic-conn layer. As such, some operation such as realign
cannot be done anymore freely.
This commit is contained in:
Amaury Denoyelle 2024-01-16 16:47:57 +01:00
parent 7dd6ed6321
commit 00a3e5f786
5 changed files with 96 additions and 195 deletions

View File

@ -32,7 +32,7 @@ enum qcs_type {
#define QC_CF_ERRL 0x00000001 /* fatal error detected locally, connection should be closed soon */
#define QC_CF_ERRL_DONE 0x00000002 /* local error properly handled, connection can be released */
/* unused 0x00000004 */
#define QC_CF_CONN_FULL 0x00000008 /* no stream buffers available on connection */
/* unused 0x00000008 */
#define QC_CF_APP_SHUT 0x00000010 /* Application layer shutdown done. */
#define QC_CF_ERR_CONN 0x00000020 /* fatal error reported by transport layer */
@ -106,7 +106,7 @@ struct qcc {
#define QC_SF_NONE 0x00000000
#define QC_SF_SIZE_KNOWN 0x00000001 /* last frame received for this stream */
#define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */
#define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */
/* unused 0x00000004 */
#define QC_SF_DETACH 0x00000008 /* sc is detached but there is remaining data to send */
/* unused 0x00000010 */
#define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */
@ -161,7 +161,6 @@ struct qcs {
struct quic_fctl fc; /* stream flow control applied on sending */
uint64_t offset; /* last offset of data ready to be sent */
struct buffer buf; /* transmit buffer before sending via xprt */
} tx;
struct eb64_node by_id;

View File

@ -1715,9 +1715,9 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
/* At least 9 bytes to store frame type + length as a varint max size */
if (b_room(res) < 9) {
/* TODO */
TRACE_STATE("not enough room for trailers frame", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
qcs->flags |= QC_SF_BLK_MROOM;
goto end;
ABORT_NOW();
}
/* Force buffer realignment as size required to encode headers is unknown. */
@ -1727,9 +1727,9 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
headers_buf = b_make(b_peek(res, b_data(res) + 9), b_contig_space(res) - 9, 0, 0);
if (qpack_encode_field_section_line(&headers_buf)) {
/* TODO */
TRACE_STATE("not enough room for trailers section line", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
qcs->flags |= QC_SF_BLK_MROOM;
goto end;
ABORT_NOW();
}
tail = b_tail(&headers_buf);
@ -1749,9 +1749,9 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
}
if (qpack_encode_header(&headers_buf, list[hdr].n, list[hdr].v)) {
/* TODO */
TRACE_STATE("not enough room for all trailers", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
qcs->flags |= QC_SF_BLK_MROOM;
goto end;
ABORT_NOW();
}
}
@ -1868,22 +1868,13 @@ static int h3_resp_data_send(struct qcs *qcs, struct htx *htx,
if (fsize > count)
fsize = count;
while (1) {
b_reset(&outbuf);
outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0);
if (b_size(&outbuf) > hsize || !b_space_wraps(res))
break;
b_slow_realign(res, trash.area, b_data(res));
}
/* TODO buffer can be realign only if no data waiting for ACK. */
outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0);
/* Not enough room for headers and at least one data byte, block the
* stream. It is expected that the stream connector layer will subscribe
* on SEND.
*/
if (b_size(&outbuf) <= hsize) {
/* TODO */
TRACE_STATE("not enough room for data frame", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs);
qcs->flags |= QC_SF_BLK_MROOM;
goto end;
ABORT_NOW();
}
if (b_size(&outbuf) < hsize + fsize)
@ -1934,8 +1925,7 @@ static size_t h3_snd_buf(struct qcs *qcs, struct buffer *buf, size_t count)
htx = htx_from_buf(buf);
while (count && !htx_is_empty(htx) &&
!(qcs->flags & QC_SF_BLK_MROOM) && !h3c->err) {
while (count && !htx_is_empty(htx) && !h3c->err) {
idx = htx_get_head(htx);
blk = htx_get_blk(htx, idx);
@ -2045,18 +2035,14 @@ static size_t h3_nego_ff(struct qcs *qcs, size_t count)
/* h3 DATA headers : 1-byte frame type + varint frame length */
hsize = 1 + QUIC_VARINT_MAX_SIZE;
while (1) {
if (b_contig_space(res) >= hsize || !b_space_wraps(res))
break;
b_slow_realign(res, trash.area, b_data(res));
}
/* TODO buffer can be realign only if no data waiting for ACK. */
/* Not enough room for headers and at least one data byte, block the
* stream. It is expected that the stream connector layer will subscribe
* on SEND.
*/
if (b_contig_space(res) <= hsize) {
qcs->flags |= QC_SF_BLK_MROOM;
/* TODO */
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
}

View File

@ -89,19 +89,16 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
size_t count)
{
enum htx_blk_type btype;
struct htx *htx;
struct htx *htx = NULL;
struct htx_blk *blk;
int32_t idx;
uint32_t bsize, fsize;
struct buffer *res, outbuf;
struct buffer *res = NULL;
size_t total = 0;
res = qcc_get_stream_txbuf(qcs);
outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0);
htx = htx_from_buf(buf);
while (count && !htx_is_empty(htx) && !(qcs->flags & QC_SF_BLK_MROOM)) {
while (count && !htx_is_empty(htx)) {
/* Not implemented : QUIC on backend side */
idx = htx_get_head(htx);
blk = htx_get_blk(htx, idx);
@ -112,6 +109,12 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
switch (btype) {
case HTX_BLK_DATA:
res = qcc_get_stream_txbuf(qcs);
if (!res) {
/* TODO */
ABORT_NOW();
}
if (unlikely(fsize == count &&
!b_data(res) &&
htx_nbblks(htx) == 1 && btype == HTX_BLK_DATA)) {
@ -137,15 +140,15 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
if (fsize > count)
fsize = count;
if (b_room(&outbuf) < fsize)
fsize = b_room(&outbuf);
if (b_contig_space(res) < fsize)
fsize = b_contig_space(res);
if (!fsize) {
qcs->flags |= QC_SF_BLK_MROOM;
goto end;
/* TODO */
ABORT_NOW();
}
b_putblk(&outbuf, htx_get_blk_ptr(htx, blk), fsize);
b_putblk(res, htx_get_blk_ptr(htx, blk), fsize);
total += fsize;
count -= fsize;
@ -168,7 +171,6 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
}
end:
b_add(res, b_data(&outbuf));
htx_to_buf(htx, buf);
return total;
@ -176,12 +178,20 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count)
{
struct buffer *res = qcc_get_stream_txbuf(qcs);
int ret = 0;
struct buffer *res;
if (!b_room(res)) {
qcs->flags |= QC_SF_BLK_MROOM;
res = qcc_get_stream_txbuf(qcs);
if (!res) {
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
/* TODO */
ABORT_NOW();
}
if (!b_room(res)) {
/* TODO */
ABORT_NOW();
}
/* No header required for HTTP/0.9, no need to reserve an offset. */
@ -189,8 +199,9 @@ static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count)
qcs->sd->iobuf.offset = 0;
qcs->sd->iobuf.data = 0;
ret = MIN(count, b_contig_space(res));
end:
return MIN(b_contig_space(res), count);
return ret;
}
static size_t hq_interop_done_ff(struct qcs *qcs)

View File

@ -72,9 +72,8 @@ static void qcs_free(struct qcs *qcs)
/* Release qc_stream_desc buffer from quic-conn layer. */
qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real);
/* Free Rx/Tx buffers. */
/* Free Rx buffer. */
qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
b_free(&qcs->tx.buf);
/* Remove qcs from qcc tree. */
eb64_delete(&qcs->by_id);
@ -138,7 +137,6 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
}
qcs->rx.msd_init = qcs->rx.msd;
qcs->tx.buf = BUF_NULL;
qcs->tx.offset = 0;
qcs->wait_event.tasklet = NULL;
@ -931,7 +929,24 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs)
*/
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs)
{
return b_alloc(&qcs->tx.buf);
struct qcc *qcc = qcs->qcc;
int buf_avail;
struct buffer *out = qc_stream_buf_get(qcs->stream);
if (!out) {
out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real,
&buf_avail);
if (!out)
goto out;
if (!b_alloc(out)) {
TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
goto out;
}
}
out:
return out;
}
/* Wakes up every streams of <qcc> which are currently waiting for sending but
@ -974,6 +989,7 @@ void qcc_reset_stream(struct qcs *qcs, int err)
const int soft_blocked = qfctl_sblocked(&qcc->tx.fc);
qcc->tx.fc.off_soft -= (qcs->tx.fc.off_soft - qcs->tx.fc.off_real);
if (soft_blocked && !qfctl_sblocked(&qcc->tx.fc))
qcc_notify_fctl(qcc);
}
@ -1014,6 +1030,9 @@ void qcc_send_stream(struct qcs *qcs, int urg, int count)
if (count) {
qfctl_sinc(&qcc->tx.fc, count);
qfctl_sinc(&qcs->tx.fc, count);
qcs->tx.offset += count;
qcs->qcc->tx.offsets += count;
}
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
@ -1552,64 +1571,6 @@ static void qcs_destroy(struct qcs *qcs)
TRACE_LEAVE(QMUX_EV_QCS_END, conn);
}
/* Transfer as much as possible data on <qcs> from <in> to <out>.
*
* Returns the total bytes of transferred data or a negative error code.
*/
static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in)
{
struct qcc *qcc = qcs->qcc;
int to_xfer;
int total = 0;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
if (!b_alloc(out)) {
TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
goto err;
}
/*
* QCS out buffer diagram
* head left to_xfer
* -------------> ----------> ----->
* --------------------------------------------------
* |...............|xxxxxxxxxxx|<<<<<
* --------------------------------------------------
* ^ ack-off ^ sent-off ^ off
*
* STREAM frame
* ^ ^
* |xxxxxxxxxxxxxxxxx|
*/
BUG_ON_HOT(qcs->tx.fc.off_real < qcs->stream->ack_offset);
BUG_ON_HOT(qcs->tx.offset < qcs->tx.fc.off_real);
BUG_ON_HOT(qcc->tx.offsets < qcc->tx.fc.off_real);
to_xfer = QUIC_MIN(b_data(in), b_room(out));
if (!to_xfer)
goto out;
total = b_force_xfer(out, in, to_xfer);
out:
{
struct qcs_xfer_data_trace_arg arg = {
.prep = b_data(out), .xfer = total,
};
TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_XFER_DATA,
qcc->conn, qcs, &arg);
}
return total;
err:
TRACE_DEVEL("leaving on error", QMUX_EV_QCS_SEND, qcc->conn, qcs);
return -1;
}
/* Prepare a STREAM frame for <qcs> instance using <out> as payload. The frame
* is appended in <frm_list>. Set <fin> if this is supposed to be the last
* stream frame. If <out> is NULL an empty STREAM frame is built : this may be
@ -1720,21 +1681,11 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
return -1;
}
/* Check after transferring data from qcs.tx.buf if FIN must be set on the next
* STREAM frame for <qcs>.
*
* Returns true if FIN must be set else false.
*/
static int qcs_stream_fin(struct qcs *qcs)
{
return qcs->flags & QC_SF_FIN_STREAM && !b_data(&qcs->tx.buf);
}
/* Return true if <qcs> has data to send in new STREAM frames. */
static forceinline int qcs_need_sending(struct qcs *qcs)
{
return b_data(&qcs->tx.buf) || qcs->tx.fc.off_real < qcs->tx.offset ||
qcs_stream_fin(qcs);
return qcs->tx.fc.off_real < qcs->tx.offset ||
qcs->flags & QC_SF_FIN_STREAM;
}
/* This function must be called by the upper layer to inform about the sending
@ -1795,7 +1746,7 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
increment_send_rate(diff, 0);
}
if (qcs->tx.offset == qcs->tx.fc.off_real && !b_data(&qcs->tx.buf)) {
if (qcs->tx.offset == qcs->tx.fc.off_real) {
/* Remove stream from send_list if all was sent. */
LIST_DEL_INIT(&qcs->el_send);
TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
@ -1953,82 +1904,42 @@ static int qcs_send_stop_sending(struct qcs *qcs)
return 0;
}
/* Used internally by qcc_io_send function. Proceed to send for <qcs>. This will
* transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
* is then generated and inserted in <frms> list. Frame length will be
* truncated if greater than <window_conn>. This allows to prepare several
* frames in a loop while respecting connection flow control window.
/* Used internally by qcc_io_send function. Proceed to send for <qcs>. A STREAM
* frame is generated poiting to QCS stream descriptor content and inserted in
* <frms> list. Frame length will be truncated if greater than <window_conn>.
* This allows to prepare several frames in a loop while respecting connection
* flow control window.
*
* Returns the payload length of the STREAM frame or a negative error code.
*/
static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn)
{
struct qcc *qcc = qcs->qcc;
struct buffer *buf = &qcs->tx.buf;
struct buffer *out = qc_stream_buf_get(qcs->stream);
int xfer = 0, flen = 0, buf_avail;
char fin = 0;
int flen = 0;
const char fin = qcs->flags & QC_SF_FIN_STREAM;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
/* Cannot send STREAM on remote unidirectional streams. */
BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id));
if (b_data(buf)) {
/* Allocate <out> buffer if not already done. */
if (!out) {
if (qcc->flags & QC_CF_CONN_FULL)
goto out;
/* This function must not be called if there is nothing to send. */
BUG_ON(!fin && !qcs_need_sending(qcs));
out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset,
&buf_avail);
if (!out) {
if (buf_avail) {
TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
goto err;
}
TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
qcc->flags |= QC_CF_CONN_FULL;
goto out;
}
}
/* Transfer data from <buf> to <out>. */
xfer = qcs_xfer_data(qcs, out, buf);
if (xfer < 0)
goto err;
if (xfer > 0) {
qcs_notify_send(qcs);
qcs->flags &= ~QC_SF_BLK_MROOM;
}
qcs->tx.offset += xfer;
qcc->tx.offsets += xfer;
/* If out buffer is empty, QCS offsets must be equal. */
BUG_ON(!b_data(out) && qcs->tx.fc.off_real != qcs->tx.offset);
/* Skip STREAM frame allocation if already subscribed for send.
* Happens on sendto transient error or network congestion.
*/
if (qcc->wait_event.events & SUB_RETRY_SEND) {
TRACE_DEVEL("already subscribed for sending",
QMUX_EV_QCS_SEND, qcc->conn, qcs);
goto err;
}
/* FIN is set if all incoming data were transferred. */
fin = qcs_stream_fin(qcs);
/* Build a new STREAM frame with <out> buffer. */
if (qcs->tx.fc.off_real != qcs->tx.offset || fin) {
/* Skip STREAM frame allocation if already subscribed for send.
* Happens on sendto transient error or network congestion.
*/
if (qcc->wait_event.events & SUB_RETRY_SEND) {
TRACE_DEVEL("already subscribed for sending",
QMUX_EV_QCS_SEND, qcc->conn, qcs);
goto err;
}
flen = qcs_build_stream_frm(qcs, out, fin, frms, window_conn);
if (flen < 0)
goto err;
}
flen = qcs_build_stream_frm(qcs, out, fin, frms, window_conn);
if (flen < 0)
goto err;
out:
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
@ -2852,7 +2763,8 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
size_t count, int flags)
{
struct qcs *qcs = __sc_mux_strm(sc);
const size_t old_data = b_data(&qcs->tx.buf);
struct buffer *out = qc_stream_buf_get(qcs->stream);
const size_t old_data = out ? b_data(out) : 0;
size_t ret = 0;
char fin;
@ -2895,7 +2807,9 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
}
if (ret || fin) {
qcc_send_stream(qcs, 0, b_data(&qcs->tx.buf) - old_data);
const size_t data = b_data(qc_stream_buf_get(qcs->stream)) - (old_data);
if (data || fin)
qcc_send_stream(qcs, 0, data);
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(qcs->qcc->wait_event.tasklet);
}
@ -3007,14 +2921,12 @@ static size_t qmux_strm_done_ff(struct stconn *sc)
data += sd->iobuf.offset;
total = qcs->qcc->app_ops->done_ff(qcs);
qcc_send_stream(qcs, 0, data);
if (data || qcs->flags & QC_SF_FIN_STREAM)
qcc_send_stream(qcs, 0, data);
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(qcc->wait_event.tasklet);
end:
if (!b_data(&qcs->tx.buf))
b_free(&qcs->tx.buf);
TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
return total;
}

View File

@ -37,10 +37,7 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
/* notify MUX about available buffers. */
--qc->stream_buf_count;
if (qc->mux_state == QC_MUX_READY) {
if (qc->qcc->flags & QC_CF_CONN_FULL) {
qc->qcc->flags &= ~QC_CF_CONN_FULL;
tasklet_wakeup(qc->qcc->wait_event.tasklet);
}
/* TODO notify MUX for available buffer. */
}
}
@ -202,11 +199,7 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
qc->stream_buf_count -= free_count;
if (qc->mux_state == QC_MUX_READY) {
/* notify MUX about available buffers. */
if (qc->qcc->flags & QC_CF_CONN_FULL) {
qc->qcc->flags &= ~QC_CF_CONN_FULL;
tasklet_wakeup(qc->qcc->wait_event.tasklet);
}
/* TODO notify MUX for available buffer. */
}
}