MINOR: quic: refactor MUX send notification

For STREAM emission, MUX QUIC generates one or several frames and emit
them via qc_send_mux(). Lower layer may use them as-is, or split them to
lower chunk to fit in a QUIC packet. It is then responsible to notify
the MUX to report the amount of data sent.

Previously, this was done via a direct call from quic_conn to MUX using
qcc_streams_sent_done(). Modify this to have a better isolation accross
layers. Define a send callback handled by the qc_stream_desc instance.
This allows the MUX to register each QCS instance individually to the
renamved qmux_ctrl_send() which replaces qcc_streams_sent_done().

At quic_conn layer, qc_stream_desc_send() can be used now. This is a
wrapper to qc_stream_desc layer to invoke the send callback if
registered.

This mechanism of qc_stream_desc callback should be extended later to
implement other notifications accross the QUIC stack.
This commit is contained in:
Amaury Denoyelle 2024-09-25 17:55:10 +02:00
parent 4859d8e71d
commit 6ad99af0a9
6 changed files with 126 additions and 103 deletions

View File

@ -40,7 +40,6 @@ int qcc_recv_max_data(struct qcc *qcc, uint64_t max);
int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max); int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max);
int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size); int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size);
int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err); int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err);
void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset);
/* Bit shift to get the stream sub ID for internal use which is obtained /* Bit shift to get the stream sub ID for internal use which is obtained
* shifting the stream IDs by this value, knowing that the * shifting the stream IDs by this value, knowing that the

View File

@ -46,7 +46,8 @@ struct qc_stream_desc {
int flags; /* QC_SD_FL_* values */ int flags; /* QC_SD_FL_* values */
void *ctx; /* MUX specific context */ void (*notify_send)(struct qc_stream_desc *, uint64_t offset, uint64_t len);
void *ctx; /* notify context */
}; };
#endif /* USE_QUIC */ #endif /* USE_QUIC */

View File

@ -20,5 +20,22 @@ struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
struct buffer *qc_stream_buf_realloc(struct qc_stream_desc *stream); struct buffer *qc_stream_buf_realloc(struct qc_stream_desc *stream);
void qc_stream_buf_release(struct qc_stream_desc *stream); void qc_stream_buf_release(struct qc_stream_desc *stream);
/* Reports emission of STREAM frame starting at <offset> and of length <len>,
* related to <stream> data storage.
*/
static inline void qc_stream_desc_send(struct qc_stream_desc *stream,
uint64_t offset, uint64_t len)
{
if (stream->notify_send)
stream->notify_send(stream, len, offset);
}
/* Subscribe for send notification on <stream>. */
static inline void qc_stream_desc_sub_send(struct qc_stream_desc *stream,
void (*cb)(struct qc_stream_desc *s, uint64_t offset, uint64_t len))
{
stream->notify_send = cb;
}
#endif /* USE_QUIC */ #endif /* USE_QUIC */
#endif /* _HAPROXY_QUIC_STREAM_H_ */ #endif /* _HAPROXY_QUIC_STREAM_H_ */

View File

