From 9b5f11cd3db37f0739b89d695b87c1373e623f50 Mon Sep 17 00:00:00 2001 From: Maxime Henrion Date: Fri, 30 Jan 2026 11:39:04 -0500 Subject: [PATCH] OPTIM: quic: rework the QUIC RX code Use an MPSC ring buffer to hold data for each datagram handler. Holding this data in a per-handler buffer avoids the HoL blocking we experienced when we had per-listener buffers with data from all threads mixed up in them. This also gets rid of the mt_list contention we were suffering before, that was causing some threads to be stuck for a significant amount of time, causing warnings and even crashes in some cases. --- include/haproxy/proto_quic.h | 2 + include/haproxy/quic_rx-t.h | 1 - include/haproxy/quic_sock-t.h | 15 +- include/haproxy/quic_sock.h | 1 + include/haproxy/receiver-t.h | 1 - src/proto_quic.c | 58 +----- src/quic_rx.c | 9 +- src/quic_sock.c | 359 ++++++++++++---------------------- src/quic_tp.c | 2 - 9 files changed, 140 insertions(+), 308 deletions(-) diff --git a/include/haproxy/proto_quic.h b/include/haproxy/proto_quic.h index 1da8da8fe..58bc720b5 100644 --- a/include/haproxy/proto_quic.h +++ b/include/haproxy/proto_quic.h @@ -25,6 +25,8 @@ #include #include +#define QUIC_MAX_UDP_PAYLOAD_SIZE 2048 + extern struct protocol proto_quic4; extern struct protocol proto_quic6; diff --git a/include/haproxy/quic_rx-t.h b/include/haproxy/quic_rx-t.h index 6b5a0c4c9..686e6fff1 100644 --- a/include/haproxy/quic_rx-t.h +++ b/include/haproxy/quic_rx-t.h @@ -2,7 +2,6 @@ #define _HAPROXY_RX_T_H extern struct pool_head *pool_head_quic_conn_rxbuf; -extern struct pool_head *pool_head_quic_dgram; extern struct pool_head *pool_head_quic_rx_packet; #include diff --git a/include/haproxy/quic_sock-t.h b/include/haproxy/quic_sock-t.h index 69f52b45f..eebafba52 100644 --- a/include/haproxy/quic_sock-t.h +++ b/include/haproxy/quic_sock-t.h @@ -4,6 +4,7 @@ #include #include +#include /* QUIC socket allocation strategy. */ enum quic_sock_mode { @@ -17,15 +18,6 @@ struct quic_accept_queue { struct tasklet *tasklet; /* task responsible to call listener_accept */ }; -/* Buffer used to receive QUIC datagrams on random thread and redispatch them - * to the connection thread. - */ -struct quic_receiver_buf { - struct buffer buf; /* storage for datagrams received. */ - struct list dgram_list; /* datagrams received with this rxbuf. */ - struct mt_list rxbuf_el; /* list element into receiver.rxbuf_list. */ -}; - #define QUIC_DGRAM_FL_REJECT 0x00000001 #define QUIC_DGRAM_FL_SEND_RETRY 0x00000002 @@ -41,15 +33,12 @@ struct quic_dgram { struct sockaddr_storage daddr; struct quic_conn *qc; - struct list recv_list; /* element pointing to quic_receiver_buf . */ - struct mt_list handler_list; /* element pointing to quic_dghdlr . */ - int flags; /* QUIC_DGRAM_FL_* values */ }; /* QUIC datagram handler */ struct quic_dghdlr { - struct mt_list dgrams; + struct mpring buf; /* MPSC ring buffer for datagrams. */ struct tasklet *task; }; diff --git a/include/haproxy/quic_sock.h b/include/haproxy/quic_sock.h index 26a2efdc1..e398d384a 100644 --- a/include/haproxy/quic_sock.h +++ b/include/haproxy/quic_sock.h @@ -45,6 +45,7 @@ struct connection *quic_sock_accept_conn(struct listener *l, int *status); struct task *quic_lstnr_dghdlr(struct task *t, void *ctx, unsigned int state); void quic_lstnr_sock_fd_iocb(int fd); +int quic_dgram_requeue(struct quic_dgram *dgram, int cid_tid); int qc_snd_buf(struct quic_conn *qc, const struct buffer *buf, size_t count, int flags, uint16_t gso_size); int qc_rcv_buf(struct quic_conn *qc); diff --git a/include/haproxy/receiver-t.h b/include/haproxy/receiver-t.h index 976cc70c7..320e04b47 100644 --- a/include/haproxy/receiver-t.h +++ b/include/haproxy/receiver-t.h @@ -81,7 +81,6 @@ struct receiver { struct shard_info *shard_info; /* points to info about the owning shard, NULL if single rx */ struct list proto_list; /* list in the protocol header */ #ifdef USE_QUIC - struct mt_list rxbuf_list; /* list of buffers to receive and dispatch QUIC datagrams. */ enum quic_sock_mode quic_mode; /* QUIC socket allocation strategy */ unsigned int quic_curr_handshake; /* count of active QUIC handshakes */ unsigned int quic_curr_accept; /* count of QUIC conns waiting for accept */ diff --git a/src/proto_quic.c b/src/proto_quic.c index 50b231b2a..fb606e323 100644 --- a/src/proto_quic.c +++ b/src/proto_quic.c @@ -56,11 +56,9 @@ struct quic_dghdlr *quic_dghdlrs; static uint64_t quic_mem_global; THREAD_LOCAL struct cshared quic_mem_diff; -/* Size of the internal buffer of QUIC RX buffer at the fd level */ +/* Size of the per-handler QUIC RX buffer at the fd level */ #define QUIC_RX_BUFSZ (1UL << 18) -DECLARE_STATIC_POOL(pool_head_quic_rxbuf, "quic_rxbuf", QUIC_RX_BUFSZ); - static int quic_bind_listener(struct listener *listener, char *errmsg, int errlen); static int quic_connect_server(struct connection *conn, int flags); static void quic_enable_listener(struct listener *listener); @@ -442,44 +440,6 @@ int quic_connect_server(struct connection *conn, int flags) return SF_ERR_NONE; /* connection is OK */ } -/* Allocate the RX buffers for listener. - * Return 1 if succeeded, 0 if not. - */ -static int quic_alloc_rxbufs_listener(struct listener *l) -{ - int i; - struct quic_receiver_buf *tmp; - - MT_LIST_INIT(&l->rx.rxbuf_list); - for (i = 0; i < my_popcountl(l->rx.bind_thread); i++) { - struct quic_receiver_buf *rxbuf; - char *buf; - - rxbuf = calloc(1, sizeof(*rxbuf)); - if (!rxbuf) - goto err; - - buf = pool_alloc(pool_head_quic_rxbuf); - if (!buf) { - free(rxbuf); - goto err; - } - - rxbuf->buf = b_make(buf, QUIC_RX_BUFSZ, 0, 0); - LIST_INIT(&rxbuf->dgram_list); - MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el); - } - - return 1; - - err: - while ((tmp = MT_LIST_POP(&l->rx.rxbuf_list, typeof(tmp), rxbuf_el))) { - pool_free(pool_head_quic_rxbuf, tmp->buf.area); - free(tmp); - } - return 0; -} - /* This function tries to bind a QUIC4/6 listener. It may return a warning or * an error message in if the message is at most bytes long * (including '\0'). Note that may be NULL if is also zero. @@ -535,12 +495,6 @@ static int quic_bind_listener(struct listener *listener, char *errmsg, int errle break; } - if (!quic_alloc_rxbufs_listener(listener)) { - msg = "could not initialize tx/rx rings"; - err |= ERR_WARN; - goto udp_return; - } - if (global.tune.frontend_rcvbuf) setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &global.tune.frontend_rcvbuf, sizeof(global.tune.frontend_rcvbuf)); @@ -632,6 +586,7 @@ REGISTER_PER_THREAD_INIT(quic_init_mem); static int quic_alloc_dghdlrs(void) { + char *buf; int i; quic_dghdlrs = calloc(global.nbthread, sizeof(*quic_dghdlrs)); @@ -643,6 +598,13 @@ static int quic_alloc_dghdlrs(void) for (i = 0; i < global.nbthread; i++) { struct quic_dghdlr *dghdlr = &quic_dghdlrs[i]; + buf = malloc(QUIC_RX_BUFSZ); + if (!buf) { + ha_alert("Failed to allocate the buffer for the quic datagram handler on thread %d.\n", i); + return 0; + } + mpring_init(&dghdlr->buf, buf, QUIC_RX_BUFSZ); + dghdlr->task = tasklet_new(); if (!dghdlr->task) { ha_alert("Failed to allocate the quic datagram handler on thread %d.\n", i); @@ -652,8 +614,6 @@ static int quic_alloc_dghdlrs(void) tasklet_set_tid(dghdlr->task, i); dghdlr->task->context = dghdlr; dghdlr->task->process = quic_lstnr_dghdlr; - - MT_LIST_INIT(&dghdlr->dgrams); } return 1; diff --git a/src/quic_rx.c b/src/quic_rx.c index f4c39b40e..576ecdcc8 100644 --- a/src/quic_rx.c +++ b/src/quic_rx.c @@ -37,7 +37,6 @@ #include DECLARE_POOL(pool_head_quic_conn_rxbuf, "quic_conn_rxbuf", QUIC_CONN_RX_BUFSZ); -DECLARE_TYPED_POOL(pool_head_quic_dgram, "quic_dgram", struct quic_dgram); DECLARE_TYPED_POOL(pool_head_quic_rx_packet, "quic_rx_packet", struct quic_rx_packet); /* Decode an expected packet number from its truncated value, @@ -2462,9 +2461,7 @@ int quic_dgram_parse(struct quic_dgram *dgram, struct quic_conn *from_qc, if (!qc) { if (new_tid >= 0) { TRACE_STATE("re-enqueue packet to conn thread", QUIC_EV_CONN_LPKT); - MT_LIST_APPEND(&quic_dghdlrs[new_tid].dgrams, - &dgram->handler_list); - tasklet_wakeup(quic_dghdlrs[new_tid].task); + quic_dgram_requeue(dgram, new_tid); pool_free(pool_head_quic_rx_packet, pkt); goto out; } @@ -2523,16 +2520,12 @@ int quic_dgram_parse(struct quic_dgram *dgram, struct quic_conn *from_qc, /* This must never happen. */ BUG_ON(pos > end); BUG_ON(pos < end || pos > dgram->buf + dgram->len); - /* Mark this datagram as consumed */ - HA_ATOMIC_STORE(&dgram->buf, NULL); out: TRACE_LEAVE(QUIC_EV_CONN_LPKT); return 0; err: - /* Mark this datagram as consumed as maybe at least some packets were parsed. */ - HA_ATOMIC_STORE(&dgram->buf, NULL); TRACE_LEAVE(QUIC_EV_CONN_LPKT); return -1; } diff --git a/src/quic_sock.c b/src/quic_sock.c index 38210b2a8..72a07f1b7 100644 --- a/src/quic_sock.c +++ b/src/quic_sock.c @@ -192,30 +192,29 @@ struct task *quic_lstnr_dghdlr(struct task *t, void *ctx, unsigned int state) { struct quic_dghdlr *dghdlr = ctx; struct quic_dgram *dgram; + size_t len; int max_dgrams = global.tune.maxpollevents; TRACE_ENTER(QUIC_EV_CONN_LPKT); - while ((dgram = MT_LIST_POP(&dghdlr->dgrams, typeof(dgram), handler_list))) { - if (quic_dgram_parse(dgram, NULL, dgram->owner)) { - /* TODO should we requeue the datagram ? */ + while ((dgram = mpring_read_begin(&dghdlr->buf, &len))) { + /* We ignore the return value of quic_dgram_parse() because + * whether it was successful or not, we still need to empty the + * ring buffer. Exiting early would leave us with data left to + * process, and no guarantee we would get woken up again. + */ + quic_dgram_parse(dgram, NULL, dgram->owner); + mpring_read_end(&dghdlr->buf, len); + + if (--max_dgrams <= 0) { + /* too much work done at once, come back here later */ + tasklet_wakeup((struct tasklet *)t); break; } - - if (--max_dgrams <= 0) - goto stop_here; } TRACE_LEAVE(QUIC_EV_CONN_LPKT); return t; - - stop_here: - /* too much work done at once, come back here later */ - if (!MT_LIST_ISEMPTY(&dghdlr->dgrams)) - tasklet_wakeup((struct tasklet *)t); - - TRACE_LEAVE(QUIC_EV_CONN_LPKT); - return t; } /* Retrieve the DCID from a QUIC datagram or packet at position, @@ -259,40 +258,13 @@ static int quic_get_dgram_dcid(unsigned char *pos, const unsigned char *end, goto leave; } - -/* Retrieve the DCID from the datagram found at position and deliver it to the - * correct datagram handler. - * Return 1 if a correct datagram could be found, 0 if not. - */ -static int quic_lstnr_dgram_dispatch(unsigned char *pos, size_t len, void *owner, - struct sockaddr_storage *saddr, - struct sockaddr_storage *daddr, - struct quic_dgram *new_dgram, struct list *dgrams) +/* Initialize a QUIC datagram. */ +static void quic_dgram_init(struct quic_dgram *dgram, + unsigned char *pos, size_t len, void *owner, + unsigned char *dcid, size_t dcid_len, + struct sockaddr_storage *saddr, + struct sockaddr_storage *daddr) { - struct quic_dgram *dgram; - unsigned char *dcid; - size_t dcid_len; - int cid_tid; - - if (!len || !quic_get_dgram_dcid(pos, pos + len, &dcid, &dcid_len)) - goto err; - - dgram = new_dgram ? new_dgram : pool_alloc(pool_head_quic_dgram); - if (!dgram) - goto err; - - if ((cid_tid = quic_get_cid_tid(dcid, dcid_len, saddr, pos, len)) < 0) { - /* Use the current thread if CID not found. If a clients opens - * a connection with multiple packets, it is possible that - * several threads will deal with datagrams sharing the same - * CID. For this reason, the CID tree insertion will be - * conducted as an atomic operation and the datagram ultimately - * redispatch by the late thread. - */ - cid_tid = tid; - } - - /* All the members must be initialized! */ dgram->obj_type = OBJ_TYPE_DGRAM; dgram->owner = owner; dgram->buf = pos; @@ -303,50 +275,6 @@ static int quic_lstnr_dgram_dispatch(unsigned char *pos, size_t len, void *owner dgram->daddr = *daddr; dgram->qc = NULL; dgram->flags = 0; - - /* Attached datagram to its quic_receiver_buf and quic_dghdlrs. */ - LIST_APPEND(dgrams, &dgram->recv_list); - MT_LIST_APPEND(&quic_dghdlrs[cid_tid].dgrams, &dgram->handler_list); - - /* typically quic_lstnr_dghdlr() */ - tasklet_wakeup(quic_dghdlrs[cid_tid].task); - - return 1; - - err: - pool_free(pool_head_quic_dgram, new_dgram); - return 0; -} - -/* This function is responsible to remove unused datagram attached in front of - * . Each instances will be freed until a not yet consumed datagram is - * found or end of the list is hit. The last unused datagram found is not freed - * and is instead returned so that the caller can reuse it if needed. - * - * Returns the last unused datagram or NULL if no occurrence found. - */ -static struct quic_dgram *quic_rxbuf_purge_dgrams(struct quic_receiver_buf *rbuf) -{ - struct quic_dgram *cur, *prev = NULL; - - while (!LIST_ISEMPTY(&rbuf->dgram_list)) { - cur = LIST_ELEM(rbuf->dgram_list.n, struct quic_dgram *, recv_list); - - /* Loop until a not yet consumed datagram is found. */ - if (HA_ATOMIC_LOAD(&cur->buf)) - break; - - /* Clear buffer of current unused datagram. */ - LIST_DELETE(&cur->recv_list); - b_del(&rbuf->buf, cur->len); - - /* Free last found unused datagram. */ - pool_free(pool_head_quic_dgram, prev); - prev = cur; - } - - /* Return last unused datagram found. */ - return prev; } /* Receive a single message from datagram socket . Data are placed in @@ -463,105 +391,132 @@ static ssize_t quic_recv(int fd, void *out, size_t len, return ret; } +/* Low-level function to write a datagram to the buffer of the handler thread. */ +static int quic_dgram_write(unsigned char *pos, size_t len, void *owner, + unsigned char *dcid, size_t dcid_len, + struct sockaddr_storage *saddr, + struct sockaddr_storage *daddr, + unsigned int cid_tid) +{ + struct mpring *dst; + struct quic_dgram *dgram; + unsigned char *data; + size_t bring_len; + void *buf; + + dst = &quic_dghdlrs[cid_tid].buf; + + bring_len = sizeof(struct quic_dgram) + len; + buf = mpring_write_reserve(dst, bring_len); + if (!buf) + return 0; + + dgram = buf; + quic_dgram_init(dgram, pos, len, owner, dcid, dcid_len, saddr, daddr); + + data = (unsigned char *)(dgram + 1); + memcpy(data, pos, len); + + dgram->dcid = data + (dgram->dcid - dgram->buf); + dgram->buf = data; + + mpring_write_commit(dst, buf, bring_len); + + /* typically quic_lstnr_dghdlr() */ + tasklet_wakeup(quic_dghdlrs[cid_tid].task); + + return 1; +} + +int quic_dgram_requeue(struct quic_dgram *dgram, int cid_tid) +{ + + return quic_dgram_write(dgram->buf, dgram->len, dgram->owner, + dgram->dcid, dgram->dcid_len, + &dgram->saddr, &dgram->daddr, cid_tid); +} + +/* Attempt to push a datagram to its handler thread. + * + * Returns 1 if successful, or 0 if the handler thread's buffer is full. If + * the datagram could not be pushed, it will be put on the appropriate pending + * lists. In all cases, take complete ownership of dgram - it won't be valid + * anymore after calling this function. + */ +static int quic_lstnr_dgram_dispatch(unsigned char *pos, size_t len, struct listener *l, + unsigned char *dcid, size_t dcid_len, + struct sockaddr_storage *saddr, + struct sockaddr_storage *daddr) +{ + struct proxy *px; + struct quic_counters *prx_counters; + int cid_tid; + + if (!len) + goto err; + + if (!dcid && !quic_get_dgram_dcid(pos, pos + len, &dcid, &dcid_len)) + goto err; + + if ((cid_tid = quic_get_cid_tid(dcid, dcid_len, saddr, pos, len)) < 0) { + /* Use the current thread if CID not found. If a clients opens + * a connection with multiple packets, it is possible that + * several threads will deal with datagrams sharing the same + * CID. For this reason, the CID tree insertion will be + * conducted as an atomic operation and the datagram ultimately + * redispatch by the late thread. + */ + cid_tid = tid; + } + + if (!quic_dgram_write(pos, len, l, dcid, dcid_len, saddr, daddr, cid_tid)) + goto err; + + return 1; + + err: + px = l->bind_conf->frontend; + prx_counters = EXTRA_COUNTERS_GET(px->extra_counters_fe, &quic_stats_module); + HA_ATOMIC_INC(&prx_counters->rxbuf_full); + return 0; +} + /* Function called on a read event from a listening socket. It tries * to handle as many connections as possible. */ void quic_lstnr_sock_fd_iocb(int fd) { ssize_t ret; - struct quic_receiver_buf *rxbuf; - struct buffer *buf; + unsigned char buf[QUIC_MAX_UDP_PAYLOAD_SIZE]; struct listener *l = objt_listener(fdtab[fd].owner); struct quic_transport_params *params; /* Source address */ struct sockaddr_storage saddr = {0}, daddr = {0}; - size_t max_sz, cspace; - struct quic_dgram *new_dgram; - unsigned char *dgram_buf; + size_t max_sz; int max_dgrams; BUG_ON(!l); - new_dgram = NULL; - if (!l) - return; - if (!(fdtab[fd].state & FD_POLL_IN) || !fd_recv_ready(fd)) return; - rxbuf = MT_LIST_POP(&l->rx.rxbuf_list, typeof(rxbuf), rxbuf_el); - if (!rxbuf) - goto out; - - buf = &rxbuf->buf; - max_dgrams = global.tune.maxpollevents; - start: - /* Try to reuse an existing dgram. Note that there is always at - * least one datagram to pick, except the first time we enter - * this function for this buffer. - */ - new_dgram = quic_rxbuf_purge_dgrams(rxbuf); - params = &l->bind_conf->quic_params; max_sz = params->max_udp_payload_size; - cspace = b_contig_space(buf); - if (cspace < max_sz) { - struct proxy *px = l->bind_conf->frontend; - struct quic_counters *prx_counters = EXTRA_COUNTERS_GET(px->extra_counters_fe, &quic_stats_module); - struct quic_dgram *dgram; + BUG_ON(max_sz > sizeof(buf)); - /* Do no mark as full, and do not try to consume it - * if the contiguous remaining space is not at the end - */ - if (b_tail(buf) + cspace < b_wrap(buf)) { - HA_ATOMIC_INC(&prx_counters->rxbuf_full); - goto out; - } - - /* Allocate a fake datagram, without data to locate - * the end of the RX buffer (required during purging). - */ - dgram = pool_alloc(pool_head_quic_dgram); - if (!dgram) - goto out; - - /* Initialize only the useful members of this fake datagram. */ - dgram->buf = NULL; - dgram->len = cspace; - /* Append this datagram only to the RX buffer list. It will - * not be treated by any datagram handler. - */ - LIST_APPEND(&rxbuf->dgram_list, &dgram->recv_list); - - /* Consume the remaining space */ - b_add(buf, cspace); - if (b_contig_space(buf) < max_sz) { - HA_ATOMIC_INC(&prx_counters->rxbuf_full); - goto out; - } - } - - dgram_buf = (unsigned char *)b_tail(buf); - ret = quic_recv(fd, dgram_buf, max_sz, + start: + ret = quic_recv(fd, buf, sizeof(buf), (struct sockaddr *)&saddr, sizeof(saddr), (struct sockaddr *)&daddr, sizeof(daddr), get_net_port(&l->rx.addr), 1); if (ret <= 0) - goto out; + return; + + quic_lstnr_dgram_dispatch(buf, ret, l, NULL, 0, &saddr, &daddr); - b_add(buf, ret); - if (!quic_lstnr_dgram_dispatch(dgram_buf, ret, l, &saddr, &daddr, - new_dgram, &rxbuf->dgram_list)) { - /* If wrong, consume this datagram */ - b_sub(buf, ret); - } - new_dgram = NULL; if (--max_dgrams > 0) goto start; - out: - pool_free(pool_head_quic_dgram, new_dgram); - MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el); } /* FD-owned quic-conn socket callback. */ @@ -864,9 +819,10 @@ int qc_snd_buf(struct quic_conn *qc, const struct buffer *buf, size_t sz, int qc_rcv_buf(struct quic_conn *qc) { struct sockaddr_storage saddr = {0}, daddr = {0}; - struct quic_dgram *new_dgram = NULL; + struct quic_dgram dgram, *new_dgram; struct buffer buf = BUF_NULL; - unsigned char *dgram_buf; + unsigned char *dgram_buf, *dcid; + size_t dcid_len; ssize_t ret = 0; struct listener *l = qc->li; @@ -875,6 +831,7 @@ int qc_rcv_buf(struct quic_conn *qc) TRACE_ENTER(QUIC_EV_CONN_RCV, qc); + new_dgram = &dgram; do { if (!b_alloc(&buf, DB_MUX_RX)) break; /* TODO subscribe for memory again available. */ @@ -882,10 +839,6 @@ int qc_rcv_buf(struct quic_conn *qc) b_reset(&buf); BUG_ON(b_contig_space(&buf) < qc->max_udp_payload); - /* Allocate datagram on first loop or after requeuing. */ - if (!new_dgram && !(new_dgram = pool_alloc(pool_head_quic_dgram))) - break; /* TODO subscribe for memory again available. */ - dgram_buf = (unsigned char *)b_tail(&buf); ret = quic_recv(qc->fd, dgram_buf, qc->max_udp_payload, (struct sockaddr *)&saddr, sizeof(saddr), @@ -901,90 +854,28 @@ int qc_rcv_buf(struct quic_conn *qc) b_add(&buf, ret); - new_dgram->obj_type = OBJ_TYPE_DGRAM; - new_dgram->buf = dgram_buf; - new_dgram->len = ret; - new_dgram->dcid_len = 0; - new_dgram->dcid = NULL; - new_dgram->saddr = saddr; - new_dgram->daddr = daddr; - new_dgram->qc = NULL; /* set later via quic_dgram_parse() */ - new_dgram->flags = 0; - TRACE_DEVEL("read datagram", QUIC_EV_CONN_RCV, qc, new_dgram); - if (!quic_get_dgram_dcid(new_dgram->buf, - new_dgram->buf + new_dgram->len, - &new_dgram->dcid, &new_dgram->dcid_len)) { + if (!quic_get_dgram_dcid(dgram_buf, dgram_buf + ret, &dcid, &dcid_len)) continue; - } - if (l && !qc_check_dcid(qc, new_dgram->dcid, new_dgram->dcid_len)) { + if (l && !qc_check_dcid(qc, dcid, dcid_len)) { /* Datagram received by error on the connection FD, dispatch it * to its associated quic-conn. * * TODO count redispatch datagrams. */ - struct quic_receiver_buf *rxbuf; - struct quic_dgram *tmp_dgram; - unsigned char *rxbuf_tail; - size_t cspace; - TRACE_STATE("datagram for other connection on quic-conn socket, requeue it", QUIC_EV_CONN_RCV, qc); - rxbuf = MT_LIST_POP(&l->rx.rxbuf_list, typeof(rxbuf), rxbuf_el); - ASSUME_NONNULL(rxbuf); - cspace = b_contig_space(&rxbuf->buf); - - tmp_dgram = quic_rxbuf_purge_dgrams(rxbuf); - pool_free(pool_head_quic_dgram, tmp_dgram); - - /* Insert a fake datagram if space wraps to consume it. */ - if (cspace < new_dgram->len && b_space_wraps(&rxbuf->buf)) { - struct quic_dgram *fake_dgram = pool_alloc(pool_head_quic_dgram); - if (!fake_dgram) { - /* TODO count lost datagrams */ - MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el); - continue; - } - - fake_dgram->buf = NULL; - fake_dgram->len = cspace; - LIST_APPEND(&rxbuf->dgram_list, &fake_dgram->recv_list); - b_add(&rxbuf->buf, cspace); - } - - /* Recheck contig space after fake datagram insert. */ - if (b_contig_space(&rxbuf->buf) < new_dgram->len) { - /* TODO count lost datagrams */ - MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el); - continue; - } - - rxbuf_tail = (unsigned char *)b_tail(&rxbuf->buf); - __b_putblk(&rxbuf->buf, (char *)dgram_buf, new_dgram->len); - if (!quic_lstnr_dgram_dispatch(rxbuf_tail, ret, l, &saddr, &daddr, - new_dgram, &rxbuf->dgram_list)) { - /* TODO count lost datagrams. */ - b_sub(&buf, ret); - } - else { - /* datagram must not be freed as it was requeued. */ - new_dgram = NULL; - } - - MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el); + quic_lstnr_dgram_dispatch(dgram_buf, ret, l, dcid, dcid_len, &saddr, &daddr); continue; } + quic_dgram_init(new_dgram, dgram_buf, ret, NULL, dcid, dcid_len, &saddr, &daddr); quic_dgram_parse(new_dgram, qc, l ? &l->obj_type : (qc->conn ? &__objt_server(qc->conn->target)->obj_type : NULL)); - /* A datagram must always be consumed after quic_parse_dgram(). */ - BUG_ON(new_dgram->buf); } while (ret > 0); - pool_free(pool_head_quic_dgram, new_dgram); - if (b_size(&buf)) { b_free(&buf); offer_buffers(NULL, 1); diff --git a/src/quic_tp.c b/src/quic_tp.c index ea27efcc7..5f75de30b 100644 --- a/src/quic_tp.c +++ b/src/quic_tp.c @@ -10,8 +10,6 @@ #include #include -#define QUIC_MAX_UDP_PAYLOAD_SIZE 2048 - static int qc_early_tranport_params_validate(struct quic_conn *qc, struct quic_transport_params *p, struct quic_early_transport_params *e);