diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index a98234253..92b8159ab 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -32,7 +32,7 @@ enum qcs_type { #define QC_CF_ERRL 0x00000001 /* fatal error detected locally, connection should be closed soon */ #define QC_CF_ERRL_DONE 0x00000002 /* local error properly handled, connection can be released */ /* unused 0x00000004 */ -/* unused 0x00000008 */ +#define QC_CF_CONN_FULL 0x00000008 /* no stream buffers available on connection */ #define QC_CF_APP_SHUT 0x00000010 /* Application layer shutdown done. */ #define QC_CF_ERR_CONN 0x00000020 /* fatal error reported by transport layer */ @@ -84,6 +84,7 @@ struct qcc { struct list send_retry_list; /* list of qcs eligible to send retry */ struct list send_list; /* list of qcs ready to send (STREAM, STOP_SENDING or RESET_STREAM emission) */ struct list fctl_list; /* list of sending qcs blocked on conn flow control */ + struct list buf_wait_list; /* list of qcs blocked on stream desc buf */ struct wait_event wait_event; /* To be used if we're waiting for I/Os */ @@ -167,6 +168,7 @@ struct qcs { struct list el_send; /* element of qcc.send_list */ struct list el_opening; /* element of qcc.opening_list */ struct list el_fctl; /* element of qcc.fctl_list */ + struct list el_buf; /* element of qcc.buf_wait_list */ struct wait_event wait_event; struct wait_event *subs; diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index a35dddc9b..e2437c351 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -21,9 +21,10 @@ int qcs_is_close_remote(struct qcs *qcs); int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); +int qcc_notify_buf(struct qcc *qcc); struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs); -struct buffer *qcc_get_stream_txbuf(struct qcs *qcs); +struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err); int qcc_release_stream_txbuf(struct qcs *qcs); int qcc_stream_can_send(const struct qcs *qcs); void qcc_reset_stream(struct qcs *qcs, int err); diff --git a/src/h3.c b/src/h3.c index 431922f01..69bbd8e27 100644 --- a/src/h3.c +++ b/src/h3.c @@ -1428,6 +1428,7 @@ static ssize_t h3_rcv_buf(struct qcs *qcs, struct buffer *b, int fin) */ static int h3_control_send(struct qcs *qcs, void *ctx) { + int err; int ret; struct h3c *h3c = ctx; unsigned char data[(2 + 3) * 2 * QUIC_VARINT_MAX_SIZE]; /* enough for 3 settings */ @@ -1468,7 +1469,8 @@ static int h3_control_send(struct qcs *qcs, void *ctx) goto err; } - if (!(res = qcc_get_stream_txbuf(qcs))) { + if (!(res = qcc_get_stream_txbuf(qcs, &err))) { + /* Consider alloc failure fatal for control stream even on conn buf limit. */ TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs); goto err; } @@ -1496,6 +1498,7 @@ static int h3_control_send(struct qcs *qcs, void *ctx) static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx) { + int err; struct h3s *h3s = qcs->ctx; struct h3c *h3c = h3s->h3c; struct buffer outbuf; @@ -1550,10 +1553,15 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx) list[hdr].n = ist(""); - if (!(res = qcc_get_stream_txbuf(qcs))) { - TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); - h3c->err = H3_INTERNAL_ERROR; - goto err; + if (!(res = qcc_get_stream_txbuf(qcs, &err))) { + if (err) { + TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); + h3c->err = H3_INTERNAL_ERROR; + goto err; + } + + TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); + goto end; } /* At least 5 bytes to store frame type + length as a varint max size */ @@ -1626,6 +1634,7 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx) break; } + end: TRACE_LEAVE(H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); return ret; @@ -1647,6 +1656,7 @@ static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx) */ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx) { + int err; struct h3s *h3s = qcs->ctx; struct h3c *h3c = h3s->h3c; struct buffer headers_buf = BUF_NULL; @@ -1708,10 +1718,15 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx) list[hdr].n = ist(""); start: - if (!(res = qcc_get_stream_txbuf(qcs))) { - TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); - h3c->err = H3_INTERNAL_ERROR; - goto err; + if (!(res = qcc_get_stream_txbuf(qcs, &err))) { + if (err) { + TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); + h3c->err = H3_INTERNAL_ERROR; + goto err; + } + + TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); + goto end; } /* At least 9 bytes to store frame type + length as a varint max size */ @@ -1815,6 +1830,7 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx) static int h3_resp_data_send(struct qcs *qcs, struct htx *htx, struct buffer *buf, size_t count) { + int err; struct h3s *h3s = qcs->ctx; struct h3c *h3c = h3s->h3c; struct buffer outbuf; @@ -1840,10 +1856,16 @@ static int h3_resp_data_send(struct qcs *qcs, struct htx *htx, if (type != HTX_BLK_DATA) goto end; - if (!(res = qcc_get_stream_txbuf(qcs))) { - TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs); - h3c->err = H3_INTERNAL_ERROR; - goto err; + if (!(res = qcc_get_stream_txbuf(qcs, &err))) { + if (err) { + TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs); + h3c->err = H3_INTERNAL_ERROR; + goto err; + } + + /* Connection buf limit reached, stconn will subscribe on SEND. */ + TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs); + goto end; } /* If HTX contains only one DATA block, try to exchange it with MUX @@ -2040,6 +2062,7 @@ static size_t h3_snd_buf(struct qcs *qcs, struct buffer *buf, size_t count) static size_t h3_nego_ff(struct qcs *qcs, size_t count) { + int err; struct buffer *res; int hsize; size_t sz, ret = 0; @@ -2047,8 +2070,13 @@ static size_t h3_nego_ff(struct qcs *qcs, size_t count) TRACE_ENTER(H3_EV_STRM_SEND, qcs->qcc->conn, qcs); start: - if (!(res = qcc_get_stream_txbuf(qcs))) { - qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF; + if (!(res = qcc_get_stream_txbuf(qcs, &err))) { + if (err) { + qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF; + goto end; + } + + qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; goto end; } @@ -2224,6 +2252,7 @@ static void h3_detach(struct qcs *qcs) */ static int h3_send_goaway(struct h3c *h3c) { + int err; struct qcs *qcs = h3c->ctrl_strm; struct buffer pos, *res; unsigned char data[3 * QUIC_VARINT_MAX_SIZE]; @@ -2243,10 +2272,10 @@ static int h3_send_goaway(struct h3c *h3c) b_quic_enc_int(&pos, frm_len, 0); b_quic_enc_int(&pos, h3c->id_goaway, 0); - res = qcc_get_stream_txbuf(qcs); + res = qcc_get_stream_txbuf(qcs, &err); if (!res || b_room(res) < b_data(&pos) || qfctl_sblocked(&qcs->tx.fc) || qfctl_sblocked(&h3c->qcc->tx.fc)) { - /* Do not try forcefully to emit GOAWAY if no space left. */ + /* Do not try forcefully to emit GOAWAY if no buffer available or not enough space left. */ TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs); goto err; } diff --git a/src/hq_interop.c b/src/hq_interop.c index 0d0e47f59..02ef12626 100644 --- a/src/hq_interop.c +++ b/src/hq_interop.c @@ -95,6 +95,7 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf, uint32_t bsize, fsize; struct buffer *res = NULL; size_t total = 0; + int err; htx = htx_from_buf(buf); @@ -109,10 +110,11 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf, switch (btype) { case HTX_BLK_DATA: - res = qcc_get_stream_txbuf(qcs); + res = qcc_get_stream_txbuf(qcs, &err); if (!res) { - /* TODO */ - ABORT_NOW(); + if (err) + ABORT_NOW(); + goto end; } if (unlikely(fsize == count && @@ -179,16 +181,16 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf, static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count) { - int ret = 0; + int err, ret = 0; struct buffer *res; start: - res = qcc_get_stream_txbuf(qcs); + res = qcc_get_stream_txbuf(qcs, &err); if (!res) { + if (err) + ABORT_NOW(); qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; goto end; - /* TODO */ - ABORT_NOW(); } if (!b_room(res)) { diff --git a/src/mux_quic.c b/src/mux_quic.c index 408e20c01..9df45b491 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -60,6 +60,7 @@ static void qcs_free(struct qcs *qcs) LIST_DEL_INIT(&qcs->el_opening); LIST_DEL_INIT(&qcs->el_send); LIST_DEL_INIT(&qcs->el_fctl); + LIST_DEL_INIT(&qcs->el_buf); /* Release stream endpoint descriptor. */ BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN)); @@ -109,6 +110,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) LIST_INIT(&qcs->el_opening); LIST_INIT(&qcs->el_send); LIST_INIT(&qcs->el_fctl); + LIST_INIT(&qcs->el_buf); qcs->start = TICK_ETERNITY; /* store transport layer stream descriptor in qcc tree */ @@ -496,6 +498,35 @@ void qcs_notify_send(struct qcs *qcs) } } +/* Notify on a new stream-desc buffer available for connection. + * + * Returns true if a stream was woken up. If false is returned, this indicates + * to the caller that it's currently unnecessary to notify for the rest of the + * available buffers. + */ +int qcc_notify_buf(struct qcc *qcc) +{ + struct qcs *qcs; + int ret = 0; + + TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); + + if (qcc->flags & QC_CF_CONN_FULL) { + TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn); + qcc->flags &= ~QC_CF_CONN_FULL; + } + + if (!LIST_ISEMPTY(&qcc->buf_wait_list)) { + qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf); + LIST_DEL_INIT(&qcs->el_buf); + qcs_notify_send(qcs); + ret = 1; + } + + TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); + return ret; +} + /* A fatal error is detected locally for connection. It should be closed * with a CONNECTION_CLOSE using code. Set to true to indicate that * the code must be considered as an application level error. This function @@ -923,22 +954,48 @@ struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs) /* Allocate if needed and retrieve stream buffer for data emission. * - * Returns buffer pointer. May be NULL on allocation failure. + * is an output argument which is useful to differentiate the failure + * cause when the buffer cannot be allocated. It is set to 0 if the connection + * buffer limit is reached. For fatal errors, its value is non-zero. + * + * Returns buffer pointer. May be NULL on allocation failure, in which case + * will refer to the cause. */ -struct buffer *qcc_get_stream_txbuf(struct qcs *qcs) +struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err) { struct qcc *qcc = qcs->qcc; int buf_avail; struct buffer *out = qc_stream_buf_get(qcs->stream); + /* Stream must not try to reallocate a buffer if currently waiting for one. */ + BUG_ON(LIST_INLIST(&qcs->el_buf)); + + *err = 0; + if (!out) { + if (qcc->flags & QC_CF_CONN_FULL) { + LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf); + goto out; + } + out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real, &buf_avail); - if (!out) + if (!out) { + if (buf_avail) { + TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); + *err = 1; + goto out; + } + + TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs); + LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf); + qcc->flags |= QC_CF_CONN_FULL; goto out; + } if (!b_alloc(out)) { TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); + *err = 1; goto out; } } @@ -988,7 +1045,7 @@ int qcc_release_stream_txbuf(struct qcs *qcs) /* Returns true if stream layer can proceed to emission via . */ int qcc_stream_can_send(const struct qcs *qcs) { - return !(qcs->flags & QC_SF_BLK_MROOM); + return !(qcs->flags & QC_SF_BLK_MROOM) && !LIST_INLIST(&qcs->el_buf); } /* Wakes up every streams of which are currently waiting for sending but @@ -1014,6 +1071,10 @@ void qcc_reset_stream(struct qcs *qcs, int err) if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs)) return; + /* TODO if QCS waiting for buffer, it could be removed from + * if sending is closed now. + */ + TRACE_STATE("reset stream", QMUX_EV_QCS_END, qcc->conn, qcs); qcs->flags |= QC_SF_TO_RESET; qcs->err = err; @@ -2575,6 +2636,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, LIST_INIT(&qcc->send_list); LIST_INIT(&qcc->fctl_list); + LIST_INIT(&qcc->buf_wait_list); qcc->wait_event.tasklet->process = qcc_io_cb; qcc->wait_event.tasklet->context = qcc; @@ -2790,6 +2852,9 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + /* Stream must not be woken up if already waiting for conn buffer. */ + BUG_ON(LIST_INLIST(&qcs->el_buf)); + /* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */ BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)); @@ -2849,6 +2914,9 @@ static size_t qmux_strm_nego_ff(struct stconn *sc, struct buffer *input, TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + /* Stream must not be woken up if already waiting for conn buffer. */ + BUG_ON(LIST_INLIST(&qcs->el_buf)); + /* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */ BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)); diff --git a/src/quic_stream.c b/src/quic_stream.c index aa8895ae7..e153660db 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include #include @@ -37,7 +37,13 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream, /* notify MUX about available buffers. */ --qc->stream_buf_count; if (qc->mux_state == QC_MUX_READY) { - /* TODO notify MUX for available buffer. */ + /* notify MUX about available buffers. + * + * TODO several streams may be woken up even if a single buffer + * is available for now. + */ + while (qcc_notify_buf(qc->qcc)) + ; } } @@ -199,7 +205,13 @@ void qc_stream_desc_free(struct qc_stream_desc *stream, int closing) qc->stream_buf_count -= free_count; if (qc->mux_state == QC_MUX_READY) { - /* TODO notify MUX for available buffer. */ + /* notify MUX about available buffers. + * + * TODO several streams may be woken up even if a single buffer + * is available for now. + */ + while (qcc_notify_buf(qc->qcc)) + ; } }