diff --git a/include/haproxy/listener-t.h b/include/haproxy/listener-t.h index f45c990a5..1ee17f6d6 100644 --- a/include/haproxy/listener-t.h +++ b/include/haproxy/listener-t.h @@ -289,9 +289,9 @@ struct bind_kw_list { /* The per-thread accept queue ring, must be a power of two minus 1 */ #define ACCEPT_QUEUE_SIZE ((1<<10) - 1) +/* head and tail are both 16 bits so that idx can be accessed atomically */ struct accept_queue_ring { - unsigned int head; - unsigned int tail; + uint32_t idx; /* (head << 16) | tail */ struct tasklet *tasklet; /* tasklet of the thread owning this ring */ struct connection *entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64))); }; diff --git a/include/haproxy/listener.h b/include/haproxy/listener.h index f7599525c..afb4530bf 100644 --- a/include/haproxy/listener.h +++ b/include/haproxy/listener.h @@ -212,6 +212,19 @@ extern struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((a extern const char* li_status_st[LI_STATE_COUNT]; enum li_status get_li_status(struct listener *l); +static inline uint accept_queue_ring_len(const struct accept_queue_ring *ring) +{ + uint idx, head, tail, len; + + idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */ + head = idx >> 16; + tail = idx & 0xffff; + len = tail + ACCEPT_QUEUE_SIZE - head; + if (len >= ACCEPT_QUEUE_SIZE) + len -= ACCEPT_QUEUE_SIZE; + return len; +} + #endif /* _HAPROXY_LISTENER_H */ /* diff --git a/src/activity.c b/src/activity.c index c47bd8481..7664d0cb1 100644 --- a/src/activity.c +++ b/src/activity.c @@ -1115,7 +1115,7 @@ static int cli_io_handler_show_activity(struct appctx *appctx) chunk_appendf(&trash, "accq_pushed:"); SHOW_TOT(thr, activity[thr].accq_pushed); chunk_appendf(&trash, "accq_full:"); SHOW_TOT(thr, activity[thr].accq_full); #ifdef USE_THREAD - chunk_appendf(&trash, "accq_ring:"); SHOW_TOT(thr, (accept_queue_rings[thr].tail - accept_queue_rings[thr].head + ACCEPT_QUEUE_SIZE) % ACCEPT_QUEUE_SIZE); + chunk_appendf(&trash, "accq_ring:"); SHOW_TOT(thr, accept_queue_ring_len(&accept_queue_rings[thr])); chunk_appendf(&trash, "fd_takeover:"); SHOW_TOT(thr, activity[thr].fd_takeover); #endif diff --git a/src/listener.c b/src/listener.c index eb872e6f3..8215cec63 100644 --- a/src/listener.c +++ b/src/listener.c @@ -68,10 +68,10 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring) unsigned int pos, next; struct connection *ptr; struct connection **e; + uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */ - pos = ring->head; - - if (pos == ring->tail) + pos = idx >> 16; + if (pos == (uint16_t)idx) return NULL; next = pos + 1; @@ -93,7 +93,10 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring) *e = NULL; __ha_barrier_store(); - ring->head = next; + do { + pos = (next << 16) | (idx & 0xffff); + } while (unlikely(!HA_ATOMIC_CAS(&ring->idx, &idx, pos) && __ha_cpu_relax())); + return ptr; } @@ -105,15 +108,17 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring) int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn) { unsigned int pos, next; + uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */ - pos = ring->tail; do { + pos = (uint16_t)idx; next = pos + 1; if (next >= ACCEPT_QUEUE_SIZE) next = 0; - if (next == ring->head) + if (next == (idx >> 16)) return 0; // ring full - } while (unlikely(!_HA_ATOMIC_CAS(&ring->tail, &pos, next))); + next |= (idx & 0xffff0000U); + } while (unlikely(!_HA_ATOMIC_CAS(&ring->idx, &idx, next) && __ha_cpu_relax())); ring->entry[pos] = conn; __ha_barrier_store(); @@ -1230,13 +1235,8 @@ void listener_accept(struct listener *l) } /* now we have two distinct thread IDs belonging to the mask */ - q1 = accept_queue_rings[base + t1].tail - accept_queue_rings[base + t1].head + ACCEPT_QUEUE_SIZE; - if (q1 >= ACCEPT_QUEUE_SIZE) - q1 -= ACCEPT_QUEUE_SIZE; - - q2 = accept_queue_rings[base + t2].tail - accept_queue_rings[base + t2].head + ACCEPT_QUEUE_SIZE; - if (q2 >= ACCEPT_QUEUE_SIZE) - q2 -= ACCEPT_QUEUE_SIZE; + q1 = accept_queue_ring_len(&accept_queue_rings[base + t1]); + q2 = accept_queue_ring_len(&accept_queue_rings[base + t2]); /* we have 3 possibilities now : * q1 < q2 : t1 is less loaded than t2, so we pick it