diff --git a/dev/haring/haring.c b/dev/haring/haring.c index 4e52b0a9e..4dfdafab5 100644 --- a/dev/haring/haring.c +++ b/dev/haring/haring.c @@ -218,7 +218,7 @@ int dump_ring_v2(struct ring_v2 *ring, size_t ofs, int flags) /* Now make our own buffer pointing to that area */ size = ring->size; head = ring->head; - tail = ring->tail; + tail = ring->tail & ~RING_TAIL_LOCK; data = (head <= tail ? 0 : size) + tail - head; buf = b_make((void *)ring + ring->rsvd, size, head, data); return dump_ring_as_buf(buf, ofs, flags); diff --git a/include/haproxy/ring-t.h b/include/haproxy/ring-t.h index 0acbee8e7..b719c9cad 100644 --- a/include/haproxy/ring-t.h +++ b/include/haproxy/ring-t.h @@ -103,6 +103,9 @@ #define RING_WRITING_SIZE 255 /* the next message's size is being written */ #define RING_MAX_READERS 254 /* highest supported value for RC */ +/* mask used to lock the tail */ +#define RING_TAIL_LOCK (1ULL << ((sizeof(size_t) * 8) - 1)) + /* this is the mmapped part */ struct ring_storage { size_t size; // storage size diff --git a/include/haproxy/ring.h b/include/haproxy/ring.h index 53b0b22c2..7fb9bc6b8 100644 --- a/include/haproxy/ring.h +++ b/include/haproxy/ring.h @@ -54,8 +54,10 @@ static inline void *ring_area(const struct ring *ring) /* returns the number of bytes in the ring */ static inline size_t ring_data(const struct ring *ring) { - return ((ring->storage->head <= ring->storage->tail) ? - 0 : ring->storage->size) + ring->storage->tail - ring->storage->head; + size_t tail = HA_ATOMIC_LOAD(&ring->storage->tail) & ~RING_TAIL_LOCK; + + return ((ring->storage->head <= tail) ? + 0 : ring->storage->size) + tail - ring->storage->head; } /* returns the allocated size in bytes for the ring */ @@ -70,10 +72,10 @@ static inline size_t ring_head(const struct ring *ring) return ring->storage->head; } -/* returns the tail offset of the ring */ +/* returns the ring's tail offset without the lock bit */ static inline size_t ring_tail(const struct ring *ring) { - return ring->storage->tail; + return HA_ATOMIC_LOAD(&ring->storage->tail) & ~RING_TAIL_LOCK; } /* duplicates ring over ring for no more than bytes or no diff --git a/src/ring.c b/src/ring.c index f56c8f168..9af3daaf2 100644 --- a/src/ring.c +++ b/src/ring.c @@ -222,12 +222,32 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz ring_area = ring->storage->area; ring_size = ring->storage->size; - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); if (needed + 1 > ring_size) goto leave; - head_ofs = ring_head(ring); - tail_ofs = ring_tail(ring); + /* try to get exclusivity on the ring's tail. For this we set the + * tail's highest bit, and the one that gets it wins. Many tests were + * run on this and the approach below is optimal for armv8.1 atomics, + * second-to-optimal with both x86_64 and second-to-optimal on armv8.0. + * x86_64 would benefit slightly more from an xchg() which would + * require the readers to loop during changes, and armv8.0 is slightly + * better there as well (+5%). The CAS is bad for both (requires a + * preload), though it might degrade better on large x86 compared to + * a busy loop that the compiler would implement for the FETCH_OR. + * Alternately we could kill 12 upper bits on a 64-bit tail ofs and + * use XADD. Not tested, and would require to undo or watch for the + * change (use it as a ticket). + */ + while (1) { + tail_ofs = HA_ATOMIC_FETCH_OR(&ring->storage->tail, RING_TAIL_LOCK); + if (!(tail_ofs & RING_TAIL_LOCK)) + break; + pl_wait_unlock_long(&ring->storage->tail, RING_TAIL_LOCK); + } + + HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); + + head_ofs = ring->storage->head; /* this is the byte before tail, it contains the users count */ lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1); @@ -305,7 +325,7 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz /* update the new space in the buffer */ ring->storage->head = head_ofs; - ring->storage->tail = tail_ofs; + HA_ATOMIC_STORE(&ring->storage->tail, tail_ofs); /* notify potential readers */ if (sent) { @@ -313,8 +333,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz appctx_wakeup(appctx); } - leave: HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); + leave: return sent; } @@ -417,8 +437,8 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); - head_ofs = ring->storage->head; - tail_ofs = ring->storage->tail; + head_ofs = ring_head(ring); + tail_ofs = ring_tail(ring); /* explanation for the initialization below: it would be better to do * this in the parsing function but this would occasionally result in diff --git a/src/sink.c b/src/sink.c index d43f7fb8a..b5c681ad7 100644 --- a/src/sink.c +++ b/src/sink.c @@ -905,6 +905,12 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) goto err; } + if (size > RING_TAIL_LOCK) { + ha_alert("parsing [%s:%d] : too large size '%llu' for new sink buffer, the limit on this platform is %llu bytes.\n", file, linenum, (ullong)size, (ullong)RING_TAIL_LOCK); + err_code |= ERR_ALERT | ERR_FATAL; + goto err; + } + if (cfg_sink->store) { ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL;