mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-21 13:51:26 +02:00
MINOR: ring: count processed messages in ring_dispatch_messages()
ring_dispatch_messages() now takes an optional argument <processed> which must point to a size_t counter when provided. When provided, the value is updated to the number of messages processed by the function.
This commit is contained in:
parent
0821460e3f
commit
47323e64ad
@ -43,7 +43,8 @@ void cli_io_release_show_ring(struct appctx *appctx);
|
|||||||
|
|
||||||
size_t ring_max_payload(const struct ring *ring);
|
size_t ring_max_payload(const struct ring *ring);
|
||||||
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
|
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
|
||||||
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len));
|
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len),
|
||||||
|
size_t *processed);
|
||||||
|
|
||||||
/* returns the ring storage's usable area */
|
/* returns the ring storage's usable area */
|
||||||
static inline void *ring_area(const struct ring *ring)
|
static inline void *ring_area(const struct ring *ring)
|
||||||
|
12
src/ring.c
12
src/ring.c
@ -545,9 +545,13 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
|
|||||||
* the last known tail pointer will be copied there so that the caller may use
|
* the last known tail pointer will be copied there so that the caller may use
|
||||||
* this to detect new data have arrived since we left the function. Returns 0
|
* this to detect new data have arrived since we left the function. Returns 0
|
||||||
* if it needs to pause, 1 once finished.
|
* if it needs to pause, 1 once finished.
|
||||||
|
*
|
||||||
|
* If <processed> is not NULL, it will be set to the number of messages
|
||||||
|
* processed by the function (even when the function returns 0)
|
||||||
*/
|
*/
|
||||||
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
|
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
|
||||||
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
|
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len),
|
||||||
|
size_t *processed)
|
||||||
{
|
{
|
||||||
size_t head_ofs, tail_ofs, prev_ofs;
|
size_t head_ofs, tail_ofs, prev_ofs;
|
||||||
size_t ring_size;
|
size_t ring_size;
|
||||||
@ -555,6 +559,7 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
|
|||||||
struct ist v1, v2;
|
struct ist v1, v2;
|
||||||
uint64_t msg_len;
|
uint64_t msg_len;
|
||||||
size_t len, cnt;
|
size_t len, cnt;
|
||||||
|
size_t msg_count = 0;
|
||||||
ssize_t copied;
|
ssize_t copied;
|
||||||
uint8_t readers;
|
uint8_t readers;
|
||||||
int ret;
|
int ret;
|
||||||
@ -650,6 +655,7 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
skip:
|
skip:
|
||||||
|
msg_count += 1;
|
||||||
vp_skip(&v1, &v2, cnt + msg_len);
|
vp_skip(&v1, &v2, cnt + msg_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -672,6 +678,8 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
|
|||||||
if (last_ofs_ptr)
|
if (last_ofs_ptr)
|
||||||
*last_ofs_ptr = tail_ofs;
|
*last_ofs_ptr = tail_ofs;
|
||||||
*ofs_ptr = head_ofs;
|
*ofs_ptr = head_ofs;
|
||||||
|
if (processed)
|
||||||
|
*processed = msg_count;
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -695,7 +703,7 @@ int cli_io_handler_show_ring(struct appctx *appctx)
|
|||||||
|
|
||||||
MT_LIST_DELETE(&appctx->wait_entry);
|
MT_LIST_DELETE(&appctx->wait_entry);
|
||||||
|
|
||||||
ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line);
|
ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line, NULL);
|
||||||
|
|
||||||
if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
|
if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
|
||||||
/* we've drained everything and are configured to wait for more
|
/* we've drained everything and are configured to wait for more
|
||||||
|
@ -382,7 +382,8 @@ static void _sink_forward_io_handler(struct appctx *appctx,
|
|||||||
|
|
||||||
MT_LIST_DELETE(&appctx->wait_entry);
|
MT_LIST_DELETE(&appctx->wait_entry);
|
||||||
|
|
||||||
ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, msg_handler);
|
ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0,
|
||||||
|
msg_handler, NULL);
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
/* let's be woken up once new data arrive */
|
/* let's be woken up once new data arrive */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user