CLEANUP: ring: rearrange the wait loop in ring_write()

The loop is constructed in a complicated way with a single break
statement in the middle and many continue statements everywhere,
making it hard to better factor between variants. Let's first
reorganize it so as to make it easier to escape when the ring
tail lock is obtained. The sequence of instrucitons remains the
same, it's only better organized.
This commit is contained in:
Willy Tarreau 2025-09-18 14:58:38 +02:00
parent 08c6bbb542
commit eca1f90e16

View File

@ -277,27 +277,32 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
while (1) { while (1) {
if ((curr_cell = HA_ATOMIC_LOAD(ring_queue_ptr)) != &cell) if ((curr_cell = HA_ATOMIC_LOAD(ring_queue_ptr)) != &cell)
goto wait_for_flush; goto wait_for_flush;
__ha_cpu_relax_for_read();
#if !defined(__ARM_FEATURE_ATOMICS) /* OK the queue is locked, let's attempt to get the tail lock.
* we'll atomically OR the lock on the pointer and check if
* someone else had it already, otherwise we own it.
*/
#if defined(__ARM_FEATURE_ATOMICS)
/* ARMv8.1-a has a true atomic OR and doesn't need the preliminary read */ /* ARMv8.1-a has a true atomic OR and doesn't need the preliminary read */
if ((tail_ofs = HA_ATOMIC_LOAD(tail_ptr)) & RING_TAIL_LOCK) { tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK);
__ha_cpu_relax_for_read(); if (!(tail_ofs & RING_TAIL_LOCK))
continue; break;
#else
tail_ofs = HA_ATOMIC_LOAD(tail_ptr);
if (likely(!(tail_ofs & RING_TAIL_LOCK))) {
if (HA_ATOMIC_CAS(tail_ptr, &tail_ofs, tail_ofs | RING_TAIL_LOCK))
break;
} }
#endif #endif
/* OK the queue is locked, let's attempt to get the tail lock */ __ha_cpu_relax_for_read();
tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK); }
/* did we get it ? */
if (!(tail_ofs & RING_TAIL_LOCK)) {
/* Here we own the tail. We can go on if we're still the leader, /* Here we own the tail. We can go on if we're still the leader,
* which we'll confirm by trying to reset the queue. If we're * which we'll confirm by trying to reset the queue. If we're
* still the leader, we're done. * still the leader, we're done.
*/ */
if (HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) if (!HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) {
break; // Won!
/* oops, no, let's give it back to another thread and wait. /* oops, no, let's give it back to another thread and wait.
* This does not happen often enough to warrant more complex * This does not happen often enough to warrant more complex
* approaches (tried already). * approaches (tried already).
@ -305,8 +310,6 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
HA_ATOMIC_STORE(tail_ptr, tail_ofs); HA_ATOMIC_STORE(tail_ptr, tail_ofs);
goto wait_for_flush; goto wait_for_flush;
} }
__ha_cpu_relax_for_read();
}
head_ofs = HA_ATOMIC_LOAD(&ring->storage->head); head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);