diff --git a/src/mux_quic.c b/src/mux_quic.c index 939136cf8..1fd44612d 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -1096,18 +1096,33 @@ static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b) } } -/* Returns the current Rx buffer instance for stream. */ +/* Returns the Rx buffer instance for stream read offset. May be NULL if + * not already allocated. + */ static struct qc_stream_rxbuf *qcs_get_curr_rxbuf(struct qcs *qcs) { struct eb64_node *node; struct qc_stream_rxbuf *buf; node = eb64_first(&qcs->rx.bufs); + if (!node) + return NULL; + buf = container_of(node, struct qc_stream_rxbuf, off_node); + if (qcs->rx.offset < buf->off_node.key) { + /* first buffer allocated for a future offset */ + return NULL; + } + + /* Ensures obsolete buffer are not kept inside QCS */ + BUG_ON(buf->off_end < qcs->rx.offset); return buf; } -/* Returns the amount of data readable at stream current offset. */ +/* Returns the amount of data readable at stream on current buffer. Note + * that this does account for hypothetical contiguous data divided on other + * Rx buffers instances. + */ static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs) { struct qc_stream_rxbuf *b = qcs_get_curr_rxbuf(qcs); @@ -1211,6 +1226,9 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) if (qcc->glitches != prev_glitches) session_add_glitch_ctr(qcc->conn->owner, qcc->glitches - prev_glitches); + /* TODO not enough data in current rxbuf, merging required with next buffer */ + BUG_ON(rxbuf && !ret && qcs->rx.offset + ncb_data(&rxbuf->ncb, 0) == rxbuf->off_end); + if (ret < 0) { TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs); goto err; @@ -1232,8 +1250,20 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) if (ret) qcs_consume(qcs, ret, rxbuf); - if (ncb_is_empty(&rxbuf->ncb)) + if (ncb_is_empty(&rxbuf->ncb)) { qcs_free_rxbuf(qcs, rxbuf); + + /* Close QCS remotely if only one Rx buffer remains and + * all data with FIN already stored in it. This is + * necessary to be performed before app_ops rcv_buf to + * ensure FIN is correctly signalled. + */ + if (qcs->flags & QC_SF_SIZE_KNOWN && !eb_is_empty(&qcs->rx.bufs)) { + const ncb_sz_t avail = qcs_rx_avail_data(qcs); + if (qcs->rx.offset + avail == qcs->rx.offset_max) + qcs_close_remote(qcs); + } + } } if (ret || (!b_data(&b) && fin)) @@ -1537,6 +1567,62 @@ int qcc_install_app_ops(struct qcc *qcc, const struct qcc_app_ops *app_ops) return 1; } +/* Retrieves the Rx buffer instance usable to store STREAM data starting at + * . It is dynamically allocated if not already instantiated. + * must contains the size of the STREAM frame. It may be reduced by the + * function if data is too large relative to the buffer starting offset. + * Another buffer instance should be allocated to store the remaining data. + * + * Returns the buffer instance or NULL in case of error. + */ +static struct qc_stream_rxbuf *qcs_get_rxbuf(struct qcs *qcs, uint64_t offset, + uint64_t *len) +{ + struct qcc *qcc = qcs->qcc; + struct eb64_node *node; + struct qc_stream_rxbuf *buf; + struct ncbuf *ncbuf; + + TRACE_ENTER(QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs); + + node = eb64_lookup_le(&qcs->rx.bufs, offset); + if (node) + buf = container_of(node, struct qc_stream_rxbuf, off_node); + + if (!node || offset >= buf->off_end) { + const uint64_t aligned_off = offset - (offset % qmux_stream_rx_bufsz()); + + TRACE_DEVEL("allocating a new entry", QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs); + buf = pool_alloc(pool_head_qc_stream_rxbuf); + if (!buf) { + TRACE_ERROR("qcs rxbuf alloc error", QMUX_EV_QCC_RECV, qcc->conn, qcs); + goto err; + } + + buf->ncb = NCBUF_NULL; + buf->off_node.key = aligned_off; + buf->off_end = aligned_off + qmux_stream_rx_bufsz(); + eb64_insert(&qcs->rx.bufs, &buf->off_node); + } + + ncbuf = &buf->ncb; + if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) { + TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); + qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0); + goto err; + } + + if (offset + *len > buf->off_end) + *len = buf->off_end - offset; + + TRACE_LEAVE(QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs); + return buf; + + err: + TRACE_DEVEL("leaving on error", QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs); + return NULL; +} + /* Handle a new STREAM frame for stream with id . Payload is pointed by * with length and represents the offset . is set if * the QUIC frame FIN bit is set. @@ -1548,9 +1634,10 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, char fin, char *data) { const int fin_standalone = (!len && fin); - struct ncbuf *ncbuf; struct qcs *qcs; - enum ncb_ret ret; + enum ncb_ret ncb_ret; + uint64_t left; + int ret; TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); @@ -1634,36 +1721,23 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, offset = qcs->rx.offset; } - if (len) { - if (eb_is_empty(&qcs->rx.bufs)) { - struct qc_stream_rxbuf *buf; - buf = pool_alloc(pool_head_qc_stream_rxbuf); - if (!buf) { - TRACE_ERROR("rxbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); - qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0); - goto err; - } + left = len; + while (left) { + struct qc_stream_rxbuf *buf; + ncb_sz_t ncb_off; - buf->ncb = NCBUF_NULL; - buf->off_node.key = qcs->rx.offset; - buf->off_end = qcs->rx.offset + qmux_stream_rx_bufsz(); - eb64_insert(&qcs->rx.bufs, &buf->off_node); - - ncbuf = &buf->ncb; - } - else { - struct qc_stream_rxbuf *buf = qcs_get_curr_rxbuf(qcs); - ncbuf = &buf->ncb; - } - - if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) { - TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); + buf = qcs_get_rxbuf(qcs, offset, &len); + if (!buf) { + TRACE_ERROR("rxbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0); goto err; } - ret = ncb_add(ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE); - switch (ret) { + /* For oldest buffer, ncb_advance() may already have been performed. */ + ncb_off = offset - MAX(qcs->rx.offset, buf->off_node.key); + + ncb_ret = ncb_add(&buf->ncb, ncb_off, data, len, NCB_ADD_COMPARE); + switch (ncb_ret) { case NCB_RET_OK: break; @@ -1688,6 +1762,11 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, qcc->conn, qcs); return 1; } + + offset += len; + data += len; + left -= len; + len = left; } if (fin) @@ -1698,11 +1777,17 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, qcs_close_remote(qcs); } - if ((qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) || - unlikely(fin_standalone && qcs_is_close_remote(qcs))) { - qcc_decode_qcs(qcc, qcs); + while ((qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) || + unlikely(fin_standalone && qcs_is_close_remote(qcs))) { + + ret = qcc_decode_qcs(qcc, qcs); LIST_DEL_INIT(&qcs->el_recv); qcc_refresh_timeout(qcc); + + if (ret <= 0) + break; + + BUG_ON_HOT(fin_standalone); /* On fin_standalone should be NULL, which ensures no infinite loop. */ } out: @@ -2762,6 +2847,7 @@ static void qcc_wait_for_hs(struct qcc *qcc) static int qcc_io_recv(struct qcc *qcc) { struct qcs *qcs; + int ret; TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); @@ -2778,8 +2864,14 @@ static int qcc_io_recv(struct qcc *qcc) qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv); /* No need to add an uni local stream in recv_list. */ BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_local(qcc, qcs->id)); - qcc_decode_qcs(qcc, qcs); - LIST_DEL_INIT(&qcs->el_recv); + + while (qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) { + ret = qcc_decode_qcs(qcc, qcs); + LIST_DEL_INIT(&qcs->el_recv); + + if (ret <= 0) + break; + } } TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);