diff --git a/src/ring.c b/src/ring.c index 4118a645d..c445a23cc 100644 --- a/src/ring.c +++ b/src/ring.c @@ -175,6 +175,8 @@ void ring_free(struct ring *ring) */ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg) { + struct ring_wait_cell **ring_queue_ptr = DISGUISE(&ring->queue[ti->ring_queue].ptr); + struct ring_wait_cell cell, *next_cell, *curr_cell; size_t *tail_ptr = &ring->storage->tail; size_t head_ofs, tail_ofs, new_tail_ofs; size_t ring_size; @@ -228,24 +230,78 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz if (needed + 1 > ring_size) goto leave; - /* try to get exclusivity on the ring's tail. For this we set the - * tail's highest bit, and the one that gets it wins. Many tests were - * run on this and the approach below is optimal for armv8.1 atomics, - * second-to-optimal with both x86_64 and second-to-optimal on armv8.0. - * x86_64 would benefit slightly more from an xchg() which would - * require the readers to loop during changes, and armv8.0 is slightly - * better there as well (+5%). The CAS is bad for both (requires a - * preload), though it might degrade better on large x86 compared to - * a busy loop that the compiler would implement for the FETCH_OR. - * Alternately we could kill 12 upper bits on a 64-bit tail ofs and - * use XADD. Not tested, and would require to undo or watch for the - * change (use it as a ticket). + cell.to_send_self = needed; + cell.needed_tot = 0; // only when non-zero the cell is considered ready. + cell.maxlen = msglen; + cell.pfx = pfx; + cell.npfx = npfx; + cell.msg = msg; + cell.nmsg = nmsg; + + /* insert our cell into the queue before the previous one. We may have + * to wait a bit if the queue's leader is attempting an election to win + * the tail, hence the busy value (should be rare enough). + */ + next_cell = HA_ATOMIC_XCHG(ring_queue_ptr, &cell); + + /* let's add the cumulated size of pending messages to ours */ + cell.next = next_cell; + if (next_cell) { + size_t next_needed; + + while ((next_needed = HA_ATOMIC_LOAD(&next_cell->needed_tot)) == 0) + __ha_cpu_relax_for_read(); + needed += next_needed; + } + + /* now will represent the size to store *all* messages. The + * atomic store may unlock a subsequent thread waiting for this one. + */ + HA_ATOMIC_STORE(&cell.needed_tot, needed); + + /* OK now we're the queue leader, it's our job to try to get ownership + * of the tail, if we succeeded above, we don't even enter the loop. If + * we failed, we set ourselves at the top the queue, waiting for the + * tail to be unlocked again. We stop doing that if another thread + * comes in and becomes the leader in turn. */ while (1) { + /* Wait for another thread to take the lead or for the tail to + * be available again. It's critical to be read-only in this + * loop so as not to lose time synchronizing cache lines. Also, + * we must detect a new leader ASAP so that the fewest possible + * threads check the tail. + */ + while ((tail_ofs = HA_ATOMIC_LOAD(tail_ptr)) & RING_TAIL_LOCK) { + next_cell = HA_ATOMIC_LOAD(ring_queue_ptr); + if (next_cell != &cell) + goto wait_for_flush; // another thread arrived, we should go to wait now + __ha_cpu_relax_for_read(); + } + + /* the tail is available again and we're still the leader, try + * again. + */ + if (HA_ATOMIC_LOAD(ring_queue_ptr) != &cell) + goto wait_for_flush; // another thread arrived, we should go to wait now + + /* OK the queue is locked, let's attempt to get the tail lock */ tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK); - if (!(tail_ofs & RING_TAIL_LOCK)) + + /* did we get it ? */ + if (!(tail_ofs & RING_TAIL_LOCK)) { + /* Yes! Are we still legitimate to get it ? We'll know by + * trying to reset the head and replace it with ourselves. + */ + curr_cell = &cell; + if (!HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) { + /* oops, no, let's give it back to another thread and wait */ + HA_ATOMIC_STORE(tail_ptr, tail_ofs); + goto wait_for_flush; // another thread arrived, we should go to wait now + } + /* Won! */ break; - pl_wait_unlock_long(tail_ptr, RING_TAIL_LOCK); + } } head_ofs = HA_ATOMIC_LOAD(&ring->storage->head); @@ -253,9 +309,10 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz /* this is the byte before tail, it contains the users count */ lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1); - /* take the lock on the area. Normally we're alone */ + /* Take the lock on the area. We're guaranteed to be the only writer + * here. + */ readers = HA_ATOMIC_XCHG(lock_ptr, RING_WRITING_SIZE); - BUG_ON_HOT(readers == RING_WRITING_SIZE); vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); @@ -278,7 +335,7 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz /* now let's update the buffer with the new tail if our message will fit */ new_tail_ofs = tail_ofs; - if (vp_size(v1, v2) <= ring_size - needed - 1) { + if (vp_size(v1, v2) <= ring_size - needed - 1 - 1) { vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); /* update the new space in the buffer */ @@ -292,54 +349,76 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz /* reset next read counter before releasing writers */ HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0); } + else { + /* release readers right now, before writing the tail, so as + * not to expose the readers count byte to another writer. + */ + HA_ATOMIC_STORE(lock_ptr, readers); + } /* and release other writers */ HA_ATOMIC_STORE(tail_ptr, new_tail_ofs); - if (vp_size(v1, v2) > ring_size - needed - 1 - 1) { - /* we had to stop due to readers blocking the head, - * let's give up. - */ - goto done_update_buf; - } - - /* now focus on free room between the old and the new tail */ vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs); - /* let's write the message size */ - vp_put_varint(&v1, &v2, msglen); + /* the list stops on a NULL */ + for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_XCHG(&curr_cell->next, curr_cell)) { + if (unlikely(tail_ofs == new_tail_ofs)) { + /* report that this message was dropped. + * Note: for now this must not happen! + */ + HA_ATOMIC_STORE(&curr_cell->to_send_self, 0); + continue; + } - /* then write the messages */ - msglen = 0; - for (i = 0; i < npfx; i++) { - size_t len = pfx[i].len; + maxlen = curr_cell->maxlen; + pfx = curr_cell->pfx; + npfx = curr_cell->npfx; + msg = curr_cell->msg; + nmsg = curr_cell->nmsg; - if (len + msglen > maxlen) - len = maxlen - msglen; - if (len) - vp_putblk(&v1, &v2, pfx[i].ptr, len); - msglen += len; + /* let's write the message size */ + vp_put_varint(&v1, &v2, maxlen); + + /* then write the messages */ + msglen = 0; + for (i = 0; i < npfx; i++) { + size_t len = pfx[i].len; + + if (len + msglen > maxlen) + len = maxlen - msglen; + if (len) + vp_putblk(&v1, &v2, pfx[i].ptr, len); + msglen += len; + } + + for (i = 0; i < nmsg; i++) { + size_t len = msg[i].len; + + if (len + msglen > maxlen) + len = maxlen - msglen; + if (len) + vp_putblk(&v1, &v2, msg[i].ptr, len); + msglen += len; + } + + /* for all but the last message we need to write the + * readers count byte. + */ + if (curr_cell->next) + vp_putchr(&v1, &v2, 0); } - for (i = 0; i < nmsg; i++) { - size_t len = msg[i].len; - - if (len + msglen > maxlen) - len = maxlen - msglen; - if (len) - vp_putblk(&v1, &v2, msg[i].ptr, len); - msglen += len; - } - - /* we must not write the read counter, it was already done, - * plus we could ruin the one of the next writer. + /* we must not write the trailing read counter, it was already done, + * plus we could ruin the one of the next writer. Let's just unlock + * the front. */ - sent = lenlen + msglen + 1; - BUG_ON_HOT(sent != needed); - done_update_buf: /* unlock the message area */ - HA_ATOMIC_STORE(lock_ptr, readers); + if (new_tail_ofs != tail_ofs) + HA_ATOMIC_STORE(lock_ptr, readers); + + sent = cell.to_send_self; /* notify potential readers */ if (sent && HA_ATOMIC_LOAD(&ring->readers_count)) { @@ -357,6 +436,21 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz leave: return sent; + + wait_for_flush: + /* The leader will write our own pointer in the cell's next to + * mark it as released. Let's wait for this. + */ + do { + next_cell = HA_ATOMIC_LOAD(&cell.next); + } while (next_cell != &cell && __ha_cpu_relax_for_read()); + + /* OK our message was queued. Retrieving the sent size in the ring cell + * allows another leader thread to zero it if it finally couldn't send + * it (should only happen when using too small ring buffers to store + * all competing threads' messages at once). + */ + return HA_ATOMIC_LOAD(&cell.to_send_self); } /* Tries to attach appctx as a new reader on ring . This is