MINOR: ring: simplify the write loop a little bit

This is mostly a cleanup in that it turns the two-level loop into a
single one, but it also simplifies the code a little bit and brings
some performance savings again, which are mostly noticeable on ARM,
but don't change anything for x86.
This commit is contained in:
Willy Tarreau 2024-03-17 12:09:30 +01:00
parent 573bbbe127
commit 4b984c5baa

View File

@ -265,49 +265,45 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
* tail to be unlocked again. We stop doing that if another thread
* comes in and becomes the leader in turn.
*/
next_cell = &cell;
/* Wait for another thread to take the lead or for the tail to
* be available again. It's critical to be read-only in this
* loop so as not to lose time synchronizing cache lines. Also,
* we must detect a new leader ASAP so that the fewest possible
* threads check the tail.
*/
while (1) {
/* Wait for another thread to take the lead or for the tail to
* be available again. It's critical to be read-only in this
* loop so as not to lose time synchronizing cache lines. Also,
* we must detect a new leader ASAP so that the fewest possible
* threads check the tail.
*/
next_cell = HA_ATOMIC_LOAD(ring_queue_ptr);
if (next_cell != &cell)
goto wait_for_flush; // another thread arrived, we should go to wait now
__ha_cpu_relax_for_read();
/* the tail is available again and we're still the leader, try
* again.
*/
while (1) {
next_cell = HA_ATOMIC_LOAD(ring_queue_ptr);
if (next_cell != &cell)
goto wait_for_flush; // FIXME: another thread arrived, we should go to wait now
__ha_cpu_relax_for_read();
#if defined(__x86_64__)
/* x86 prefers a read first */
if (!(HA_ATOMIC_LOAD(tail_ptr) & RING_TAIL_LOCK))
/* x86 prefers a read first */
if (!(HA_ATOMIC_LOAD(tail_ptr) & RING_TAIL_LOCK))
#endif
{
tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK);
if (!(tail_ofs & RING_TAIL_LOCK))
break;
{
/* OK the queue is locked, let's attempt to get the tail lock */
tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK);
if (!(tail_ofs & RING_TAIL_LOCK)) {
/* We got it! Are we still legitimate to get it ?
* We'll know by trying to reset the head and
* replace it with ourselves.
*/
curr_cell = &cell;
if (!HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) {
/* oops, no, let's give it back to another thread and wait */
HA_ATOMIC_STORE(tail_ptr, tail_ofs);
goto wait_for_flush; // another thread arrived, we should go to wait now
}
/* Won! */
break;
}
__ha_cpu_relax_for_read();
}
/* OK the queue is locked, let's attempt to get the tail lock */
/* did we get it ? */
if (!(tail_ofs & RING_TAIL_LOCK)) {
/* Yes! Are we still legitimate to get it ? We'll know by
* trying to reset the head and replace it with ourselves.
*/
curr_cell = &cell;
if (!HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) {
/* oops, no, let's give it back to another thread and wait */
HA_ATOMIC_STORE(tail_ptr, tail_ofs);
goto wait_for_flush; // another thread arrived, we should go to wait now
}
/* Won! */
break;
}
__ha_cpu_relax_for_read();
}
head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);