MINOR: ring: make the ring reader use only absolute offsets

The goal is to remove references to the buffer's head and tail in the
fast path so that we can release the lock during some reads. This means
no more comparisons with b_data() nor operations relative to b_head()
will be possible anymore. As a first step we need to have an absolute
offset in the buffer, and to use b_getblk_ofs() in the applet callbacks
to retrieve the data based on this.
This commit is contained in:
Willy Tarreau 2023-02-23 09:53:38 +01:00
parent 63242a59c4
commit 0f611987da
3 changed files with 24 additions and 21 deletions

View File

@ -726,7 +726,7 @@ int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
}
/* Atomically append a line to applet <ctx>'s output, appending a trailing 'LF'.
* The line is read from <buf> at offset <ofs> relative to the buffer's head,
* The line is read from <buf> at offset <ofs> relative to the buffer's origin,
* for <len> bytes. It returns the number of bytes consumed from the input
* buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will
* never be able to (too large msg). The input buffer is not modified. The
@ -743,7 +743,7 @@ ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size
}
chunk_reset(&trash);
b_getblk(buf, trash.area, len, ofs);
b_getblk_ofs(buf, trash.area, len, ofs);
trash.data += len;
trash.area[trash.data++] = '\n';
if (applet_putchk(appctx, &trash) == -1)

View File

@ -4347,7 +4347,7 @@ static struct applet syslog_applet = {
/* Atomically append an event to applet >ctx>'s output, prepending it with its
* size in decimal followed by a space.
* The line is read from <buf> at offset <ofs> relative to the buffer's head,
* The line is read from <buf> at offset <ofs> relative to the buffer's origin,
* for <len> bytes. It returns the number of bytes consumed from the input
* buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will
* never be able to (too large msg). The input buffer is not modified. The
@ -4372,7 +4372,7 @@ ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t o
return -2;
/* try to transfer it or report full */
trash.data += b_getblk(buf, trash.area + trash.data, len, ofs);
trash.data += b_getblk_ofs(buf, trash.area + trash.data, len, ofs);
if (applet_putchk(appctx, &trash) == -1)
return -1;

View File

@ -343,7 +343,8 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
uint64_t msg_len;
ssize_t copied;
size_t len, cnt;
size_t ofs;
size_t ofs; /* absolute offset from the buffer's origin */
size_t pos; /* relative position from head (0..data-1) */
int ret;
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
@ -362,47 +363,49 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
HA_ATOMIC_INC(b_orig(buf) + *ofs_ptr);
}
/* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter.
*/
ofs = *ofs_ptr - b_head_ofs(buf);
if (*ofs_ptr < b_head_ofs(buf))
ofs += b_size(buf);
ofs = *ofs_ptr;
BUG_ON(ofs >= buf->size);
HA_ATOMIC_DEC(b_peek(buf, ofs));
HA_ATOMIC_DEC(b_orig(buf) + ofs);
/* in this loop, ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
* stop before the end (ret=0).
*/
ret = 1;
while (ofs + 1 < b_data(buf)) {
while (1) {
/* relative position in the buffer */
pos = b_rel_ofs(buf, ofs);
if (pos + 1 >= b_data(buf)) {
/* no more data */
break;
}
cnt = 1;
len = b_peek_varint(buf, ofs + cnt, &msg_len);
len = b_peek_varint(buf, pos + cnt, &msg_len);
if (!len)
break;
cnt += len;
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
BUG_ON(msg_len + pos + cnt + 1 > b_data(buf));
copied = msg_handler(ctx, buf, ofs + cnt, msg_len);
if (copied == -2) {
/* too large a message to ever fit, let's skip it */
ofs += cnt + msg_len;
continue;
goto skip;
}
else if (copied == -1) {
/* output full */
ret = 0;
break;
}
ofs += cnt + msg_len;
skip:
ofs = b_add_ofs(buf, ofs, cnt + msg_len);
}
HA_ATOMIC_INC(b_peek(buf, ofs));
HA_ATOMIC_INC(b_orig(buf) + ofs);
if (last_ofs_ptr)
*last_ofs_ptr = b_tail_ofs(buf);
*ofs_ptr = b_peek_ofs(buf, ofs);
*ofs_ptr = ofs;
HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
return ret;
}