MINOR: sink/api: pass explicit maxlen parameter to sink_write()

sink_write() currently relies on sink->maxlen to know when to stop
writing a given payload.

But it could be useful to pass a smaller, explicit value to sink_write()
to stop before the ring maxlen, for instance if the ring is shared between
multiple feeders.

sink_write() now takes an optional maxlen parameter:
  if maxlen is > 0, then sink_write will stop writing at maxlen if maxlen
  is smaller than ring->maxlen, else only ring->maxlen will be considered.

[for haproxy <= 2.7, patch must be applied by hand: that is:
__sink_write() and sink_write() should be patched to take maxlen into
account and function calls to sink_write() should use 0 as second argument
to keep original behavior]
This commit is contained in:
Aurelien DARRAGON 2023-06-26 16:44:41 +02:00 committed by Christopher Faulet
parent 901f31bc9a
commit b6e2d62fb3
5 changed files with 19 additions and 12 deletions

View File

@ -32,7 +32,8 @@ extern struct proxy *sink_proxies_list;
struct sink *sink_find(const char *name);
struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt, int fd);
ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
ssize_t __sink_write(struct sink *sink, size_t maxlen,
const struct ist msg[], size_t nmsg,
int level, int facility, struct ist * metadata);
int sink_announce_dropped(struct sink *sink, int facility);
@ -44,7 +45,8 @@ int sink_announce_dropped(struct sink *sink, int facility);
* The function returns the number of Bytes effectively sent or announced.
* or <= 0 in other cases.
*/
static inline ssize_t sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
static inline ssize_t sink_write(struct sink *sink, size_t maxlen,
const struct ist msg[], size_t nmsg,
int level, int facility, struct ist *metadata)
{
ssize_t sent;
@ -69,7 +71,7 @@ static inline ssize_t sink_write(struct sink *sink, const struct ist msg[], size
}
HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &sink->ctx.lock);
sent = __sink_write(sink, msg, nmsg, level, facility, metadata);
sent = __sink_write(sink, maxlen, msg, nmsg, level, facility, metadata);
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &sink->ctx.lock);
fail:

View File

@ -1689,7 +1689,7 @@ static inline void __do_send_log(struct logsrv *logsrv, int nblogger, int level,
msg = ist2(message, size);
msg = isttrim(msg, logsrv->maxlen);
sent = sink_write(logsrv->sink, &msg, 1, level, facility, metadata);
sent = sink_write(logsrv->sink, 0, &msg, 1, level, facility, metadata);
}
else if (logsrv->addr.ss_family == AF_CUST_EXISTING_FD) {
struct ist msg;

View File

@ -1760,7 +1760,7 @@ static int sample_conv_debug(const struct arg *arg_p, struct sample *smp, void *
done:
line = ist2(buf->area, buf->data);
sink_write(sink, &line, 1, 0, 0, NULL);
sink_write(sink, 0, &line, 1, 0, 0, NULL);
end:
free_trash_chunk(buf);
return 1;

View File

@ -168,10 +168,13 @@ struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt,
* array <msg> to sink <sink>. Formatting according to the sink's preference is
* done here. Lost messages are NOT accounted for. It is preferable to call
* sink_write() instead which will also try to emit the number of dropped
* messages when there are any. It returns >0 if it could write anything,
* <=0 otherwise.
* messages when there are any. It will stop writing at <maxlen> instead of
* sink->maxlen if <maxlen> is positive and inferior to sink->maxlen.
*
* It returns >0 if it could write anything, <=0 otherwise.
*/
ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
ssize_t __sink_write(struct sink *sink, size_t maxlen,
const struct ist msg[], size_t nmsg,
int level, int facility, struct ist *metadata)
{
struct ist *pfx = NULL;
@ -183,11 +186,13 @@ struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt,
pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
send:
if (!maxlen)
maxlen = ~0;
if (sink->type == SINK_TYPE_FD) {
return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
return fd_write_frag_line(sink->ctx.fd, MIN(maxlen, sink->maxlen), pfx, npfx, msg, nmsg, 1);
}
else if (sink->type == SINK_TYPE_BUFFER) {
return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
return ring_write(sink->ctx.ring, MIN(maxlen, sink->maxlen), pfx, npfx, msg, nmsg);
}
return 0;
}
@ -229,7 +234,7 @@ int sink_announce_dropped(struct sink *sink, int facility)
metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
}
if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
if (__sink_write(sink, 0, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
return 0;
/* success! */
HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);

View File

@ -291,7 +291,7 @@ void __trace(enum trace_level level, uint64_t mask, struct trace_source *src,
}
if (src->sink)
sink_write(src->sink, line, words, 0, 0, NULL);
sink_write(src->sink, 0, line, words, 0, 0, NULL);
end:
/* check if we need to stop the trace now */