@ -32,6 +32,8 @@
DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc)); DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs)); DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
{ {
struct buffer buf; struct buffer buf;
@ -179,6 +181,8 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
TRACE_ERROR("qc_stream_desc alloc failure", QMUX_EV_QCS_NEW, qcc->conn, qcs); TRACE_ERROR("qc_stream_desc alloc failure", QMUX_EV_QCS_NEW, qcc->conn, qcs);
goto err; goto err;
} }
qc_stream_desc_sub_send(qcs->stream, qmux_ctrl_send);
} }
if (qcc->app_ops->attach && qcc->app_ops->attach(qcs, qcc->ctx)) { if (qcc->app_ops->attach && qcc->app_ops->attach(qcs, qcc->ctx)) {
@ -525,6 +529,100 @@ void qcs_notify_send(struct qcs *qcs)
} }
} }
/* Returns total number of bytes not already sent to quic-conn layer. */
static uint64_t qcs_prep_bytes(const struct qcs *qcs)
{
struct buffer *out = qc_stream_buf_get(qcs->stream);
uint64_t diff, base_off;
if (!out)
return 0;
/* if ack_offset < buf_offset, it points to an older buffer. */
base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
diff = qcs->tx.fc.off_real - base_off;
return b_data(out) - diff;
}
/* Used as a callback for qc_stream_desc layer to notify about emission of a
* STREAM frame of <data> length starting at <offset>.
*/
static void qmux_ctrl_send(struct qc_stream_desc *stream, uint64_t data, uint64_t offset)
{
struct qcs *qcs = stream->ctx;
struct qcc *qcc = qcs->qcc;
uint64_t diff;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
/* Real off MUST always be the greatest offset sent. */
BUG_ON(offset > qcs->tx.fc.off_real);
/* check if the STREAM frame has already been notified. It can happen
* for retransmission.
*/
if (offset + data < qcs->tx.fc.off_real) {
TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs);
goto out;
}
qcs_idle_open(qcs);
diff = offset + data - qcs->tx.fc.off_real;
if (diff) {
struct quic_fctl *fc_conn = &qcc->tx.fc;
struct quic_fctl *fc_strm = &qcs->tx.fc;
/* Ensure real offset never exceeds soft value. */
BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft);
BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft);
/* increase offset sum on connection */
if (qfctl_rinc(fc_conn, diff)) {
TRACE_STATE("connection flow-control reached",
QMUX_EV_QCS_SEND, qcc->conn);
}
/* increase offset on stream */
if (qfctl_rinc(fc_strm, diff)) {
TRACE_STATE("stream flow-control reached",
QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
/* Release buffer if everything sent and buf is full or stream is waiting for room. */
if (!qcs_prep_bytes(qcs) &&
(b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) {
qc_stream_buf_release(qcs->stream);
qcs->flags &= ~QC_SF_BLK_MROOM;
qcs_notify_send(qcs);
}
/* Add measurement for send rate. This is done at the MUX layer
* to account only for STREAM frames without retransmission.
*/
increment_send_rate(diff, 0);
}
if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) {
/* Remove stream from send_list if all was sent. */
LIST_DEL_INIT(&qcs->el_send);
TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
/* Close stream locally. */
qcs_close_local(qcs);
if (qcs->flags & QC_SF_FIN_STREAM) {
qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN;
/* Reset flag to not emit multiple FIN STREAM frames. */
qcs->flags &= ~QC_SF_FIN_STREAM;
}
}
}
out:
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
/* Returns true if <qcc> buffer window does not have room for a new buffer. */ /* Returns true if <qcc> buffer window does not have room for a new buffer. */
static inline int qcc_bufwnd_full(const struct qcc *qcc) static inline int qcc_bufwnd_full(const struct qcc *qcc)
{ {
@ -1123,21 +1221,6 @@ struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs)
return out && b_size(out) ? out : NULL; return out && b_size(out) ? out : NULL;
} }
/* Returns total number of bytes not already sent to quic-conn layer. */
static uint64_t qcs_prep_bytes(const struct qcs *qcs)
{
struct buffer *out = qc_stream_buf_get(qcs->stream);
uint64_t diff, base_off;
if (!out)
return 0;
/* if ack_offset < buf_offset, it points to an older buffer. */
base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
diff = qcs->tx.fc.off_real - base_off;
return b_data(out) - diff;
}
/* Try to realign <out> buffer for <qcs> stream. This is done only if there is /* Try to realign <out> buffer for <qcs> stream. This is done only if there is
* no data waiting for ACK. * no data waiting for ACK.
* *
@ -1942,85 +2025,6 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
return -1; return -1;
} }
/* This function must be called by the upper layer to inform about the sending
* of a STREAM frame for <qcs> instance. The frame is of <data> length and on
* <offset>.
*/
void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
{
struct qcc *qcc = qcs->qcc;
uint64_t diff;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
/* Real off MUST always be the greatest offset sent. */
BUG_ON(offset > qcs->tx.fc.off_real);
/* check if the STREAM frame has already been notified. It can happen
* for retransmission.
*/
if (offset + data < qcs->tx.fc.off_real) {
TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs);
goto out;
}
qcs_idle_open(qcs);
diff = offset + data - qcs->tx.fc.off_real;
if (diff) {
struct quic_fctl *fc_conn = &qcc->tx.fc;
struct quic_fctl *fc_strm = &qcs->tx.fc;
/* Ensure real offset never exceeds soft value. */
BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft);
BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft);
/* increase offset sum on connection */
if (qfctl_rinc(fc_conn, diff)) {
TRACE_STATE("connection flow-control reached",
QMUX_EV_QCS_SEND, qcc->conn);
}
/* increase offset on stream */
if (qfctl_rinc(fc_strm, diff)) {
TRACE_STATE("stream flow-control reached",
QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
/* Release buffer if everything sent and buf is full or stream is waiting for room. */
if (!qcs_prep_bytes(qcs) &&
(b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) {
qc_stream_buf_release(qcs->stream);
qcs->flags &= ~QC_SF_BLK_MROOM;
qcs_notify_send(qcs);
}
/* Add measurement for send rate. This is done at the MUX layer
* to account only for STREAM frames without retransmission.
*/
increment_send_rate(diff, 0);
}
if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) {
/* Remove stream from send_list if all was sent. */
LIST_DEL_INIT(&qcs->el_send);
TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
/* Close stream locally. */
qcs_close_local(qcs);
if (qcs->flags & QC_SF_FIN_STREAM) {
qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN;
/* Reset flag to not emit multiple FIN STREAM frames. */
qcs->flags &= ~QC_SF_FIN_STREAM;
}
}
}
out:
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
/* Returns true if subscribe set, false otherwise. */ /* Returns true if subscribe set, false otherwise. */
static int qcc_subscribe_send(struct qcc *qcc) static int qcc_subscribe_send(struct qcc *qcc)
{ {

View File

@ -91,6 +91,7 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void
stream->ack_offset = 0; stream->ack_offset = 0;
stream->flags = 0; stream->flags = 0;
stream->ctx = ctx; stream->ctx = ctx;
stream->notify_send = NULL;
return stream; return stream;
} }

