diff --git a/src/ring.c b/src/ring.c index 6abbe2caf..ca9a6c853 100644 --- a/src/ring.c +++ b/src/ring.c @@ -27,6 +27,7 @@ #include #include #include +#include /* context used to dump the contents of a ring via "show events" or "show errors" */ struct show_ring_ctx { @@ -164,6 +165,10 @@ void ring_free(struct ring *ring) ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg) { struct buffer *buf = &ring->storage->buf; + size_t head_ofs, tail_ofs; + size_t ring_size; + char *ring_area; + struct ist v1, v2; struct appctx *appctx; size_t msglen = 0; size_t lenlen; @@ -195,14 +200,29 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz * - lenlen bytes for the size encoding * - msglen for the message * - one byte for the new marker + * + * Note that we'll also reserve one extra byte to make sure we never + * leave a full buffer (the vec-to-ring conversion cannot be done if + * both areas are of size 0). */ needed = lenlen + msglen + 1; - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); - if (needed + 1 > b_size(buf)) - goto done_buf; + /* these ones do not change under us (only resize affects them and it + * must be done under thread isolation). + */ + ring_area = b_orig(buf); + ring_size = b_size(buf); - while (b_room(buf) < needed) { + HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); + if (needed + 1 > ring_size) + goto leave; + + head_ofs = b_head_ofs(buf); + tail_ofs = b_tail_ofs(buf); + + vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); + + while (vp_size(v1, v2) > ring_size - needed - 1 - 1) { /* we need to delete the oldest message (from the end), * and we have to stop if there's a reader stuck there. * Unless there's corruption in the buffer it's guaranteed @@ -210,19 +230,28 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz * varint-encoded length (1 byte min) and the message * payload (0 bytes min). */ - if (*b_head(buf)) - goto done_buf; - dellenlen = b_peek_varint(buf, 1, &dellen); + if (*_vp_head(v1, v2)) + break; + dellenlen = vp_peek_varint_ofs(v1, v2, 1, &dellen); if (!dellenlen) - goto done_buf; - BUG_ON(b_data(buf) < 1 + dellenlen + dellen); - - b_del(buf, 1 + dellenlen + dellen); + break; + BUG_ON_HOT(vp_size(v1, v2) < 1 + dellenlen + dellen); + vp_skip(&v1, &v2, 1 + dellenlen + dellen); } - /* OK now we do have room */ - __b_put_varint(buf, msglen); + /* now let's update the buffer with the new head and size */ + vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); + if (vp_size(v1, v2) > ring_size - needed - 1 - 1) + goto done_update_buf; + + /* now focus on free room */ + vp_ring_to_room(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); + + /* let's write the message size */ + vp_put_varint(&v1, &v2, msglen); + + /* then write the messages */ msglen = 0; for (i = 0; i < npfx; i++) { size_t len = pfx[i].len; @@ -230,7 +259,7 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz if (len + msglen > maxlen) len = maxlen - msglen; if (len) - __b_putblk(buf, pfx[i].ptr, len); + vp_putblk(&v1, &v2, pfx[i].ptr, len); msglen += len; } @@ -240,19 +269,28 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz if (len + msglen > maxlen) len = maxlen - msglen; if (len) - __b_putblk(buf, msg[i].ptr, len); + vp_putblk(&v1, &v2, msg[i].ptr, len); msglen += len; } - *b_tail(buf) = 0; buf->data++; // new read counter + vp_putchr(&v1, &v2, 0); // new read counter sent = lenlen + msglen + 1; BUG_ON_HOT(sent != needed); - /* notify potential readers */ - list_for_each_entry(appctx, &ring->waiters, wait_entry) - appctx_wakeup(appctx); + vp_room_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); - done_buf: + done_update_buf: + /* update the new space in the buffer */ + buf->head = head_ofs; + buf->data = ((tail_ofs >= head_ofs) ? 0 : ring_size) + tail_ofs - head_ofs; + + /* notify potential readers */ + if (sent) { + list_for_each_entry(appctx, &ring->waiters, wait_entry) + appctx_wakeup(appctx); + } + + leave: HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); return sent; }