diff --git a/src/ring.c b/src/ring.c index 13d75dfb2..fb19efc46 100644 --- a/src/ring.c +++ b/src/ring.c @@ -332,11 +332,17 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs) HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); if (ofs != ~0) { /* reader was still attached */ - char *area = ring_area(ring); + uint8_t *area = (uint8_t *)ring_area(ring); + uint8_t readers; BUG_ON(ofs >= ring_size(ring)); LIST_DEL_INIT(&appctx->wait_entry); - HA_ATOMIC_DEC(area + ofs); + + /* dec readers count */ + do { + readers = _HA_ATOMIC_LOAD(area + ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(area + ofs, &readers, readers - 1)) && __ha_cpu_relax()); } HA_ATOMIC_DEC(&ring->readers_count); HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); @@ -382,14 +388,15 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t { size_t head_ofs, tail_ofs; size_t ring_size; - char *ring_area; + uint8_t *ring_area; struct ist v1, v2; uint64_t msg_len; size_t len, cnt; ssize_t copied; + uint8_t readers; int ret; - ring_area = ring->storage->area; + ring_area = (uint8_t *)ring->storage->area; ring_size = ring->storage->size; HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); @@ -416,15 +423,22 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t /* make ctx->ofs relative to the beginning of the buffer now */ *ofs_ptr = head_ofs; - /* and reserve our slot here */ - HA_ATOMIC_INC(ring_area + head_ofs); + /* and reserve our slot here (inc readers count) */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); } /* we have the guarantee we can restart from our own head */ head_ofs = *ofs_ptr; BUG_ON(head_ofs >= ring_size); - HA_ATOMIC_DEC(ring_area + head_ofs); + /* dec readers count */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers - 1)) && __ha_cpu_relax()); /* in this loop, head_ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to @@ -432,7 +446,7 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t * origin, while pos is relative to the ring's head. */ ret = 1; - vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); + vp_ring_to_data(&v1, &v2, (char *)ring_area, ring_size, head_ofs, tail_ofs); while (1) { if (vp_size(v1, v2) <= 1) { @@ -462,9 +476,14 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t vp_skip(&v1, &v2, cnt + msg_len); } - vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); + vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs); + + /* inc readers count */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); - HA_ATOMIC_INC(ring_area + head_ofs); if (last_ofs_ptr) *last_ofs_ptr = tail_ofs; *ofs_ptr = head_ofs;