MINOR: listener: make accept_queue index atomic

There has always been a race when checking the length of an accept queue
to determine which one is more loaded that another, because the head and
tail are read at two different moments. This is not required, we can merge
them as two 16 bit numbers inside a single 32-bit index that is always
accessed atomically. This way we read both values at once and always have
a consistent measurement.
This commit is contained in:
Willy Tarreau 2023-04-20 11:05:28 +02:00
parent 09b52d1c3d
commit e6f5ab5afa
4 changed files with 30 additions and 17 deletions

View File

@ -289,9 +289,9 @@ struct bind_kw_list {
/* The per-thread accept queue ring, must be a power of two minus 1 */
#define ACCEPT_QUEUE_SIZE ((1<<10) - 1)
/* head and tail are both 16 bits so that idx can be accessed atomically */
struct accept_queue_ring {
unsigned int head;
unsigned int tail;
uint32_t idx; /* (head << 16) | tail */
struct tasklet *tasklet; /* tasklet of the thread owning this ring */
struct connection *entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64)));
};

View File

@ -212,6 +212,19 @@ extern struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((a
extern const char* li_status_st[LI_STATE_COUNT];
enum li_status get_li_status(struct listener *l);
static inline uint accept_queue_ring_len(const struct accept_queue_ring *ring)
{
uint idx, head, tail, len;
idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */
head = idx >> 16;
tail = idx & 0xffff;
len = tail + ACCEPT_QUEUE_SIZE - head;
if (len >= ACCEPT_QUEUE_SIZE)
len -= ACCEPT_QUEUE_SIZE;
return len;
}
#endif /* _HAPROXY_LISTENER_H */
/*

View File

@ -1115,7 +1115,7 @@ static int cli_io_handler_show_activity(struct appctx *appctx)
chunk_appendf(&trash, "accq_pushed:"); SHOW_TOT(thr, activity[thr].accq_pushed);
chunk_appendf(&trash, "accq_full:"); SHOW_TOT(thr, activity[thr].accq_full);
#ifdef USE_THREAD
chunk_appendf(&trash, "accq_ring:"); SHOW_TOT(thr, (accept_queue_rings[thr].tail - accept_queue_rings[thr].head + ACCEPT_QUEUE_SIZE) % ACCEPT_QUEUE_SIZE);
chunk_appendf(&trash, "accq_ring:"); SHOW_TOT(thr, accept_queue_ring_len(&accept_queue_rings[thr]));
chunk_appendf(&trash, "fd_takeover:"); SHOW_TOT(thr, activity[thr].fd_takeover);
#endif

View File

@ -68,10 +68,10 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring)
unsigned int pos, next;
struct connection *ptr;
struct connection **e;
uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */
pos = ring->head;
if (pos == ring->tail)
pos = idx >> 16;
if (pos == (uint16_t)idx)
return NULL;
next = pos + 1;
@ -93,7 +93,10 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring)
*e = NULL;
__ha_barrier_store();
ring->head = next;
do {
pos = (next << 16) | (idx & 0xffff);
} while (unlikely(!HA_ATOMIC_CAS(&ring->idx, &idx, pos) && __ha_cpu_relax()));
return ptr;
}
@ -105,15 +108,17 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring)
int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn)
{
unsigned int pos, next;
uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */
pos = ring->tail;
do {
pos = (uint16_t)idx;
next = pos + 1;
if (next >= ACCEPT_QUEUE_SIZE)
next = 0;
if (next == ring->head)
if (next == (idx >> 16))
return 0; // ring full
} while (unlikely(!_HA_ATOMIC_CAS(&ring->tail, &pos, next)));
next |= (idx & 0xffff0000U);
} while (unlikely(!_HA_ATOMIC_CAS(&ring->idx, &idx, next) && __ha_cpu_relax()));
ring->entry[pos] = conn;
__ha_barrier_store();
@ -1230,13 +1235,8 @@ void listener_accept(struct listener *l)
}
/* now we have two distinct thread IDs belonging to the mask */
q1 = accept_queue_rings[base + t1].tail - accept_queue_rings[base + t1].head + ACCEPT_QUEUE_SIZE;
if (q1 >= ACCEPT_QUEUE_SIZE)
q1 -= ACCEPT_QUEUE_SIZE;
q2 = accept_queue_rings[base + t2].tail - accept_queue_rings[base + t2].head + ACCEPT_QUEUE_SIZE;
if (q2 >= ACCEPT_QUEUE_SIZE)
q2 -= ACCEPT_QUEUE_SIZE;
q1 = accept_queue_ring_len(&accept_queue_rings[base + t1]);
q2 = accept_queue_ring_len(&accept_queue_rings[base + t2]);
/* we have 3 possibilities now :
* q1 < q2 : t1 is less loaded than t2, so we pick it