mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-06 07:07:04 +02:00
The main CLI I/O handle is responsible to interrupt the processing on shutdown/abort. It is not the responsibility of the I/O handler of CLI commands to take care of it.
786 lines
24 KiB
C
786 lines
24 KiB
C
/*
|
|
* Ring buffer management
|
|
*
|
|
* Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Lesser General Public
|
|
* License as published by the Free Software Foundation, version 2.1
|
|
* exclusively.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public
|
|
* License along with this library; if not, write to the Free Software
|
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
#include <stdlib.h>
|
|
#include <haproxy/api.h>
|
|
#include <haproxy/applet.h>
|
|
#include <haproxy/buf.h>
|
|
#include <haproxy/cfgparse.h>
|
|
#include <haproxy/cli.h>
|
|
#include <haproxy/ring.h>
|
|
#include <haproxy/sc_strm.h>
|
|
#include <haproxy/stconn.h>
|
|
#include <haproxy/thread.h>
|
|
#include <haproxy/vecpair.h>
|
|
|
|
/* context used to dump the contents of a ring via "show events" or "show errors" */
|
|
struct show_ring_ctx {
|
|
struct ring *ring; /* ring to be dumped */
|
|
size_t ofs; /* storage offset to restart from; ~0=oldest */
|
|
uint flags; /* set of RING_WF_* */
|
|
};
|
|
|
|
/* Initialize a pre-allocated ring with the buffer area of size <size>.
|
|
* Makes the storage point to the indicated area and adjusts the declared
|
|
* ring size according to the position of the area in the storage. If <reset>
|
|
* is non-zero, the storage area is reset, otherwise it's left intact (except
|
|
* for the area origin pointer which is updated so that the area can come from
|
|
* an mmap()).
|
|
*/
|
|
void ring_init(struct ring *ring, void *area, size_t size, int reset)
|
|
{
|
|
MT_LIST_INIT(&ring->waiters);
|
|
ring->readers_count = 0;
|
|
ring->flags = 0;
|
|
ring->storage = area;
|
|
ring->pending = 0;
|
|
ring->waking = 0;
|
|
memset(&ring->queue, 0, sizeof(ring->queue));
|
|
|
|
if (reset) {
|
|
ring->storage->size = size - sizeof(*ring->storage);
|
|
ring->storage->rsvd = sizeof(*ring->storage);
|
|
ring->storage->head = 0;
|
|
ring->storage->tail = 0;
|
|
|
|
/* write the initial RC byte */
|
|
*ring->storage->area = 0;
|
|
ring->storage->tail = 1;
|
|
}
|
|
}
|
|
|
|
/* Creates a ring and its storage area at address <area> for <size> bytes.
|
|
* If <area> is null, then it's allocated of the requested size. The ring
|
|
* storage struct is part of the area so the usable area is slightly reduced.
|
|
* However the storage is immediately adjacent to the struct so that the ring
|
|
* remains consistent on-disk. ring_free() will ignore such ring stoages and
|
|
* will only release the ring part, so the caller is responsible for releasing
|
|
* them. If <reset> is non-zero, the storage area is reset, otherwise it's left
|
|
* intact.
|
|
*/
|
|
struct ring *ring_make_from_area(void *area, size_t size, int reset)
|
|
{
|
|
struct ring *ring = NULL;
|
|
uint flags = 0;
|
|
|
|
if (size < sizeof(*ring->storage) + 2)
|
|
return NULL;
|
|
|
|
ring = malloc(sizeof(*ring));
|
|
if (!ring)
|
|
goto fail;
|
|
|
|
if (!area)
|
|
area = malloc(size);
|
|
else
|
|
flags |= RING_FL_MAPPED;
|
|
|
|
if (!area)
|
|
goto fail;
|
|
|
|
ring_init(ring, area, size, reset);
|
|
ring->flags |= flags;
|
|
return ring;
|
|
fail:
|
|
free(ring);
|
|
return NULL;
|
|
}
|
|
|
|
/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on
|
|
* allocation failure.
|
|
*/
|
|
struct ring *ring_new(size_t size)
|
|
{
|
|
return ring_make_from_area(NULL, size, 1);
|
|
}
|
|
|
|
/* Resizes existing ring <ring> to <size> which must be larger, without losing
|
|
* its contents. The new size must be at least as large as the previous one or
|
|
* no change will be performed. The pointer to the ring is returned on success,
|
|
* or NULL on allocation failure. This will lock the ring for writes.
|
|
*/
|
|
struct ring *ring_resize(struct ring *ring, size_t size)
|
|
{
|
|
struct ring_storage *old, *new;
|
|
|
|
if (size <= ring_data(ring) + sizeof(*ring->storage))
|
|
return ring;
|
|
|
|
old = ring->storage;
|
|
new = malloc(size);
|
|
if (!new)
|
|
return NULL;
|
|
|
|
thread_isolate();
|
|
|
|
/* recheck the ring's size, it may have changed during the malloc */
|
|
if (size > ring_data(ring) + sizeof(*ring->storage)) {
|
|
/* copy old contents */
|
|
struct ist v1, v2;
|
|
size_t len;
|
|
|
|
vp_ring_to_data(&v1, &v2, old->area, old->size, old->head, old->tail);
|
|
len = vp_size(v1, v2);
|
|
vp_peek_ofs(v1, v2, 0, new->area, len);
|
|
new->size = size - sizeof(*ring->storage);
|
|
new->rsvd = sizeof(*ring->storage);
|
|
new->head = 0;
|
|
new->tail = len;
|
|
new = HA_ATOMIC_XCHG(&ring->storage, new);
|
|
}
|
|
|
|
thread_release();
|
|
|
|
/* free the unused one */
|
|
free(new);
|
|
return ring;
|
|
}
|
|
|
|
/* destroys and frees ring <ring> */
|
|
void ring_free(struct ring *ring)
|
|
{
|
|
if (!ring)
|
|
return;
|
|
|
|
/* make sure it was not allocated by ring_make_from_area */
|
|
if (!(ring->flags & RING_FL_MAPPED))
|
|
free(ring->storage);
|
|
free(ring);
|
|
}
|
|
|
|
/* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>
|
|
* to ring <ring>. The message is sent atomically. It may be truncated to
|
|
* <maxlen> bytes if <maxlen> is non-null. There is no distinction between the
|
|
* two lists, it's just a convenience to help the caller prepend some prefixes
|
|
* when necessary. It takes the ring's write lock to make sure no other thread
|
|
* will touch the buffer during the update. Returns the number of bytes sent,
|
|
* or <=0 on failure.
|
|
*/
|
|
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
|
|
{
|
|
struct ring_wait_cell **ring_queue_ptr = DISGUISE(&ring->queue[ti->ring_queue].ptr);
|
|
struct ring_wait_cell cell, *next_cell, *curr_cell;
|
|
size_t *tail_ptr = &ring->storage->tail;
|
|
size_t head_ofs, tail_ofs, new_tail_ofs;
|
|
size_t ring_size;
|
|
char *ring_area;
|
|
struct ist v1, v2;
|
|
size_t msglen = 0;
|
|
size_t lenlen;
|
|
size_t needed;
|
|
uint64_t dellen;
|
|
int dellenlen;
|
|
uint8_t *lock_ptr;
|
|
uint8_t readers;
|
|
ssize_t sent = 0;
|
|
int i;
|
|
|
|
/* we have to find some room to add our message (the buffer is
|
|
* never empty and at least contains the previous counter) and
|
|
* to update both the buffer contents and heads at the same
|
|
* time (it's doable using atomic ops but not worth the
|
|
* trouble, let's just lock). For this we first need to know
|
|
* the total message's length. We cannot measure it while
|
|
* copying due to the varint encoding of the length.
|
|
*/
|
|
for (i = 0; i < npfx; i++)
|
|
msglen += pfx[i].len;
|
|
for (i = 0; i < nmsg; i++)
|
|
msglen += msg[i].len;
|
|
|
|
if (msglen > maxlen)
|
|
msglen = maxlen;
|
|
|
|
lenlen = varint_bytes(msglen);
|
|
|
|
/* We need:
|
|
* - lenlen bytes for the size encoding
|
|
* - msglen for the message
|
|
* - one byte for the new marker
|
|
*
|
|
* Note that we'll also reserve one extra byte to make sure we never
|
|
* leave a full buffer (the vec-to-ring conversion cannot be done if
|
|
* both areas are of size 0).
|
|
*/
|
|
needed = lenlen + msglen + 1;
|
|
|
|
/* these ones do not change under us (only resize affects them and it
|
|
* must be done under thread isolation).
|
|
*/
|
|
ring_area = ring->storage->area;
|
|
ring_size = ring->storage->size;
|
|
|
|
if (needed + 1 > ring_size)
|
|
goto leave;
|
|
|
|
cell.to_send_self = needed;
|
|
cell.needed_tot = 0; // only when non-zero the cell is considered ready.
|
|
cell.maxlen = msglen;
|
|
cell.pfx = pfx;
|
|
cell.npfx = npfx;
|
|
cell.msg = msg;
|
|
cell.nmsg = nmsg;
|
|
|
|
/* insert our cell into the queue before the previous one. We may have
|
|
* to wait a bit if the queue's leader is attempting an election to win
|
|
* the tail, hence the busy value (should be rare enough).
|
|
*/
|
|
next_cell = HA_ATOMIC_XCHG(ring_queue_ptr, &cell);
|
|
|
|
/* let's add the cumulated size of pending messages to ours */
|
|
cell.next = next_cell;
|
|
if (next_cell) {
|
|
size_t next_needed;
|
|
|
|
while ((next_needed = HA_ATOMIC_LOAD(&next_cell->needed_tot)) == 0)
|
|
__ha_cpu_relax_for_read();
|
|
needed += next_needed;
|
|
}
|
|
|
|
/* now <needed> will represent the size to store *all* messages. The
|
|
* atomic store may unlock a subsequent thread waiting for this one.
|
|
*/
|
|
HA_ATOMIC_STORE(&cell.needed_tot, needed);
|
|
|
|
/* OK now we're the queue leader, it's our job to try to get ownership
|
|
* of the tail, if we succeeded above, we don't even enter the loop. If
|
|
* we failed, we set ourselves at the top the queue, waiting for the
|
|
* tail to be unlocked again. We stop doing that if another thread
|
|
* comes in and becomes the leader in turn.
|
|
*/
|
|
|
|
/* Wait for another thread to take the lead or for the tail to
|
|
* be available again. It's critical to be read-only in this
|
|
* loop so as not to lose time synchronizing cache lines. Also,
|
|
* we must detect a new leader ASAP so that the fewest possible
|
|
* threads check the tail.
|
|
*/
|
|
|
|
while (1) {
|
|
if ((curr_cell = HA_ATOMIC_LOAD(ring_queue_ptr)) != &cell)
|
|
goto wait_for_flush;
|
|
__ha_cpu_relax_for_read();
|
|
|
|
#if !defined(__ARM_FEATURE_ATOMICS)
|
|
/* ARMv8.1-a has a true atomic OR and doesn't need the preliminary read */
|
|
if ((tail_ofs = HA_ATOMIC_LOAD(tail_ptr)) & RING_TAIL_LOCK) {
|
|
__ha_cpu_relax_for_read();
|
|
continue;
|
|
}
|
|
#endif
|
|
/* OK the queue is locked, let's attempt to get the tail lock */
|
|
tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK);
|
|
|
|
/* did we get it ? */
|
|
if (!(tail_ofs & RING_TAIL_LOCK)) {
|
|
/* Here we own the tail. We can go on if we're still the leader,
|
|
* which we'll confirm by trying to reset the queue. If we're
|
|
* still the leader, we're done.
|
|
*/
|
|
if (HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL))
|
|
break; // Won!
|
|
|
|
/* oops, no, let's give it back to another thread and wait.
|
|
* This does not happen often enough to warrant more complex
|
|
* approaches (tried already).
|
|
*/
|
|
HA_ATOMIC_STORE(tail_ptr, tail_ofs);
|
|
goto wait_for_flush;
|
|
}
|
|
__ha_cpu_relax_for_read();
|
|
}
|
|
|
|
head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
|
|
|
|
/* this is the byte before tail, it contains the users count */
|
|
lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1);
|
|
|
|
/* Take the lock on the area. We're guaranteed to be the only writer
|
|
* here.
|
|
*/
|
|
readers = HA_ATOMIC_XCHG(lock_ptr, RING_WRITING_SIZE);
|
|
|
|
vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
|
|
|
|
while (vp_size(v1, v2) > ring_size - needed - 1 - 1) {
|
|
/* we need to delete the oldest message (from the end),
|
|
* and we have to stop if there's a reader stuck there.
|
|
* Unless there's corruption in the buffer it's guaranteed
|
|
* that we have enough data to find 1 counter byte, a
|
|
* varint-encoded length (1 byte min) and the message
|
|
* payload (0 bytes min).
|
|
*/
|
|
if (*_vp_head(v1, v2))
|
|
break;
|
|
dellenlen = vp_peek_varint_ofs(v1, v2, 1, &dellen);
|
|
if (!dellenlen)
|
|
break;
|
|
BUG_ON_HOT(vp_size(v1, v2) < 1 + dellenlen + dellen);
|
|
vp_skip(&v1, &v2, 1 + dellenlen + dellen);
|
|
}
|
|
|
|
/* now let's update the buffer with the new tail if our message will fit */
|
|
new_tail_ofs = tail_ofs;
|
|
if (vp_size(v1, v2) <= ring_size - needed - 1 - 1) {
|
|
vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
|
|
|
|
/* update the new space in the buffer */
|
|
HA_ATOMIC_STORE(&ring->storage->head, head_ofs);
|
|
|
|
/* calculate next tail pointer */
|
|
new_tail_ofs += needed;
|
|
if (new_tail_ofs >= ring_size)
|
|
new_tail_ofs -= ring_size;
|
|
|
|
/* reset next read counter before releasing writers */
|
|
HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0);
|
|
}
|
|
else {
|
|
/* release readers right now, before writing the tail, so as
|
|
* not to expose the readers count byte to another writer.
|
|
*/
|
|
HA_ATOMIC_STORE(lock_ptr, readers);
|
|
}
|
|
|
|
/* and release other writers */
|
|
HA_ATOMIC_STORE(tail_ptr, new_tail_ofs);
|
|
|
|
vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs);
|
|
|
|
if (likely(tail_ofs != new_tail_ofs)) {
|
|
/* the list stops on a NULL */
|
|
for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_LOAD(&curr_cell->next)) {
|
|
maxlen = curr_cell->maxlen;
|
|
pfx = curr_cell->pfx;
|
|
npfx = curr_cell->npfx;
|
|
msg = curr_cell->msg;
|
|
nmsg = curr_cell->nmsg;
|
|
|
|
/* let's write the message size */
|
|
vp_put_varint(&v1, &v2, maxlen);
|
|
|
|
/* then write the messages */
|
|
msglen = 0;
|
|
for (i = 0; i < npfx; i++) {
|
|
size_t len = pfx[i].len;
|
|
|
|
if (len + msglen > maxlen)
|
|
len = maxlen - msglen;
|
|
if (len)
|
|
vp_putblk(&v1, &v2, pfx[i].ptr, len);
|
|
msglen += len;
|
|
}
|
|
|
|
for (i = 0; i < nmsg; i++) {
|
|
size_t len = msg[i].len;
|
|
|
|
if (len + msglen > maxlen)
|
|
len = maxlen - msglen;
|
|
if (len)
|
|
vp_putblk(&v1, &v2, msg[i].ptr, len);
|
|
msglen += len;
|
|
}
|
|
|
|
/* for all but the last message we need to write the
|
|
* readers count byte.
|
|
*/
|
|
if (curr_cell->next)
|
|
vp_putchr(&v1, &v2, 0);
|
|
}
|
|
|
|
/* now release */
|
|
for (curr_cell = &cell; curr_cell; curr_cell = next_cell) {
|
|
next_cell = HA_ATOMIC_LOAD(&curr_cell->next);
|
|
_HA_ATOMIC_STORE(&curr_cell->next, curr_cell);
|
|
}
|
|
|
|
/* unlock the message area */
|
|
HA_ATOMIC_STORE(lock_ptr, readers);
|
|
} else {
|
|
/* messages were dropped, notify about this and release them */
|
|
for (curr_cell = &cell; curr_cell; curr_cell = next_cell) {
|
|
next_cell = HA_ATOMIC_LOAD(&curr_cell->next);
|
|
HA_ATOMIC_STORE(&curr_cell->to_send_self, 0);
|
|
_HA_ATOMIC_STORE(&curr_cell->next, curr_cell);
|
|
}
|
|
}
|
|
|
|
/* we must not write the trailing read counter, it was already done,
|
|
* plus we could ruin the one of the next writer. And the front was
|
|
* unlocked either at the top if the ring was full, or just above if it
|
|
* could be properly filled.
|
|
*/
|
|
|
|
sent = cell.to_send_self;
|
|
|
|
/* notify potential readers */
|
|
if (sent && HA_ATOMIC_LOAD(&ring->readers_count)) {
|
|
HA_ATOMIC_INC(&ring->pending);
|
|
while (HA_ATOMIC_LOAD(&ring->pending) && HA_ATOMIC_XCHG(&ring->waking, 1) == 0) {
|
|
struct mt_list *elt1, elt2;
|
|
struct appctx *appctx;
|
|
|
|
HA_ATOMIC_STORE(&ring->pending, 0);
|
|
mt_list_for_each_entry_safe(appctx, &ring->waiters, wait_entry, elt1, elt2)
|
|
appctx_wakeup(appctx);
|
|
HA_ATOMIC_STORE(&ring->waking, 0);
|
|
}
|
|
}
|
|
|
|
leave:
|
|
return sent;
|
|
|
|
wait_for_flush:
|
|
/* if we arrive here, it means we found another leader */
|
|
|
|
/* The leader will write our own pointer in the cell's next to
|
|
* mark it as released. Let's wait for this.
|
|
*/
|
|
do {
|
|
next_cell = HA_ATOMIC_LOAD(&cell.next);
|
|
} while (next_cell != &cell && __ha_cpu_relax_for_read());
|
|
|
|
/* OK our message was queued. Retrieving the sent size in the ring cell
|
|
* allows another leader thread to zero it if it finally couldn't send
|
|
* it (should only happen when using too small ring buffers to store
|
|
* all competing threads' messages at once).
|
|
*/
|
|
return HA_ATOMIC_LOAD(&cell.to_send_self);
|
|
}
|
|
|
|
/* Tries to attach appctx <appctx> as a new reader on ring <ring>. This is
|
|
* meant to be used by low level appctx code such as CLI or ring forwarding.
|
|
* For higher level functions, please see the relevant parts in appctx or CLI.
|
|
* It returns non-zero on success or zero on failure if too many users are
|
|
* already attached. On success, the caller MUST call ring_detach_appctx()
|
|
* to detach itself, even if it was never woken up.
|
|
*/
|
|
int ring_attach(struct ring *ring)
|
|
{
|
|
int users = ring->readers_count;
|
|
|
|
do {
|
|
if (users >= RING_MAX_READERS)
|
|
return 0;
|
|
} while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1));
|
|
return 1;
|
|
}
|
|
|
|
/* detach an appctx from a ring. The appctx is expected to be waiting at offset
|
|
* <ofs> relative to the beginning of the storage, or ~0 if not waiting yet.
|
|
* Nothing is done if <ring> is NULL.
|
|
*/
|
|
void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
|
|
{
|
|
if (!ring)
|
|
return;
|
|
|
|
HA_ATOMIC_DEC(&ring->readers_count);
|
|
|
|
if (ofs != ~0) {
|
|
/* reader was still attached */
|
|
uint8_t *area = (uint8_t *)ring_area(ring);
|
|
uint8_t readers;
|
|
|
|
BUG_ON(ofs >= ring_size(ring));
|
|
MT_LIST_DELETE(&appctx->wait_entry);
|
|
|
|
/* dec readers count */
|
|
do {
|
|
readers = _HA_ATOMIC_LOAD(area + ofs);
|
|
} while ((readers > RING_MAX_READERS ||
|
|
!_HA_ATOMIC_CAS(area + ofs, &readers, readers - 1)) && __ha_cpu_relax());
|
|
}
|
|
}
|
|
|
|
/* Tries to attach CLI handler <appctx> as a new reader on ring <ring>. This is
|
|
* meant to be used when registering a CLI function to dump a buffer, so it
|
|
* returns zero on success, or non-zero on failure with a message in the appctx
|
|
* CLI context. It automatically sets the io_handler and io_release callbacks if
|
|
* they were not set. The <flags> take a combination of RING_WF_*.
|
|
*/
|
|
int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
|
|
{
|
|
struct show_ring_ctx *ctx = applet_reserve_svcctx(appctx, sizeof(*ctx));
|
|
|
|
if (!ring_attach(ring))
|
|
return cli_err(appctx,
|
|
"Sorry, too many watchers (" TOSTR(RING_MAX_READERS) ") on this ring buffer. "
|
|
"What could it have so interesting to attract so many watchers ?");
|
|
|
|
if (!appctx->io_handler)
|
|
appctx->io_handler = cli_io_handler_show_ring;
|
|
if (!appctx->io_release)
|
|
appctx->io_release = cli_io_release_show_ring;
|
|
|
|
memset(ctx, 0, sizeof(*ctx));
|
|
ctx->ring = ring;
|
|
ctx->ofs = ~0; // start from the oldest event
|
|
ctx->flags = flags;
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* parses as many messages as possible from ring <ring>, starting at the offset
|
|
* stored at *ofs_ptr, with RING_WF_* flags in <flags>, and passes them to
|
|
* the message handler <msg_handler>. If <last_of_ptr> is not NULL, a copy of
|
|
* the last known tail pointer will be copied there so that the caller may use
|
|
* this to detect new data have arrived since we left the function. Returns 0
|
|
* if it needs to pause, 1 once finished.
|
|
*/
|
|
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
|
|
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
|
|
{
|
|
size_t head_ofs, tail_ofs, prev_ofs;
|
|
size_t ring_size;
|
|
uint8_t *ring_area;
|
|
struct ist v1, v2;
|
|
uint64_t msg_len;
|
|
size_t len, cnt;
|
|
ssize_t copied;
|
|
uint8_t readers;
|
|
int ret;
|
|
|
|
ring_area = (uint8_t *)ring->storage->area;
|
|
ring_size = ring->storage->size;
|
|
|
|
/* explanation for the initialization below: it would be better to do
|
|
* this in the parsing function but this would occasionally result in
|
|
* dropped events because we'd take a reference on the oldest message
|
|
* and keep it while being scheduled. Thus instead let's take it the
|
|
* first time we enter here so that we have a chance to pass many
|
|
* existing messages before grabbing a reference to a location. This
|
|
* value cannot be produced after initialization. The first offset
|
|
* needs to be taken under isolation as it must not move while we're
|
|
* trying to catch it.
|
|
*/
|
|
if (unlikely(*ofs_ptr == ~0)) {
|
|
thread_isolate();
|
|
|
|
head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
|
|
tail_ofs = ring_tail(ring);
|
|
|
|
if (flags & RING_WF_SEEK_NEW) {
|
|
/* going to the end means looking at tail-1 */
|
|
head_ofs = tail_ofs + ring_size - 1;
|
|
if (head_ofs >= ring_size)
|
|
head_ofs -= ring_size;
|
|
}
|
|
|
|
/* reserve our slot here (inc readers count) */
|
|
do {
|
|
readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
|
|
} while ((readers > RING_MAX_READERS ||
|
|
!_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
|
|
|
|
thread_release();
|
|
|
|
/* store this precious offset in our context, and we're done */
|
|
*ofs_ptr = head_ofs;
|
|
}
|
|
|
|
/* we have the guarantee we can restart from our own head */
|
|
head_ofs = *ofs_ptr;
|
|
BUG_ON(head_ofs >= ring_size);
|
|
|
|
/* the tail will continue to move but we're getting a safe value
|
|
* here that will continue to work.
|
|
*/
|
|
tail_ofs = ring_tail(ring);
|
|
|
|
/* we keep track of where we were and we don't release it before
|
|
* we've protected the next place.
|
|
*/
|
|
prev_ofs = head_ofs;
|
|
|
|
/* in this loop, head_ofs always points to the counter byte that precedes
|
|
* the message so that we can take our reference there if we have to
|
|
* stop before the end (ret=0). The reference is relative to the ring's
|
|
* origin, while pos is relative to the ring's head.
|
|
*/
|
|
ret = 1;
|
|
vp_ring_to_data(&v1, &v2, (char *)ring_area, ring_size, head_ofs, tail_ofs);
|
|
|
|
while (1) {
|
|
if (vp_size(v1, v2) <= 1) {
|
|
/* no more data */
|
|
break;
|
|
}
|
|
|
|
readers = _HA_ATOMIC_LOAD(_vp_addr(v1, v2, 0));
|
|
if (readers > RING_MAX_READERS) {
|
|
/* we just met a writer which hasn't finished */
|
|
break;
|
|
}
|
|
|
|
cnt = 1;
|
|
len = vp_peek_varint_ofs(v1, v2, cnt, &msg_len);
|
|
if (!len)
|
|
break;
|
|
cnt += len;
|
|
|
|
BUG_ON(msg_len + cnt + 1 > vp_size(v1, v2));
|
|
|
|
copied = msg_handler(ctx, v1, v2, cnt, msg_len);
|
|
if (copied == -2) {
|
|
/* too large a message to ever fit, let's skip it */
|
|
goto skip;
|
|
}
|
|
else if (copied == -1) {
|
|
/* output full */
|
|
ret = 0;
|
|
break;
|
|
}
|
|
skip:
|
|
vp_skip(&v1, &v2, cnt + msg_len);
|
|
}
|
|
|
|
vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs);
|
|
|
|
if (head_ofs != prev_ofs) {
|
|
/* inc readers count on new place */
|
|
do {
|
|
readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
|
|
} while ((readers > RING_MAX_READERS ||
|
|
!_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
|
|
|
|
/* dec readers count on old place */
|
|
do {
|
|
readers = _HA_ATOMIC_LOAD(ring_area + prev_ofs);
|
|
} while ((readers > RING_MAX_READERS ||
|
|
!_HA_ATOMIC_CAS(ring_area + prev_ofs, &readers, readers - 1)) && __ha_cpu_relax());
|
|
}
|
|
|
|
if (last_ofs_ptr)
|
|
*last_ofs_ptr = tail_ofs;
|
|
*ofs_ptr = head_ofs;
|
|
return ret;
|
|
}
|
|
|
|
/* This function dumps all events from the ring whose pointer is in <p0> into
|
|
* the appctx's output buffer, and takes from <o0> the seek offset into the
|
|
* buffer's history (0 for oldest known event). It looks at <i0> for boolean
|
|
* options: bit0 means it must wait for new data or any key to be pressed. Bit1
|
|
* means it must seek directly to the end to wait for new contents. It returns
|
|
* 0 if the output buffer or events are missing is full and it needs to be
|
|
* called again, otherwise non-zero. It is meant to be used with
|
|
* cli_release_show_ring() to clean up.
|
|
*/
|
|
int cli_io_handler_show_ring(struct appctx *appctx)
|
|
{
|
|
struct show_ring_ctx *ctx = appctx->svcctx;
|
|
struct stconn *sc = appctx_sc(appctx);
|
|
struct ring *ring = ctx->ring;
|
|
size_t last_ofs;
|
|
size_t ofs;
|
|
int ret;
|
|
|
|
MT_LIST_DELETE(&appctx->wait_entry);
|
|
|
|
ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line);
|
|
|
|
if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
|
|
/* we've drained everything and are configured to wait for more
|
|
* data or an event (keypress, close)
|
|
*/
|
|
if (!sc_oc(sc)->output && !(sc->flags & SC_FL_SHUT_DONE)) {
|
|
/* let's be woken up once new data arrive */
|
|
MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry);
|
|
ofs = ring_tail(ring);
|
|
if (ofs != last_ofs) {
|
|
/* more data was added into the ring between the
|
|
* unlock and the lock, and the writer might not
|
|
* have seen us. We need to reschedule a read.
|
|
*/
|
|
applet_have_more_data(appctx);
|
|
} else
|
|
applet_have_no_more_data(appctx);
|
|
ret = 0;
|
|
}
|
|
/* always drain all the request */
|
|
co_skip(sc_oc(sc), sc_oc(sc)->output);
|
|
}
|
|
|
|
applet_expect_no_data(appctx);
|
|
return ret;
|
|
}
|
|
|
|
/* must be called after cli_io_handler_show_ring() above */
|
|
void cli_io_release_show_ring(struct appctx *appctx)
|
|
{
|
|
struct show_ring_ctx *ctx = appctx->svcctx;
|
|
struct ring *ring = ctx->ring;
|
|
size_t ofs = ctx->ofs;
|
|
|
|
ring_detach_appctx(ring, appctx, ofs);
|
|
}
|
|
|
|
/* Returns the MAXIMUM payload len that could theoretically fit into the ring
|
|
* based on ring buffer size.
|
|
*
|
|
* Computation logic relies on implementation details from 'ring-t.h'.
|
|
*/
|
|
size_t ring_max_payload(const struct ring *ring)
|
|
{
|
|
size_t max;
|
|
|
|
/* initial max = bufsize - 1 (initial RC) - 1 (payload RC) */
|
|
max = ring_size(ring) - 1 - 1;
|
|
|
|
/* subtract payload VI (varint-encoded size) */
|
|
max -= varint_bytes(max);
|
|
return max;
|
|
}
|
|
|
|
/* config parser for global "tune.ring.queues", accepts a number from 0 to RING_WAIT_QUEUES */
|
|
static int cfg_parse_tune_ring_queues(char **args, int section_type, struct proxy *curpx,
|
|
const struct proxy *defpx, const char *file, int line,
|
|
char **err)
|
|
{
|
|
int queues;
|
|
|
|
if (too_many_args(1, args, err, NULL))
|
|
return -1;
|
|
|
|
queues = atoi(args[1]);
|
|
if (queues < 0 || queues > RING_WAIT_QUEUES) {
|
|
memprintf(err, "'%s' expects a number between 0 and %d but got '%s'.", args[0], RING_WAIT_QUEUES, args[1]);
|
|
return -1;
|
|
}
|
|
|
|
global.tune.ring_queues = queues;
|
|
return 0;
|
|
}
|
|
|
|
/* config keyword parsers */
|
|
static struct cfg_kw_list cfg_kws = {ILH, {
|
|
{ CFG_GLOBAL, "tune.ring.queues", cfg_parse_tune_ring_queues },
|
|
{ 0, NULL, NULL }
|
|
}};
|
|
|
|
INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);
|
|
|
|
/*
|
|
* Local variables:
|
|
* c-indent-level: 8
|
|
* c-basic-offset: 8
|
|
* End:
|
|
*/
|