MINOR: mux-quic: refine API between QMUX/qc-stream-desc layers

This commit is contained in:
Amaury Denoyelle 2025-12-10 11:30:19 +01:00
parent 3417f9b90a
commit 2e3d06e45d
5 changed files with 46 additions and 26 deletions

View File

@ -31,6 +31,7 @@ int qcs_is_close_remote(struct qcs *qcs);
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
void qcs_notify_recv(struct qcs *qcs);
void qcs_notify_send(struct qcs *qcs);
void qcs_on_data_sent(struct qcs *qcs, uint64_t data, uint64_t offset);
void qcc_notify_buf(struct qcc *qcc, uint64_t free_size);
struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs);

View File

@ -49,8 +49,8 @@ struct qc_stream_desc {
int flags; /* QC_SD_FL_* values */
void (*notify_send)(struct qc_stream_desc *, uint64_t offset, uint64_t len);
void (*notify_room)(struct qc_stream_desc *, uint64_t room);
void (*notify_send)(void *ctx, uint64_t offset, uint64_t len);
void (*notify_room)(void *ctx, uint64_t room, int release);
void *ctx; /* notify context */
};

View File

@ -43,14 +43,14 @@ static inline void qc_stream_desc_send(struct qc_stream_desc *stream,
static inline void qc_stream_desc_sub_send(struct qc_stream_desc *stream,
void (*cb)(void *ctx, uint64_t offset, uint64_t len))
{
stream->notify_send = (void (*)(struct qc_stream_desc *, uint64_t offset, uint64_t len))cb;
stream->notify_send = cb;
}
/* Subscribe for room notification on <stream>. */
static inline void qc_stream_desc_sub_room(struct qc_stream_desc *stream,
void (*cb)(void *ctx, uint64_t offset))
void (*cb)(void *ctx, uint64_t offset, int release))
{
stream->notify_room = (void (*)(struct qc_stream_desc *, uint64_t offset))cb;
stream->notify_room = cb;
}
#endif /* USE_QUIC */

View File

