diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 47a3947b1..ef45d664c 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -55,6 +55,7 @@ struct qcc { } rx; struct { uint64_t max_data; /* Maximum number of bytes which may be sent */ + struct list frms; /* list of frames ready to be sent */ } tx; struct eb_root streams_by_id; /* all active streams by their ID */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 7c52ac013..24ba82933 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -9,6 +9,7 @@ #include #include #include +#include DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc)); DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs)); @@ -330,11 +331,11 @@ static void qc_release(struct qcc *qcc) } } -static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset) +static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset, + struct list *frm_list) { struct quic_frame *frm; struct buffer *buf = &qcs->tx.xprt_buf; - struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP]; int total = 0, to_xfer; unsigned char *btail; @@ -370,7 +371,7 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint frm->stream.len = total; } - LIST_APPEND(&qel->pktns->tx.frms, &frm->list); + LIST_APPEND(frm_list, &frm->list); out: fprintf(stderr, "%s: total=%d fin=%d id=%llu offset=%lu\n", __func__, total, fin, (ull)qcs->by_id.key, offset); @@ -380,37 +381,61 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint return -1; } +/* Wrapper for send on transport layer. Send a list of frames for the + * connection . + * + * Returns 0 if all data sent with success else non-zero. + */ +static int qc_send_frames(struct qcc *qcc, struct list *frms) +{ + if (!LIST_ISEMPTY(frms)) + qc_send_app_pkts(qcc->conn->qc, frms); + + /* TODO Currently, the transport layer is not complete. It might not + * try to send all frames even if the Tx buffer is free. In this case + * it is necessary to retry immediately instead of subscribing. + */ + if (!LIST_ISEMPTY(frms)) { + fprintf(stderr, "%s: remaining frames to send\n", __func__); + qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx, + SUB_RETRY_SEND, &qcc->wait_event); + return 1; + } + + return 0; +} + static int qc_send(struct qcc *qcc) { struct eb64_node *node; - int xprt_wake = 0; int ret = 0; fprintf(stderr, "%s\n", __func__); - /* TODO simple loop through all streams and check if there is frames to - * send + /* loop through all streams, construct STREAM frames if data available. + * TODO optimize the loop to favor streams which are not too heavy. */ node = eb64_first(&qcc->streams_by_id); while (node) { struct qcs *qcs = container_of(node, struct qcs, by_id); struct buffer *buf = &qcs->tx.buf; + if (b_data(buf)) { char fin = qcs->flags & QC_SF_FIN_STREAM; - ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset); + ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset, + &qcc->tx.frms); BUG_ON(ret < 0); /* TODO handle this properly */ if (ret > 0) { qcs_notify_send(qcs); if (qcs->flags & QC_SF_BLK_MROOM) qcs->flags &= ~QC_SF_BLK_MROOM; - - xprt_wake = 1; } fprintf(stderr, "%s ret=%d\n", __func__, ret); qcs->tx.offset += ret; + /* Subscribe if not all data can be send. */ if (b_data(buf)) { qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx, SUB_RETRY_SEND, &qcc->wait_event); @@ -419,8 +444,8 @@ static int qc_send(struct qcc *qcc) node = eb64_next(node); } - if (xprt_wake) - tasklet_wakeup(((struct ssl_sock_ctx *)(qcc->conn->xprt_ctx))->wait_event.tasklet); + qc_send_frames(qcc, &qcc->tx.frms); + /* TODO adjust ret if not all frames are sent. */ return ret; } @@ -531,6 +556,7 @@ static int qc_init(struct connection *conn, struct proxy *prx, qcc->rx.max_data = lparams->initial_max_data; qcc->tx.max_data = 0; + LIST_INIT(&qcc->tx.frms); /* Client initiated streams must respect the server flow control. */ qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;