MINOR: mux-quic: implement immediate send retry

Complete qc_send function. After having processed each qcs emission, it
will now retry send on qcs where transfer can continue. This is useful
when qc_stream_desc buffer is full and there is still data present in
qcs buf.

To implement this, each eligible qcs is inserted in a new list
<qcc.send_retry_list>. This is done on send notification from the
transport layer through qcc_streams_sent_done(). Retry emission until
send_retry_list is empty or the transport layer cannot proceed more
data.

Several send operations are now called on two different places. Thus a
new _qc_send_qcs() function is defined to factorize the code.

This change should maximize the throughput during QUIC transfers.
This commit is contained in:
Amaury Denoyelle 2022-04-15 17:32:04 +02:00
parent d2f80a2e63
commit 1b2dba531d
4 changed files with 102 additions and 64 deletions

View File

@ -10,6 +10,7 @@
#include <haproxy/buf-t.h>
#include <haproxy/connection-t.h>
#include <haproxy/list-t.h>
#include <haproxy/quic_stream-t.h>
#include <haproxy/xprt_quic-t.h>
#include <haproxy/conn_stream-t.h>
@ -70,6 +71,8 @@ struct qcc {
struct eb_root streams_by_id; /* all active streams by their ID */
struct list send_retry_list; /* list of qcs eligible to send retry */
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
struct wait_event *subs;
@ -111,6 +114,8 @@ struct qcs {
uint64_t id;
struct qc_stream_desc *stream;
struct list el; /* element of qcc.send_retry_list */
struct wait_event wait_event;
struct wait_event *subs;
};

View File

@ -14,7 +14,6 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len
void qc_stream_desc_free(struct qc_stream_desc *stream);
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
int qc_stream_buf_avail(struct quic_conn *qc);
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
uint64_t offset);
void qc_stream_buf_release(struct qc_stream_desc *stream);

View File

