From 2c71fe58f03a5d6e8c40be972569fb2c7dac0ade Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Wed, 9 Feb 2022 18:16:49 +0100 Subject: [PATCH] MEDIUM: mux-quic: use direct send transport API for STREAMs Modify the STREAM emission in qc_send. Use the new transport function qc_send_app_pkts to directly send the list of constructed frames. This allows to remove the tasklet wakeup on the quic_conn and should reduce the latency. If not all frames are send after the transport call, subscribe the MUX on the lower layer to be able to retry. Currently there is a bug because the transport layer does not retry to send frames in excess after a successful sendto. This might cause the transfer to be interrupted. --- include/haproxy/mux_quic-t.h | 1 + src/mux_quic.c | 48 +++++++++++++++++++++++++++--------- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 47a3947b1..ef45d664c 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -55,6 +55,7 @@ struct qcc { } rx; struct { uint64_t max_data; /* Maximum number of bytes which may be sent */ + struct list frms; /* list of frames ready to be sent */ } tx; struct eb_root streams_by_id; /* all active streams by their ID */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 7c52ac013..24ba82933 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -9,6 +9,7 @@ #include #include #include +#include DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc)); DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs)); @@ -330,11 +331,11 @@ static void qc_release(struct qcc *qcc) } } -static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset) +static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset, + struct list *frm_list) { struct quic_frame *frm; struct buffer *buf = &qcs->tx.xprt_buf; - struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP]; int total = 0, to_xfer; unsigned char *btail; @@ -370,7 +371,7 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint frm->stream.len = total; } - LIST_APPEND(&qel->pktns->tx.frms, &frm->list); + LIST_APPEND(frm_list, &frm->list); out: fprintf(stderr, "%s: total=%d fin=%d id=%llu offset=%lu\n", __func__, total, fin, (ull)qcs->by_id.key, offset); @@ -380,37 +381,61 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint return -1; } +/* Wrapper for send on transport layer. Send a list of frames for the + * connection . + * + * Returns 0 if all data sent with success else non-zero. + */ +static int qc_send_frames(struct qcc *qcc, struct list *frms) +{ + if (!LIST_ISEMPTY(frms)) + qc_send_app_pkts(qcc->conn->qc, frms); + + /* TODO Currently, the transport layer is not complete. It might not + * try to send all frames even if the Tx buffer is free. In this case + * it is necessary to retry immediately instead of subscribing. + */ + if (!LIST_ISEMPTY(frms)) { + fprintf(stderr, "%s: remaining frames to send\n", __func__); + qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx, + SUB_RETRY_SEND, &qcc->wait_event); + return 1; + } + + return 0; +} + static int qc_send(struct qcc *qcc) { struct eb64_node *node; - int xprt_wake = 0; int ret = 0; fprintf(stderr, "%s\n", __func__); - /* TODO simple loop through all streams and check if there is frames to - * send + /* loop through all streams, construct STREAM frames if data available. + * TODO optimize the loop to favor streams which are not too heavy. */ node = eb64_first(&qcc->streams_by_id); while (node) { struct qcs *qcs = container_of(node, struct qcs, by_id); struct buffer *buf = &qcs->tx.buf; + if (b_data(buf)) { char fin = qcs->flags & QC_SF_FIN_STREAM; - ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset); + ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset, + &qcc->tx.frms); BUG_ON(ret < 0); /* TODO handle this properly */ if (ret > 0) { qcs_notify_send(qcs); if (qcs->flags & QC_SF_BLK_MROOM) qcs->flags &= ~QC_SF_BLK_MROOM; - - xprt_wake = 1; } fprintf(stderr, "%s ret=%d\n", __func__, ret); qcs->tx.offset += ret; + /* Subscribe if not all data can be send. */ if (b_data(buf)) { qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx, SUB_RETRY_SEND, &qcc->wait_event); @@ -419,8 +444,8 @@ static int qc_send(struct qcc *qcc) node = eb64_next(node); } - if (xprt_wake) - tasklet_wakeup(((struct ssl_sock_ctx *)(qcc->conn->xprt_ctx))->wait_event.tasklet); + qc_send_frames(qcc, &qcc->tx.frms); + /* TODO adjust ret if not all frames are sent. */ return ret; } @@ -531,6 +556,7 @@ static int qc_init(struct connection *conn, struct proxy *prx, qcc->rx.max_data = lparams->initial_max_data; qcc->tx.max_data = 0; + LIST_INIT(&qcc->tx.frms); /* Client initiated streams must respect the server flow control. */ qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;