diff --git a/src/ring.c b/src/ring.c index 4311bddd6..74172ce3a 100644 --- a/src/ring.c +++ b/src/ring.c @@ -277,37 +277,40 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz while (1) { if ((curr_cell = HA_ATOMIC_LOAD(ring_queue_ptr)) != &cell) goto wait_for_flush; - __ha_cpu_relax_for_read(); -#if !defined(__ARM_FEATURE_ATOMICS) + /* OK the queue is locked, let's attempt to get the tail lock. + * we'll atomically OR the lock on the pointer and check if + * someone else had it already, otherwise we own it. + */ + +#if defined(__ARM_FEATURE_ATOMICS) /* ARMv8.1-a has a true atomic OR and doesn't need the preliminary read */ - if ((tail_ofs = HA_ATOMIC_LOAD(tail_ptr)) & RING_TAIL_LOCK) { - __ha_cpu_relax_for_read(); - continue; + tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK); + if (!(tail_ofs & RING_TAIL_LOCK)) + break; +#else + tail_ofs = HA_ATOMIC_LOAD(tail_ptr); + if (likely(!(tail_ofs & RING_TAIL_LOCK))) { + if (HA_ATOMIC_CAS(tail_ptr, &tail_ofs, tail_ofs | RING_TAIL_LOCK)) + break; } #endif - /* OK the queue is locked, let's attempt to get the tail lock */ - tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK); - - /* did we get it ? */ - if (!(tail_ofs & RING_TAIL_LOCK)) { - /* Here we own the tail. We can go on if we're still the leader, - * which we'll confirm by trying to reset the queue. If we're - * still the leader, we're done. - */ - if (HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) - break; // Won! - - /* oops, no, let's give it back to another thread and wait. - * This does not happen often enough to warrant more complex - * approaches (tried already). - */ - HA_ATOMIC_STORE(tail_ptr, tail_ofs); - goto wait_for_flush; - } __ha_cpu_relax_for_read(); } + /* Here we own the tail. We can go on if we're still the leader, + * which we'll confirm by trying to reset the queue. If we're + * still the leader, we're done. + */ + if (!HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) { + /* oops, no, let's give it back to another thread and wait. + * This does not happen often enough to warrant more complex + * approaches (tried already). + */ + HA_ATOMIC_STORE(tail_ptr, tail_ofs); + goto wait_for_flush; + } + head_ofs = HA_ATOMIC_LOAD(&ring->storage->head); /* this is the byte before tail, it contains the users count */