@ -38,7 +38,7 @@ DECLARE_TYPED_POOL(pool_head_qcs, "qcs", struct qcs);
DECLARE_STATIC_TYPED_POOL(pool_head_qc_stream_rxbuf, "qc_stream_rxbuf", struct qc_stream_rxbuf);
static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset);
static void qmux_ctrl_room(void *ctx, uint64_t room);
static void qmux_ctrl_room(void *ctx, uint64_t room, int release);
int qmux_is_quic(const struct qcc *qcc)
{
@ -611,12 +611,8 @@ static uint64_t qcs_prep_bytes(const struct qcs *qcs)
}
}
/* Used as a callback for qc_stream_desc layer to notify about emission of a
* STREAM frame of <data> length starting at <offset>.
*/
static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset)
void qcs_on_data_sent(struct qcs *qcs, uint64_t data, uint64_t offset)
{
struct qcs *qcs = ((struct qc_stream_desc *)ctx)->ctx;
struct qcc *qcc = qcs->qcc;
uint64_t diff;
@ -660,12 +656,23 @@ static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset)
TRACE_STATE("stream flow-control reached",
QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
/* Release buffer if everything sent and buf is full or stream is waiting for room. */
if (!qcs_prep_bytes(qcs) &&
(b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) {
qc_stream_buf_release(qcs->stream);
qcs->flags &= ~QC_SF_BLK_MROOM;
qcs_notify_send(qcs);
if (qmux_is_quic(qcc)) {
/* Release buffer if everything sent and buf is full or stream is waiting for room. */
if (!qcs_prep_bytes(qcs) &&
(b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) {
qc_stream_buf_release(qcs->stream);
qcs->flags &= ~QC_SF_BLK_MROOM;
qcs_notify_send(qcs);
}
}
else {
b_del(&qcs->qos_buf, diff);
/* Release buffer if everything sent and buf is full or stream is waiting for room. */
if (!qcs_prep_bytes(qcs) && qcs->flags & QC_SF_BLK_MROOM) {
qcs->flags &= ~QC_SF_BLK_MROOM;
qcs_notify_send(qcs);
}
}
/* Add measurement for send rate. This is done at the MUX layer
@ -674,7 +681,8 @@ static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset)
increment_send_rate(diff, 0);
}
if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) {
if ((qmux_is_quic(qcc) && !qc_stream_buf_get(qcs->stream)) ||
!qcs_prep_bytes(qcs)) {
/* Remove stream from send_list if all was sent. */
LIST_DEL_INIT(&qcs->el_send);
TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
@ -684,13 +692,15 @@ static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset)
qcs_close_local(qcs);
if (qcs->flags & QC_SF_FIN_STREAM) {
qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN;
if (qmux_is_quic(qcc))
qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN;
/* Reset flag to not emit multiple FIN STREAM frames. */
qcs->flags &= ~QC_SF_FIN_STREAM;
}
/* Unsubscribe from streamdesc when everything sent. */
qc_stream_desc_sub_send(qcs->stream, NULL);
if (qmux_is_quic(qcc))
qc_stream_desc_sub_send(qcs->stream, NULL);
if (qcs_is_completed(qcs)) {
TRACE_STATE("add stream in purg_list", QMUX_EV_QCS_SEND, qcc->conn, qcs);
@ -703,6 +713,15 @@ static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset)
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
/* Callback for notification about emission of a STREAM frame of <data> length
* starting at <offset>.
*/
static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset)
{
struct qcs *qcs = ((struct qc_stream_desc *)ctx)->ctx;
qcs_on_data_sent(qcs, data, offset);
}
/* Returns true if <qcc> buffer window does not have room for a new buffer. */
static inline int qcc_bufwnd_full(const struct qcc *qcc)
{
@ -715,11 +734,11 @@ static inline int qcc_bufwnd_full(const struct qcc *qcc)
}
}
static void qmux_ctrl_room(void *ctx, uint64_t room)
static void qmux_ctrl_room(void *ctx, uint64_t room, int release)
{
struct qc_stream_desc *stream = ctx;
/* Context is different for active and released streams. */
struct qcc *qcc = !(stream->flags & QC_SD_FL_RELEASE) ?
struct qcc *qcc = !release ?
((struct qcs *)stream->ctx)->qcc : stream->ctx;
qcc_notify_buf(qcc, room);
}

View File

@ -53,7 +53,7 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
/* notify MUX about available buffers. */
if (stream->notify_room && room)
stream->notify_room(stream, room);
stream->notify_room(stream, room, stream->flags & QC_SD_FL_RELEASE);
}
/* Allocate a new stream descriptor with id <id>. The caller is responsible to
@ -237,7 +237,7 @@ static int qc_stream_buf_store_ack(struct qc_stream_buf *buf,
buf->room += newly_acked;
if (stream->notify_room && qc_stream_buf_is_released(buf, stream))
stream->notify_room(stream, newly_acked);
stream->notify_room(stream, newly_acked, stream->flags & QC_SD_FL_RELEASE);
end:
return newly_acked;
@ -271,7 +271,7 @@ static struct qc_stream_buf *qc_stream_buf_ack(struct qc_stream_buf *buf,
if (diff >= buf->room) {
diff -= buf->room;
buf->room = 0;
stream->notify_room(stream, diff);
stream->notify_room(stream, diff, stream->flags & QC_SD_FL_RELEASE);
}
else {
buf->room -= diff;
@ -534,7 +534,7 @@ void qc_stream_buf_release(struct qc_stream_desc *stream)
* space plus already stored out-of-order data range as available.
*/
if (stream->notify_room && room)
stream->notify_room(stream, room);
stream->notify_room(stream, room, stream->flags & QC_SD_FL_RELEASE);
}
static int create_sbuf_pool(void)