diff --git a/src/sink.c b/src/sink.c index 86356ad21..1b09c04ae 100644 --- a/src/sink.c +++ b/src/sink.c @@ -548,11 +548,43 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink { struct appctx *appctx; struct applet *applet = &sink_forward_applet; + uint best_tid, best_load; + int attempts, first; if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) applet = &sink_forward_oc_applet; - appctx = appctx_new_on(applet, NULL, statistical_prng_range(global.nbthread)); + BUG_ON(!global.nbthread); + attempts = MIN(global.nbthread, 3); + first = 1; + + /* to shut gcc warning */ + best_tid = best_load = 0; + + /* to help spread the load over multiple threads, try to find a + * non-overloaded thread by picking a random thread and checking + * its load. If we fail to find a non-overloaded thread after 3 + * attempts, let's pick the least overloaded one. + */ + while (attempts-- > 0) { + uint cur_tid; + uint cur_load; + + cur_tid = statistical_prng_range(global.nbthread); + cur_load = HA_ATOMIC_LOAD(&ha_thread_ctx[cur_tid].rq_total); + + if (first || cur_load < best_load) { + best_tid = cur_tid; + best_load = cur_load; + } + first = 0; + + /* if we already found a non-overloaded thread, stop now */ + if (HA_ATOMIC_LOAD(&ha_thread_ctx[best_tid].rq_total) < 3) + break; + } + + appctx = appctx_new_on(applet, NULL, best_tid); if (!appctx) goto out_close; appctx->svcctx = (void *)sft;