mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 07:37:02 +02:00
MEDIUM: sink: move the generic ring forwarder code use ring_dispatch_messages()
Now the code is much simpler than the ring forwarding function almost does not need any knowledge of the structure of the ring anymore.
This commit is contained in:
parent
c62a2d540d
commit
c262442b1a
64
src/sink.c
64
src/sink.c
@ -355,10 +355,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
|
|||||||
struct sink_forward_target *sft = appctx->svcctx;
|
struct sink_forward_target *sft = appctx->svcctx;
|
||||||
struct sink *sink = sft->sink;
|
struct sink *sink = sft->sink;
|
||||||
struct ring *ring = sink->ctx.ring;
|
struct ring *ring = sink->ctx.ring;
|
||||||
struct buffer *buf = &ring->buf;
|
size_t ofs, last_ofs;
|
||||||
uint64_t msg_len;
|
|
||||||
size_t len, cnt, ofs, last_ofs;
|
|
||||||
ssize_t copied;
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
|
||||||
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
|
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
|
||||||
@ -389,68 +386,13 @@ static void sink_forward_io_handler(struct appctx *appctx)
|
|||||||
LIST_DEL_INIT(&appctx->wait_entry);
|
LIST_DEL_INIT(&appctx->wait_entry);
|
||||||
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
|
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
|
||||||
|
|
||||||
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
|
ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, applet_append_line);
|
||||||
|
|
||||||
/* explanation for the initialization below: it would be better to do
|
|
||||||
* this in the parsing function but this would occasionally result in
|
|
||||||
* dropped events because we'd take a reference on the oldest message
|
|
||||||
* and keep it while being scheduled. Thus instead let's take it the
|
|
||||||
* first time we enter here so that we have a chance to pass many
|
|
||||||
* existing messages before grabbing a reference to a location. This
|
|
||||||
* value cannot be produced after initialization.
|
|
||||||
*/
|
|
||||||
if (unlikely(sft->ofs == ~0)) {
|
|
||||||
sft->ofs = b_peek_ofs(buf, 0);
|
|
||||||
HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* we were already there, adjust the offset to be relative to
|
|
||||||
* the buffer's head and remove us from the counter.
|
|
||||||
*/
|
|
||||||
ofs = sft->ofs - b_head_ofs(buf);
|
|
||||||
if (sft->ofs < b_head_ofs(buf))
|
|
||||||
ofs += b_size(buf);
|
|
||||||
BUG_ON(ofs >= buf->size);
|
|
||||||
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
|
||||||
|
|
||||||
/* in this loop, ofs always points to the counter byte that precedes
|
|
||||||
* the message so that we can take our reference there if we have to
|
|
||||||
* stop before the end (ret=0).
|
|
||||||
*/
|
|
||||||
ret = 1;
|
|
||||||
while (ofs + 1 < b_data(buf)) {
|
|
||||||
cnt = 1;
|
|
||||||
len = b_peek_varint(buf, ofs + cnt, &msg_len);
|
|
||||||
if (!len)
|
|
||||||
break;
|
|
||||||
cnt += len;
|
|
||||||
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
|
|
||||||
|
|
||||||
copied = applet_append_line(appctx, buf, ofs + cnt, msg_len);
|
|
||||||
if (copied == -2) {
|
|
||||||
/* too large a message to ever fit, let's skip it */
|
|
||||||
ofs += cnt + msg_len;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
else if (copied == -1) {
|
|
||||||
/* output full */
|
|
||||||
ret = 0;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ofs += cnt + msg_len;
|
|
||||||
}
|
|
||||||
|
|
||||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
|
||||||
last_ofs = b_tail_ofs(buf);
|
|
||||||
sft->ofs = b_peek_ofs(buf, ofs);
|
|
||||||
|
|
||||||
HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
|
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
/* let's be woken up once new data arrive */
|
/* let's be woken up once new data arrive */
|
||||||
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
|
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
|
||||||
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
|
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
|
||||||
ofs = b_tail_ofs(buf);
|
ofs = b_tail_ofs(&ring->buf);
|
||||||
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
|
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
|
||||||
if (ofs != last_ofs) {
|
if (ofs != last_ofs) {
|
||||||
/* more data was added into the ring between the
|
/* more data was added into the ring between the
|
||||||
|
Loading…
Reference in New Issue
Block a user