diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h index 1c88c128d..b1ac7df7a 100644 --- a/include/haproxy/applet.h +++ b/include/haproxy/applet.h @@ -58,7 +58,7 @@ size_t appctx_raw_snd_buf(struct appctx *appctx, struct buffer *buf, size_t coun size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags); int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags); -ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size_t len); +ssize_t applet_append_line(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len); static inline struct appctx *appctx_new_here(struct applet *applet, struct sedesc *sedesc) { diff --git a/include/haproxy/log.h b/include/haproxy/log.h index 591be3b4b..c61089661 100644 --- a/include/haproxy/log.h +++ b/include/haproxy/log.h @@ -87,7 +87,7 @@ void app_log(struct list *loggers, struct buffer *tag, int level, const char *fo */ int add_to_logformat_list(char *start, char *end, int type, struct list *list_format, char **err); -ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t ofs, size_t len); +ssize_t syslog_applet_append_event(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len); /* * Parse the log_format string and fill a linked list. diff --git a/include/haproxy/ring.h b/include/haproxy/ring.h index 9e8970164..057693c58 100644 --- a/include/haproxy/ring.h +++ b/include/haproxy/ring.h @@ -42,7 +42,7 @@ void cli_io_release_show_ring(struct appctx *appctx); size_t ring_max_payload(const struct ring *ring); int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags, - ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len)); + ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)); /* returns the ring storage's area */ static inline void *ring_area(const struct ring *ring) diff --git a/src/applet.c b/src/applet.c index afd76f205..59838648f 100644 --- a/src/applet.c +++ b/src/applet.c @@ -24,6 +24,7 @@ #include #include #include +#include #include unsigned int nb_applets = 0; @@ -725,15 +726,15 @@ int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) return ret; } -/* Atomically append a line to applet 's output, appending a trailing 'LF'. - * The line is read from at offset relative to the buffer's origin, - * for bytes. It returns the number of bytes consumed from the input - * buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will - * never be able to (too large msg). The input buffer is not modified. The - * caller is responsible for making sure that there are at least ofs+len bytes - * in the input buffer. +/* Atomically append a line to applet 's output, appending a trailing LF. + * The line is read from vectors and at offset relative to the + * area's origin, for bytes. It returns the number of bytes consumed from + * the input vectors on success, -1 if it temporarily cannot (buffer full), -2 + * if it will never be able to (too large msg). The vectors are not modified. + * The caller is responsible for making sure that there are at least ofs+len + * bytes in the input vectors. */ -ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size_t len) +ssize_t applet_append_line(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len) { struct appctx *appctx = ctx; @@ -743,7 +744,7 @@ ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size } chunk_reset(&trash); - b_getblk_ofs(buf, trash.area, len, ofs); + vp_peek_ofs(v1, v2, ofs, trash.area, len); trash.data += len; trash.area[trash.data++] = '\n'; if (applet_putchk(appctx, &trash) == -1) diff --git a/src/log.c b/src/log.c index d2c0f80dc..a25a5cfd9 100644 --- a/src/log.c +++ b/src/log.c @@ -45,6 +45,7 @@ #include #include #include +#include /* global recv logs counter */ int cum_log_messages; @@ -4346,15 +4347,14 @@ static struct applet syslog_applet = { }; /* Atomically append an event to applet >ctx>'s output, prepending it with its - * size in decimal followed by a space. - * The line is read from at offset relative to the buffer's origin, - * for bytes. It returns the number of bytes consumed from the input - * buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will - * never be able to (too large msg). The input buffer is not modified. The - * caller is responsible for making sure that there are at least ofs+len bytes - * in the input buffer. + * size in decimal followed by a space. The line is read from vectors and + * at offset relative to the area's origin, for bytes. It + * returns the number of bytes consumed from the input vectors on success, -1 + * if it temporarily cannot (buffer full), -2 if it will never be able to (too + * large msg). The input vectors are not modified. The caller is responsible for + * making sure that there are at least ofs+len bytes in the input buffer. */ -ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t ofs, size_t len) +ssize_t syslog_applet_append_event(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len) { struct appctx *appctx = ctx; char *p; @@ -4372,7 +4372,7 @@ ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t o return -2; /* try to transfer it or report full */ - trash.data += b_getblk_ofs(buf, trash.area + trash.data, len, ofs); + trash.data += vp_peek_ofs(v1, v2, ofs, trash.area, len); if (applet_putchk(appctx, &trash) == -1) return -1; diff --git a/src/ring.c b/src/ring.c index ca9a6c853..442e98aef 100644 --- a/src/ring.c +++ b/src/ring.c @@ -325,14 +325,11 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs) HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); if (ofs != ~0) { /* reader was still attached */ - if (ofs < ring_head(ring)) - ofs += ring_size(ring) - ring_head(ring); - else - ofs -= ring_head(ring); + char *area = ring_area(ring); BUG_ON(ofs >= ring_size(ring)); LIST_DEL_INIT(&appctx->wait_entry); - HA_ATOMIC_DEC(b_peek(&ring->storage->buf, ofs)); + HA_ATOMIC_DEC(area + ofs); } HA_ATOMIC_DEC(&ring->readers_count); HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); @@ -374,18 +371,26 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags) * if it needs to pause, 1 once finished. */ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags, - ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len)) + ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)) { struct buffer *buf = &ring->storage->buf; + size_t head_ofs, tail_ofs; + size_t ring_size; + char *ring_area; + struct ist v1, v2; uint64_t msg_len; - ssize_t copied; size_t len, cnt; - size_t ofs; /* absolute offset from the buffer's origin */ - size_t pos; /* relative position from head (0..data-1) */ + ssize_t copied; int ret; + ring_area = b_orig(buf); + ring_size = b_size(buf); + HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); + head_ofs = b_head_ofs(buf); + tail_ofs = b_tail_ofs(buf); + /* explanation for the initialization below: it would be better to do * this in the parsing function but this would occasionally result in * dropped events because we'd take a reference on the oldest message @@ -395,37 +400,49 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t * value cannot be produced after initialization. */ if (unlikely(*ofs_ptr == ~0)) { - /* going to the end means looking at tail-1 */ - *ofs_ptr = b_peek_ofs(buf, (flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0); - HA_ATOMIC_INC(b_orig(buf) + *ofs_ptr); + if (flags & RING_WF_SEEK_NEW) { + /* going to the end means looking at tail-1 */ + head_ofs = tail_ofs + ring_size - 1; + if (head_ofs >= ring_size) + head_ofs -= ring_size; + } + + /* make ctx->ofs relative to the beginning of the buffer now */ + *ofs_ptr = head_ofs; + + /* and reserve our slot here */ + HA_ATOMIC_INC(ring_area + head_ofs); } - ofs = *ofs_ptr; - BUG_ON(ofs >= buf->size); - HA_ATOMIC_DEC(b_orig(buf) + ofs); + /* we have the guarantee we can restart from our own head */ + head_ofs = *ofs_ptr; + BUG_ON(head_ofs >= ring_size); - /* in this loop, ofs always points to the counter byte that precedes + HA_ATOMIC_DEC(ring_area + head_ofs); + + /* in this loop, head_ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to - * stop before the end (ret=0). + * stop before the end (ret=0). The reference is relative to the ring's + * origin, while pos is relative to the ring's head. */ ret = 1; - while (1) { - /* relative position in the buffer */ - pos = b_rel_ofs(buf, ofs); + vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); - if (pos + 1 >= b_data(buf)) { + while (1) { + if (vp_size(v1, v2) <= 1) { /* no more data */ break; } cnt = 1; - len = b_peek_varint(buf, pos + cnt, &msg_len); + len = vp_peek_varint_ofs(v1, v2, cnt, &msg_len); if (!len) break; cnt += len; - BUG_ON(msg_len + pos + cnt + 1 > b_data(buf)); - copied = msg_handler(ctx, buf, ofs + cnt, msg_len); + BUG_ON(msg_len + cnt + 1 > vp_size(v1, v2)); + + copied = msg_handler(ctx, v1, v2, cnt, msg_len); if (copied == -2) { /* too large a message to ever fit, let's skip it */ goto skip; @@ -436,13 +453,15 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t break; } skip: - ofs = b_add_ofs(buf, ofs, cnt + msg_len); + vp_skip(&v1, &v2, cnt + msg_len); } - HA_ATOMIC_INC(b_orig(buf) + ofs); + vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); + + HA_ATOMIC_INC(ring_area + head_ofs); if (last_ofs_ptr) - *last_ofs_ptr = b_tail_ofs(buf); - *ofs_ptr = ofs; + *last_ofs_ptr = tail_ofs; + *ofs_ptr = head_ofs; HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock); return ret; }