OPTIM: sink: drop the sink lock used to count drops

The sink lock was made to prevent event producers from passing while
there were other threads trying to print a "dropped" message, in order
to guarantee the absence of reordering. It has a serious impact however,
which is that all threads need to take the read lock when producing a
regular trace even when there's no reader.

This patch takes a different approach. The drop counter is shifted left
by one so that the lowest bit is used to indicate that one thread is
already taking care of trying to dump the counter. Threads only read
this value normally, and will only try to change it if it's non-null,
in which case they'll first check if they are the first ones trying to
dump it, otherwise will simply count another drop and leave. This has
a large benefit. First, it will avoid the locking that causes stalls
as soon as a slow reader is present. Second, it avoids any write on the
fast path as long as there's no drop. And it remains very lightweight
since we just need to add +2 or subtract 2*dropped in operations, while
offering the guarantee that the sink_write() has succeeded before
unlocking the counter.

While a reader was previously limiting the traffic to 11k RPS under
4C/8T, now we reach 36k RPS vs 14k with no reader, so readers will no
longer slow the traffic down and will instead even speed it up due to
avoiding the contention down the chain in the ring. The locking cost
dropped from ~75% to ~60% now (it's in ring_write now).
This commit is contained in:
Willy Tarreau 2024-03-01 19:22:04 +01:00
parent eb7b2ec83a
commit 758cb450a2
3 changed files with 43 additions and 23 deletions

View File

@ -60,9 +60,8 @@ struct sink {
struct sig_handler *forward_sighandler; /* signal handler */ struct sig_handler *forward_sighandler; /* signal handler */
struct { struct {
struct ring *ring; // used by ring buffer and STRM sender struct ring *ring; // used by ring buffer and STRM sender
unsigned int dropped; // dropped events since last one. unsigned int dropped; // 2*dropped events since last one + 1 for purge in progress.
int fd; // fd num for FD type sink int fd; // fd num for FD type sink
__decl_thread(HA_RWLOCK_T lock); // shared/excl for dropped
} ctx; } ctx;
}; };

View File

