diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index a78a9024b..241aa60e4 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -185,7 +185,6 @@ struct qcs { struct session *sess; struct qcc *qcc; struct eb64_node by_id; /* place in qcc's streams_by_id */ - struct eb_root frms; uint64_t id; /* stream ID */ uint32_t flags; /* QC_SF_* */ struct { @@ -194,12 +193,15 @@ struct qcs { uint64_t offset; /* the current offset of received data */ uint64_t bytes; /* number of bytes received */ struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */ + struct eb_root frms; /* received frames ordered by their offsets */ } rx; struct { enum qcs_tx_st st; /* TX state */ uint64_t max_data; /* maximum number of bytes which may be sent */ uint64_t offset; /* the current offset of data to send */ uint64_t bytes; /* number of bytes sent */ + uint64_t ack_offset; /* last acked ordered byte offset */ + struct eb_root acked_frms; /* acked frames ordered by their offsets */ struct buffer buf; /* transmit buffer, always valid (buf_empty or real buffer) */ struct buffer mbuf[QCC_MBUF_CNT]; uint64_t left; /* data currently stored in mbuf waiting for send */ diff --git a/include/haproxy/quic_frame-t.h b/include/haproxy/quic_frame-t.h index c8a396b4f..13b4cbfa0 100644 --- a/include/haproxy/quic_frame-t.h +++ b/include/haproxy/quic_frame-t.h @@ -31,6 +31,8 @@ #include +#include + /* QUIC frame types. */ enum quic_frame_type { QUIC_FT_PADDING = 0x00, @@ -141,7 +143,9 @@ struct quic_new_token { struct quic_stream { uint64_t id; - uint64_t offset; + struct qcs *qcs; + struct buffer *buf; + struct eb64_node offset; uint64_t len; const unsigned char *data; }; diff --git a/include/haproxy/quic_frame.h b/include/haproxy/quic_frame.h index a5c022acc..917b6675e 100644 --- a/include/haproxy/quic_frame.h +++ b/include/haproxy/quic_frame.h @@ -72,7 +72,7 @@ static inline size_t qc_frm_len(struct quic_frame *frm) case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F: { struct quic_stream *f = &frm->stream; len += 1 + quic_int_getsize(f->id) + - ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(f->offset) : 0) + + ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(f->offset.key) : 0) + ((frm->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) ? quic_int_getsize(f->len) : 0) + f->len; break; } diff --git a/src/mux_quic.c b/src/mux_quic.c index cb710118c..886f3c493 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -970,19 +970,20 @@ struct qcs *bidi_qcs_new(struct qcc *qcc, uint64_t id) qcs->qcc = qcc; qcs->cs = NULL; qcs->id = qcs->by_id.key = id; - qcs->frms = EB_ROOT_UNIQUE; qcs->flags = QC_SF_NONE; qcs->rx.buf = BUF_NULL; qcs->rx.st = QC_RX_SS_IDLE; qcs->rx.bytes = qcs->rx.offset = 0; qcs->rx.max_data = qcc->strms[qcs_type].rx.max_data; - qcs->rx.buf = BUF_NULL; + qcs->rx.frms = EB_ROOT_UNIQUE; + qcs->tx.st = QC_TX_SS_IDLE; - qcs->tx.bytes = qcs->tx.offset = 0; + qcs->tx.bytes = qcs->tx.offset = qcs->tx.ack_offset = 0; + qcs->tx.acked_frms = EB_ROOT_UNIQUE; qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data; - qcs->tx.buf = BUF_NULL; + qcs->tx.buf = BUF_NULL; br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0])); qcs->tx.left = 0; @@ -1041,13 +1042,13 @@ struct qcs *luqs_new(struct qcc *qcc) qcs->qcc = qcc; qcs->cs = NULL; qcs->id = qcs->by_id.key = next_id; - qcs->frms = EB_ROOT_UNIQUE; qcs->flags = QC_SF_NONE; - qcs->tx.st = QC_TX_SS_IDLE; - qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data; - qcs->tx.offset = qcs->tx.bytes = 0; - qcs->tx.buf = BUF_NULL; + qcs->tx.st = QC_TX_SS_IDLE; + qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data; + qcs->tx.offset = qcs->tx.bytes = qcs->tx.ack_offset = 0; + qcs->tx.acked_frms = EB_ROOT_UNIQUE; + qcs->tx.buf = BUF_NULL; br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0])); qcs->tx.left = 0; @@ -1083,13 +1084,13 @@ struct qcs *ruqs_new(struct qcc *qcc, uint64_t id) qcs->qcc = qcc; qcs->id = qcs->by_id.key = id; - qcs->frms = EB_ROOT_UNIQUE; qcs->flags = QC_SF_NONE; qcs->rx.st = QC_RX_SS_IDLE; qcs->rx.max_data = qcc->strms[qcs_type].rx.max_data; qcs->rx.offset = qcs->rx.bytes = 0; qcs->rx.buf = BUF_NULL; + qcs->rx.frms = EB_ROOT_UNIQUE; br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0])); qcs->tx.left = 0; @@ -1396,12 +1397,12 @@ leave: static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset) { struct quic_frame *frm; - struct buffer buf = BUF_NULL; + struct buffer *buf = &qcs->tx.buf; + struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP]; int total = 0; - qc_get_buf(qcs->qcc, &buf); - total = b_xfer(&buf, payload, b_data(payload)); - + qc_get_buf(qcs->qcc, buf); + total = b_force_xfer(buf, payload, QUIC_MIN(b_data(payload), b_room(buf))); frm = pool_zalloc(pool_head_quic_frame); if (!frm) goto err; @@ -1411,16 +1412,16 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT; if (offset) { frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT; - frm->stream.offset = offset; + frm->stream.offset.key = offset; } + frm->stream.qcs = qcs; + frm->stream.buf = buf; frm->stream.id = qcs->by_id.key; if (total) { frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT; frm->stream.len = total; - frm->stream.data = (unsigned char *)b_head(&buf); } - struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP]; MT_LIST_APPEND(&qel->pktns->tx.frms, &frm->mt_list); fprintf(stderr, "%s: total=%d fin=%d offset=%lu\n", __func__, total, fin, offset); return total; diff --git a/src/quic_frame.c b/src/quic_frame.c index 4f0746215..51b073d06 100644 --- a/src/quic_frame.c +++ b/src/quic_frame.c @@ -375,15 +375,29 @@ static int quic_build_stream_frame(unsigned char **buf, const unsigned char *end struct quic_frame *frm, struct quic_conn *conn) { struct quic_stream *stream = &frm->stream; + size_t offset, block1, block2; + struct buffer b; if (!quic_enc_int(buf, end, stream->id) || - ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) && !quic_enc_int(buf, end, stream->offset)) || + ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) && !quic_enc_int(buf, end, stream->offset.key)) || ((frm->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) && (!quic_enc_int(buf, end, stream->len) || end - *buf < stream->len))) return 0; - memcpy(*buf, stream->data, stream->len); - *buf += stream->len; + /* Buffer copy */ + b = *stream->buf; + offset = (frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? + stream->offset.key & (b_size(stream->buf) - 1): 0; + block1 = b_wrap(&b) - (b_orig(&b) + offset); + if (block1 > stream->len) + block1 = stream->len; + block2 = stream->len - block1; + memcpy(*buf, b_orig(&b) + offset, block1); + *buf += block1; + if (block2) { + memcpy(*buf, b_orig(&b), block2); + *buf += block2; + } return 1; } @@ -401,9 +415,9 @@ static int quic_parse_stream_frame(struct quic_frame *frm, struct quic_conn *qc, /* Offset parsing */ if (!(frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)) { - stream->offset = 0; + stream->offset.key = 0; } - else if (!quic_dec_int(&stream->offset, buf, end)) + else if (!quic_dec_int((uint64_t *)&stream->offset.key, buf, end)) return 0; /* Length parsing */ diff --git a/src/xprt_quic.c b/src/xprt_quic.c index 0fdd45d9c..40346f9bf 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -571,7 +571,7 @@ static void quic_trace(enum trace_level level, uint64_t mask, const struct trace !!(s->id & QUIC_STREAM_FRAME_ID_DIR_BIT), !!(frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT), (unsigned long long)s->id, - (unsigned long long)s->offset, + (unsigned long long)s->offset.key, (unsigned long long)s->len); } } @@ -1149,13 +1149,56 @@ static int qc_pkt_decrypt(struct quic_rx_packet *pkt, struct quic_tls_ctx *tls_c return 1; } +/* Remove from stream the acknowledged frames. + * Never fails. + */ +static void qcs_try_to_consume(struct qcs *qcs) +{ + struct eb64_node *frm_node; + + frm_node = eb64_first(&qcs->tx.acked_frms); + while (frm_node) { + struct quic_stream *strm; + + strm = eb64_entry(&frm_node->node, struct quic_stream, offset); + if (strm->offset.key != qcs->tx.ack_offset) + break; + + b_del(strm->buf, strm->len); + qcs->tx.ack_offset += strm->len; + frm_node = eb64_next(frm_node); + eb64_delete(&strm->offset); + } +} + /* Treat frame whose packet it is attached to has just been acknowledged. */ static inline void qc_treat_acked_tx_frm(struct quic_frame *frm, struct ssl_sock_ctx *ctx) { + TRACE_PROTO("Removing frame", QUIC_EV_CONN_PRSAFRM, ctx->conn, frm); - LIST_DELETE(&frm->list); - pool_free(pool_head_quic_frame, frm); + switch (frm->type) { + case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F: + { + struct qcs *qcs = frm->stream.qcs; + struct quic_stream *strm = &frm->stream; + + if (qcs->tx.ack_offset == strm->offset.key) { + b_del(strm->buf, strm->len); + qcs->tx.ack_offset += strm->len; + LIST_DELETE(&frm->list); + pool_free(pool_head_quic_frame, frm); + } + else { + eb64_insert(&qcs->tx.acked_frms, &strm->offset); + } + qcs_try_to_consume(qcs); + } + break; + default: + LIST_DELETE(&frm->list); + pool_free(pool_head_quic_frame, frm); + } } /* Remove down to node entries from tree of TX packet, @@ -1582,7 +1625,7 @@ struct quic_rx_strm_frm *new_quic_rx_strm_frm(struct quic_stream *stream_frm, frm = pool_alloc(pool_head_quic_rx_strm_frm); if (frm) { - frm->offset_node.key = stream_frm->offset; + frm->offset_node.key = stream_frm->offset.key; frm->len = stream_frm->len; frm->data = stream_frm->data; frm->pkt = pkt; @@ -1686,7 +1729,7 @@ static size_t qc_strm_cpy(struct buffer *buf, struct quic_stream *strm_frm) try = strm_frm->len; memcpy(b_tail(buf), strm_frm->data, try); strm_frm->len -= try; - strm_frm->offset += try; + strm_frm->offset.key += try; b_add(buf, try); ret += try; } @@ -1715,7 +1758,7 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt, } strm = eb64_entry(&strm_node->node, struct qcs, by_id); - frm_node = eb64_lookup(&strm->frms, strm_frm->offset); + frm_node = eb64_lookup(&strm->rx.frms, strm_frm->offset.key); /* FIXME: handle the case where this frame overlap others */ if (frm_node) { TRACE_PROTO("Already existing stream data", @@ -1723,7 +1766,7 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt, goto out; } - if (strm_frm->offset == strm->rx.offset) { + if (strm_frm->offset.key == strm->rx.offset) { int ret; if (!qc_get_buf(qc->qcc, &strm->rx.buf)) @@ -1749,7 +1792,7 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt, return 0; } - eb64_insert(&strm->frms, &frm->offset_node); + eb64_insert(&strm->rx.frms, &frm->offset_node); quic_rx_packet_refinc(pkt); out: @@ -1778,7 +1821,7 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt, } strm = eb64_entry(&strm_node->node, struct qcs, by_id); - frm_node = eb64_lookup(&strm->frms, strm_frm->offset); + frm_node = eb64_lookup(&strm->rx.frms, strm_frm->offset.key); /* FIXME: handle the case where this frame overlap others */ if (frm_node) { TRACE_PROTO("Already existing stream data", @@ -1787,7 +1830,7 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt, } strm_frm_len = strm_frm->len; - if (strm_frm->offset == strm->rx.offset) { + if (strm_frm->offset.key == strm->rx.offset) { int ret; if (!qc_get_buf(qc->qcc, &strm->rx.buf)) @@ -1806,7 +1849,7 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt, if (ret) ruqs_notify_recv(strm); - strm_frm->offset += ret; + strm_frm->offset.key += ret; } /* Take this frame into an account for the stream flow control */ strm->rx.offset += strm_frm_len; @@ -1824,7 +1867,7 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt, return 0; } - eb64_insert(&strm->frms, &frm->offset_node); + eb64_insert(&strm->rx.frms, &frm->offset_node); quic_rx_packet_refinc(pkt); out: @@ -3719,11 +3762,11 @@ static inline int qc_build_frms(struct quic_tx_packet *pkt, * excepting the variable ones. Note that +1 is for the type of this frame. */ hlen = 1 + quic_int_getsize(cf->stream.id) + - ((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset) : 0); + ((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset.key) : 0); /* Compute the data length of this STREAM frame. */ avail_room = room - hlen - *len; if ((ssize_t)avail_room <= 0) - continue; + break; if (cf->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) { dlen = max_available_room(avail_room, &dlen_sz); @@ -3761,6 +3804,8 @@ static inline int qc_build_frms(struct quic_tx_packet *pkt, } new_cf->type = cf->type; + new_cf->stream.qcs = cf->stream.qcs; + new_cf->stream.buf = cf->stream.buf; new_cf->stream.id = cf->stream.id; if (cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) new_cf->stream.offset = cf->stream.offset; @@ -3773,7 +3818,7 @@ static inline int qc_build_frms(struct quic_tx_packet *pkt, cf->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT; /* Consume bytes of the current frame. */ cf->stream.len -= dlen; - cf->stream.offset += dlen; + cf->stream.offset.key += dlen; cf->stream.data += dlen; } break;