From eca1f90e16d793fccdf70c3259d3bcfb72c74bbe Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Thu, 18 Sep 2025 14:58:38 +0200 Subject: [PATCH] CLEANUP: ring: rearrange the wait loop in ring_write() The loop is constructed in a complicated way with a single break statement in the middle and many continue statements everywhere, making it hard to better factor between variants. Let's first reorganize it so as to make it easier to escape when the ring tail lock is obtained. The sequence of instrucitons remains the same, it's only better organized. --- src/ring.c | 51 +++++++++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/src/ring.c b/src/ring.c index 4311bddd6..74172ce3a 100644 --- a/src/ring.c +++ b/src/ring.c @@ -277,37 +277,40 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz while (1) { if ((curr_cell = HA_ATOMIC_LOAD(ring_queue_ptr)) != &cell) goto wait_for_flush; - __ha_cpu_relax_for_read(); -#if !defined(__ARM_FEATURE_ATOMICS) + /* OK the queue is locked, let's attempt to get the tail lock. + * we'll atomically OR the lock on the pointer and check if + * someone else had it already, otherwise we own it. + */ + +#if defined(__ARM_FEATURE_ATOMICS) /* ARMv8.1-a has a true atomic OR and doesn't need the preliminary read */ - if ((tail_ofs = HA_ATOMIC_LOAD(tail_ptr)) & RING_TAIL_LOCK) { - __ha_cpu_relax_for_read(); - continue; + tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK); + if (!(tail_ofs & RING_TAIL_LOCK)) + break; +#else + tail_ofs = HA_ATOMIC_LOAD(tail_ptr); + if (likely(!(tail_ofs & RING_TAIL_LOCK))) { + if (HA_ATOMIC_CAS(tail_ptr, &tail_ofs, tail_ofs | RING_TAIL_LOCK)) + break; } #endif - /* OK the queue is locked, let's attempt to get the tail lock */ - tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK); - - /* did we get it ? */ - if (!(tail_ofs & RING_TAIL_LOCK)) { - /* Here we own the tail. We can go on if we're still the leader, - * which we'll confirm by trying to reset the queue. If we're - * still the leader, we're done. - */ - if (HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) - break; // Won! - - /* oops, no, let's give it back to another thread and wait. - * This does not happen often enough to warrant more complex - * approaches (tried already). - */ - HA_ATOMIC_STORE(tail_ptr, tail_ofs); - goto wait_for_flush; - } __ha_cpu_relax_for_read(); } + /* Here we own the tail. We can go on if we're still the leader, + * which we'll confirm by trying to reset the queue. If we're + * still the leader, we're done. + */ + if (!HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) { + /* oops, no, let's give it back to another thread and wait. + * This does not happen often enough to warrant more complex + * approaches (tried already). + */ + HA_ATOMIC_STORE(tail_ptr, tail_ofs); + goto wait_for_flush; + } + head_ofs = HA_ATOMIC_LOAD(&ring->storage->head); /* this is the byte before tail, it contains the users count */