diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 83fccb29e..d096d1dca 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -101,8 +102,9 @@ struct qcs { struct { struct eb_root frms; /* received frames ordered by their offsets */ - uint64_t offset; /* the current offset of received data */ + uint64_t offset; /* absolute current base offset of ncbuf */ struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */ + struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */ struct buffer app_buf; /* receive buffer used by conn_stream layer */ uint64_t msd; /* fctl bytes limit to enforce */ } rx; diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 08ff9a358..72e52f318 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -18,13 +18,14 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type); void qcs_free(struct qcs *qcs); struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr); +struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf); int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, - char fin, char *data, struct qcs **out_qcs, size_t *done); + char fin, char *data, struct qcs **out_qcs); int qcc_recv_max_data(struct qcc *qcc, uint64_t max); int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max); int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs); diff --git a/src/h3.c b/src/h3.c index 68fc1025a..488fccfab 100644 --- a/src/h3.c +++ b/src/h3.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -76,9 +77,9 @@ struct h3s { DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s)); /* Simple function to duplicate a buffer */ -static inline struct buffer h3_b_dup(struct buffer *b) +static inline struct buffer h3_b_dup(struct ncbuf *b) { - return b_make(b->area, b->size, b->head, b->data); + return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0)); } /* Decode a h3 frame header made of two QUIC varints from buffer. @@ -104,7 +105,7 @@ static inline size_t h3_decode_frm_header(uint64_t *ftype, uint64_t *flen, * * Returns the number of bytes handled or a negative error code. */ -static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len, +static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, char fin) { struct buffer htx_buf = BUF_NULL; @@ -119,8 +120,8 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len, int hdr_idx; /* TODO support buffer wrapping */ - BUG_ON(b_contig_data(buf, 0) != b_data(buf)); - if (qpack_decode_fs((const unsigned char *)b_head(buf), len, tmp, list) < 0) + BUG_ON(ncb_head(buf) + len >= ncb_wrap(buf)); + if (qpack_decode_fs((const unsigned char *)ncb_head(buf), len, tmp, list) < 0) return -1; qc_get_buf(qcs, &htx_buf); @@ -200,12 +201,12 @@ static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len, * * Returns the number of bytes handled or a negative error code. */ -static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len, +static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len, char fin) { struct buffer *appbuf; struct htx *htx = NULL; - size_t contig = 0, htx_sent = 0; + size_t htx_sent = 0; int htx_space; char *head; @@ -213,12 +214,12 @@ static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len, BUG_ON(!appbuf); htx = htx_from_buf(appbuf); - if (len > b_data(buf)) { - len = b_data(buf); + if (len > ncb_data(buf, 0)) { + len = ncb_data(buf, 0); fin = 0; } - head = b_head(buf); + head = ncb_head(buf); retry: htx_space = htx_free_data_space(htx); if (!htx_space) { @@ -231,10 +232,10 @@ static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len, fin = 0; } - contig = b_contig_data(buf, contig); - if (len > contig) { - htx_sent = htx_add_data(htx, ist2(b_head(buf), contig)); - head = b_orig(buf); + if (head + len > ncb_wrap(buf)) { + size_t contig = ncb_wrap(buf) - head; + htx_sent = htx_add_data(htx, ist2(ncb_head(buf), contig)); + head = ncb_orig(buf); len -= contig; goto retry; } @@ -256,15 +257,15 @@ static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len, */ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx) { - struct buffer *rxbuf = &qcs->rx.buf; + struct ncbuf *rxbuf = &qcs->rx.ncbuf; struct h3s *h3s = qcs->ctx; ssize_t ret; h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id); - if (!b_data(rxbuf)) + if (!ncb_data(rxbuf, 0)) return 0; - while (b_data(rxbuf) && !(qcs->flags & QC_SF_DEM_FULL)) { + while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) { uint64_t ftype, flen; struct buffer b; char last_stream_frame = 0; @@ -279,16 +280,17 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx) h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n", __func__, ftype, flen); - b_del(rxbuf, hlen); + ncb_advance(rxbuf, hlen); h3s->demux_frame_type = ftype; h3s->demux_frame_len = flen; + qcs->rx.offset += hlen; } flen = h3s->demux_frame_len; ftype = h3s->demux_frame_type; - if (flen > b_data(&b) && !b_full(rxbuf)) + if (flen > b_data(&b) && !ncb_is_full(rxbuf)) break; - last_stream_frame = (fin && flen == b_data(rxbuf)); + last_stream_frame = (fin && flen == ncb_total_data(rxbuf)); switch (ftype) { case H3_FT_DATA: @@ -303,20 +305,21 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx) break; case H3_FT_PUSH_PROMISE: /* Not supported */ - ret = MIN(b_data(rxbuf), flen); + ret = MIN(ncb_data(rxbuf, 0), flen); break; default: /* draft-ietf-quic-http34 9. Extensions to HTTP/3 * unknown frame types MUST be ignored */ h3_debug_printf(stderr, "ignore unknown frame type 0x%lx\n", ftype); - ret = MIN(b_data(rxbuf), flen); + ret = MIN(ncb_data(rxbuf, 0), flen); } if (ret) { - b_del(rxbuf, ret); + ncb_advance(rxbuf, ret); BUG_ON(h3s->demux_frame_len < ret); h3s->demux_frame_len -= ret; + qcs->rx.offset += ret; } } @@ -386,7 +389,7 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx) struct buffer b; /* Work on a copy of */ - b = h3_b_dup(rxbuf); + b = b_make(rxbuf->area, rxbuf->size, rxbuf->head, rxbuf->data); hlen = h3_decode_frm_header(&ftype, &flen, &b); if (!hlen) break; diff --git a/src/hq_interop.c b/src/hq_interop.c index dcaba8ef9..db4387e89 100644 --- a/src/hq_interop.c +++ b/src/hq_interop.c @@ -7,19 +7,20 @@ #include #include #include +#include static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx) { - struct buffer *rxbuf = &qcs->rx.buf; + struct ncbuf *rxbuf = &qcs->rx.ncbuf; struct htx *htx; struct htx_sl *sl; struct conn_stream *cs; struct buffer htx_buf = BUF_NULL; struct ist path; - char *ptr = b_head(rxbuf); - char *end = b_wrap(rxbuf); - size_t size = b_size(rxbuf); - size_t data = b_data(rxbuf); + char *ptr = ncb_head(rxbuf); + char *end = ncb_wrap(rxbuf); + size_t size = ncb_size(rxbuf); + size_t data = ncb_data(rxbuf, 0); b_alloc(&htx_buf); htx = htx_from_buf(&htx_buf); @@ -76,7 +77,8 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx) return -1; - b_del(rxbuf, b_data(rxbuf)); + qcs->rx.offset += ncb_data(rxbuf, 0); + ncb_advance(rxbuf, ncb_data(rxbuf, 0)); b_free(&htx_buf); if (fin) diff --git a/src/mux_quic.c b/src/mux_quic.c index bc8910c1e..022b1dbf5 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -153,6 +154,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qcc->rfctl.msd_bidi_l; qcs->rx.buf = BUF_NULL; + qcs->rx.ncbuf = NCBUF_NULL; qcs->rx.app_buf = BUF_NULL; qcs->rx.offset = 0; qcs->rx.frms = EB_ROOT_UNIQUE; @@ -184,6 +186,16 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) return NULL; } +static void qc_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) +{ + struct buffer buf; + + buf = b_make(ncbuf->area, ncbuf->size, 0, 0); + b_free(&buf); + + *ncbuf = NCBUF_NULL; +} + /* Free a qcs. This function must only be done to remove a stream on allocation * error or connection shutdown. Else use qcs_destroy which handle all the * QUIC connection mechanism. @@ -191,6 +203,7 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) void qcs_free(struct qcs *qcs) { b_free(&qcs->rx.buf); + qc_free_ncbuf(qcs, &qcs->rx.ncbuf); b_free(&qcs->tx.buf); BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams); @@ -215,6 +228,21 @@ struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr) return buf; } +struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) +{ + struct buffer buf = BUF_NULL; + + if (ncb_is_null(ncbuf)) { + b_alloc(&buf); + BUG_ON(b_is_null(&buf)); + + *ncbuf = ncb_make(buf.area, buf.size, 0); + ncb_init(ncbuf, 0); + } + + return ncbuf; +} + int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es) { struct qcc *qcc = qcs->qcc; @@ -344,22 +372,17 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) * . In case of success, the caller can immediatly call qcc_decode_qcs * to process the frame content. * - * Returns a code indicating how the frame was handled. - * - 0: frame received completely and can be dropped. - * - 1: frame not received but can be dropped. - * - 2: frame cannot be handled, either partially or not at all. - * indicated the number of bytes handled. The rest should be buffered. + * Returns 0 on success else non-zero. */ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, - char fin, char *data, struct qcs **out_qcs, size_t *done) + char fin, char *data, struct qcs **out_qcs) { struct qcs *qcs; - size_t total, diff; + enum ncb_ret ret; TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); *out_qcs = NULL; - *done = 0; qcs = qcc_get_qcs(qcc, id); if (!qcs) { @@ -375,44 +398,46 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, *out_qcs = qcs; - if (offset > qcs->rx.offset) - return 2; - if (offset + len <= qcs->rx.offset) { TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); return 0; } - /* Last frame already handled for this stream. */ - BUG_ON(qcs->flags & QC_SF_FIN_RECV); + /* TODO if last frame already received, stream size must not change. + * Else send FINAL_SIZE_ERROR. + */ + /* TODO initial max-stream-data overflow. Implement FLOW_CONTROL_ERROR emission. */ BUG_ON(offset + len > qcs->rx.msd); - if (!qc_get_buf(qcs, &qcs->rx.buf) || b_full(&qcs->rx.buf)) { + if (!qc_get_ncbuf(qcs, &qcs->rx.ncbuf) || ncb_is_null(&qcs->rx.ncbuf)) { /* TODO should mark qcs as full */ - return 2; + ABORT_NOW(); + return 1; } TRACE_DEVEL("newly received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); - diff = qcs->rx.offset - offset; + if (offset < qcs->rx.offset) { + len -= qcs->rx.offset - offset; + offset = qcs->rx.offset; + } - len -= diff; - data += diff; - - /* TODO handle STREAM frames larger than RX buffer. */ - BUG_ON(len > b_size(&qcs->rx.buf)); - - total = b_putblk(&qcs->rx.buf, data, len); - qcs->rx.offset += total; - *done = total; + ret = ncb_add(&qcs->rx.ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE); + if (ret != NCB_RET_OK) { + if (ret == NCB_RET_DATA_REJ) { + /* TODO generate PROTOCOL_VIOLATION error */ + TRACE_DEVEL("leaving on data rejected", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, + qcc->conn, qcs); + } + else if (ret == NCB_RET_GAP_SIZE) { + TRACE_DEVEL("cannot bufferize frame due to gap size limit", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, + qcc->conn, qcs); + } + return 1; + } /* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */ - BUG_ON(qcs->rx.offset == qcs->rx.msd); - - if (total < len) { - TRACE_DEVEL("leaving on partially received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); - return 2; - } + BUG_ON(offset + len == qcs->rx.msd); if (fin) qcs->flags |= QC_SF_FIN_RECV; diff --git a/src/xprt_quic.c b/src/xprt_quic.c index d4f3ef0bf..4f3bcf766 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -2194,105 +2194,19 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt, struct quic_stream *strm_frm, struct quic_conn *qc) { - struct quic_rx_strm_frm *frm; - struct eb64_node *frm_node; struct qcs *qcs = NULL; - size_t done, buf_was_full; int ret; ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len, strm_frm->offset.key, strm_frm->fin, - (char *)strm_frm->data, &qcs, &done); + (char *)strm_frm->data, &qcs); - /* invalid frame */ - if (ret == 1) + /* frame rejected - packet must not be acknowledeged */ + if (ret) return 0; - /* already fully received offset */ - if (ret == 0 && done == 0) - return 1; - - /* frame not handled (partially or completely) must be buffered */ - if (ret == 2) { - frm = new_quic_rx_strm_frm(strm_frm, pkt); - if (!frm) { - TRACE_PROTO("Could not alloc RX STREAM frame", - QUIC_EV_CONN_PSTRM, qc); - return 0; - } - - /* frame partially handled by the MUX */ - if (done) { - BUG_ON(done >= frm->len); /* must never happen */ - frm->len -= done; - frm->data += done; - frm->offset_node.key += done; - } - - eb64_insert(&qcs->rx.frms, &frm->offset_node); - quic_rx_packet_refinc(pkt); - - /* interrupt only if frame was not received at all. */ - if (!done) - return 1; - } - - /* Decode the data if buffer is already full as it's not possible to - * dequeue a frame in this condition. - */ - if (b_full(&qcs->rx.buf)) + if (qcs) qcc_decode_qcs(qc->qcc, qcs); - - retry: - /* Frame received (partially or not) by the mux. - * If there is buffered frame for next offset, it may be possible to - * receive them now. - */ - frm_node = eb64_first(&qcs->rx.frms); - while (frm_node) { - frm = eb64_entry(frm_node, - struct quic_rx_strm_frm, offset_node); - - ret = qcc_recv(qc->qcc, qcs->id, frm->len, - frm->offset_node.key, frm->fin, - (char *)frm->data, &qcs, &done); - - BUG_ON(ret == 1); /* must never happen for buffered frames */ - - /* interrupt the parsing if the frame cannot be handled - * entirely for the moment only. - */ - if (ret == 2) { - if (done) { - BUG_ON(done >= frm->len); /* must never happen */ - frm->len -= done; - frm->data += done; - - eb64_delete(&frm->offset_node); - frm->offset_node.key += done; - eb64_insert(&qcs->rx.frms, &frm->offset_node); - } - break; - } - - /* Remove a newly received frame or an invalid one. */ - frm_node = eb64_next(frm_node); - eb64_delete(&frm->offset_node); - quic_rx_packet_refdec(frm->pkt); - pool_free(pool_head_quic_rx_strm_frm, frm); - } - - buf_was_full = b_full(&qcs->rx.buf); - /* Decode the received data. */ - qcc_decode_qcs(qc->qcc, qcs); - - /* Buffer was full so the reception was stopped. Now the buffer has - * space available thanks to qcc_decode_qcs(). We can now retry to - * handle more data. - */ - if (buf_was_full && !b_full(&qcs->rx.buf)) - goto retry; - return 1; }