mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-22 14:21:25 +02:00
MINOR: sink: Remove the tests on the opposite SC state to process messages
The state of the opposite SC is already tested to wait the connection is established before sending messages. So, there is no reason to test it again before looping on the ring buffer.
This commit is contained in:
parent
3d949010bc
commit
4b866959d8
150
src/sink.c
150
src/sink.c
@ -364,51 +364,50 @@ static void sink_forward_io_handler(struct appctx *appctx)
|
|||||||
HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
|
HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* we were already there, adjust the offset to be relative to
|
||||||
|
* the buffer's head and remove us from the counter.
|
||||||
|
*/
|
||||||
|
ofs = sft->ofs - b_head_ofs(buf);
|
||||||
|
if (sft->ofs < b_head_ofs(buf))
|
||||||
|
ofs += b_size(buf);
|
||||||
|
BUG_ON(ofs >= buf->size);
|
||||||
|
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
||||||
|
|
||||||
/* in this loop, ofs always points to the counter byte that precedes
|
/* in this loop, ofs always points to the counter byte that precedes
|
||||||
* the message so that we can take our reference there if we have to
|
* the message so that we can take our reference there if we have to
|
||||||
* stop before the end (ret=0).
|
* stop before the end (ret=0).
|
||||||
*/
|
*/
|
||||||
if (sc_opposite(sc)->state == SC_ST_EST) {
|
ret = 1;
|
||||||
/* we were already there, adjust the offset to be relative to
|
while (ofs + 1 < b_data(buf)) {
|
||||||
* the buffer's head and remove us from the counter.
|
cnt = 1;
|
||||||
*/
|
len = b_peek_varint(buf, ofs + cnt, &msg_len);
|
||||||
ofs = sft->ofs - b_head_ofs(buf);
|
if (!len)
|
||||||
if (sft->ofs < b_head_ofs(buf))
|
break;
|
||||||
ofs += b_size(buf);
|
cnt += len;
|
||||||
BUG_ON(ofs >= buf->size);
|
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
|
||||||
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
|
||||||
|
|
||||||
ret = 1;
|
if (unlikely(msg_len + 1 > b_size(&trash))) {
|
||||||
while (ofs + 1 < b_data(buf)) {
|
/* too large a message to ever fit, let's skip it */
|
||||||
cnt = 1;
|
|
||||||
len = b_peek_varint(buf, ofs + cnt, &msg_len);
|
|
||||||
if (!len)
|
|
||||||
break;
|
|
||||||
cnt += len;
|
|
||||||
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
|
|
||||||
|
|
||||||
if (unlikely(msg_len + 1 > b_size(&trash))) {
|
|
||||||
/* too large a message to ever fit, let's skip it */
|
|
||||||
ofs += cnt + msg_len;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
chunk_reset(&trash);
|
|
||||||
len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
|
|
||||||
trash.data += len;
|
|
||||||
trash.area[trash.data++] = '\n';
|
|
||||||
|
|
||||||
if (applet_putchk(appctx, &trash) == -1) {
|
|
||||||
ret = 0;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ofs += cnt + msg_len;
|
ofs += cnt + msg_len;
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
chunk_reset(&trash);
|
||||||
last_ofs = b_tail_ofs(buf);
|
len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
|
||||||
sft->ofs = b_peek_ofs(buf, ofs);
|
trash.data += len;
|
||||||
|
trash.area[trash.data++] = '\n';
|
||||||
|
|
||||||
|
if (applet_putchk(appctx, &trash) == -1) {
|
||||||
|
ret = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ofs += cnt + msg_len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||||
|
last_ofs = b_tail_ofs(buf);
|
||||||
|
sft->ofs = b_peek_ofs(buf, ofs);
|
||||||
|
|
||||||
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
@ -502,54 +501,53 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
|
|||||||
HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
|
HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* we were already there, adjust the offset to be relative to
|
||||||
|
* the buffer's head and remove us from the counter.
|
||||||
|
*/
|
||||||
|
ofs = sft->ofs - b_head_ofs(buf);
|
||||||
|
if (sft->ofs < b_head_ofs(buf))
|
||||||
|
ofs += b_size(buf);
|
||||||
|
BUG_ON(ofs >= buf->size);
|
||||||
|
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
||||||
|
|
||||||
/* in this loop, ofs always points to the counter byte that precedes
|
/* in this loop, ofs always points to the counter byte that precedes
|
||||||
* the message so that we can take our reference there if we have to
|
* the message so that we can take our reference there if we have to
|
||||||
* stop before the end (ret=0).
|
* stop before the end (ret=0).
|
||||||
*/
|
*/
|
||||||
if (sc_opposite(sc)->state == SC_ST_EST) {
|
ret = 1;
|
||||||
/* we were already there, adjust the offset to be relative to
|
while (ofs + 1 < b_data(buf)) {
|
||||||
* the buffer's head and remove us from the counter.
|
cnt = 1;
|
||||||
*/
|
len = b_peek_varint(buf, ofs + cnt, &msg_len);
|
||||||
ofs = sft->ofs - b_head_ofs(buf);
|
if (!len)
|
||||||
if (sft->ofs < b_head_ofs(buf))
|
break;
|
||||||
ofs += b_size(buf);
|
cnt += len;
|
||||||
BUG_ON(ofs >= buf->size);
|
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
|
||||||
HA_ATOMIC_DEC(b_peek(buf, ofs));
|
|
||||||
|
|
||||||
ret = 1;
|
chunk_reset(&trash);
|
||||||
while (ofs + 1 < b_data(buf)) {
|
p = ulltoa(msg_len, trash.area, b_size(&trash));
|
||||||
cnt = 1;
|
if (p) {
|
||||||
len = b_peek_varint(buf, ofs + cnt, &msg_len);
|
trash.data = (p - trash.area) + 1;
|
||||||
if (!len)
|
*p = ' ';
|
||||||
break;
|
|
||||||
cnt += len;
|
|
||||||
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
|
|
||||||
|
|
||||||
chunk_reset(&trash);
|
|
||||||
p = ulltoa(msg_len, trash.area, b_size(&trash));
|
|
||||||
if (p) {
|
|
||||||
trash.data = (p - trash.area) + 1;
|
|
||||||
*p = ' ';
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!p || (trash.data + msg_len > b_size(&trash))) {
|
|
||||||
/* too large a message to ever fit, let's skip it */
|
|
||||||
ofs += cnt + msg_len;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
|
|
||||||
|
|
||||||
if (applet_putchk(appctx, &trash) == -1) {
|
|
||||||
ret = 0;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ofs += cnt + msg_len;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
HA_ATOMIC_INC(b_peek(buf, ofs));
|
if (!p || (trash.data + msg_len > b_size(&trash))) {
|
||||||
sft->ofs = b_peek_ofs(buf, ofs);
|
/* too large a message to ever fit, let's skip it */
|
||||||
|
ofs += cnt + msg_len;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
|
||||||
|
|
||||||
|
if (applet_putchk(appctx, &trash) == -1) {
|
||||||
|
ret = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ofs += cnt + msg_len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HA_ATOMIC_INC(b_peek(buf, ofs));
|
||||||
|
sft->ofs = b_peek_ofs(buf, ofs);
|
||||||
|
|
||||||
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user