@ -52,18 +52,10 @@ int sink_announce_dropped(struct sink *sink, struct log_header hdr);
static inline ssize_t sink_write(struct sink *sink, struct log_header hdr, static inline ssize_t sink_write(struct sink *sink, struct log_header hdr,
size_t maxlen, const struct ist msg[], size_t nmsg) size_t maxlen, const struct ist msg[], size_t nmsg)
{ {
ssize_t sent; ssize_t sent = 0;
if (unlikely(sink->ctx.dropped > 0)) { if (unlikely(HA_ATOMIC_LOAD(&sink->ctx.dropped) > 0)) {
/* We need to take an exclusive lock so that other producers
* don't do the same thing at the same time and above all we
* want to be sure others have finished sending their messages
* so that the dropped event arrives exactly at the right
* position.
*/
HA_RWLOCK_WRLOCK(RING_LOCK, &sink->ctx.lock);
sent = sink_announce_dropped(sink, hdr); sent = sink_announce_dropped(sink, hdr);
HA_RWLOCK_WRUNLOCK(RING_LOCK, &sink->ctx.lock);
if (!sent) { if (!sent) {
/* we failed, we don't try to send our log as if it /* we failed, we don't try to send our log as if it
@ -73,13 +65,11 @@ static inline ssize_t sink_write(struct sink *sink, struct log_header hdr,
} }
} }
HA_RWLOCK_RDLOCK(RING_LOCK, &sink->ctx.lock);
sent = __sink_write(sink, hdr, maxlen, msg, nmsg); sent = __sink_write(sink, hdr, maxlen, msg, nmsg);
HA_RWLOCK_RDUNLOCK(RING_LOCK, &sink->ctx.lock);
fail: fail:
if (unlikely(sent <= 0)) if (unlikely(sent <= 0))
HA_ATOMIC_INC(&sink->ctx.dropped); HA_ATOMIC_ADD(&sink->ctx.dropped, 2);
return sent; return sent;
} }

View File

@ -87,7 +87,6 @@ static struct sink *__sink_new(const char *name, const char *desc, int fmt)
/* address will be filled by the caller if needed */ /* address will be filled by the caller if needed */
sink->ctx.fd = -1; sink->ctx.fd = -1;
sink->ctx.dropped = 0; sink->ctx.dropped = 0;
HA_RWLOCK_INIT(&sink->ctx.lock);
LIST_APPEND(&sink_list, &sink->sink_list); LIST_APPEND(&sink_list, &sink->sink_list);
end: end:
return sink; return sink;
@ -206,9 +205,15 @@ send:
* here with the only difference that we override the log level. This is * here with the only difference that we override the log level. This is
* possible since the announce message will be sent from the same context. * possible since the announce message will be sent from the same context.
* *
* In case of success, the amount of drops is reduced by as much. It's supposed * In case of success, the amount of drops is reduced by as much.
* to be called under an exclusive lock on the sink to avoid multiple producers * The function ensures that a single thread will do that work at once, other
* doing the same. On success, >0 is returned, otherwise <=0 on failure. * ones will only report a failure if a thread is dumping, so that no thread
* waits. A pair od atomic OR and AND is performed around the code so the
* caller would be advised to only call this function AFTER having verified
* that sink->ctx.dropped is not zero in order to avoid a memory write. On
* success, >0 is returned, otherwise <=0 on failure, indicating that it could
* not eliminate the pending drop counter. It may loop up to 10 times trying
* to catch up with failing competing threads.
*/ */
int sink_announce_dropped(struct sink *sink, struct log_header hdr) int sink_announce_dropped(struct sink *sink, struct log_header hdr)
{ {
@ -217,10 +222,26 @@ int sink_announce_dropped(struct sink *sink, struct log_header hdr)
uint dropped, last_dropped; uint dropped, last_dropped;
struct ist msgvec[1]; struct ist msgvec[1];
uint retries = 10; uint retries = 10;
int ret = 0;
/* Explanation. ctx.dropped is made of:
* bit0 = 1 if dropped dump in progress
* bit1..31 = dropped counter
* If non-zero there have been some drops. If not &1, it means
* nobody's taking care of them and we'll have to, otherwise
* another thread is already on them and we can just pass and
* count another drop (hence add 2).
*/
dropped = HA_ATOMIC_FETCH_OR(&sink->ctx.dropped, 1);
if (dropped & 1) {
/* another thread was already on it */
goto leave;
}
last_dropped = 0; last_dropped = 0;
dropped >>= 1;
while (1) { while (1) {
while (unlikely((dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped)) > last_dropped) && retries-- > 0) { while (unlikely(dropped > last_dropped) && retries-- > 0) {
/* try to aggregate multiple messages if other threads arrive while /* try to aggregate multiple messages if other threads arrive while
* we're producing the dropped message. * we're producing the dropped message.
*/ */
@ -234,7 +255,9 @@ int sink_announce_dropped(struct sink *sink, struct log_header hdr)
msglen = msg_dropped2 + sizeof(msg_dropped2) - msg; msglen = msg_dropped2 + sizeof(msg_dropped2) - msg;
} }
msgvec[0] = ist2(msg, msglen); msgvec[0] = ist2(msg, msglen);
dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped) >> 1;
} }
if (!dropped) if (!dropped)
break; break;
@ -242,11 +265,19 @@ int sink_announce_dropped(struct sink *sink, struct log_header hdr)
hdr.level = LOG_NOTICE; /* override level but keep original log header data */ hdr.level = LOG_NOTICE; /* override level but keep original log header data */
if (__sink_write(sink, hdr, 0, msgvec, 1) <= 0) if (__sink_write(sink, hdr, 0, msgvec, 1) <= 0)
return 0; goto done;
/* success! */ /* success! */
HA_ATOMIC_SUB(&sink->ctx.dropped, dropped); HA_ATOMIC_SUB(&sink->ctx.dropped, dropped << 1);
} }
return 1;
/* done! */
ret = 1;
done:
/* unlock the counter */
HA_ATOMIC_AND(&sink->ctx.dropped, ~1);
leave:
return ret;
} }
/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */ /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */