diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index b32d4f966..9a69b7638 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -102,6 +102,9 @@ struct qcc { #define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */ #define QC_SF_READ_ABORTED 0x00000040 /* stream rejected by app layer */ +/* Maximum size of stream Rx buffer. */ +#define QC_S_RX_BUF_SZ (global.tune.bufsize - NCB_RESERVED_SZ) + struct qcs { struct qcc *qcc; struct sedesc *sd; @@ -137,7 +140,7 @@ struct qcs { struct qcc_app_ops { int (*init)(struct qcc *qcc); int (*attach)(struct qcs *qcs, void *conn_ctx); - int (*decode_qcs)(struct qcs *qcs, int fin); + int (*decode_qcs)(struct qcs *qcs, struct buffer *b, int fin); size_t (*snd_buf)(struct stconn *sc, struct buffer *buf, size_t count, int flags); void (*detach)(struct qcs *qcs); int (*finalize)(void *ctx); diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 3c713a3a6..0846ca7b7 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -23,7 +23,6 @@ struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf); int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); -void qcs_consume(struct qcs *qcs, uint64_t bytes); void qcc_emit_cc_app(struct qcc *qcc, int err); int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, diff --git a/src/h3.c b/src/h3.c index b9a9fed36..655bf26a0 100644 --- a/src/h3.c +++ b/src/h3.c @@ -26,7 +26,6 @@ #include #include #include -#include #include #include #include @@ -144,22 +143,15 @@ struct h3s { DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s)); -/* Simple function to duplicate a buffer */ -static inline struct buffer h3_b_dup(const struct ncbuf *b) -{ - return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0)); -} - -/* Initialize an uni-stream by reading its type from . +/* Initialize an uni-stream by reading its type from . * * Returns 0 on success else non-zero. */ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs, - struct ncbuf *rxbuf) + struct buffer *b) { /* decode unidirectional stream type */ struct h3s *h3s = qcs->ctx; - struct buffer b; uint64_t type; size_t len = 0, ret; @@ -168,8 +160,7 @@ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs, BUG_ON_HOT(!quic_stream_is_uni(qcs->id) || h3s->flags & H3_SF_UNI_INIT); - b = h3_b_dup(rxbuf); - ret = b_quic_dec_int(&type, &b, &len); + ret = b_quic_dec_int(&type, b, &len); if (!ret) { ABORT_NOW(); } @@ -220,7 +211,6 @@ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs, }; h3s->flags |= H3_SF_UNI_INIT; - qcs_consume(qcs, len); TRACE_LEAVE(H3_EV_H3S_NEW, qcs->qcc->conn, qcs); return 0; @@ -231,7 +221,7 @@ static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs, * * Returns 0 on success else non-zero. */ -static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct ncbuf *rxbuf) +static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct buffer *b) { struct h3s *h3s = qcs->ctx; @@ -263,14 +253,13 @@ static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct ncbuf *rxbuf) * consumed. */ static inline size_t h3_decode_frm_header(uint64_t *ftype, uint64_t *flen, - struct ncbuf *rxbuf) + struct buffer *b) { size_t hlen; - struct buffer b = h3_b_dup(rxbuf); hlen = 0; - if (!b_quic_dec_int(ftype, &b, &hlen) || - !b_quic_dec_int(flen, &b, &hlen)) { + if (!b_quic_dec_int(ftype, b, &hlen) || + !b_quic_dec_int(flen, b, &hlen)) { return 0; } @@ -333,8 +322,8 @@ static int h3_is_frame_valid(struct h3c *h3c, struct qcs *qcs, uint64_t ftype) * * Returns the number of bytes handled or a negative error code. */ -static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, - char fin) +static int h3_headers_to_htx(struct qcs *qcs, const struct buffer *buf, + uint64_t len, char fin) { struct buffer htx_buf = BUF_NULL; struct buffer *tmp = get_trash_chunk(); @@ -350,8 +339,8 @@ static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, TRACE_ENTER(H3_EV_RX_FRAME|H3_EV_RX_HDR, qcs->qcc->conn, qcs); /* TODO support buffer wrapping */ - BUG_ON(ncb_head(buf) + len >= ncb_wrap(buf)); - if (qpack_decode_fs((const unsigned char *)ncb_head(buf), len, tmp, list) < 0) + BUG_ON(b_head(buf) + len >= b_wrap(buf)); + if (qpack_decode_fs((const unsigned char *)b_head(buf), len, tmp, list) < 0) return -1; qc_get_buf(qcs, &htx_buf); @@ -431,8 +420,8 @@ static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, * * Returns the number of bytes handled or a negative error code. */ -static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, - char fin) +static int h3_data_to_htx(struct qcs *qcs, const struct buffer *buf, + uint64_t len, char fin) { struct buffer *appbuf; struct htx *htx = NULL; @@ -446,12 +435,12 @@ static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, BUG_ON(!appbuf); htx = htx_from_buf(appbuf); - if (len > ncb_data(buf, 0)) { - len = ncb_data(buf, 0); + if (len > b_data(buf)) { + len = b_data(buf); fin = 0; } - head = ncb_head(buf); + head = b_head(buf); retry: htx_space = htx_free_data_space(htx); if (!htx_space) { @@ -464,16 +453,16 @@ static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, fin = 0; } - if (head + len > ncb_wrap(buf)) { - size_t contig = ncb_wrap(buf) - head; - htx_sent = htx_add_data(htx, ist2(ncb_head(buf), contig)); + if (head + len > b_wrap(buf)) { + size_t contig = b_wrap(buf) - head; + htx_sent = htx_add_data(htx, ist2(b_head(buf), contig)); if (htx_sent < contig) { qcs->flags |= QC_SF_DEM_FULL; goto out; } len -= contig; - head = ncb_orig(buf); + head = b_orig(buf); goto retry; } @@ -493,11 +482,11 @@ static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, return htx_sent; } -/* Parse a SETTINGS frame of length of payload . +/* Parse a SETTINGS frame of length of payload . * * Returns the number of bytes handled or a negative error code. */ -static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf, +static size_t h3_parse_settings_frm(struct h3c *h3c, const struct buffer *buf, size_t len) { struct buffer b; @@ -507,8 +496,11 @@ static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf, TRACE_ENTER(H3_EV_RX_FRAME|H3_EV_RX_SETTINGS, h3c->qcc->conn); - b = h3_b_dup(rxbuf); - b_set_data(&b, len); + /* Work on a copy of . */ + b = b_make(b_orig(buf), b_size(buf), b_head_ofs(buf), b_data(buf)); + + /* TODO handle incomplete SETTINGS frame */ + BUG_ON(len < b_data(&b)); while (b_data(&b)) { if (!b_quic_dec_int(&id, &b, &ret) || !b_quic_dec_int(&value, &b, &ret)) { @@ -576,36 +568,35 @@ static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf, * * Returns 0 on success else non-zero. */ -static int h3_decode_qcs(struct qcs *qcs, int fin) +static int h3_decode_qcs(struct qcs *qcs, struct buffer *b, int fin) { - struct ncbuf *rxbuf = &qcs->rx.ncbuf; struct h3s *h3s = qcs->ctx; struct h3c *h3c = h3s->h3c; ssize_t ret; h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id); - if (!ncb_data(rxbuf, 0)) + if (!b_data(b)) return 0; if (quic_stream_is_uni(qcs->id) && !(h3s->flags & H3_SF_UNI_INIT)) { - if (h3_init_uni_stream(h3c, qcs, rxbuf)) + if (h3_init_uni_stream(h3c, qcs, b)) return 1; } if (quic_stream_is_uni(qcs->id) && (h3s->flags & H3_SF_UNI_NO_H3)) { /* For non-h3 STREAM, parse it and return immediately. */ - if (h3_parse_uni_stream_no_h3(qcs, rxbuf)) + if (h3_parse_uni_stream_no_h3(qcs, b)) return 1; return 0; } - while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) { + while (b_data(b) && !(qcs->flags & QC_SF_DEM_FULL)) { uint64_t ftype, flen; char last_stream_frame = 0; /* Work on a copy of */ if (!h3s->demux_frame_len) { - size_t hlen = h3_decode_frm_header(&ftype, &flen, rxbuf); + size_t hlen = h3_decode_frm_header(&ftype, &flen, b); if (!hlen) break; @@ -620,8 +611,7 @@ static int h3_decode_qcs(struct qcs *qcs, int fin) return 1; } - qcs_consume(qcs, hlen); - if (!ncb_data(rxbuf, 0)) + if (!b_data(b)) break; } @@ -631,31 +621,31 @@ static int h3_decode_qcs(struct qcs *qcs, int fin) /* Do not demux incomplete frames except H3 DATA which can be * fragmented in multiple HTX blocks. */ - if (flen > ncb_data(rxbuf, 0) && ftype != H3_FT_DATA) { + if (flen > b_data(b) && ftype != H3_FT_DATA) { /* Reject frames bigger than bufsize. * * TODO HEADERS should in complement be limited with H3 * SETTINGS_MAX_FIELD_SECTION_SIZE parameter to prevent * excessive decompressed size. */ - if (flen > ncb_size(rxbuf)) { + if (flen > QC_S_RX_BUF_SZ) { qcc_emit_cc_app(qcs->qcc, H3_EXCESSIVE_LOAD); return 1; } break; } - last_stream_frame = (fin && flen == ncb_total_data(rxbuf)); + last_stream_frame = (fin && flen == b_data(b)); h3_inc_frame_type_cnt(h3c->prx_counters, ftype); switch (ftype) { case H3_FT_DATA: - ret = h3_data_to_htx(qcs, rxbuf, flen, last_stream_frame); + ret = h3_data_to_htx(qcs, b, flen, last_stream_frame); /* TODO handle error reporting. Stream closure required. */ if (ret < 0) { ABORT_NOW(); } break; case H3_FT_HEADERS: - ret = h3_headers_to_htx(qcs, rxbuf, flen, last_stream_frame); + ret = h3_headers_to_htx(qcs, b, flen, last_stream_frame); /* TODO handle error reporting. Stream closure required. */ if (ret < 0) { ABORT_NOW(); } break; @@ -667,7 +657,7 @@ static int h3_decode_qcs(struct qcs *qcs, int fin) ret = flen; break; case H3_FT_SETTINGS: - ret = h3_parse_settings_frm(qcs->qcc->ctx, rxbuf, flen); + ret = h3_parse_settings_frm(qcs->qcc->ctx, b, flen); if (ret < 0) { qcc_emit_cc_app(qcs->qcc, h3c->err); return 1; @@ -688,7 +678,7 @@ static int h3_decode_qcs(struct qcs *qcs, int fin) if (ret) { BUG_ON(h3s->demux_frame_len < ret); h3s->demux_frame_len -= ret; - qcs_consume(qcs, ret); + b_del(b, ret); } } diff --git a/src/hq_interop.c b/src/hq_interop.c index f5c0e79cd..4b4b5222d 100644 --- a/src/hq_interop.c +++ b/src/hq_interop.c @@ -7,20 +7,18 @@ #include #include #include -#include -static int hq_interop_decode_qcs(struct qcs *qcs, int fin) +static int hq_interop_decode_qcs(struct qcs *qcs, struct buffer *b, int fin) { - struct ncbuf *rxbuf = &qcs->rx.ncbuf; struct htx *htx; struct htx_sl *sl; struct stconn *sc; struct buffer htx_buf = BUF_NULL; struct ist path; - char *ptr = ncb_head(rxbuf); - char *end = ncb_wrap(rxbuf); - size_t size = ncb_size(rxbuf); - size_t data = ncb_data(rxbuf, 0); + char *ptr = b_head(b); + char *end = b_wrap(b); + size_t size = b_size(b); + size_t data = b_data(b); b_alloc(&htx_buf); htx = htx_from_buf(&htx_buf); @@ -76,7 +74,7 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin) if (!sc) return 1; - qcs_consume(qcs, ncb_data(rxbuf, 0)); + b_reset(b); b_free(&htx_buf); if (fin) diff --git a/src/mux_quic.c b/src/mux_quic.c index 84237c3e6..28a1896a5 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -286,57 +286,6 @@ void qcs_notify_send(struct qcs *qcs) } } -/* Remove from Rx buffer. This must be called by transcoders - * after STREAM parsing. Flow-control for received offsets may be allocated for - * the peer if needed. - */ -void qcs_consume(struct qcs *qcs, uint64_t bytes) -{ - struct qcc *qcc = qcs->qcc; - struct quic_frame *frm; - struct ncbuf *buf = &qcs->rx.ncbuf; - enum ncb_ret ret; - - ret = ncb_advance(buf, bytes); - if (ret) { - ABORT_NOW(); /* should not happens because removal only in data */ - } - - if (ncb_is_empty(buf)) - qc_free_ncbuf(qcs, buf); - - qcs->rx.offset += bytes; - if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) { - frm = pool_zalloc(pool_head_quic_frame); - BUG_ON(!frm); /* TODO handle this properly */ - - qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init; - - LIST_INIT(&frm->reflist); - frm->type = QUIC_FT_MAX_STREAM_DATA; - frm->max_stream_data.id = qcs->id; - frm->max_stream_data.max_stream_data = qcs->rx.msd; - - LIST_APPEND(&qcc->lfctl.frms, &frm->list); - tasklet_wakeup(qcc->wait_event.tasklet); - } - - qcc->lfctl.offsets_consume += bytes; - if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) { - frm = pool_zalloc(pool_head_quic_frame); - BUG_ON(!frm); /* TODO handle this properly */ - - qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init; - - LIST_INIT(&frm->reflist); - frm->type = QUIC_FT_MAX_DATA; - frm->max_data.max_data = qcc->lfctl.md; - - LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list); - tasklet_wakeup(qcs->qcc->wait_event.tasklet); - } -} - /* Retrieve as an ebtree node the stream with as ID, possibly allocates * several streams, depending on the already open ones. * Return this node if succeeded, NULL if not. @@ -425,6 +374,63 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) return NULL; } +/* Simple function to duplicate a buffer */ +static inline struct buffer qcs_b_dup(const struct ncbuf *b) +{ + return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0)); +} + +/* Remove from Rx buffer. This must be called by transcoders + * after STREAM parsing. Flow-control for received offsets may be allocated for + * the peer if needed. + */ +static void qcs_consume(struct qcs *qcs, uint64_t bytes) +{ + struct qcc *qcc = qcs->qcc; + struct quic_frame *frm; + struct ncbuf *buf = &qcs->rx.ncbuf; + enum ncb_ret ret; + + ret = ncb_advance(buf, bytes); + if (ret) { + ABORT_NOW(); /* should not happens because removal only in data */ + } + + if (ncb_is_empty(buf)) + qc_free_ncbuf(qcs, buf); + + qcs->rx.offset += bytes; + if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) { + frm = pool_zalloc(pool_head_quic_frame); + BUG_ON(!frm); /* TODO handle this properly */ + + qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init; + + LIST_INIT(&frm->reflist); + frm->type = QUIC_FT_MAX_STREAM_DATA; + frm->max_stream_data.id = qcs->id; + frm->max_stream_data.max_stream_data = qcs->rx.msd; + + LIST_APPEND(&qcc->lfctl.frms, &frm->list); + tasklet_wakeup(qcc->wait_event.tasklet); + } + + qcc->lfctl.offsets_consume += bytes; + if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) { + frm = pool_zalloc(pool_head_quic_frame); + BUG_ON(!frm); /* TODO handle this properly */ + + qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init; + + LIST_INIT(&frm->reflist); + frm->type = QUIC_FT_MAX_DATA; + frm->max_data.max_data = qcc->lfctl.md; + + LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list); + tasklet_wakeup(qcs->qcc->wait_event.tasklet); + } +} + /* Decode the content of STREAM frames already received on the stream instance * . * @@ -432,14 +438,27 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) */ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) { + struct buffer b; + size_t data, done; + int ret; + TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs); - if (qcc->app_ops->decode_qcs(qcs, qcs->flags & QC_SF_FIN_RECV)) { + b = qcs_b_dup(&qcs->rx.ncbuf); + data = b_data(&b); + + ret = qcc->app_ops->decode_qcs(qcs, &b, qcs->flags & QC_SF_FIN_RECV); + if (ret) { TRACE_DEVEL("leaving on decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs); return 1; } - qcs_notify_recv(qcs); + BUG_ON_HOT(data < b_data(&b)); + done = data - b_data(&b); + if (done) { + qcs_consume(qcs, done); + qcs_notify_recv(qcs); + } TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs);