diff --git a/include/haproxy/mux_quic_priv.h b/include/haproxy/mux_quic_priv.h new file mode 100644 index 000000000..25a4cbadd --- /dev/null +++ b/include/haproxy/mux_quic_priv.h @@ -0,0 +1,14 @@ +#ifndef _HAPROXY_MUX_QUIC_PRIV_H +#define _HAPROXY_MUX_QUIC_PRIV_H + +/* This header file should only be used by QUIC-MUX layer internally. */ + +#include + +void qcs_idle_open(struct qcs *qcs); +void qcs_close_local(struct qcs *qcs); +int qcs_is_completed(struct qcs *qcs); + +uint64_t qcs_prep_bytes(const struct qcs *qcs); + +#endif /* _HAPROXY_MUX_QUIC_PRIV_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 806d2e420..1d0b3f4b9 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -1,4 +1,5 @@ #include +#include #include @@ -409,7 +410,7 @@ static void qcc_refresh_timeout(struct qcc *qcc) /* Mark a stream as open if it was idle. This can be used on every * successful emission/reception operation to update the stream state. */ -static void qcs_idle_open(struct qcs *qcs) +void qcs_idle_open(struct qcs *qcs) { /* This operation must not be used if the stream is already closed. */ BUG_ON_HOT(qcs->st == QC_SS_CLO); @@ -421,7 +422,7 @@ static void qcs_idle_open(struct qcs *qcs) } /* Close the local channel of instance. */ -static void qcs_close_local(struct qcs *qcs) +void qcs_close_local(struct qcs *qcs) { TRACE_STATE("closing stream locally", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); @@ -445,7 +446,7 @@ static void qcs_close_local(struct qcs *qcs) } /* Returns true if can be purged. */ -static int qcs_is_completed(struct qcs *qcs) +int qcs_is_completed(struct qcs *qcs) { /* A stream is completed if fully closed and stconn released, or simply * detached and everything already sent. @@ -594,7 +595,7 @@ struct buffer *qcs_tx_buf(struct qcs *qcs) } /* Returns total number of bytes not already sent to quic-conn layer. */ -static uint64_t qcs_prep_bytes(const struct qcs *qcs) +uint64_t qcs_prep_bytes(const struct qcs *qcs) { const struct buffer *out = qcs_tx_buf_const(qcs); uint64_t diff, base_off; diff --git a/src/mux_quic_qstrm.c b/src/mux_quic_qstrm.c index 12f54c396..59a0f6949 100644 --- a/src/mux_quic_qstrm.c +++ b/src/mux_quic_qstrm.c @@ -5,7 +5,10 @@ #include #include #include +#include +#include #include +#include #include #include @@ -148,6 +151,69 @@ int qcc_qstrm_recv(struct qcc *qcc) return -1; } +/* Updates a stream after a successful emission of data of length . */ +static void qstrm_ctrl_send(struct qcs *qcs, uint64_t data) +{ + struct qcc *qcc = qcs->qcc; + struct quic_fctl *fc_conn = &qcc->tx.fc; + struct quic_fctl *fc_strm = &qcs->tx.fc; + + TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); + + qcs_idle_open(qcs); + + /* Ensure real offset never exceeds soft value. */ + BUG_ON(fc_conn->off_real + data > fc_conn->off_soft); + BUG_ON(fc_strm->off_real + data > fc_strm->off_soft); + + /* increase offset on connection */ + if (qfctl_rinc(fc_conn, data)) { + TRACE_STATE("connection flow-control reached", + QMUX_EV_QCS_SEND, qcc->conn); + } + + /* increase offset on stream */ + if (qfctl_rinc(fc_strm, data)) { + TRACE_STATE("stream flow-control reached", + QMUX_EV_QCS_SEND, qcc->conn, qcs); + } + + b_del(&qcs->tx.qstrm_buf, data); + /* Release buffer if everything sent and stream is waiting for room. */ + if (!qcs_prep_bytes(qcs) && (qcs->flags & QC_SF_BLK_MROOM)) { + qcs->flags &= ~QC_SF_BLK_MROOM; + qcs_notify_send(qcs); + } + + /* Add measurement for send rate. This is done at the MUX layer + * to account only for STREAM frames without retransmission. + */ + increment_send_rate(data, 0); + + if (!qcs_prep_bytes(qcs)) { + /* Remove stream from send_list if all was sent. */ + LIST_DEL_INIT(&qcs->el_send); + TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs); + + if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) { + /* Close stream locally. */ + qcs_close_local(qcs); + + if (qcs->flags & QC_SF_FIN_STREAM) { + /* Reset flag to not emit multiple FIN STREAM frames. */ + qcs->flags &= ~QC_SF_FIN_STREAM; + } + + if (qcs_is_completed(qcs)) { + TRACE_STATE("add stream in purg_list", QMUX_EV_QCS_SEND, qcc->conn, qcs); + LIST_APPEND(&qcc->purg_list, &qcs->el_send); + } + } + } + + TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); +} + /* Sends list of frames for connection. * * Returns 0 if all data are emitted or a positive value if sending should be @@ -211,7 +277,7 @@ int qcc_qstrm_send_frames(struct qcc *qcc, struct list *frms) } if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) - /* TODO notify MUX */ + qstrm_ctrl_send(frm->stream.stream, frm->stream.len); LIST_DEL_INIT(&frm->list); if (split_frm) {