MEDIUM: mux-quic: use direct send transport API for STREAMs

Modify the STREAM emission in qc_send. Use the new transport function
qc_send_app_pkts to directly send the list of constructed frames. This
allows to remove the tasklet wakeup on the quic_conn and should reduce
the latency.

If not all frames are send after the transport call, subscribe the MUX
on the lower layer to be able to retry. Currently there is a bug because
the transport layer does not retry to send frames in excess after a
successful sendto. This might cause the transfer to be interrupted.
This commit is contained in:
Amaury Denoyelle 2022-02-09 18:16:49 +01:00
parent 414df7684a
commit 2c71fe58f0
2 changed files with 38 additions and 11 deletions

View File

@ -55,6 +55,7 @@ struct qcc {
} rx; } rx;
struct { struct {
uint64_t max_data; /* Maximum number of bytes which may be sent */ uint64_t max_data; /* Maximum number of bytes which may be sent */
struct list frms; /* list of frames ready to be sent */
} tx; } tx;
struct eb_root streams_by_id; /* all active streams by their ID */ struct eb_root streams_by_id; /* all active streams by their ID */

View File

@ -9,6 +9,7 @@
#include <haproxy/htx.h> #include <haproxy/htx.h>
#include <haproxy/pool.h> #include <haproxy/pool.h>
#include <haproxy/ssl_sock-t.h> #include <haproxy/ssl_sock-t.h>
#include <haproxy/xprt_quic.h>
DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc)); DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs)); DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
@ -330,11 +331,11 @@ static void qc_release(struct qcc *qcc)
} }
} }
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset) static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset,
struct list *frm_list)
{ {
struct quic_frame *frm; struct quic_frame *frm;
struct buffer *buf = &qcs->tx.xprt_buf; struct buffer *buf = &qcs->tx.xprt_buf;
struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
int total = 0, to_xfer; int total = 0, to_xfer;
unsigned char *btail; unsigned char *btail;
@ -370,7 +371,7 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint
frm->stream.len = total; frm->stream.len = total;
} }
LIST_APPEND(&qel->pktns->tx.frms, &frm->list); LIST_APPEND(frm_list, &frm->list);
out: out:
fprintf(stderr, "%s: total=%d fin=%d id=%llu offset=%lu\n", fprintf(stderr, "%s: total=%d fin=%d id=%llu offset=%lu\n",
__func__, total, fin, (ull)qcs->by_id.key, offset); __func__, total, fin, (ull)qcs->by_id.key, offset);
@ -380,37 +381,61 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint
return -1; return -1;
} }
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
* connection <qcc>.
*
* Returns 0 if all data sent with success else non-zero.
*/
static int qc_send_frames(struct qcc *qcc, struct list *frms)
{
if (!LIST_ISEMPTY(frms))
qc_send_app_pkts(qcc->conn->qc, frms);
/* TODO Currently, the transport layer is not complete. It might not
* try to send all frames even if the Tx buffer is free. In this case
* it is necessary to retry immediately instead of subscribing.
*/
if (!LIST_ISEMPTY(frms)) {
fprintf(stderr, "%s: remaining frames to send\n", __func__);
qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
SUB_RETRY_SEND, &qcc->wait_event);
return 1;
}
return 0;
}
static int qc_send(struct qcc *qcc) static int qc_send(struct qcc *qcc)
{ {
struct eb64_node *node; struct eb64_node *node;
int xprt_wake = 0;
int ret = 0; int ret = 0;
fprintf(stderr, "%s\n", __func__); fprintf(stderr, "%s\n", __func__);
/* TODO simple loop through all streams and check if there is frames to /* loop through all streams, construct STREAM frames if data available.
* send * TODO optimize the loop to favor streams which are not too heavy.
*/ */
node = eb64_first(&qcc->streams_by_id); node = eb64_first(&qcc->streams_by_id);
while (node) { while (node) {
struct qcs *qcs = container_of(node, struct qcs, by_id); struct qcs *qcs = container_of(node, struct qcs, by_id);
struct buffer *buf = &qcs->tx.buf; struct buffer *buf = &qcs->tx.buf;
if (b_data(buf)) { if (b_data(buf)) {
char fin = qcs->flags & QC_SF_FIN_STREAM; char fin = qcs->flags & QC_SF_FIN_STREAM;
ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset); ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset,
&qcc->tx.frms);
BUG_ON(ret < 0); /* TODO handle this properly */ BUG_ON(ret < 0); /* TODO handle this properly */
if (ret > 0) { if (ret > 0) {
qcs_notify_send(qcs); qcs_notify_send(qcs);
if (qcs->flags & QC_SF_BLK_MROOM) if (qcs->flags & QC_SF_BLK_MROOM)
qcs->flags &= ~QC_SF_BLK_MROOM; qcs->flags &= ~QC_SF_BLK_MROOM;
xprt_wake = 1;
} }
fprintf(stderr, "%s ret=%d\n", __func__, ret); fprintf(stderr, "%s ret=%d\n", __func__, ret);
qcs->tx.offset += ret; qcs->tx.offset += ret;
/* Subscribe if not all data can be send. */
if (b_data(buf)) { if (b_data(buf)) {
qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx, qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
SUB_RETRY_SEND, &qcc->wait_event); SUB_RETRY_SEND, &qcc->wait_event);
@ -419,8 +444,8 @@ static int qc_send(struct qcc *qcc)
node = eb64_next(node); node = eb64_next(node);
} }
if (xprt_wake) qc_send_frames(qcc, &qcc->tx.frms);
tasklet_wakeup(((struct ssl_sock_ctx *)(qcc->conn->xprt_ctx))->wait_event.tasklet); /* TODO adjust ret if not all frames are sent. */
return ret; return ret;
} }
@ -531,6 +556,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
qcc->rx.max_data = lparams->initial_max_data; qcc->rx.max_data = lparams->initial_max_data;
qcc->tx.max_data = 0; qcc->tx.max_data = 0;
LIST_INIT(&qcc->tx.frms);
/* Client initiated streams must respect the server flow control. */ /* Client initiated streams must respect the server flow control. */
qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi; qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;