diff --git a/include/proto/sink.h b/include/proto/sink.h index 0d8dd7dfc..29475364d 100644 --- a/include/proto/sink.h +++ b/include/proto/sink.h @@ -29,7 +29,46 @@ extern struct list sink_list; struct sink *sink_find(const char *name); struct sink *sink_new_fd(const char *name, const char *desc, enum sink_fmt fmt, int fd); -void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg); +ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg); +int sink_announce_dropped(struct sink *sink); + + +/* tries to send message parts (up to 8, ignored above) from message + * array to sink . Formating according to the sink's preference is + * done here. Lost messages are accounted for in the sink's counter. If there + * were lost messages, an attempt is first made to indicate it. + */ +static inline void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg) +{ + ssize_t sent; + + if (unlikely(sink->ctx.dropped > 0)) { + /* We need to take an exclusive lock so that other producers + * don't do the same thing at the same time and above all we + * want to be sure others have finished sending their messages + * so that the dropped event arrives exactly at the right + * position. + */ + HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.lock); + sent = sink_announce_dropped(sink); + HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.lock); + + if (!sent) { + /* we failed, we don't try to send our log as if it + * would pass by chance, we'd get disordered events. + */ + goto fail; + } + } + + HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &sink->ctx.lock); + sent = __sink_write(sink, msg, nmsg); + HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &sink->ctx.lock); + + fail: + if (unlikely(sent <= 0)) + HA_ATOMIC_ADD(&sink->ctx.dropped, 1); +} #endif /* _PROTO_SINK_H */ diff --git a/include/types/sink.h b/include/types/sink.h index 5465ca63d..e36a29653 100644 --- a/include/types/sink.h +++ b/include/types/sink.h @@ -60,9 +60,9 @@ struct sink { uint8_t syslog_minlvl; // used by syslog & short formats uint32_t maxlen; // max message length (truncated above) struct { + __decl_hathreads(HA_RWLOCK_T lock); // shared/excl for dropped struct ring *ring; // used by ring buffer and STRM sender unsigned int dropped; // dropped events since last one. - __decl_hathreads(HA_RWLOCK_T lock); // used by some types int fd; // fd num for FD type sink } ctx; }; diff --git a/src/sink.c b/src/sink.c index 5746c4bc2..8d2ce9183 100644 --- a/src/sink.c +++ b/src/sink.c @@ -145,14 +145,16 @@ struct sink *sink_new_buf(const char *name, const char *desc, enum sink_fmt fmt, /* tries to send message parts (up to 8, ignored above) from message * array to sink . Formating according to the sink's preference is - * done here. Lost messages are accounted for in the sink's counter. + * done here. Lost messages are NOT accounted for. It is preferable to call + * sink_write() instead which will also try to emit the number of dropped + * messages when there are any. It returns >0 if it could write anything, + * <=0 otherwise. */ -void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg) +ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg) { char short_hdr[4]; struct ist pfx[4]; size_t npfx = 0; - size_t sent = 0; if (sink->fmt == SINK_FMT_SHORT) { short_hdr[0] = '<'; @@ -165,17 +167,36 @@ void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg) } if (sink->type == SINK_TYPE_FD) { - sent = fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1); - /* sent > 0 if the message was delivered */ + return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1); } else if (sink->type == SINK_TYPE_BUFFER) { - sent = ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg); - /* sent > 0 if the message was delivered */ + return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg); } + return 0; +} - /* account for errors now */ - if (sent <= 0) - HA_ATOMIC_ADD(&sink->ctx.dropped, 1); +/* Tries to emit a message indicating the number of dropped events. In case of + * success, the amount of drops is reduced by as much. It's supposed to be + * called under an exclusive lock on the sink to avoid multiple produces doing + * the same. On success, >0 is returned, otherwise <=0 on failure. + */ +int sink_announce_dropped(struct sink *sink) +{ + unsigned int dropped; + struct buffer msg; + struct ist msgvec[1]; + char logbuf[64]; + + while (unlikely((dropped = sink->ctx.dropped) > 0)) { + chunk_init(&msg, logbuf, sizeof(logbuf)); + chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : ""); + msgvec[0] = ist2(msg.area, msg.data); + if (__sink_write(sink, msgvec, 1) <= 0) + return 0; + /* success! */ + HA_ATOMIC_SUB(&sink->ctx.dropped, dropped); + } + return 1; } /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */