diff --git a/include/haproxy/sink-t.h b/include/haproxy/sink-t.h index 79a0ddade..d5e1cec57 100644 --- a/include/haproxy/sink-t.h +++ b/include/haproxy/sink-t.h @@ -60,9 +60,8 @@ struct sink { struct sig_handler *forward_sighandler; /* signal handler */ struct { struct ring *ring; // used by ring buffer and STRM sender - unsigned int dropped; // dropped events since last one. + unsigned int dropped; // 2*dropped events since last one + 1 for purge in progress. int fd; // fd num for FD type sink - __decl_thread(HA_RWLOCK_T lock); // shared/excl for dropped } ctx; }; diff --git a/include/haproxy/sink.h b/include/haproxy/sink.h index 3b428a154..bdc84475b 100644 --- a/include/haproxy/sink.h +++ b/include/haproxy/sink.h @@ -52,18 +52,10 @@ int sink_announce_dropped(struct sink *sink, struct log_header hdr); static inline ssize_t sink_write(struct sink *sink, struct log_header hdr, size_t maxlen, const struct ist msg[], size_t nmsg) { - ssize_t sent; + ssize_t sent = 0; - if (unlikely(sink->ctx.dropped > 0)) { - /* We need to take an exclusive lock so that other producers - * don't do the same thing at the same time and above all we - * want to be sure others have finished sending their messages - * so that the dropped event arrives exactly at the right - * position. - */ - HA_RWLOCK_WRLOCK(RING_LOCK, &sink->ctx.lock); + if (unlikely(HA_ATOMIC_LOAD(&sink->ctx.dropped) > 0)) { sent = sink_announce_dropped(sink, hdr); - HA_RWLOCK_WRUNLOCK(RING_LOCK, &sink->ctx.lock); if (!sent) { /* we failed, we don't try to send our log as if it @@ -73,13 +65,11 @@ static inline ssize_t sink_write(struct sink *sink, struct log_header hdr, } } - HA_RWLOCK_RDLOCK(RING_LOCK, &sink->ctx.lock); sent = __sink_write(sink, hdr, maxlen, msg, nmsg); - HA_RWLOCK_RDUNLOCK(RING_LOCK, &sink->ctx.lock); fail: if (unlikely(sent <= 0)) - HA_ATOMIC_INC(&sink->ctx.dropped); + HA_ATOMIC_ADD(&sink->ctx.dropped, 2); return sent; } diff --git a/src/sink.c b/src/sink.c index 636805a02..f5347269a 100644 --- a/src/sink.c +++ b/src/sink.c @@ -87,7 +87,6 @@ static struct sink *__sink_new(const char *name, const char *desc, int fmt) /* address will be filled by the caller if needed */ sink->ctx.fd = -1; sink->ctx.dropped = 0; - HA_RWLOCK_INIT(&sink->ctx.lock); LIST_APPEND(&sink_list, &sink->sink_list); end: return sink; @@ -206,9 +205,15 @@ send: * here with the only difference that we override the log level. This is * possible since the announce message will be sent from the same context. * - * In case of success, the amount of drops is reduced by as much. It's supposed - * to be called under an exclusive lock on the sink to avoid multiple producers - * doing the same. On success, >0 is returned, otherwise <=0 on failure. + * In case of success, the amount of drops is reduced by as much. + * The function ensures that a single thread will do that work at once, other + * ones will only report a failure if a thread is dumping, so that no thread + * waits. A pair od atomic OR and AND is performed around the code so the + * caller would be advised to only call this function AFTER having verified + * that sink->ctx.dropped is not zero in order to avoid a memory write. On + * success, >0 is returned, otherwise <=0 on failure, indicating that it could + * not eliminate the pending drop counter. It may loop up to 10 times trying + * to catch up with failing competing threads. */ int sink_announce_dropped(struct sink *sink, struct log_header hdr) { @@ -217,10 +222,26 @@ int sink_announce_dropped(struct sink *sink, struct log_header hdr) uint dropped, last_dropped; struct ist msgvec[1]; uint retries = 10; + int ret = 0; + + /* Explanation. ctx.dropped is made of: + * bit0 = 1 if dropped dump in progress + * bit1..31 = dropped counter + * If non-zero there have been some drops. If not &1, it means + * nobody's taking care of them and we'll have to, otherwise + * another thread is already on them and we can just pass and + * count another drop (hence add 2). + */ + dropped = HA_ATOMIC_FETCH_OR(&sink->ctx.dropped, 1); + if (dropped & 1) { + /* another thread was already on it */ + goto leave; + } last_dropped = 0; + dropped >>= 1; while (1) { - while (unlikely((dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped)) > last_dropped) && retries-- > 0) { + while (unlikely(dropped > last_dropped) && retries-- > 0) { /* try to aggregate multiple messages if other threads arrive while * we're producing the dropped message. */ @@ -234,7 +255,9 @@ int sink_announce_dropped(struct sink *sink, struct log_header hdr) msglen = msg_dropped2 + sizeof(msg_dropped2) - msg; } msgvec[0] = ist2(msg, msglen); + dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped) >> 1; } + if (!dropped) break; @@ -242,11 +265,19 @@ int sink_announce_dropped(struct sink *sink, struct log_header hdr) hdr.level = LOG_NOTICE; /* override level but keep original log header data */ if (__sink_write(sink, hdr, 0, msgvec, 1) <= 0) - return 0; + goto done; + /* success! */ - HA_ATOMIC_SUB(&sink->ctx.dropped, dropped); + HA_ATOMIC_SUB(&sink->ctx.dropped, dropped << 1); } - return 1; + + /* done! */ + ret = 1; +done: + /* unlock the counter */ + HA_ATOMIC_AND(&sink->ctx.dropped, ~1); +leave: + return ret; } /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */