diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index e805c40bb..ac623be2a 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -31,6 +31,7 @@ int qcs_is_close_remote(struct qcs *qcs); int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); +void qcs_on_data_sent(struct qcs *qcs, uint64_t data, uint64_t offset); void qcc_notify_buf(struct qcc *qcc, uint64_t free_size); struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs); diff --git a/include/haproxy/quic_stream-t.h b/include/haproxy/quic_stream-t.h index a9738d2e9..598c09f72 100644 --- a/include/haproxy/quic_stream-t.h +++ b/include/haproxy/quic_stream-t.h @@ -49,8 +49,8 @@ struct qc_stream_desc { int flags; /* QC_SD_FL_* values */ - void (*notify_send)(struct qc_stream_desc *, uint64_t offset, uint64_t len); - void (*notify_room)(struct qc_stream_desc *, uint64_t room); + void (*notify_send)(void *ctx, uint64_t offset, uint64_t len); + void (*notify_room)(void *ctx, uint64_t room, int release); void *ctx; /* notify context */ }; diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h index b5dc596a4..aeae72ad1 100644 --- a/include/haproxy/quic_stream.h +++ b/include/haproxy/quic_stream.h @@ -43,14 +43,14 @@ static inline void qc_stream_desc_send(struct qc_stream_desc *stream, static inline void qc_stream_desc_sub_send(struct qc_stream_desc *stream, void (*cb)(void *ctx, uint64_t offset, uint64_t len)) { - stream->notify_send = (void (*)(struct qc_stream_desc *, uint64_t offset, uint64_t len))cb; + stream->notify_send = cb; } /* Subscribe for room notification on . */ static inline void qc_stream_desc_sub_room(struct qc_stream_desc *stream, - void (*cb)(void *ctx, uint64_t offset)) + void (*cb)(void *ctx, uint64_t offset, int release)) { - stream->notify_room = (void (*)(struct qc_stream_desc *, uint64_t offset))cb; + stream->notify_room = cb; } #endif /* USE_QUIC */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 4995ffb37..4a203b2c8 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -38,7 +38,7 @@ DECLARE_TYPED_POOL(pool_head_qcs, "qcs", struct qcs); DECLARE_STATIC_TYPED_POOL(pool_head_qc_stream_rxbuf, "qc_stream_rxbuf", struct qc_stream_rxbuf); static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset); -static void qmux_ctrl_room(void *ctx, uint64_t room); +static void qmux_ctrl_room(void *ctx, uint64_t room, int release); int qmux_is_quic(const struct qcc *qcc) { @@ -611,12 +611,8 @@ static uint64_t qcs_prep_bytes(const struct qcs *qcs) } } -/* Used as a callback for qc_stream_desc layer to notify about emission of a - * STREAM frame of length starting at . - */ -static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset) +void qcs_on_data_sent(struct qcs *qcs, uint64_t data, uint64_t offset) { - struct qcs *qcs = ((struct qc_stream_desc *)ctx)->ctx; struct qcc *qcc = qcs->qcc; uint64_t diff; @@ -660,12 +656,23 @@ static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset) TRACE_STATE("stream flow-control reached", QMUX_EV_QCS_SEND, qcc->conn, qcs); } - /* Release buffer if everything sent and buf is full or stream is waiting for room. */ - if (!qcs_prep_bytes(qcs) && - (b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) { - qc_stream_buf_release(qcs->stream); - qcs->flags &= ~QC_SF_BLK_MROOM; - qcs_notify_send(qcs); + + if (qmux_is_quic(qcc)) { + /* Release buffer if everything sent and buf is full or stream is waiting for room. */ + if (!qcs_prep_bytes(qcs) && + (b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) { + qc_stream_buf_release(qcs->stream); + qcs->flags &= ~QC_SF_BLK_MROOM; + qcs_notify_send(qcs); + } + } + else { + b_del(&qcs->qos_buf, diff); + /* Release buffer if everything sent and buf is full or stream is waiting for room. */ + if (!qcs_prep_bytes(qcs) && qcs->flags & QC_SF_BLK_MROOM) { + qcs->flags &= ~QC_SF_BLK_MROOM; + qcs_notify_send(qcs); + } } /* Add measurement for send rate. This is done at the MUX layer @@ -674,7 +681,8 @@ static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset) increment_send_rate(diff, 0); } - if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) { + if ((qmux_is_quic(qcc) && !qc_stream_buf_get(qcs->stream)) || + !qcs_prep_bytes(qcs)) { /* Remove stream from send_list if all was sent. */ LIST_DEL_INIT(&qcs->el_send); TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -684,13 +692,15 @@ static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset) qcs_close_local(qcs); if (qcs->flags & QC_SF_FIN_STREAM) { - qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN; + if (qmux_is_quic(qcc)) + qcs->stream->flags |= QC_SD_FL_WAIT_FOR_FIN; /* Reset flag to not emit multiple FIN STREAM frames. */ qcs->flags &= ~QC_SF_FIN_STREAM; } /* Unsubscribe from streamdesc when everything sent. */ - qc_stream_desc_sub_send(qcs->stream, NULL); + if (qmux_is_quic(qcc)) + qc_stream_desc_sub_send(qcs->stream, NULL); if (qcs_is_completed(qcs)) { TRACE_STATE("add stream in purg_list", QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -703,6 +713,15 @@ static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset) TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); } +/* Callback for notification about emission of a STREAM frame of length + * starting at . + */ +static void qmux_ctrl_send(void *ctx, uint64_t data, uint64_t offset) +{ + struct qcs *qcs = ((struct qc_stream_desc *)ctx)->ctx; + qcs_on_data_sent(qcs, data, offset); +} + /* Returns true if buffer window does not have room for a new buffer. */ static inline int qcc_bufwnd_full(const struct qcc *qcc) { @@ -715,11 +734,11 @@ static inline int qcc_bufwnd_full(const struct qcc *qcc) } } -static void qmux_ctrl_room(void *ctx, uint64_t room) +static void qmux_ctrl_room(void *ctx, uint64_t room, int release) { struct qc_stream_desc *stream = ctx; /* Context is different for active and released streams. */ - struct qcc *qcc = !(stream->flags & QC_SD_FL_RELEASE) ? + struct qcc *qcc = !release ? ((struct qcs *)stream->ctx)->qcc : stream->ctx; qcc_notify_buf(qcc, room); } diff --git a/src/quic_stream.c b/src/quic_stream.c index 364030477..10083d066 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -53,7 +53,7 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream, /* notify MUX about available buffers. */ if (stream->notify_room && room) - stream->notify_room(stream, room); + stream->notify_room(stream, room, stream->flags & QC_SD_FL_RELEASE); } /* Allocate a new stream descriptor with id . The caller is responsible to @@ -237,7 +237,7 @@ static int qc_stream_buf_store_ack(struct qc_stream_buf *buf, buf->room += newly_acked; if (stream->notify_room && qc_stream_buf_is_released(buf, stream)) - stream->notify_room(stream, newly_acked); + stream->notify_room(stream, newly_acked, stream->flags & QC_SD_FL_RELEASE); end: return newly_acked; @@ -271,7 +271,7 @@ static struct qc_stream_buf *qc_stream_buf_ack(struct qc_stream_buf *buf, if (diff >= buf->room) { diff -= buf->room; buf->room = 0; - stream->notify_room(stream, diff); + stream->notify_room(stream, diff, stream->flags & QC_SD_FL_RELEASE); } else { buf->room -= diff; @@ -534,7 +534,7 @@ void qc_stream_buf_release(struct qc_stream_desc *stream) * space plus already stored out-of-order data range as available. */ if (stream->notify_room && room) - stream->notify_room(stream, room); + stream->notify_room(stream, room, stream->flags & QC_SD_FL_RELEASE); } static int create_sbuf_pool(void)