mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-06 23:27:04 +02:00
MINOR: quic: refactor STREAM room notification
qc_stream_desc is an intermediary layer between QUIC MUX and quic_conn. It is a facility which permits to store data to emit and keep them for retransmission until acknowledgment. This layer is responsible to notify QUIC MUX each time a buffer is freed. This is necessary as MUX buffer allocation is limited by the underlying congestion window size. Refactor this to use a mechanism similar to send notification. A new callback notify_room can now be registered to qc_stream_desc instance. This is set by QUIC MUX to qmux_ctrl_room(). On MUX QUIC free, special care is now taken to reset notify_room callback to NULL. Thanks to this refactoring, further adjustment have been made to refine the architecture. One of them is the removal of qc_stream_desc QC_SD_FL_OOB_BUF, which is now converted to a MUX layer flag QC_SF_TXBUF_OOB.
This commit is contained in:
parent
d7f4e5abf0
commit
db68f8ed86
@ -243,7 +243,7 @@ static forceinline char *qcc_show_flags(char *buf, size_t len, const char *delim
|
||||
#define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */
|
||||
#define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */
|
||||
#define QC_SF_DETACH 0x00000008 /* sc is detached but there is remaining data to send */
|
||||
/* unused 0x00000010 */
|
||||
#define QC_SF_TXBUB_OOB 0x00000010 /* stream reserved for metadata out-of-band transmission; txbuf allocation is unrestricted */
|
||||
#define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */
|
||||
#define QC_SF_READ_ABORTED 0x00000040 /* Rx closed using STOP_SENDING*/
|
||||
#define QC_SF_TO_RESET 0x00000080 /* a RESET_STREAM must be sent */
|
||||
|
@ -22,7 +22,6 @@ struct qc_stream_buf {
|
||||
|
||||
#define QC_SD_FL_RELEASE 0x00000001 /* set when MUX has finished to use this stream */
|
||||
#define QC_SD_FL_WAIT_FOR_FIN 0x00000002 /* set if sent FIN is waiting for acknowledgement */
|
||||
#define QC_SD_FL_OOB_BUF 0x00000004 /* buffers not accounted against conn limit */
|
||||
|
||||
/* QUIC STREAM descriptor.
|
||||
*
|
||||
@ -47,6 +46,7 @@ struct qc_stream_desc {
|
||||
int flags; /* QC_SD_FL_* values */
|
||||
|
||||
void (*notify_send)(struct qc_stream_desc *, uint64_t offset, uint64_t len);
|
||||
void (*notify_room)(struct qc_stream_desc *, uint64_t room);
|
||||
void *ctx; /* notify context */
|
||||
};
|
||||
|
||||
|
@ -10,7 +10,8 @@ struct quic_conn;
|
||||
|
||||
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type, void *ctx,
|
||||
struct quic_conn *qc);
|
||||
void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size);
|
||||
void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size,
|
||||
void *new_ctx);
|
||||
int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len, int fin);
|
||||
void qc_stream_desc_free(struct qc_stream_desc *stream, int closing);
|
||||
|
||||
@ -37,5 +38,12 @@ static inline void qc_stream_desc_sub_send(struct qc_stream_desc *stream,
|
||||
stream->notify_send = cb;
|
||||
}
|
||||
|
||||
/* Subscribe for room notification on <stream>. */
|
||||
static inline void qc_stream_desc_sub_room(struct qc_stream_desc *stream,
|
||||
void (*cb)(struct qc_stream_desc *s, uint64_t offset))
|
||||
{
|
||||
stream->notify_room = cb;
|
||||
}
|
||||
|
||||
#endif /* USE_QUIC */
|
||||
#endif /* _HAPROXY_QUIC_STREAM_H_ */
|
||||
|
@ -33,6 +33,7 @@ DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
|
||||
DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
|
||||
|
||||
static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
|
||||
static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
|
||||
|
||||
static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
|
||||
{
|
||||
@ -82,7 +83,7 @@ static void qcs_free(struct qcs *qcs)
|
||||
/* Release qc_stream_desc buffer from quic-conn layer. */
|
||||
if (qcs->stream) {
|
||||
qc_stream_desc_sub_send(qcs->stream, NULL);
|
||||
qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real);
|
||||
qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real, qcc);
|
||||
}
|
||||
|
||||
/* Free Rx buffer. */
|
||||
@ -186,6 +187,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
||||
}
|
||||
|
||||
qc_stream_desc_sub_send(qcs->stream, qmux_ctrl_send);
|
||||
qc_stream_desc_sub_room(qcs->stream, qmux_ctrl_room);
|
||||
}
|
||||
|
||||
if (qcc->app_ops->attach && qcc->app_ops->attach(qcs, qcc->ctx)) {
|
||||
@ -634,6 +636,14 @@ static inline int qcc_bufwnd_full(const struct qcc *qcc)
|
||||
return qcc->tx.buf_in_flight >= qc->path->cwnd;
|
||||
}
|
||||
|
||||
static void qmux_ctrl_room(struct qc_stream_desc *stream, uint64_t room)
|
||||
{
|
||||
/* Context is different for active and released streams. */
|
||||
struct qcc *qcc = !(stream->flags & QC_SD_FL_RELEASE) ?
|
||||
((struct qcs *)stream->ctx)->qcc : stream->ctx;
|
||||
qcc_notify_buf(qcc, room);
|
||||
}
|
||||
|
||||
/* Report that one or several stream-desc buffers have been released for <qcc>
|
||||
* connection. <free_size> represent the sum of freed buffers sizes. May also
|
||||
* be used to notify about congestion window increase, in which case
|
||||
@ -841,7 +851,8 @@ void qcs_send_metadata(struct qcs *qcs)
|
||||
/* Cannot use if some data already transferred for this stream. */
|
||||
BUG_ON(qcs->stream->ack_offset || !LIST_ISEMPTY(&qcs->stream->buf_list));
|
||||
|
||||
qcs->stream->flags |= QC_SD_FL_OOB_BUF;
|
||||
qcs->flags |= QC_SF_TXBUB_OOB;
|
||||
qc_stream_desc_sub_room(qcs->stream, NULL);
|
||||
}
|
||||
|
||||
struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin)
|
||||
@ -1149,7 +1160,6 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small)
|
||||
{
|
||||
struct qcc *qcc = qcs->qcc;
|
||||
struct buffer *out = qc_stream_buf_get(qcs->stream);
|
||||
const int unlimited = qcs->stream->flags & QC_SD_FL_OOB_BUF;
|
||||
|
||||
/* Stream must not try to reallocate a buffer if currently waiting for one. */
|
||||
BUG_ON(LIST_INLIST(&qcs->el_buf));
|
||||
@ -1157,7 +1167,7 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small)
|
||||
*err = 0;
|
||||
|
||||
if (!out) {
|
||||
if (likely(!unlimited)) {
|
||||
if (likely(!(qcs->flags & QC_SF_TXBUB_OOB))) {
|
||||
if ((qcc->flags & QC_CF_CONN_FULL)) {
|
||||
LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
|
||||
tot_time_start(&qcs->timer.buf);
|
||||
@ -1180,7 +1190,7 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small)
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (likely(!unlimited))
|
||||
if (likely(!(qcs->flags & QC_SF_TXBUB_OOB)))
|
||||
qcc->tx.buf_in_flight += b_size(out);
|
||||
}
|
||||
|
||||
@ -1199,12 +1209,11 @@ struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs)
|
||||
{
|
||||
struct qcc *qcc = qcs->qcc;
|
||||
struct buffer *out = qc_stream_buf_get(qcs->stream);
|
||||
const int unlimited = qcs->stream->flags & QC_SD_FL_OOB_BUF;
|
||||
|
||||
/* Stream must not try to reallocate a buffer if currently waiting for one. */
|
||||
BUG_ON(LIST_INLIST(&qcs->el_buf));
|
||||
|
||||
if (likely(!unlimited)) {
|
||||
if (likely(!(qcs->flags & QC_SF_TXBUB_OOB))) {
|
||||
/* Reduce buffer window. As such there is always some space
|
||||
* left for a new buffer allocation.
|
||||
*/
|
||||
@ -1218,7 +1227,7 @@ struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs)
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (likely(!unlimited))
|
||||
if (likely(!(qcs->flags & QC_SF_TXBUB_OOB)))
|
||||
qcc->tx.buf_in_flight += b_size(out);
|
||||
|
||||
out:
|
||||
@ -2617,6 +2626,7 @@ static void qcc_release(struct qcc *qcc)
|
||||
{
|
||||
struct connection *conn = qcc->conn;
|
||||
struct eb64_node *node;
|
||||
struct quic_conn *qc = conn->handle.qc;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_QCC_END, conn);
|
||||
|
||||
@ -2633,6 +2643,14 @@ static void qcc_release(struct qcc *qcc)
|
||||
qcs_free(qcs);
|
||||
}
|
||||
|
||||
/* unsubscribe from all remaining qc_stream_desc */
|
||||
node = eb64_first(&qc->streams_by_id);
|
||||
while (node) {
|
||||
struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
|
||||
qc_stream_desc_sub_room(stream, NULL);
|
||||
node = eb64_next(node);
|
||||
}
|
||||
|
||||
tasklet_free(qcc->wait_event.tasklet);
|
||||
if (conn && qcc->wait_event.events) {
|
||||
conn->xprt->unsubscribe(conn, conn->xprt_ctx,
|
||||
|
@ -28,7 +28,6 @@ static inline int qc_stream_desc_done(const struct qc_stream_desc *s)
|
||||
static void qc_stream_buf_free(struct qc_stream_desc *stream,
|
||||
struct qc_stream_buf **stream_buf)
|
||||
{
|
||||
struct quic_conn *qc = stream->qc;
|
||||
struct buffer *buf = &(*stream_buf)->buf;
|
||||
uint64_t free_size;
|
||||
|
||||
@ -50,12 +49,8 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
|
||||
*stream_buf = NULL;
|
||||
|
||||
/* notify MUX about available buffers. */
|
||||
if (qc->mux_state == QC_MUX_READY) {
|
||||
if (!(stream->flags & QC_SD_FL_OOB_BUF)) {
|
||||
/* notify MUX about available buffers. */
|
||||
qcc_notify_buf(qc->qcc, free_size);
|
||||
}
|
||||
}
|
||||
if (stream->notify_room)
|
||||
stream->notify_room(stream, free_size);
|
||||
}
|
||||
|
||||
/* Allocate a new stream descriptor with id <id>. The caller is responsible to
|
||||
@ -92,6 +87,7 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void
|
||||
stream->flags = 0;
|
||||
stream->ctx = ctx;
|
||||
stream->notify_send = NULL;
|
||||
stream->notify_room = NULL;
|
||||
|
||||
return stream;
|
||||
}
|
||||
@ -102,15 +98,19 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void
|
||||
* <final_size> corresponds to the last offset sent for this stream. If there
|
||||
* is unsent data present, they will be remove first to guarantee that buffer
|
||||
* is freed after receiving all acknowledges.
|
||||
*
|
||||
* It is expected that upper layer instance related to <stream> may disappear
|
||||
* after this operation. As such, <new_ctx> must be set to reassociate <stream>
|
||||
* for notifications.
|
||||
*/
|
||||
void qc_stream_desc_release(struct qc_stream_desc *stream,
|
||||
uint64_t final_size)
|
||||
uint64_t final_size, void *new_ctx)
|
||||
{
|
||||
/* A stream can be released only one time. */
|
||||
BUG_ON(stream->flags & QC_SD_FL_RELEASE);
|
||||
|
||||
stream->flags |= QC_SD_FL_RELEASE;
|
||||
stream->ctx = NULL;
|
||||
stream->ctx = new_ctx;
|
||||
|
||||
if (stream->buf) {
|
||||
struct qc_stream_buf *stream_buf = stream->buf;
|
||||
|
@ -124,7 +124,7 @@ void quic_cstream_free(struct quic_cstream *cs)
|
||||
|
||||
quic_free_ncbuf(&cs->rx.ncbuf);
|
||||
|
||||
qc_stream_desc_release(cs->desc, 0);
|
||||
qc_stream_desc_release(cs->desc, 0, NULL);
|
||||
pool_free(pool_head_quic_cstream, cs);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user