From eb3d5f464d5f90c18c488d5025982e0c59cc5416 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Wed, 28 Feb 2024 09:37:47 +0100 Subject: [PATCH] MEDIUM: ring: use the topmost bit of the tail as a lock We're now locking the tail while looking for some room in the ring. In fact it's still while writing to it, but the goal definitely is to get rid of the lock ASAP. For this we reserve the topmost bit of the tail as a lock, which may have as a possible visible effect that buffers will be limited to 2GB instead of 4GB on 32-bit machines (though in practise, good luck for allocating more than 2GB contiguous on 32-bit), but in practice since the size is read with atol() and some operating systems limit it to LONG_MAX unless passing negative numbers, the limit is already there. For now the impact on x86_64 is significant (drop from 2.35 to 1.4M/s on 48 threads on EPYC 24 cores) but this situation is only temporary so that changes can be reviewable and bisectable. Other approaches were attempted, such as using XCHG instead, which is slightly faster on x86 with low thread counts (but causes more write contention), and forces readers to stall under heavy traffic because they can't access a valid value for the queue anymore. A CAS requires preloading the value and is les good on ARMv8.1. XADD could also be considered with 12-13 upper bits of the offset dedicated to locking, but that looks overkill. --- dev/haring/haring.c | 2 +- include/haproxy/ring-t.h | 3 +++ include/haproxy/ring.h | 10 ++++++---- src/ring.c | 34 +++++++++++++++++++++++++++------- src/sink.c | 6 ++++++ 5 files changed, 43 insertions(+), 12 deletions(-) diff --git a/dev/haring/haring.c b/dev/haring/haring.c index 4e52b0a9e..4dfdafab5 100644 --- a/dev/haring/haring.c +++ b/dev/haring/haring.c @@ -218,7 +218,7 @@ int dump_ring_v2(struct ring_v2 *ring, size_t ofs, int flags) /* Now make our own buffer pointing to that area */ size = ring->size; head = ring->head; - tail = ring->tail; + tail = ring->tail & ~RING_TAIL_LOCK; data = (head <= tail ? 0 : size) + tail - head; buf = b_make((void *)ring + ring->rsvd, size, head, data); return dump_ring_as_buf(buf, ofs, flags); diff --git a/include/haproxy/ring-t.h b/include/haproxy/ring-t.h index 0acbee8e7..b719c9cad 100644 --- a/include/haproxy/ring-t.h +++ b/include/haproxy/ring-t.h @@ -103,6 +103,9 @@ #define RING_WRITING_SIZE 255 /* the next message's size is being written */ #define RING_MAX_READERS 254 /* highest supported value for RC */ +/* mask used to lock the tail */ +#define RING_TAIL_LOCK (1ULL << ((sizeof(size_t) * 8) - 1)) + /* this is the mmapped part */ struct ring_storage { size_t size; // storage size diff --git a/include/haproxy/ring.h b/include/haproxy/ring.h index 53b0b22c2..7fb9bc6b8 100644 --- a/include/haproxy/ring.h +++ b/include/haproxy/ring.h @@ -54,8 +54,10 @@ static inline void *ring_area(const struct ring *ring) /* returns the number of bytes in the ring */ static inline size_t ring_data(const struct ring *ring) { - return ((ring->storage->head <= ring->storage->tail) ? - 0 : ring->storage->size) + ring->storage->tail - ring->storage->head; + size_t tail = HA_ATOMIC_LOAD(&ring->storage->tail) & ~RING_TAIL_LOCK; + + return ((ring->storage->head <= tail) ? + 0 : ring->storage->size) + tail - ring->storage->head; } /* returns the allocated size in bytes for the ring */ @@ -70,10 +72,10 @@ static inline size_t ring_head(const struct ring *ring) return ring->storage->head; } -/* returns the tail offset of the ring */ +/* returns the ring's tail offset without the lock bit */ static inline size_t ring_tail(const struct ring *ring) { - return ring->storage->tail; + return HA_ATOMIC_LOAD(&ring->storage->tail) & ~RING_TAIL_LOCK; } /* duplicates ring over ring for no more than bytes or no diff --git a/src/ring.c b/src/ring.c index f56c8f168..9af3daaf2 100644 --- a/src/ring.c +++ b/src/ring.c @@ -222,12 +222,32 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz ring_area = ring->storage->area; ring_size = ring->storage->size; - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); if (needed + 1 > ring_size) goto leave; - head_ofs = ring_head(ring); - tail_ofs = ring_tail(ring); + /* try to get exclusivity on the ring's tail. For this we set the + * tail's highest bit, and the one that gets it wins. Many tests were + * run on this and the approach below is optimal for armv8.1 atomics, + * second-to-optimal with both x86_64 and second-to-optimal on armv8.0. + * x86_64 would benefit slightly more from an xchg() which would + * require the readers to loop during changes, and armv8.0 is slightly + * better there as well (+5%). The CAS is bad for both (requires a + * preload), though it might degrade better on large x86 compared to + * a busy loop that the compiler would implement for the FETCH_OR. + * Alternately we could kill 12 upper bits on a 64-bit tail ofs and + * use XADD. Not tested, and would require to undo or watch for the + * change (use it as a ticket). + */ + while (1) { + tail_ofs = HA_ATOMIC_FETCH_OR(&ring->storage->tail, RING_TAIL_LOCK); + if (!(tail_ofs & RING_TAIL_LOCK)) + break; + pl_wait_unlock_long(&ring->storage->tail, RING_TAIL_LOCK); + } + + HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); + + head_ofs = ring->storage->head; /* this is the byte before tail, it contains the users count */ lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1); @@ -305,7 +325,7 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz /* update the new space in the buffer */ ring->storage->head = head_ofs; - ring->storage->tail = tail_ofs; + HA_ATOMIC_STORE(&ring->storage->tail, tail_ofs); /* notify potential readers */ if (sent) { @@ -313,8 +333,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz appctx_wakeup(appctx); } - leave: HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); + leave: return sent; } @@ -417,8 +437,8 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); - head_ofs = ring->storage->head; - tail_ofs = ring->storage->tail; + head_ofs = ring_head(ring); + tail_ofs = ring_tail(ring); /* explanation for the initialization below: it would be better to do * this in the parsing function but this would occasionally result in diff --git a/src/sink.c b/src/sink.c index d43f7fb8a..b5c681ad7 100644 --- a/src/sink.c +++ b/src/sink.c @@ -905,6 +905,12 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) goto err; } + if (size > RING_TAIL_LOCK) { + ha_alert("parsing [%s:%d] : too large size '%llu' for new sink buffer, the limit on this platform is %llu bytes.\n", file, linenum, (ullong)size, (ullong)RING_TAIL_LOCK); + err_code |= ERR_ALERT | ERR_FATAL; + goto err; + } + if (cfg_sink->store) { ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL;