From 2513bd257f2db7fc48fc50f7227a8fa13aeb2680 Mon Sep 17 00:00:00 2001 From: Aurelien DARRAGON Date: Mon, 22 Jul 2024 17:52:18 +0200 Subject: [PATCH] OPTIM: sink: consider threads' current load when rebalancing applets In c454296f0 ("OPTIM: sink: balance applets accross threads"), we already made sure to balance applets accross threads by picking a random thread to spawn the new applet. Also, thanks to the previous commit, we also have the ability to destroy the applet when a certain amount of messages were processed to help distribute the load during runtime. Let's improve that by trying up to 3 different threads in the hope to pick a non-overloaded one in the best scenario, and the least over loaded one in the worst case. This should help to better distribute the load over multiple threads when high loads are expected. Logic was greatly inspired from thread migration logic used by server health checks, but it was simpliflied for sink's use case. --- src/sink.c | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/src/sink.c b/src/sink.c index 86356ad21..1b09c04ae 100644 --- a/src/sink.c +++ b/src/sink.c @@ -548,11 +548,43 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink { struct appctx *appctx; struct applet *applet = &sink_forward_applet; + uint best_tid, best_load; + int attempts, first; if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) applet = &sink_forward_oc_applet; - appctx = appctx_new_on(applet, NULL, statistical_prng_range(global.nbthread)); + BUG_ON(!global.nbthread); + attempts = MIN(global.nbthread, 3); + first = 1; + + /* to shut gcc warning */ + best_tid = best_load = 0; + + /* to help spread the load over multiple threads, try to find a + * non-overloaded thread by picking a random thread and checking + * its load. If we fail to find a non-overloaded thread after 3 + * attempts, let's pick the least overloaded one. + */ + while (attempts-- > 0) { + uint cur_tid; + uint cur_load; + + cur_tid = statistical_prng_range(global.nbthread); + cur_load = HA_ATOMIC_LOAD(&ha_thread_ctx[cur_tid].rq_total); + + if (first || cur_load < best_load) { + best_tid = cur_tid; + best_load = cur_load; + } + first = 0; + + /* if we already found a non-overloaded thread, stop now */ + if (HA_ATOMIC_LOAD(&ha_thread_ctx[best_tid].rq_total) < 3) + break; + } + + appctx = appctx_new_on(applet, NULL, best_tid); if (!appctx) goto out_close; appctx->svcctx = (void *)sft;