MINOR: ring: make the reader check the readers count before inc/dec

We'll want to reserve some special values for the readers count to
temporary lock the following message, but for this it will be mandatory
that readers check for them before incrementing/decrementing the counter.
Let'sdo that using a CAS. The readers performance is not as critical as
the writer's anyway so the slight overhead is not a problem.
This commit is contained in:
Willy Tarreau 2024-02-28 09:03:46 +01:00
parent dd8ea5d928
commit d336d71cbb

View File

@ -332,11 +332,17 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
if (ofs != ~0) {
/* reader was still attached */
char *area = ring_area(ring);
uint8_t *area = (uint8_t *)ring_area(ring);
uint8_t readers;
BUG_ON(ofs >= ring_size(ring));
LIST_DEL_INIT(&appctx->wait_entry);
HA_ATOMIC_DEC(area + ofs);
/* dec readers count */
do {
readers = _HA_ATOMIC_LOAD(area + ofs);
} while ((readers > RING_MAX_READERS ||
!_HA_ATOMIC_CAS(area + ofs, &readers, readers - 1)) && __ha_cpu_relax());
}
HA_ATOMIC_DEC(&ring->readers_count);
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
@ -382,14 +388,15 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
{
size_t head_ofs, tail_ofs;
size_t ring_size;
char *ring_area;
uint8_t *ring_area;
struct ist v1, v2;
uint64_t msg_len;
size_t len, cnt;
ssize_t copied;
uint8_t readers;
int ret;
ring_area = ring->storage->area;
ring_area = (uint8_t *)ring->storage->area;
ring_size = ring->storage->size;
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
@ -416,15 +423,22 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
/* make ctx->ofs relative to the beginning of the buffer now */
*ofs_ptr = head_ofs;
/* and reserve our slot here */
HA_ATOMIC_INC(ring_area + head_ofs);
/* and reserve our slot here (inc readers count) */
do {
readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
} while ((readers > RING_MAX_READERS ||
!_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
}
/* we have the guarantee we can restart from our own head */
head_ofs = *ofs_ptr;
BUG_ON(head_ofs >= ring_size);
HA_ATOMIC_DEC(ring_area + head_ofs);
/* dec readers count */
do {
readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
} while ((readers > RING_MAX_READERS ||
!_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers - 1)) && __ha_cpu_relax());
/* in this loop, head_ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
@ -432,7 +446,7 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
* origin, while pos is relative to the ring's head.
*/
ret = 1;
vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
vp_ring_to_data(&v1, &v2, (char *)ring_area, ring_size, head_ofs, tail_ofs);
while (1) {
if (vp_size(v1, v2) <= 1) {
@ -462,9 +476,14 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
vp_skip(&v1, &v2, cnt + msg_len);
}
vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs);
/* inc readers count */
do {
readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
} while ((readers > RING_MAX_READERS ||
!_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
HA_ATOMIC_INC(ring_area + head_ofs);
if (last_ofs_ptr)
*last_ofs_ptr = tail_ofs;
*ofs_ptr = head_ofs;