From 6ad99af0a9e857c6cd47a81866a10de83bfa0dea Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Wed, 25 Sep 2024 17:55:10 +0200 Subject: [PATCH] MINOR: quic: refactor MUX send notification For STREAM emission, MUX QUIC generates one or several frames and emit them via qc_send_mux(). Lower layer may use them as-is, or split them to lower chunk to fit in a QUIC packet. It is then responsible to notify the MUX to report the amount of data sent. Previously, this was done via a direct call from quic_conn to MUX using qcc_streams_sent_done(). Modify this to have a better isolation accross layers. Define a send callback handled by the qc_stream_desc instance. This allows the MUX to register each QCS instance individually to the renamved qmux_ctrl_send() which replaces qcc_streams_sent_done(). At quic_conn layer, qc_stream_desc_send() can be used now. This is a wrapper to qc_stream_desc layer to invoke the send callback if registered. This mechanism of qc_stream_desc callback should be extended later to implement other notifications accross the QUIC stack. --- include/haproxy/mux_quic.h | 1 - include/haproxy/quic_stream-t.h | 3 +- include/haproxy/quic_stream.h | 17 +++ src/mux_quic.c | 192 ++++++++++++++++---------------- src/quic_stream.c | 1 + src/quic_tx.c | 15 +-- 6 files changed, 126 insertions(+), 103 deletions(-) diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index fd6c09024..cc5eb1e96 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -40,7 +40,6 @@ int qcc_recv_max_data(struct qcc *qcc, uint64_t max); int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max); int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size); int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err); -void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset); /* Bit shift to get the stream sub ID for internal use which is obtained * shifting the stream IDs by this value, knowing that the diff --git a/include/haproxy/quic_stream-t.h b/include/haproxy/quic_stream-t.h index 1a5cf47df..8e0dcfd48 100644 --- a/include/haproxy/quic_stream-t.h +++ b/include/haproxy/quic_stream-t.h @@ -46,7 +46,8 @@ struct qc_stream_desc { int flags; /* QC_SD_FL_* values */ - void *ctx; /* MUX specific context */ + void (*notify_send)(struct qc_stream_desc *, uint64_t offset, uint64_t len); + void *ctx; /* notify context */ }; #endif /* USE_QUIC */ diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h index 11c487836..fcd843010 100644 --- a/include/haproxy/quic_stream.h +++ b/include/haproxy/quic_stream.h @@ -20,5 +20,22 @@ struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream, struct buffer *qc_stream_buf_realloc(struct qc_stream_desc *stream); void qc_stream_buf_release(struct qc_stream_desc *stream); +/* Reports emission of STREAM frame starting at and of length , + * related to data storage. + */ +static inline void qc_stream_desc_send(struct qc_stream_desc *stream, + uint64_t offset, uint64_t len) +{ + if (stream->notify_send) + stream->notify_send(stream, len, offset); +} + +/* Subscribe for send notification on . */ +static inline void qc_stream_desc_sub_send(struct qc_stream_desc *stream, + void (*cb)(struct qc_stream_desc *s, uint64_t offset, uint64_t len)) +{ + stream->notify_send = cb; +} + #endif /* USE_QUIC */ #endif /* _HAPROXY_QUIC_STREAM_H_ */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 630e98bbc..dc80c88fd 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -32,6 +32,8 @@ DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc)); DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs)); +static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset); + static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) { struct buffer buf; @@ -179,6 +181,8 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) TRACE_ERROR("qc_stream_desc alloc failure", QMUX_EV_QCS_NEW, qcc->conn, qcs); goto err; } + + qc_stream_desc_sub_send(qcs->stream, qmux_ctrl_send); } if (qcc->app_ops->attach && qcc->app_ops->attach(qcs, qcc->ctx)) { @@ -525,6 +529,100 @@ void qcs_notify_send(struct qcs *qcs) } } +/* Returns total number of bytes not already sent to quic-conn layer. */ +static uint64_t qcs_prep_bytes(const struct qcs *qcs) +{ + struct buffer *out = qc_stream_buf_get(qcs->stream); + uint64_t diff, base_off; + + if (!out) + return 0; + + /* if ack_offset < buf_offset, it points to an older buffer. */ + base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset); + diff = qcs->tx.fc.off_real - base_off; + return b_data(out) - diff; +} + +/* Used as a callback for qc_stream_desc layer to notify about emission of a + * STREAM frame of length starting at . + */ +static void qmux_ctrl_send(struct qc_stream_desc *stream, uint64_t data, uint64_t offset) +{ + struct qcs *qcs = stream->ctx; + struct qcc *qcc = qcs->qcc; + uint64_t diff; + + TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); + + /* Real off MUST always be the greatest offset sent. */ + BUG_ON(offset > qcs->tx.fc.off_real); + + /* check if the STREAM frame has already been notified. It can happen + * for retransmission. + */ + if (offset + data < qcs->tx.fc.off_real) { + TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs); + goto out; + } + + qcs_idle_open(qcs); + + diff = offset + data - qcs->tx.fc.off_real; + if (diff) { + struct quic_fctl *fc_conn = &qcc->tx.fc; + struct quic_fctl *fc_strm = &qcs->tx.fc; + + /* Ensure real offset never exceeds soft value. */ + BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft); + BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft); + + /* increase offset sum on connection */ + if (qfctl_rinc(fc_conn, diff)) { + TRACE_STATE("connection flow-control reached", + QMUX_EV_QCS_SEND, qcc->conn); + } + + /* increase offset on stream */ + if (qfctl_rinc(fc_strm, diff)) { + TRACE_STATE("stream flow-control reached", + QMUX_EV_QCS_SEND, qcc->conn, qcs); + } + /* Release buffer if everything sent and buf is full or stream is waiting for room. */ + if (!qcs_prep_bytes(qcs) && + (b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) { + qc_stream_buf_release(qcs->stream); + qcs->flags &= ~QC_SF_BLK_MROOM; + qcs_notify_send(qcs); + } + + /* Add measurement for send rate. This is done at the MUX layer + * to account only for STREAM frames without retransmission. + */ + increment_send_rate(diff, 0); + } + + if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) { + /* Remove stream from send_list if all was sent. */ + LIST_DEL_INIT(&qcs->el_send); + TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs); + + if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) { + /* Close stream locally. */ + qcs_close_local(qcs); + + if (qcs->flags & QC_SF_FIN_STREAM) { + qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN; + /* Reset flag to not emit multiple FIN STREAM frames. */ + qcs->flags &= ~QC_SF_FIN_STREAM; + } + } + } + + out: + TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); +} + /* Returns true if buffer window does not have room for a new buffer. */ static inline int qcc_bufwnd_full(const struct qcc *qcc) { @@ -1123,21 +1221,6 @@ struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs) return out && b_size(out) ? out : NULL; } -/* Returns total number of bytes not already sent to quic-conn layer. */ -static uint64_t qcs_prep_bytes(const struct qcs *qcs) -{ - struct buffer *out = qc_stream_buf_get(qcs->stream); - uint64_t diff, base_off; - - if (!out) - return 0; - - /* if ack_offset < buf_offset, it points to an older buffer. */ - base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset); - diff = qcs->tx.fc.off_real - base_off; - return b_data(out) - diff; -} - /* Try to realign buffer for stream. This is done only if there is * no data waiting for ACK. * @@ -1942,85 +2025,6 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, return -1; } -/* This function must be called by the upper layer to inform about the sending - * of a STREAM frame for instance. The frame is of length and on - * . - */ -void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) -{ - struct qcc *qcc = qcs->qcc; - uint64_t diff; - - TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); - - /* Real off MUST always be the greatest offset sent. */ - BUG_ON(offset > qcs->tx.fc.off_real); - - /* check if the STREAM frame has already been notified. It can happen - * for retransmission. - */ - if (offset + data < qcs->tx.fc.off_real) { - TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs); - goto out; - } - - qcs_idle_open(qcs); - - diff = offset + data - qcs->tx.fc.off_real; - if (diff) { - struct quic_fctl *fc_conn = &qcc->tx.fc; - struct quic_fctl *fc_strm = &qcs->tx.fc; - - /* Ensure real offset never exceeds soft value. */ - BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft); - BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft); - - /* increase offset sum on connection */ - if (qfctl_rinc(fc_conn, diff)) { - TRACE_STATE("connection flow-control reached", - QMUX_EV_QCS_SEND, qcc->conn); - } - - /* increase offset on stream */ - if (qfctl_rinc(fc_strm, diff)) { - TRACE_STATE("stream flow-control reached", - QMUX_EV_QCS_SEND, qcc->conn, qcs); - } - /* Release buffer if everything sent and buf is full or stream is waiting for room. */ - if (!qcs_prep_bytes(qcs) && - (b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) { - qc_stream_buf_release(qcs->stream); - qcs->flags &= ~QC_SF_BLK_MROOM; - qcs_notify_send(qcs); - } - - /* Add measurement for send rate. This is done at the MUX layer - * to account only for STREAM frames without retransmission. - */ - increment_send_rate(diff, 0); - } - - if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) { - /* Remove stream from send_list if all was sent. */ - LIST_DEL_INIT(&qcs->el_send); - TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs); - - if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) { - /* Close stream locally. */ - qcs_close_local(qcs); - - if (qcs->flags & QC_SF_FIN_STREAM) { - qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN; - /* Reset flag to not emit multiple FIN STREAM frames. */ - qcs->flags &= ~QC_SF_FIN_STREAM; - } - } - } - - out: - TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); -} - /* Returns true if subscribe set, false otherwise. */ static int qcc_subscribe_send(struct qcc *qcc) { diff --git a/src/quic_stream.c b/src/quic_stream.c index 91f657d47..f6adc2e43 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -91,6 +91,7 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void stream->ack_offset = 0; stream->flags = 0; stream->ctx = ctx; + stream->notify_send = NULL; return stream; } diff --git a/src/quic_tx.c b/src/quic_tx.c index 8ad3d515e..1420e103b 100644 --- a/src/quic_tx.c +++ b/src/quic_tx.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -1664,9 +1665,9 @@ static int qc_build_frms(struct list *outlist, struct list *inlist, /* Do not notify MUX on retransmission. */ if (qc->flags & QUIC_FL_CONN_TX_MUX_CONTEXT) { - qcc_streams_sent_done(cf->stream.stream->ctx, - cf->stream.len, - cf->stream.offset.key); + qc_stream_desc_send(cf->stream.stream, + cf->stream.offset.key, + cf->stream.len); } } else { @@ -1711,14 +1712,14 @@ static int qc_build_frms(struct list *outlist, struct list *inlist, /* Do not notify MUX on retransmission. */ if (qc->flags & QUIC_FL_CONN_TX_MUX_CONTEXT) { - qcc_streams_sent_done(new_cf->stream.stream->ctx, - new_cf->stream.len, - new_cf->stream.offset.key); + qc_stream_desc_send(new_cf->stream.stream, + new_cf->stream.offset.key, + new_cf->stream.len); } } /* TODO the MUX is notified about the frame sending via - * previous qcc_streams_sent_done call. However, the + * previous qc_stream_desc_send call. However, the * sending can fail later, for example if the sendto * system call returns an error. As the MUX has been * notified, the transport layer is responsible to