diff --git a/include/proto/ring.h b/include/proto/ring.h index e32403454..a82f93583 100644 --- a/include/proto/ring.h +++ b/include/proto/ring.h @@ -30,6 +30,9 @@ struct ring *ring_new(size_t size); struct ring *ring_resize(struct ring *ring, size_t size); void ring_free(struct ring *ring); ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg); +int ring_attach_cli(struct ring *ring, struct appctx *appctx); +int cli_io_handler_show_ring(struct appctx *appctx); +void cli_io_release_show_ring(struct appctx *appctx); #endif /* _PROTO_RING_H */ diff --git a/src/ring.c b/src/ring.c index 8ba3868ac..a718cec70 100644 --- a/src/ring.c +++ b/src/ring.c @@ -23,7 +23,10 @@ #include #include #include +#include +#include #include +#include /* Creates and returns a ring buffer of size bytes. Returns NULL on * allocation failure. @@ -189,6 +192,125 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz return sent; } +/* Tries to attach CLI handler as a new reader on ring . This is + * meant to be used when registering a CLI function to dump a buffer, so it + * returns zero on success, or non-zero on failure with a message in the appctx + * CLI context. + */ +int ring_attach_cli(struct ring *ring, struct appctx *appctx) +{ + int users = ring->readers_count; + + do { + if (users >= 1) + return cli_err(appctx, + "Sorry, too many watchers (255) on this ring buffer. " + "What could it have so interesting to attract so many watchers ?"); + + } while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1)); + + appctx->ctx.cli.p0 = ring; + appctx->ctx.cli.p1 = 0; // start from the oldest event + return 0; +} + +/* This function dumps all events from the ring whose pointer is in into + * the appctx's output buffer, and takes from the seek offset into the + * buffer's history (0 for oldest known event). It returns 0 if the output + * buffer is full and it needs to be called again, otherwise non-zero. It is + * meant to be used with cli_release_show_ring() to clean up. + */ +int cli_io_handler_show_ring(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + struct ring *ring = appctx->ctx.cli.p0; + struct buffer *buf = &ring->buf; + size_t ofs = (unsigned long)appctx->ctx.cli.p1; + uint64_t msg_len; + size_t len, cnt; + int ret; + + if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW))) + return 1; + + HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); + + /* explanation for the initialization below: it would be better to do + * this in the parsing function but this would occasionally result in + * dropped events because we'd take a reference on the oldest message + * and keep it while being scheduled. Thus instead let's take it the + * first time we enter here so that we have a chance to pass many + * existing messages before grabbing a reference to a location. + */ + if (unlikely(!ofs)) { + HA_ATOMIC_ADD(b_head(buf), 1); + ofs += ring->ofs; + } + + /* we were already there, adjust the offset to be relative to + * the buffer's head and remove us from the counter. + */ + ofs -= ring->ofs; + BUG_ON(ofs >= buf->size); + HA_ATOMIC_SUB(b_peek(buf, ofs), 1); + + /* in this loop, ofs always points to the counter byte that precedes + * the message so that we can take our reference there if we have to + * stop before the end (ret=0). + */ + ret = 1; + while (ofs + 1 < b_data(buf)) { + cnt = 1; + len = b_peek_varint(buf, ofs + cnt, &msg_len); + if (!len) + break; + cnt += len; + BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); + + if (unlikely(msg_len + 1 > b_size(&trash))) { + /* too large a message to ever fit, let's skip it */ + ofs += cnt + msg_len; + continue; + } + + chunk_reset(&trash); + len = b_getblk(buf, trash.area, msg_len, ofs + cnt); + trash.data += len; + trash.area[trash.data++] = '\n'; + + if (ci_putchk(si_ic(si), &trash) == -1) { + si_rx_room_blk(si); + ret = 0; + break; + } + ofs += cnt + msg_len; + } + + HA_ATOMIC_ADD(b_peek(buf, ofs), 1); + ofs += ring->ofs; + appctx->ctx.cli.p1 = (void *)ofs; + HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); + return ret; +} + +/* must be called after cli_io_handler_show_ring() above */ +void cli_io_release_show_ring(struct appctx *appctx) +{ + struct ring *ring = appctx->ctx.cli.p0; + size_t ofs = (unsigned long)appctx->ctx.cli.p1; + + if (!ring) + return; + + HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); + ofs -= ring->ofs; + BUG_ON(ofs >= b_size(&ring->buf)); + HA_ATOMIC_SUB(b_peek(&ring->buf, ofs), 1); + HA_ATOMIC_SUB(&ring->readers_count, 1); + HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); +} + + /* * Local variables: * c-indent-level: 8