From e6f5ab5afa1e11d509aa92588b64de6e3d3765a8 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Thu, 20 Apr 2023 11:05:28 +0200 Subject: [PATCH] MINOR: listener: make accept_queue index atomic There has always been a race when checking the length of an accept queue to determine which one is more loaded that another, because the head and tail are read at two different moments. This is not required, we can merge them as two 16 bit numbers inside a single 32-bit index that is always accessed atomically. This way we read both values at once and always have a consistent measurement. --- include/haproxy/listener-t.h | 4 ++-- include/haproxy/listener.h | 13 +++++++++++++ src/activity.c | 2 +- src/listener.c | 28 ++++++++++++++-------------- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/include/haproxy/listener-t.h b/include/haproxy/listener-t.h index f45c990a5..1ee17f6d6 100644 --- a/include/haproxy/listener-t.h +++ b/include/haproxy/listener-t.h @@ -289,9 +289,9 @@ struct bind_kw_list { /* The per-thread accept queue ring, must be a power of two minus 1 */ #define ACCEPT_QUEUE_SIZE ((1<<10) - 1) +/* head and tail are both 16 bits so that idx can be accessed atomically */ struct accept_queue_ring { - unsigned int head; - unsigned int tail; + uint32_t idx; /* (head << 16) | tail */ struct tasklet *tasklet; /* tasklet of the thread owning this ring */ struct connection *entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64))); }; diff --git a/include/haproxy/listener.h b/include/haproxy/listener.h index f7599525c..afb4530bf 100644 --- a/include/haproxy/listener.h +++ b/include/haproxy/listener.h @@ -212,6 +212,19 @@ extern struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((a extern const char* li_status_st[LI_STATE_COUNT]; enum li_status get_li_status(struct listener *l); +static inline uint accept_queue_ring_len(const struct accept_queue_ring *ring) +{ + uint idx, head, tail, len; + + idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */ + head = idx >> 16; + tail = idx & 0xffff; + len = tail + ACCEPT_QUEUE_SIZE - head; + if (len >= ACCEPT_QUEUE_SIZE) + len -= ACCEPT_QUEUE_SIZE; + return len; +} + #endif /* _HAPROXY_LISTENER_H */ /* diff --git a/src/activity.c b/src/activity.c index c47bd8481..7664d0cb1 100644 --- a/src/activity.c +++ b/src/activity.c @@ -1115,7 +1115,7 @@ static int cli_io_handler_show_activity(struct appctx *appctx) chunk_appendf(&trash, "accq_pushed:"); SHOW_TOT(thr, activity[thr].accq_pushed); chunk_appendf(&trash, "accq_full:"); SHOW_TOT(thr, activity[thr].accq_full); #ifdef USE_THREAD - chunk_appendf(&trash, "accq_ring:"); SHOW_TOT(thr, (accept_queue_rings[thr].tail - accept_queue_rings[thr].head + ACCEPT_QUEUE_SIZE) % ACCEPT_QUEUE_SIZE); + chunk_appendf(&trash, "accq_ring:"); SHOW_TOT(thr, accept_queue_ring_len(&accept_queue_rings[thr])); chunk_appendf(&trash, "fd_takeover:"); SHOW_TOT(thr, activity[thr].fd_takeover); #endif diff --git a/src/listener.c b/src/listener.c index eb872e6f3..8215cec63 100644 --- a/src/listener.c +++ b/src/listener.c @@ -68,10 +68,10 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring) unsigned int pos, next; struct connection *ptr; struct connection **e; + uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */ - pos = ring->head; - - if (pos == ring->tail) + pos = idx >> 16; + if (pos == (uint16_t)idx) return NULL; next = pos + 1; @@ -93,7 +93,10 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring) *e = NULL; __ha_barrier_store(); - ring->head = next; + do { + pos = (next << 16) | (idx & 0xffff); + } while (unlikely(!HA_ATOMIC_CAS(&ring->idx, &idx, pos) && __ha_cpu_relax())); + return ptr; } @@ -105,15 +108,17 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring) int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn) { unsigned int pos, next; + uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */ - pos = ring->tail; do { + pos = (uint16_t)idx; next = pos + 1; if (next >= ACCEPT_QUEUE_SIZE) next = 0; - if (next == ring->head) + if (next == (idx >> 16)) return 0; // ring full - } while (unlikely(!_HA_ATOMIC_CAS(&ring->tail, &pos, next))); + next |= (idx & 0xffff0000U); + } while (unlikely(!_HA_ATOMIC_CAS(&ring->idx, &idx, next) && __ha_cpu_relax())); ring->entry[pos] = conn; __ha_barrier_store(); @@ -1230,13 +1235,8 @@ void listener_accept(struct listener *l) } /* now we have two distinct thread IDs belonging to the mask */ - q1 = accept_queue_rings[base + t1].tail - accept_queue_rings[base + t1].head + ACCEPT_QUEUE_SIZE; - if (q1 >= ACCEPT_QUEUE_SIZE) - q1 -= ACCEPT_QUEUE_SIZE; - - q2 = accept_queue_rings[base + t2].tail - accept_queue_rings[base + t2].head + ACCEPT_QUEUE_SIZE; - if (q2 >= ACCEPT_QUEUE_SIZE) - q2 -= ACCEPT_QUEUE_SIZE; + q1 = accept_queue_ring_len(&accept_queue_rings[base + t1]); + q2 = accept_queue_ring_len(&accept_queue_rings[base + t2]); /* we have 3 possibilities now : * q1 < q2 : t1 is less loaded than t2, so we pick it