diff --git a/src/ring.c b/src/ring.c index 7998f0905..9f2be10fd 100644 --- a/src/ring.c +++ b/src/ring.c @@ -367,63 +367,70 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs); - /* the list stops on a NULL */ - for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_XCHG(&curr_cell->next, curr_cell)) { - if (unlikely(tail_ofs == new_tail_ofs)) { - /* report that this message was dropped. - * Note: for now this must not happen! + if (likely(tail_ofs != new_tail_ofs)) { + /* the list stops on a NULL */ + for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_LOAD(&curr_cell->next)) { + maxlen = curr_cell->maxlen; + pfx = curr_cell->pfx; + npfx = curr_cell->npfx; + msg = curr_cell->msg; + nmsg = curr_cell->nmsg; + + /* let's write the message size */ + vp_put_varint(&v1, &v2, maxlen); + + /* then write the messages */ + msglen = 0; + for (i = 0; i < npfx; i++) { + size_t len = pfx[i].len; + + if (len + msglen > maxlen) + len = maxlen - msglen; + if (len) + vp_putblk(&v1, &v2, pfx[i].ptr, len); + msglen += len; + } + + for (i = 0; i < nmsg; i++) { + size_t len = msg[i].len; + + if (len + msglen > maxlen) + len = maxlen - msglen; + if (len) + vp_putblk(&v1, &v2, msg[i].ptr, len); + msglen += len; + } + + /* for all but the last message we need to write the + * readers count byte. */ + if (curr_cell->next) + vp_putchr(&v1, &v2, 0); + } + + /* now release */ + for (curr_cell = &cell; curr_cell; curr_cell = next_cell) { + next_cell = HA_ATOMIC_LOAD(&curr_cell->next); + HA_ATOMIC_STORE(&curr_cell->next, curr_cell); + } + + /* unlock the message area */ + HA_ATOMIC_STORE(lock_ptr, readers); + } else { + /* messages were dropped, notify about this and release them */ + for (curr_cell = &cell; curr_cell; curr_cell = next_cell) { + next_cell = HA_ATOMIC_LOAD(&curr_cell->next); HA_ATOMIC_STORE(&curr_cell->to_send_self, 0); - continue; + HA_ATOMIC_STORE(&curr_cell->next, curr_cell); } - - maxlen = curr_cell->maxlen; - pfx = curr_cell->pfx; - npfx = curr_cell->npfx; - msg = curr_cell->msg; - nmsg = curr_cell->nmsg; - - /* let's write the message size */ - vp_put_varint(&v1, &v2, maxlen); - - /* then write the messages */ - msglen = 0; - for (i = 0; i < npfx; i++) { - size_t len = pfx[i].len; - - if (len + msglen > maxlen) - len = maxlen - msglen; - if (len) - vp_putblk(&v1, &v2, pfx[i].ptr, len); - msglen += len; - } - - for (i = 0; i < nmsg; i++) { - size_t len = msg[i].len; - - if (len + msglen > maxlen) - len = maxlen - msglen; - if (len) - vp_putblk(&v1, &v2, msg[i].ptr, len); - msglen += len; - } - - /* for all but the last message we need to write the - * readers count byte. - */ - if (curr_cell->next) - vp_putchr(&v1, &v2, 0); } /* we must not write the trailing read counter, it was already done, - * plus we could ruin the one of the next writer. Let's just unlock - * the front. + * plus we could ruin the one of the next writer. And the front was + * unlocked either at the top if the ring was full, or just above if it + * could be properly filled. */ - /* unlock the message area */ - if (new_tail_ofs != tail_ofs) - HA_ATOMIC_STORE(lock_ptr, readers); - sent = cell.to_send_self; /* notify potential readers */