diff --git a/src/sink.c b/src/sink.c index 9cfe6eafc..44d2d9111 100644 --- a/src/sink.c +++ b/src/sink.c @@ -364,51 +364,50 @@ static void sink_forward_io_handler(struct appctx *appctx) HA_ATOMIC_INC(b_orig(buf) + sft->ofs); } + /* we were already there, adjust the offset to be relative to + * the buffer's head and remove us from the counter. + */ + ofs = sft->ofs - b_head_ofs(buf); + if (sft->ofs < b_head_ofs(buf)) + ofs += b_size(buf); + BUG_ON(ofs >= buf->size); + HA_ATOMIC_DEC(b_peek(buf, ofs)); + /* in this loop, ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to * stop before the end (ret=0). */ - if (sc_opposite(sc)->state == SC_ST_EST) { - /* we were already there, adjust the offset to be relative to - * the buffer's head and remove us from the counter. - */ - ofs = sft->ofs - b_head_ofs(buf); - if (sft->ofs < b_head_ofs(buf)) - ofs += b_size(buf); - BUG_ON(ofs >= buf->size); - HA_ATOMIC_DEC(b_peek(buf, ofs)); + ret = 1; + while (ofs + 1 < b_data(buf)) { + cnt = 1; + len = b_peek_varint(buf, ofs + cnt, &msg_len); + if (!len) + break; + cnt += len; + BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); - ret = 1; - while (ofs + 1 < b_data(buf)) { - cnt = 1; - len = b_peek_varint(buf, ofs + cnt, &msg_len); - if (!len) - break; - cnt += len; - BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); - - if (unlikely(msg_len + 1 > b_size(&trash))) { - /* too large a message to ever fit, let's skip it */ - ofs += cnt + msg_len; - continue; - } - - chunk_reset(&trash); - len = b_getblk(buf, trash.area, msg_len, ofs + cnt); - trash.data += len; - trash.area[trash.data++] = '\n'; - - if (applet_putchk(appctx, &trash) == -1) { - ret = 0; - break; - } + if (unlikely(msg_len + 1 > b_size(&trash))) { + /* too large a message to ever fit, let's skip it */ ofs += cnt + msg_len; + continue; } - HA_ATOMIC_INC(b_peek(buf, ofs)); - last_ofs = b_tail_ofs(buf); - sft->ofs = b_peek_ofs(buf, ofs); + chunk_reset(&trash); + len = b_getblk(buf, trash.area, msg_len, ofs + cnt); + trash.data += len; + trash.area[trash.data++] = '\n'; + + if (applet_putchk(appctx, &trash) == -1) { + ret = 0; + break; + } + ofs += cnt + msg_len; } + + HA_ATOMIC_INC(b_peek(buf, ofs)); + last_ofs = b_tail_ofs(buf); + sft->ofs = b_peek_ofs(buf, ofs); + HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); if (ret) { @@ -502,54 +501,53 @@ static void sink_forward_oc_io_handler(struct appctx *appctx) HA_ATOMIC_INC(b_orig(buf) + sft->ofs); } + /* we were already there, adjust the offset to be relative to + * the buffer's head and remove us from the counter. + */ + ofs = sft->ofs - b_head_ofs(buf); + if (sft->ofs < b_head_ofs(buf)) + ofs += b_size(buf); + BUG_ON(ofs >= buf->size); + HA_ATOMIC_DEC(b_peek(buf, ofs)); + /* in this loop, ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to * stop before the end (ret=0). */ - if (sc_opposite(sc)->state == SC_ST_EST) { - /* we were already there, adjust the offset to be relative to - * the buffer's head and remove us from the counter. - */ - ofs = sft->ofs - b_head_ofs(buf); - if (sft->ofs < b_head_ofs(buf)) - ofs += b_size(buf); - BUG_ON(ofs >= buf->size); - HA_ATOMIC_DEC(b_peek(buf, ofs)); + ret = 1; + while (ofs + 1 < b_data(buf)) { + cnt = 1; + len = b_peek_varint(buf, ofs + cnt, &msg_len); + if (!len) + break; + cnt += len; + BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); - ret = 1; - while (ofs + 1 < b_data(buf)) { - cnt = 1; - len = b_peek_varint(buf, ofs + cnt, &msg_len); - if (!len) - break; - cnt += len; - BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); - - chunk_reset(&trash); - p = ulltoa(msg_len, trash.area, b_size(&trash)); - if (p) { - trash.data = (p - trash.area) + 1; - *p = ' '; - } - - if (!p || (trash.data + msg_len > b_size(&trash))) { - /* too large a message to ever fit, let's skip it */ - ofs += cnt + msg_len; - continue; - } - - trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt); - - if (applet_putchk(appctx, &trash) == -1) { - ret = 0; - break; - } - ofs += cnt + msg_len; + chunk_reset(&trash); + p = ulltoa(msg_len, trash.area, b_size(&trash)); + if (p) { + trash.data = (p - trash.area) + 1; + *p = ' '; } - HA_ATOMIC_INC(b_peek(buf, ofs)); - sft->ofs = b_peek_ofs(buf, ofs); + if (!p || (trash.data + msg_len > b_size(&trash))) { + /* too large a message to ever fit, let's skip it */ + ofs += cnt + msg_len; + continue; + } + + trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt); + + if (applet_putchk(appctx, &trash) == -1) { + ret = 0; + break; + } + ofs += cnt + msg_len; } + + HA_ATOMIC_INC(b_peek(buf, ofs)); + sft->ofs = b_peek_ofs(buf, ofs); + HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); if (ret) {