View File

@ -23,6 +23,7 @@
#include <haproxy/quic_retransmit.h> #include <haproxy/quic_retransmit.h>
#include <haproxy/quic_retry.h> #include <haproxy/quic_retry.h>
#include <haproxy/quic_sock.h> #include <haproxy/quic_sock.h>
#include <haproxy/quic_stream.h>
#include <haproxy/quic_tls.h> #include <haproxy/quic_tls.h>
#include <haproxy/quic_trace.h> #include <haproxy/quic_trace.h>
#include <haproxy/ssl_sock-t.h> #include <haproxy/ssl_sock-t.h>
@ -1664,9 +1665,9 @@ static int qc_build_frms(struct list *outlist, struct list *inlist,
/* Do not notify MUX on retransmission. */ /* Do not notify MUX on retransmission. */
if (qc->flags & QUIC_FL_CONN_TX_MUX_CONTEXT) { if (qc->flags & QUIC_FL_CONN_TX_MUX_CONTEXT) {
qcc_streams_sent_done(cf->stream.stream->ctx, qc_stream_desc_send(cf->stream.stream,
cf->stream.len, cf->stream.offset.key,
cf->stream.offset.key); cf->stream.len);
} }
} }
else { else {
@ -1711,14 +1712,14 @@ static int qc_build_frms(struct list *outlist, struct list *inlist,
/* Do not notify MUX on retransmission. */ /* Do not notify MUX on retransmission. */
if (qc->flags & QUIC_FL_CONN_TX_MUX_CONTEXT) { if (qc->flags & QUIC_FL_CONN_TX_MUX_CONTEXT) {
qcc_streams_sent_done(new_cf->stream.stream->ctx, qc_stream_desc_send(new_cf->stream.stream,
new_cf->stream.len, new_cf->stream.offset.key,
new_cf->stream.offset.key); new_cf->stream.len);
} }
} }
/* TODO the MUX is notified about the frame sending via /* TODO the MUX is notified about the frame sending via
* previous qcc_streams_sent_done call. However, the * previous qc_stream_desc_send call. However, the
* sending can fail later, for example if the sendto * sending can fail later, for example if the sendto
* system call returns an error. As the MUX has been * system call returns an error. As the MUX has been
* notified, the transport layer is responsible to * notified, the transport layer is responsible to