mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-06 15:17:01 +02:00
MEDIUM: ring: move the ring reader code to ring_dispatch_messages()
This new function is made around the loop that scans a ring for new messages and dispatches them to a message handler. It also takes ring flags (WAIT, NEW, etc) and offset pointers that the caller will use to initialize/reuse/update the current processing offset. The caller is still responsible for presetting it to ~0 before the first call if it wants the function to automatically adjust it (or set it to the correct value). The function may also return the last_ofs that was known before releasing the lock so that the caller knows what to compare against and if it needs to restart processing or not. The context remains a void* so that should not necessarily depend on an appctx. The current "show ring" code was ported to this and it continues to work as expected.
This commit is contained in:
parent
ad31e53287
commit
c62a2d540d
@ -42,6 +42,8 @@ int cli_io_handler_show_ring(struct appctx *appctx);
|
|||||||
void cli_io_release_show_ring(struct appctx *appctx);
|
void cli_io_release_show_ring(struct appctx *appctx);
|
||||||
|
|
||||||
size_t ring_max_payload(const struct ring *ring);
|
size_t ring_max_payload(const struct ring *ring);
|
||||||
|
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
|
||||||
|
ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len));
|
||||||
|
|
||||||
#endif /* _HAPROXY_RING_H */
|
#endif /* _HAPROXY_RING_H */
|
||||||
|
|
||||||
|
81
src/ring.c
81
src/ring.c
@ -328,36 +328,24 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function dumps all events from the ring whose pointer is in <p0> into
|
|
||||||
* the appctx's output buffer, and takes from <o0> the seek offset into the
|
/* parses as many messages as possible from ring <ring>, starting at the offset
|
||||||
* buffer's history (0 for oldest known event). It looks at <i0> for boolean
|
* stored at *ofs_ptr, with RING_WF_* flags in <flags>, and passes them to
|
||||||
* options: bit0 means it must wait for new data or any key to be pressed. Bit1
|
* the message handler <msg_handler>. If <last_of_ptr> is not NULL, a copy of
|
||||||
* means it must seek directly to the end to wait for new contents. It returns
|
* the last known tail pointer will be copied there so that the caller may use
|
||||||
* 0 if the output buffer or events are missing is full and it needs to be
|
* this to detect new data have arrived since we left the function. Returns 0
|
||||||
* called again, otherwise non-zero. It is meant to be used with
|
* if it needs to pause, 1 once finished.
|
||||||
* cli_release_show_ring() to clean up.
|
|
||||||
*/
|
*/
|
||||||
int cli_io_handler_show_ring(struct appctx *appctx)
|
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
|
||||||
|
ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len))
|
||||||
{
|
{
|
||||||
struct show_ring_ctx *ctx = appctx->svcctx;
|
|
||||||
struct stconn *sc = appctx_sc(appctx);
|
|
||||||
struct ring *ring = ctx->ring;
|
|
||||||
struct buffer *buf = &ring->buf;
|
struct buffer *buf = &ring->buf;
|
||||||
size_t ofs;
|
|
||||||
size_t last_ofs;
|
|
||||||
uint64_t msg_len;
|
uint64_t msg_len;
|
||||||
size_t len, cnt;
|
|
||||||
ssize_t copied;
|
ssize_t copied;
|
||||||
|
size_t len, cnt;
|
||||||
|
size_t ofs;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
/* FIXME: Don't watch the other side !*/
|
|
||||||
if (unlikely(sc_opposite(sc)->flags & SC_FL_SHUT_DONE))
|
|
||||||
return 1;
|
|
||||||
|
|
||||||
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
|
|
||||||
LIST_DEL_INIT(&appctx->wait_entry);
|
|
||||||
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
|
|
||||||
|
|
||||||
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
|
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
|
||||||
|
|
||||||
/* explanation for the initialization below: it would be better to do
|
/* explanation for the initialization below: it would be better to do
|
||||||
@ -368,17 +356,17 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
|||||||
* existing messages before grabbing a reference to a location. This
|
* existing messages before grabbing a reference to a location. This
|
||||||
* value cannot be produced after initialization.
|
* value cannot be produced after initialization.
|
||||||
*/
|
*/
|
||||||
if (unlikely(ctx->ofs == ~0)) {
|
if (unlikely(*ofs_ptr == ~0)) {
|
||||||
/* going to the end means looking at tail-1 */
|
/* going to the end means looking at tail-1 */
|
||||||
ctx->ofs = b_peek_ofs(buf, (ctx->flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0);
|
*ofs_ptr = b_peek_ofs(buf, (flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0);
|
||||||
HA_ATOMIC_INC(b_orig(buf) + ctx->ofs);
|
HA_ATOMIC_INC(b_orig(buf) + *ofs_ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we were already there, adjust the offset to be relative to
|
/* we were already there, adjust the offset to be relative to
|
||||||
* the buffer's head and remove us from the counter.
|
* the buffer's head and remove us from the counter.
|
||||||
*/
|
*/
|
||||||
ofs = ctx->ofs - b_head_ofs(buf);
|
ofs = *ofs_ptr - b_head_ofs(buf);
|
||||||
if (ctx->ofs < b_head_ofs(buf))
|
if (*ofs_ptr < b_head_ofs(buf))
|
||||||
ofs += b_size(buf);
|
ofs += b_size(buf);
|
||||||
|
|
||||||
BUG_ON(ofs >= buf->size);
|
BUG_ON(ofs >= buf->size);
|
||||||
@ -397,7 +385,7 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
|||||||
cnt += len;
|
cnt += len;
|
||||||
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
|
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
|
||||||
|
|
||||||
copied = applet_append_line(appctx, buf, ofs + cnt, msg_len);
|
copied = msg_handler(ctx, buf, ofs + cnt, msg_len);
|
||||||
if (copied == -2) {
|
if (copied == -2) {
|
||||||
/* too large a message to ever fit, let's skip it */
|
/* too large a message to ever fit, let's skip it */
|
||||||
ofs += cnt + msg_len;
|
ofs += cnt + msg_len;
|
||||||
@ -412,9 +400,40 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||||
last_ofs = b_tail_ofs(buf);
|
if (last_ofs_ptr)
|
||||||
ctx->ofs = b_peek_ofs(buf, ofs);
|
*last_ofs_ptr = b_tail_ofs(buf);
|
||||||
|
*ofs_ptr = b_peek_ofs(buf, ofs);
|
||||||
HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
|
HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This function dumps all events from the ring whose pointer is in <p0> into
|
||||||
|
* the appctx's output buffer, and takes from <o0> the seek offset into the
|
||||||
|
* buffer's history (0 for oldest known event). It looks at <i0> for boolean
|
||||||
|
* options: bit0 means it must wait for new data or any key to be pressed. Bit1
|
||||||
|
* means it must seek directly to the end to wait for new contents. It returns
|
||||||
|
* 0 if the output buffer or events are missing is full and it needs to be
|
||||||
|
* called again, otherwise non-zero. It is meant to be used with
|
||||||
|
* cli_release_show_ring() to clean up.
|
||||||
|
*/
|
||||||
|
int cli_io_handler_show_ring(struct appctx *appctx)
|
||||||
|
{
|
||||||
|
struct show_ring_ctx *ctx = appctx->svcctx;
|
||||||
|
struct stconn *sc = appctx_sc(appctx);
|
||||||
|
struct ring *ring = ctx->ring;
|
||||||
|
size_t last_ofs;
|
||||||
|
size_t ofs;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
/* FIXME: Don't watch the other side !*/
|
||||||
|
if (unlikely(sc_opposite(sc)->flags & SC_FL_SHUT_DONE))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
|
||||||
|
LIST_DEL_INIT(&appctx->wait_entry);
|
||||||
|
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
|
||||||
|
|
||||||
|
ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line);
|
||||||
|
|
||||||
if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
|
if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
|
||||||
/* we've drained everything and are configured to wait for more
|
/* we've drained everything and are configured to wait for more
|
||||||
|
Loading…
Reference in New Issue
Block a user