mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-05-01 19:20:59 +02:00
OPTIM: quic: rework the QUIC RX code
Use an MPSC ring buffer to hold data for each datagram handler. Holding this data in a per-handler buffer avoids the HoL blocking we experienced when we had per-listener buffers with data from all threads mixed up in them. This also gets rid of the mt_list contention we were suffering before, that was causing some threads to be stuck for a significant amount of time, causing warnings and even crashes in some cases.
This commit is contained in:
parent
57d8f06215
commit
9b5f11cd3d
@ -25,6 +25,8 @@
|
||||
#include <haproxy/api-t.h>
|
||||
#include <haproxy/thread-t.h>
|
||||
|
||||
#define QUIC_MAX_UDP_PAYLOAD_SIZE 2048
|
||||
|
||||
extern struct protocol proto_quic4;
|
||||
extern struct protocol proto_quic6;
|
||||
|
||||
|
||||
@ -2,7 +2,6 @@
|
||||
#define _HAPROXY_RX_T_H
|
||||
|
||||
extern struct pool_head *pool_head_quic_conn_rxbuf;
|
||||
extern struct pool_head *pool_head_quic_dgram;
|
||||
extern struct pool_head *pool_head_quic_rx_packet;
|
||||
|
||||
#include <import/eb64tree.h>
|
||||
|
||||
@ -4,6 +4,7 @@
|
||||
|
||||
#include <haproxy/buf-t.h>
|
||||
#include <haproxy/obj_type-t.h>
|
||||
#include <haproxy/mpring.h>
|
||||
|
||||
/* QUIC socket allocation strategy. */
|
||||
enum quic_sock_mode {
|
||||
@ -17,15 +18,6 @@ struct quic_accept_queue {
|
||||
struct tasklet *tasklet; /* task responsible to call listener_accept */
|
||||
};
|
||||
|
||||
/* Buffer used to receive QUIC datagrams on random thread and redispatch them
|
||||
* to the connection thread.
|
||||
*/
|
||||
struct quic_receiver_buf {
|
||||
struct buffer buf; /* storage for datagrams received. */
|
||||
struct list dgram_list; /* datagrams received with this rxbuf. */
|
||||
struct mt_list rxbuf_el; /* list element into receiver.rxbuf_list. */
|
||||
};
|
||||
|
||||
#define QUIC_DGRAM_FL_REJECT 0x00000001
|
||||
#define QUIC_DGRAM_FL_SEND_RETRY 0x00000002
|
||||
|
||||
@ -41,15 +33,12 @@ struct quic_dgram {
|
||||
struct sockaddr_storage daddr;
|
||||
struct quic_conn *qc;
|
||||
|
||||
struct list recv_list; /* element pointing to quic_receiver_buf <dgram_list>. */
|
||||
struct mt_list handler_list; /* element pointing to quic_dghdlr <dgrams>. */
|
||||
|
||||
int flags; /* QUIC_DGRAM_FL_* values */
|
||||
};
|
||||
|
||||
/* QUIC datagram handler */
|
||||
struct quic_dghdlr {
|
||||
struct mt_list dgrams;
|
||||
struct mpring buf; /* MPSC ring buffer for datagrams. */
|
||||
struct tasklet *task;
|
||||
};
|
||||
|
||||
|
||||
@ -45,6 +45,7 @@ struct connection *quic_sock_accept_conn(struct listener *l, int *status);
|
||||
|
||||
struct task *quic_lstnr_dghdlr(struct task *t, void *ctx, unsigned int state);
|
||||
void quic_lstnr_sock_fd_iocb(int fd);
|
||||
int quic_dgram_requeue(struct quic_dgram *dgram, int cid_tid);
|
||||
int qc_snd_buf(struct quic_conn *qc, const struct buffer *buf, size_t count,
|
||||
int flags, uint16_t gso_size);
|
||||
int qc_rcv_buf(struct quic_conn *qc);
|
||||
|
||||
@ -81,7 +81,6 @@ struct receiver {
|
||||
struct shard_info *shard_info; /* points to info about the owning shard, NULL if single rx */
|
||||
struct list proto_list; /* list in the protocol header */
|
||||
#ifdef USE_QUIC
|
||||
struct mt_list rxbuf_list; /* list of buffers to receive and dispatch QUIC datagrams. */
|
||||
enum quic_sock_mode quic_mode; /* QUIC socket allocation strategy */
|
||||
unsigned int quic_curr_handshake; /* count of active QUIC handshakes */
|
||||
unsigned int quic_curr_accept; /* count of QUIC conns waiting for accept */
|
||||
|
||||
@ -56,11 +56,9 @@ struct quic_dghdlr *quic_dghdlrs;
|
||||
static uint64_t quic_mem_global;
|
||||
THREAD_LOCAL struct cshared quic_mem_diff;
|
||||
|
||||
/* Size of the internal buffer of QUIC RX buffer at the fd level */
|
||||
/* Size of the per-handler QUIC RX buffer at the fd level */
|
||||
#define QUIC_RX_BUFSZ (1UL << 18)
|
||||
|
||||
DECLARE_STATIC_POOL(pool_head_quic_rxbuf, "quic_rxbuf", QUIC_RX_BUFSZ);
|
||||
|
||||
static int quic_bind_listener(struct listener *listener, char *errmsg, int errlen);
|
||||
static int quic_connect_server(struct connection *conn, int flags);
|
||||
static void quic_enable_listener(struct listener *listener);
|
||||
@ -442,44 +440,6 @@ int quic_connect_server(struct connection *conn, int flags)
|
||||
return SF_ERR_NONE; /* connection is OK */
|
||||
}
|
||||
|
||||
/* Allocate the RX buffers for <l> listener.
|
||||
* Return 1 if succeeded, 0 if not.
|
||||
*/
|
||||
static int quic_alloc_rxbufs_listener(struct listener *l)
|
||||
{
|
||||
int i;
|
||||
struct quic_receiver_buf *tmp;
|
||||
|
||||
MT_LIST_INIT(&l->rx.rxbuf_list);
|
||||
for (i = 0; i < my_popcountl(l->rx.bind_thread); i++) {
|
||||
struct quic_receiver_buf *rxbuf;
|
||||
char *buf;
|
||||
|
||||
rxbuf = calloc(1, sizeof(*rxbuf));
|
||||
if (!rxbuf)
|
||||
goto err;
|
||||
|
||||
buf = pool_alloc(pool_head_quic_rxbuf);
|
||||
if (!buf) {
|
||||
free(rxbuf);
|
||||
goto err;
|
||||
}
|
||||
|
||||
rxbuf->buf = b_make(buf, QUIC_RX_BUFSZ, 0, 0);
|
||||
LIST_INIT(&rxbuf->dgram_list);
|
||||
MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el);
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
||||
err:
|
||||
while ((tmp = MT_LIST_POP(&l->rx.rxbuf_list, typeof(tmp), rxbuf_el))) {
|
||||
pool_free(pool_head_quic_rxbuf, tmp->buf.area);
|
||||
free(tmp);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* This function tries to bind a QUIC4/6 listener. It may return a warning or
|
||||
* an error message in <errmsg> if the message is at most <errlen> bytes long
|
||||
* (including '\0'). Note that <errmsg> may be NULL if <errlen> is also zero.
|
||||
@ -535,12 +495,6 @@ static int quic_bind_listener(struct listener *listener, char *errmsg, int errle
|
||||
break;
|
||||
}
|
||||
|
||||
if (!quic_alloc_rxbufs_listener(listener)) {
|
||||
msg = "could not initialize tx/rx rings";
|
||||
err |= ERR_WARN;
|
||||
goto udp_return;
|
||||
}
|
||||
|
||||
if (global.tune.frontend_rcvbuf)
|
||||
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &global.tune.frontend_rcvbuf, sizeof(global.tune.frontend_rcvbuf));
|
||||
|
||||
@ -632,6 +586,7 @@ REGISTER_PER_THREAD_INIT(quic_init_mem);
|
||||
|
||||
static int quic_alloc_dghdlrs(void)
|
||||
{
|
||||
char *buf;
|
||||
int i;
|
||||
|
||||
quic_dghdlrs = calloc(global.nbthread, sizeof(*quic_dghdlrs));
|
||||
@ -643,6 +598,13 @@ static int quic_alloc_dghdlrs(void)
|
||||
for (i = 0; i < global.nbthread; i++) {
|
||||
struct quic_dghdlr *dghdlr = &quic_dghdlrs[i];
|
||||
|
||||
buf = malloc(QUIC_RX_BUFSZ);
|
||||
if (!buf) {
|
||||
ha_alert("Failed to allocate the buffer for the quic datagram handler on thread %d.\n", i);
|
||||
return 0;
|
||||
}
|
||||
mpring_init(&dghdlr->buf, buf, QUIC_RX_BUFSZ);
|
||||
|
||||
dghdlr->task = tasklet_new();
|
||||
if (!dghdlr->task) {
|
||||
ha_alert("Failed to allocate the quic datagram handler on thread %d.\n", i);
|
||||
@ -652,8 +614,6 @@ static int quic_alloc_dghdlrs(void)
|
||||
tasklet_set_tid(dghdlr->task, i);
|
||||
dghdlr->task->context = dghdlr;
|
||||
dghdlr->task->process = quic_lstnr_dghdlr;
|
||||
|
||||
MT_LIST_INIT(&dghdlr->dgrams);
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
||||
@ -37,7 +37,6 @@
|
||||
#include <haproxy/trace.h>
|
||||
|
||||
DECLARE_POOL(pool_head_quic_conn_rxbuf, "quic_conn_rxbuf", QUIC_CONN_RX_BUFSZ);
|
||||
DECLARE_TYPED_POOL(pool_head_quic_dgram, "quic_dgram", struct quic_dgram);
|
||||
DECLARE_TYPED_POOL(pool_head_quic_rx_packet, "quic_rx_packet", struct quic_rx_packet);
|
||||
|
||||
/* Decode an expected packet number from <truncated_on> its truncated value,
|
||||
@ -2462,9 +2461,7 @@ int quic_dgram_parse(struct quic_dgram *dgram, struct quic_conn *from_qc,
|
||||
if (!qc) {
|
||||
if (new_tid >= 0) {
|
||||
TRACE_STATE("re-enqueue packet to conn thread", QUIC_EV_CONN_LPKT);
|
||||
MT_LIST_APPEND(&quic_dghdlrs[new_tid].dgrams,
|
||||
&dgram->handler_list);
|
||||
tasklet_wakeup(quic_dghdlrs[new_tid].task);
|
||||
quic_dgram_requeue(dgram, new_tid);
|
||||
pool_free(pool_head_quic_rx_packet, pkt);
|
||||
goto out;
|
||||
}
|
||||
@ -2523,16 +2520,12 @@ int quic_dgram_parse(struct quic_dgram *dgram, struct quic_conn *from_qc,
|
||||
/* This must never happen. */
|
||||
BUG_ON(pos > end);
|
||||
BUG_ON(pos < end || pos > dgram->buf + dgram->len);
|
||||
/* Mark this datagram as consumed */
|
||||
HA_ATOMIC_STORE(&dgram->buf, NULL);
|
||||
|
||||
out:
|
||||
TRACE_LEAVE(QUIC_EV_CONN_LPKT);
|
||||
return 0;
|
||||
|
||||
err:
|
||||
/* Mark this datagram as consumed as maybe at least some packets were parsed. */
|
||||
HA_ATOMIC_STORE(&dgram->buf, NULL);
|
||||
TRACE_LEAVE(QUIC_EV_CONN_LPKT);
|
||||
return -1;
|
||||
}
|
||||
|
||||
359
src/quic_sock.c
359
src/quic_sock.c
@ -192,30 +192,29 @@ struct task *quic_lstnr_dghdlr(struct task *t, void *ctx, unsigned int state)
|
||||
{
|
||||
struct quic_dghdlr *dghdlr = ctx;
|
||||
struct quic_dgram *dgram;
|
||||
size_t len;
|
||||
int max_dgrams = global.tune.maxpollevents;
|
||||
|
||||
TRACE_ENTER(QUIC_EV_CONN_LPKT);
|
||||
|
||||
while ((dgram = MT_LIST_POP(&dghdlr->dgrams, typeof(dgram), handler_list))) {
|
||||
if (quic_dgram_parse(dgram, NULL, dgram->owner)) {
|
||||
/* TODO should we requeue the datagram ? */
|
||||
while ((dgram = mpring_read_begin(&dghdlr->buf, &len))) {
|
||||
/* We ignore the return value of quic_dgram_parse() because
|
||||
* whether it was successful or not, we still need to empty the
|
||||
* ring buffer. Exiting early would leave us with data left to
|
||||
* process, and no guarantee we would get woken up again.
|
||||
*/
|
||||
quic_dgram_parse(dgram, NULL, dgram->owner);
|
||||
mpring_read_end(&dghdlr->buf, len);
|
||||
|
||||
if (--max_dgrams <= 0) {
|
||||
/* too much work done at once, come back here later */
|
||||
tasklet_wakeup((struct tasklet *)t);
|
||||
break;
|
||||
}
|
||||
|
||||
if (--max_dgrams <= 0)
|
||||
goto stop_here;
|
||||
}
|
||||
|
||||
TRACE_LEAVE(QUIC_EV_CONN_LPKT);
|
||||
return t;
|
||||
|
||||
stop_here:
|
||||
/* too much work done at once, come back here later */
|
||||
if (!MT_LIST_ISEMPTY(&dghdlr->dgrams))
|
||||
tasklet_wakeup((struct tasklet *)t);
|
||||
|
||||
TRACE_LEAVE(QUIC_EV_CONN_LPKT);
|
||||
return t;
|
||||
}
|
||||
|
||||
/* Retrieve the DCID from a QUIC datagram or packet at <pos> position,
|
||||
@ -259,40 +258,13 @@ static int quic_get_dgram_dcid(unsigned char *pos, const unsigned char *end,
|
||||
goto leave;
|
||||
}
|
||||
|
||||
|
||||
/* Retrieve the DCID from the datagram found at <pos> position and deliver it to the
|
||||
* correct datagram handler.
|
||||
* Return 1 if a correct datagram could be found, 0 if not.
|
||||
*/
|
||||
static int quic_lstnr_dgram_dispatch(unsigned char *pos, size_t len, void *owner,
|
||||
struct sockaddr_storage *saddr,
|
||||
struct sockaddr_storage *daddr,
|
||||
struct quic_dgram *new_dgram, struct list *dgrams)
|
||||
/* Initialize a QUIC datagram. */
|
||||
static void quic_dgram_init(struct quic_dgram *dgram,
|
||||
unsigned char *pos, size_t len, void *owner,
|
||||
unsigned char *dcid, size_t dcid_len,
|
||||
struct sockaddr_storage *saddr,
|
||||
struct sockaddr_storage *daddr)
|
||||
{
|
||||
struct quic_dgram *dgram;
|
||||
unsigned char *dcid;
|
||||
size_t dcid_len;
|
||||
int cid_tid;
|
||||
|
||||
if (!len || !quic_get_dgram_dcid(pos, pos + len, &dcid, &dcid_len))
|
||||
goto err;
|
||||
|
||||
dgram = new_dgram ? new_dgram : pool_alloc(pool_head_quic_dgram);
|
||||
if (!dgram)
|
||||
goto err;
|
||||
|
||||
if ((cid_tid = quic_get_cid_tid(dcid, dcid_len, saddr, pos, len)) < 0) {
|
||||
/* Use the current thread if CID not found. If a clients opens
|
||||
* a connection with multiple packets, it is possible that
|
||||
* several threads will deal with datagrams sharing the same
|
||||
* CID. For this reason, the CID tree insertion will be
|
||||
* conducted as an atomic operation and the datagram ultimately
|
||||
* redispatch by the late thread.
|
||||
*/
|
||||
cid_tid = tid;
|
||||
}
|
||||
|
||||
/* All the members must be initialized! */
|
||||
dgram->obj_type = OBJ_TYPE_DGRAM;
|
||||
dgram->owner = owner;
|
||||
dgram->buf = pos;
|
||||
@ -303,50 +275,6 @@ static int quic_lstnr_dgram_dispatch(unsigned char *pos, size_t len, void *owner
|
||||
dgram->daddr = *daddr;
|
||||
dgram->qc = NULL;
|
||||
dgram->flags = 0;
|
||||
|
||||
/* Attached datagram to its quic_receiver_buf and quic_dghdlrs. */
|
||||
LIST_APPEND(dgrams, &dgram->recv_list);
|
||||
MT_LIST_APPEND(&quic_dghdlrs[cid_tid].dgrams, &dgram->handler_list);
|
||||
|
||||
/* typically quic_lstnr_dghdlr() */
|
||||
tasklet_wakeup(quic_dghdlrs[cid_tid].task);
|
||||
|
||||
return 1;
|
||||
|
||||
err:
|
||||
pool_free(pool_head_quic_dgram, new_dgram);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* This function is responsible to remove unused datagram attached in front of
|
||||
* <buf>. Each instances will be freed until a not yet consumed datagram is
|
||||
* found or end of the list is hit. The last unused datagram found is not freed
|
||||
* and is instead returned so that the caller can reuse it if needed.
|
||||
*
|
||||
* Returns the last unused datagram or NULL if no occurrence found.
|
||||
*/
|
||||
static struct quic_dgram *quic_rxbuf_purge_dgrams(struct quic_receiver_buf *rbuf)
|
||||
{
|
||||
struct quic_dgram *cur, *prev = NULL;
|
||||
|
||||
while (!LIST_ISEMPTY(&rbuf->dgram_list)) {
|
||||
cur = LIST_ELEM(rbuf->dgram_list.n, struct quic_dgram *, recv_list);
|
||||
|
||||
/* Loop until a not yet consumed datagram is found. */
|
||||
if (HA_ATOMIC_LOAD(&cur->buf))
|
||||
break;
|
||||
|
||||
/* Clear buffer of current unused datagram. */
|
||||
LIST_DELETE(&cur->recv_list);
|
||||
b_del(&rbuf->buf, cur->len);
|
||||
|
||||
/* Free last found unused datagram. */
|
||||
pool_free(pool_head_quic_dgram, prev);
|
||||
prev = cur;
|
||||
}
|
||||
|
||||
/* Return last unused datagram found. */
|
||||
return prev;
|
||||
}
|
||||
|
||||
/* Receive a single message from datagram socket <fd>. Data are placed in <out>
|
||||
@ -463,105 +391,132 @@ static ssize_t quic_recv(int fd, void *out, size_t len,
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Low-level function to write a datagram to the buffer of the handler thread. */
|
||||
static int quic_dgram_write(unsigned char *pos, size_t len, void *owner,
|
||||
unsigned char *dcid, size_t dcid_len,
|
||||
struct sockaddr_storage *saddr,
|
||||
struct sockaddr_storage *daddr,
|
||||
unsigned int cid_tid)
|
||||
{
|
||||
struct mpring *dst;
|
||||
struct quic_dgram *dgram;
|
||||
unsigned char *data;
|
||||
size_t bring_len;
|
||||
void *buf;
|
||||
|
||||
dst = &quic_dghdlrs[cid_tid].buf;
|
||||
|
||||
bring_len = sizeof(struct quic_dgram) + len;
|
||||
buf = mpring_write_reserve(dst, bring_len);
|
||||
if (!buf)
|
||||
return 0;
|
||||
|
||||
dgram = buf;
|
||||
quic_dgram_init(dgram, pos, len, owner, dcid, dcid_len, saddr, daddr);
|
||||
|
||||
data = (unsigned char *)(dgram + 1);
|
||||
memcpy(data, pos, len);
|
||||
|
||||
dgram->dcid = data + (dgram->dcid - dgram->buf);
|
||||
dgram->buf = data;
|
||||
|
||||
mpring_write_commit(dst, buf, bring_len);
|
||||
|
||||
/* typically quic_lstnr_dghdlr() */
|
||||
tasklet_wakeup(quic_dghdlrs[cid_tid].task);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int quic_dgram_requeue(struct quic_dgram *dgram, int cid_tid)
|
||||
{
|
||||
|
||||
return quic_dgram_write(dgram->buf, dgram->len, dgram->owner,
|
||||
dgram->dcid, dgram->dcid_len,
|
||||
&dgram->saddr, &dgram->daddr, cid_tid);
|
||||
}
|
||||
|
||||
/* Attempt to push a datagram to its handler thread.
|
||||
*
|
||||
* Returns 1 if successful, or 0 if the handler thread's buffer is full. If
|
||||
* the datagram could not be pushed, it will be put on the appropriate pending
|
||||
* lists. In all cases, take complete ownership of dgram - it won't be valid
|
||||
* anymore after calling this function.
|
||||
*/
|
||||
static int quic_lstnr_dgram_dispatch(unsigned char *pos, size_t len, struct listener *l,
|
||||
unsigned char *dcid, size_t dcid_len,
|
||||
struct sockaddr_storage *saddr,
|
||||
struct sockaddr_storage *daddr)
|
||||
{
|
||||
struct proxy *px;
|
||||
struct quic_counters *prx_counters;
|
||||
int cid_tid;
|
||||
|
||||
if (!len)
|
||||
goto err;
|
||||
|
||||
if (!dcid && !quic_get_dgram_dcid(pos, pos + len, &dcid, &dcid_len))
|
||||
goto err;
|
||||
|
||||
if ((cid_tid = quic_get_cid_tid(dcid, dcid_len, saddr, pos, len)) < 0) {
|
||||
/* Use the current thread if CID not found. If a clients opens
|
||||
* a connection with multiple packets, it is possible that
|
||||
* several threads will deal with datagrams sharing the same
|
||||
* CID. For this reason, the CID tree insertion will be
|
||||
* conducted as an atomic operation and the datagram ultimately
|
||||
* redispatch by the late thread.
|
||||
*/
|
||||
cid_tid = tid;
|
||||
}
|
||||
|
||||
if (!quic_dgram_write(pos, len, l, dcid, dcid_len, saddr, daddr, cid_tid))
|
||||
goto err;
|
||||
|
||||
return 1;
|
||||
|
||||
err:
|
||||
px = l->bind_conf->frontend;
|
||||
prx_counters = EXTRA_COUNTERS_GET(px->extra_counters_fe, &quic_stats_module);
|
||||
HA_ATOMIC_INC(&prx_counters->rxbuf_full);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Function called on a read event from a listening socket. It tries
|
||||
* to handle as many connections as possible.
|
||||
*/
|
||||
void quic_lstnr_sock_fd_iocb(int fd)
|
||||
{
|
||||
ssize_t ret;
|
||||
struct quic_receiver_buf *rxbuf;
|
||||
struct buffer *buf;
|
||||
unsigned char buf[QUIC_MAX_UDP_PAYLOAD_SIZE];
|
||||
struct listener *l = objt_listener(fdtab[fd].owner);
|
||||
struct quic_transport_params *params;
|
||||
/* Source address */
|
||||
struct sockaddr_storage saddr = {0}, daddr = {0};
|
||||
size_t max_sz, cspace;
|
||||
struct quic_dgram *new_dgram;
|
||||
unsigned char *dgram_buf;
|
||||
size_t max_sz;
|
||||
int max_dgrams;
|
||||
|
||||
BUG_ON(!l);
|
||||
|
||||
new_dgram = NULL;
|
||||
if (!l)
|
||||
return;
|
||||
|
||||
if (!(fdtab[fd].state & FD_POLL_IN) || !fd_recv_ready(fd))
|
||||
return;
|
||||
|
||||
rxbuf = MT_LIST_POP(&l->rx.rxbuf_list, typeof(rxbuf), rxbuf_el);
|
||||
if (!rxbuf)
|
||||
goto out;
|
||||
|
||||
buf = &rxbuf->buf;
|
||||
|
||||
max_dgrams = global.tune.maxpollevents;
|
||||
start:
|
||||
/* Try to reuse an existing dgram. Note that there is always at
|
||||
* least one datagram to pick, except the first time we enter
|
||||
* this function for this <rxbuf> buffer.
|
||||
*/
|
||||
new_dgram = quic_rxbuf_purge_dgrams(rxbuf);
|
||||
|
||||
params = &l->bind_conf->quic_params;
|
||||
max_sz = params->max_udp_payload_size;
|
||||
cspace = b_contig_space(buf);
|
||||
if (cspace < max_sz) {
|
||||
struct proxy *px = l->bind_conf->frontend;
|
||||
struct quic_counters *prx_counters = EXTRA_COUNTERS_GET(px->extra_counters_fe, &quic_stats_module);
|
||||
struct quic_dgram *dgram;
|
||||
BUG_ON(max_sz > sizeof(buf));
|
||||
|
||||
/* Do no mark <buf> as full, and do not try to consume it
|
||||
* if the contiguous remaining space is not at the end
|
||||
*/
|
||||
if (b_tail(buf) + cspace < b_wrap(buf)) {
|
||||
HA_ATOMIC_INC(&prx_counters->rxbuf_full);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Allocate a fake datagram, without data to locate
|
||||
* the end of the RX buffer (required during purging).
|
||||
*/
|
||||
dgram = pool_alloc(pool_head_quic_dgram);
|
||||
if (!dgram)
|
||||
goto out;
|
||||
|
||||
/* Initialize only the useful members of this fake datagram. */
|
||||
dgram->buf = NULL;
|
||||
dgram->len = cspace;
|
||||
/* Append this datagram only to the RX buffer list. It will
|
||||
* not be treated by any datagram handler.
|
||||
*/
|
||||
LIST_APPEND(&rxbuf->dgram_list, &dgram->recv_list);
|
||||
|
||||
/* Consume the remaining space */
|
||||
b_add(buf, cspace);
|
||||
if (b_contig_space(buf) < max_sz) {
|
||||
HA_ATOMIC_INC(&prx_counters->rxbuf_full);
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
dgram_buf = (unsigned char *)b_tail(buf);
|
||||
ret = quic_recv(fd, dgram_buf, max_sz,
|
||||
start:
|
||||
ret = quic_recv(fd, buf, sizeof(buf),
|
||||
(struct sockaddr *)&saddr, sizeof(saddr),
|
||||
(struct sockaddr *)&daddr, sizeof(daddr),
|
||||
get_net_port(&l->rx.addr), 1);
|
||||
if (ret <= 0)
|
||||
goto out;
|
||||
return;
|
||||
|
||||
quic_lstnr_dgram_dispatch(buf, ret, l, NULL, 0, &saddr, &daddr);
|
||||
|
||||
b_add(buf, ret);
|
||||
if (!quic_lstnr_dgram_dispatch(dgram_buf, ret, l, &saddr, &daddr,
|
||||
new_dgram, &rxbuf->dgram_list)) {
|
||||
/* If wrong, consume this datagram */
|
||||
b_sub(buf, ret);
|
||||
}
|
||||
new_dgram = NULL;
|
||||
if (--max_dgrams > 0)
|
||||
goto start;
|
||||
out:
|
||||
pool_free(pool_head_quic_dgram, new_dgram);
|
||||
MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el);
|
||||
}
|
||||
|
||||
/* FD-owned quic-conn socket callback. */
|
||||
@ -864,9 +819,10 @@ int qc_snd_buf(struct quic_conn *qc, const struct buffer *buf, size_t sz,
|
||||
int qc_rcv_buf(struct quic_conn *qc)
|
||||
{
|
||||
struct sockaddr_storage saddr = {0}, daddr = {0};
|
||||
struct quic_dgram *new_dgram = NULL;
|
||||
struct quic_dgram dgram, *new_dgram;
|
||||
struct buffer buf = BUF_NULL;
|
||||
unsigned char *dgram_buf;
|
||||
unsigned char *dgram_buf, *dcid;
|
||||
size_t dcid_len;
|
||||
ssize_t ret = 0;
|
||||
struct listener *l = qc->li;
|
||||
|
||||
@ -875,6 +831,7 @@ int qc_rcv_buf(struct quic_conn *qc)
|
||||
|
||||
TRACE_ENTER(QUIC_EV_CONN_RCV, qc);
|
||||
|
||||
new_dgram = &dgram;
|
||||
do {
|
||||
if (!b_alloc(&buf, DB_MUX_RX))
|
||||
break; /* TODO subscribe for memory again available. */
|
||||
@ -882,10 +839,6 @@ int qc_rcv_buf(struct quic_conn *qc)
|
||||
b_reset(&buf);
|
||||
BUG_ON(b_contig_space(&buf) < qc->max_udp_payload);
|
||||
|
||||
/* Allocate datagram on first loop or after requeuing. */
|
||||
if (!new_dgram && !(new_dgram = pool_alloc(pool_head_quic_dgram)))
|
||||
break; /* TODO subscribe for memory again available. */
|
||||
|
||||
dgram_buf = (unsigned char *)b_tail(&buf);
|
||||
ret = quic_recv(qc->fd, dgram_buf, qc->max_udp_payload,
|
||||
(struct sockaddr *)&saddr, sizeof(saddr),
|
||||
@ -901,90 +854,28 @@ int qc_rcv_buf(struct quic_conn *qc)
|
||||
|
||||
b_add(&buf, ret);
|
||||
|
||||
new_dgram->obj_type = OBJ_TYPE_DGRAM;
|
||||
new_dgram->buf = dgram_buf;
|
||||
new_dgram->len = ret;
|
||||
new_dgram->dcid_len = 0;
|
||||
new_dgram->dcid = NULL;
|
||||
new_dgram->saddr = saddr;
|
||||
new_dgram->daddr = daddr;
|
||||
new_dgram->qc = NULL; /* set later via quic_dgram_parse() */
|
||||
new_dgram->flags = 0;
|
||||
|
||||
TRACE_DEVEL("read datagram", QUIC_EV_CONN_RCV, qc, new_dgram);
|
||||
|
||||
if (!quic_get_dgram_dcid(new_dgram->buf,
|
||||
new_dgram->buf + new_dgram->len,
|
||||
&new_dgram->dcid, &new_dgram->dcid_len)) {
|
||||
if (!quic_get_dgram_dcid(dgram_buf, dgram_buf + ret, &dcid, &dcid_len))
|
||||
continue;
|
||||
}
|
||||
|
||||
if (l && !qc_check_dcid(qc, new_dgram->dcid, new_dgram->dcid_len)) {
|
||||
if (l && !qc_check_dcid(qc, dcid, dcid_len)) {
|
||||
/* Datagram received by error on the connection FD, dispatch it
|
||||
* to its associated quic-conn.
|
||||
*
|
||||
* TODO count redispatch datagrams.
|
||||
*/
|
||||
struct quic_receiver_buf *rxbuf;
|
||||
struct quic_dgram *tmp_dgram;
|
||||
unsigned char *rxbuf_tail;
|
||||
size_t cspace;
|
||||
|
||||
TRACE_STATE("datagram for other connection on quic-conn socket, requeue it", QUIC_EV_CONN_RCV, qc);
|
||||
|
||||
rxbuf = MT_LIST_POP(&l->rx.rxbuf_list, typeof(rxbuf), rxbuf_el);
|
||||
ASSUME_NONNULL(rxbuf);
|
||||
cspace = b_contig_space(&rxbuf->buf);
|
||||
|
||||
tmp_dgram = quic_rxbuf_purge_dgrams(rxbuf);
|
||||
pool_free(pool_head_quic_dgram, tmp_dgram);
|
||||
|
||||
/* Insert a fake datagram if space wraps to consume it. */
|
||||
if (cspace < new_dgram->len && b_space_wraps(&rxbuf->buf)) {
|
||||
struct quic_dgram *fake_dgram = pool_alloc(pool_head_quic_dgram);
|
||||
if (!fake_dgram) {
|
||||
/* TODO count lost datagrams */
|
||||
MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el);
|
||||
continue;
|
||||
}
|
||||
|
||||
fake_dgram->buf = NULL;
|
||||
fake_dgram->len = cspace;
|
||||
LIST_APPEND(&rxbuf->dgram_list, &fake_dgram->recv_list);
|
||||
b_add(&rxbuf->buf, cspace);
|
||||
}
|
||||
|
||||
/* Recheck contig space after fake datagram insert. */
|
||||
if (b_contig_space(&rxbuf->buf) < new_dgram->len) {
|
||||
/* TODO count lost datagrams */
|
||||
MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el);
|
||||
continue;
|
||||
}
|
||||
|
||||
rxbuf_tail = (unsigned char *)b_tail(&rxbuf->buf);
|
||||
__b_putblk(&rxbuf->buf, (char *)dgram_buf, new_dgram->len);
|
||||
if (!quic_lstnr_dgram_dispatch(rxbuf_tail, ret, l, &saddr, &daddr,
|
||||
new_dgram, &rxbuf->dgram_list)) {
|
||||
/* TODO count lost datagrams. */
|
||||
b_sub(&buf, ret);
|
||||
}
|
||||
else {
|
||||
/* datagram must not be freed as it was requeued. */
|
||||
new_dgram = NULL;
|
||||
}
|
||||
|
||||
MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el);
|
||||
quic_lstnr_dgram_dispatch(dgram_buf, ret, l, dcid, dcid_len, &saddr, &daddr);
|
||||
continue;
|
||||
}
|
||||
|
||||
quic_dgram_init(new_dgram, dgram_buf, ret, NULL, dcid, dcid_len, &saddr, &daddr);
|
||||
quic_dgram_parse(new_dgram, qc, l ? &l->obj_type :
|
||||
(qc->conn ? &__objt_server(qc->conn->target)->obj_type : NULL));
|
||||
/* A datagram must always be consumed after quic_parse_dgram(). */
|
||||
BUG_ON(new_dgram->buf);
|
||||
} while (ret > 0);
|
||||
|
||||
pool_free(pool_head_quic_dgram, new_dgram);
|
||||
|
||||
if (b_size(&buf)) {
|
||||
b_free(&buf);
|
||||
offer_buffers(NULL, 1);
|
||||
|
||||
@ -10,8 +10,6 @@
|
||||
#include <haproxy/quic_trace.h>
|
||||
#include <haproxy/trace.h>
|
||||
|
||||
#define QUIC_MAX_UDP_PAYLOAD_SIZE 2048
|
||||
|
||||
static int qc_early_tranport_params_validate(struct quic_conn *qc,
|
||||
struct quic_transport_params *p,
|
||||
struct quic_early_transport_params *e);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user