@ -7,6 +7,7 @@
#include <haproxy/conn_stream.h>
#include <haproxy/dynbuf.h>
#include <haproxy/htx.h>
#include <haproxy/list.h>
#include <haproxy/pool.h>
#include <haproxy/quic_stream.h>
#include <haproxy/sink.h>
@ -722,14 +723,9 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
qc_stream_buf_release(qcs->stream);
/* reschedule send if buffers available */
if (qc_stream_buf_avail(qcc->conn->handle.qc)) {
tasklet_wakeup(qcc->wait_event.tasklet);
}
else {
qcc->flags |= QC_CF_CONN_FULL;
}
/* prepare qcs for immediate send retry if data to send */
if (b_data(&qcs->tx.buf))
LIST_APPEND(&qcc->send_retry_list, &qcs->el);
}
}
@ -760,9 +756,11 @@ static int qc_send_frames(struct qcc *qcc, struct list *frms)
if (LIST_ISEMPTY(frms)) {
TRACE_DEVEL("leaving with no frames to send", QMUX_EV_QCC_SEND, qcc->conn);
return 0;
return 1;
}
LIST_INIT(&qcc->send_retry_list);
retry_send:
first_frm = LIST_ELEM(frms->n, struct quic_frame *, list);
if ((first_frm->type & QUIC_FT_STREAM_8) == QUIC_FT_STREAM_8) {
@ -837,6 +835,65 @@ static int qc_send_max_streams(struct qcc *qcc)
return 0;
}
/* Used internally by qc_send function. Proceed to send for <qcs>. This will
* transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
* is then generated and inserted in <frms> list. <qcc_max_data> is the current
* flow-control max-data at the connection level which must not be surpassed.
*
* Returns the total bytes transferred between qcs and quic_stream buffers. Can
* be null if out buffer cannot be allocated.
*/
static int _qc_send_qcs(struct qcs *qcs, struct list *frms,
uint64_t qcc_max_data)
{
struct qcc *qcc = qcs->qcc;
struct buffer *buf = &qcs->tx.buf;
struct buffer *out = qc_stream_buf_get(qcs->stream);
int xfer = 0;
/* Allocate <out> buffer if necessary. */
if (!out) {
if (qcc->flags & QC_CF_CONN_FULL)
return 0;
out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
if (!out) {
qcc->flags |= QC_CF_CONN_FULL;
return 0;
}
}
/* Transfer data from <buf> to <out>. */
if (b_data(buf)) {
xfer = qcs_xfer_data(qcs, out, buf, qcc_max_data);
BUG_ON(xfer < 0); /* TODO handle this properly */
if (xfer > 0) {
qcs_notify_send(qcs);
qcs->flags &= ~QC_SF_BLK_MROOM;
}
qcs->tx.offset += xfer;
}
/* out buffer cannot be emptied if qcs offsets differ. */
BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset);
/* Build a new STREAM frame with <out> buffer. */
if (qcs->tx.sent_offset != qcs->tx.offset) {
int ret;
char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
/* FIN is set if all incoming data were transfered. */
fin = !!(fin && !b_data(buf));
ret = qcs_build_stream_frm(qcs, out, fin, frms);
BUG_ON(ret < 0); /* TODO handle this properly */
}
return xfer;
}
/* Proceed to sending. Loop through all available streams for the <qcc>
* instance and try to send as much as possible.
*
@ -846,7 +903,8 @@ static int qc_send(struct qcc *qcc)
{
struct list frms = LIST_HEAD_INIT(frms);
struct eb64_node *node;
int total = 0;
struct qcs *qcs, *qcs_tmp;
int total = 0, tmp_total = 0;
TRACE_ENTER(QMUX_EV_QCC_SEND);
@ -867,9 +925,8 @@ static int qc_send(struct qcc *qcc)
*/
node = eb64_first(&qcc->streams_by_id);
while (node) {
struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
struct buffer *buf = &qcs->tx.buf;
struct buffer *out = qc_stream_buf_get(qcs->stream);
int ret;
qcs = eb64_entry(node, struct qcs, by_id);
/* TODO
* for the moment, unidirectional streams have their own
@ -886,63 +943,38 @@ static int qc_send(struct qcc *qcc)
continue;
}
if (!b_data(buf) && !out) {
if (!b_data(&qcs->tx.buf) && !qc_stream_buf_get(qcs->stream)) {
node = eb64_next(node);
continue;
}
if (!out && (qcc->flags & QC_CF_CONN_FULL)) {
node = eb64_next(node);
continue;
}
if (!out) {
out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
if (!out) {
qcc->flags |= QC_CF_CONN_FULL;
node = eb64_next(node);
continue;
}
}
/* Prepare <out> buffer with data from <buf>. */
if (b_data(buf)) {
int ret = qcs_xfer_data(qcs, out, buf,
qcc->tx.sent_offsets + total);
BUG_ON(ret < 0); /* TODO handle this properly */
if (ret > 0) {
qcs_notify_send(qcs);
if (qcs->flags & QC_SF_BLK_MROOM)
qcs->flags &= ~QC_SF_BLK_MROOM;
}
qcs->tx.offset += ret;
total += ret;
}
/* Subscribe if not all data can be transfered. */
if (b_data(buf)) {
qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
SUB_RETRY_SEND, &qcc->wait_event);
}
/* Build a new STREAM frame with <out> buffer. */
if (b_data(out) && qcs->tx.sent_offset != qcs->tx.offset) {
int ret;
char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
/* FIN is set if all incoming data were transfered. */
fin = !!(fin && !b_data(buf));
ret = qcs_build_stream_frm(qcs, out, fin, &frms);
BUG_ON(ret < 0); /* TODO handle this properly */
}
ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + total);
total += ret;
node = eb64_next(node);
}
qc_send_frames(qcc, &frms);
if (qc_send_frames(qcc, &frms)) {
/* data rejected by transport layer, do not retry. */
goto out;
}
retry:
tmp_total = 0;
list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_retry_list, el) {
int ret;
BUG_ON(!b_data(&qcs->tx.buf));
BUG_ON(qc_stream_buf_get(qcs->stream));
ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + tmp_total);
tmp_total += ret;
LIST_DELETE(&qcs->el);
}
total += tmp_total;
if (!qc_send_frames(qcc, &frms) && !LIST_ISEMPTY(&qcc->send_retry_list))
goto retry;
out:
TRACE_LEAVE(QMUX_EV_QCC_SEND);
return total;
@ -1105,6 +1137,8 @@ static int qc_init(struct connection *conn, struct proxy *prx,
if (!qcc->wait_event.tasklet)
goto fail_no_tasklet;
LIST_INIT(&qcc->send_retry_list);
qcc->subs = NULL;
qcc->wait_event.tasklet->process = qc_io_cb;
qcc->wait_event.tasklet->context = qcc;

View File

@ -206,7 +206,7 @@ struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
/* Check if a new stream buffer can be allocated for the connection <qc>.
* Returns a boolean.
*/
int qc_stream_buf_avail(struct quic_conn *qc)
static int qc_stream_buf_avail(struct quic_conn *qc)
{
/* TODO use a global tune settings for max */
return qc->stream_buf_count < 30;