MEDIUM: counters: Dynamically allocate per-thread group counters

Instead of statically allocating the per-thread group counters,
based on the max number of thread groups available, allocate
them dynamically, based on the number of thread groups actually
used. That way we can increase the maximum number of thread
groups without using an unreasonable amount of memory.
This commit is contained in:
Olivier Houchard 2026-01-12 04:25:34 +01:00
parent 37057feb80
commit 5495c88441
18 changed files with 113 additions and 51 deletions

View File

@ -88,7 +88,7 @@ static inline int be_usable_srv(struct proxy *be)
/* set the time of last session on the backend */
static inline void be_set_sess_last(struct proxy *be)
{
if (be->be_counters.shared.tg[tgid - 1])
if (be->be_counters.shared.tg && be->be_counters.shared.tg[tgid - 1])
HA_ATOMIC_STORE(&be->be_counters.shared.tg[tgid - 1]->last_sess, ns_to_sec(now_ns));
}

View File

@ -66,7 +66,7 @@ struct counters_shared {
COUNTERS_SHARED;
struct {
COUNTERS_SHARED_TG;
} *tg[MAX_TGROUPS];
} **tg;
};
/*
@ -101,7 +101,7 @@ struct fe_counters_shared_tg {
struct fe_counters_shared {
COUNTERS_SHARED;
struct fe_counters_shared_tg *tg[MAX_TGROUPS];
struct fe_counters_shared_tg **tg;
};
/* counters used by listeners and frontends */
@ -160,7 +160,7 @@ struct be_counters_shared_tg {
struct be_counters_shared {
COUNTERS_SHARED;
struct be_counters_shared_tg *tg[MAX_TGROUPS];
struct be_counters_shared_tg **tg;
};
/* counters used by servers and backends */

View File

@ -166,12 +166,12 @@ static inline int proxy_abrt_close(const struct proxy *px)
/* increase the number of cumulated connections received on the designated frontend */
static inline void proxy_inc_fe_conn_ctr(struct listener *l, struct proxy *fe)
{
if (fe->fe_counters.shared.tg[tgid - 1])
if (fe->fe_counters.shared.tg && fe->fe_counters.shared.tg[tgid - 1]) {
_HA_ATOMIC_INC(&fe->fe_counters.shared.tg[tgid - 1]->cum_conn);
if (l && l->counters && l->counters->shared.tg[tgid - 1])
_HA_ATOMIC_INC(&l->counters->shared.tg[tgid - 1]->cum_conn);
if (fe->fe_counters.shared.tg[tgid - 1])
update_freq_ctr(&fe->fe_counters.shared.tg[tgid - 1]->conn_per_sec, 1);
}
if (l && l->counters && l->counters->shared.tg && l->counters->shared.tg[tgid - 1])
_HA_ATOMIC_INC(&l->counters->shared.tg[tgid - 1]->cum_conn);
HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.cps_max,
update_freq_ctr(&fe->fe_counters._conn_per_sec, 1));
}
@ -179,12 +179,12 @@ static inline void proxy_inc_fe_conn_ctr(struct listener *l, struct proxy *fe)
/* increase the number of cumulated connections accepted by the designated frontend */
static inline void proxy_inc_fe_sess_ctr(struct listener *l, struct proxy *fe)
{
if (fe->fe_counters.shared.tg[tgid - 1])
if (fe->fe_counters.shared.tg && fe->fe_counters.shared.tg[tgid - 1]) {
_HA_ATOMIC_INC(&fe->fe_counters.shared.tg[tgid - 1]->cum_sess);
if (l && l->counters && l->counters->shared.tg[tgid - 1])
_HA_ATOMIC_INC(&l->counters->shared.tg[tgid - 1]->cum_sess);
if (fe->fe_counters.shared.tg[tgid - 1])
update_freq_ctr(&fe->fe_counters.shared.tg[tgid - 1]->sess_per_sec, 1);
}
if (l && l->counters && l->counters->shared.tg && l->counters->shared.tg[tgid - 1])
_HA_ATOMIC_INC(&l->counters->shared.tg[tgid - 1]->cum_sess);
HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.sps_max,
update_freq_ctr(&fe->fe_counters._sess_per_sec, 1));
}
@ -199,19 +199,19 @@ static inline void proxy_inc_fe_cum_sess_ver_ctr(struct listener *l, struct prox
http_ver > sizeof(fe->fe_counters.shared.tg[tgid - 1]->cum_sess_ver) / sizeof(*fe->fe_counters.shared.tg[tgid - 1]->cum_sess_ver))
return;
if (fe->fe_counters.shared.tg[tgid - 1])
if (fe->fe_counters.shared.tg && fe->fe_counters.shared.tg[tgid - 1])
_HA_ATOMIC_INC(&fe->fe_counters.shared.tg[tgid - 1]->cum_sess_ver[http_ver - 1]);
if (l && l->counters && l->counters->shared.tg[tgid - 1])
if (l && l->counters && l->counters->shared.tg && l->counters->shared.tg[tgid - 1])
_HA_ATOMIC_INC(&l->counters->shared.tg[tgid - 1]->cum_sess_ver[http_ver - 1]);
}
/* increase the number of cumulated streams on the designated backend */
static inline void proxy_inc_be_ctr(struct proxy *be)
{
if (be->be_counters.shared.tg[tgid - 1])
if (be->be_counters.shared.tg && be->be_counters.shared.tg[tgid - 1]) {
_HA_ATOMIC_INC(&be->be_counters.shared.tg[tgid - 1]->cum_sess);
if (be->be_counters.shared.tg[tgid - 1])
update_freq_ctr(&be->be_counters.shared.tg[tgid - 1]->sess_per_sec, 1);
}
HA_ATOMIC_UPDATE_MAX(&be->be_counters.sps_max,
update_freq_ctr(&be->be_counters._sess_per_sec, 1));
}
@ -226,12 +226,12 @@ static inline void proxy_inc_fe_req_ctr(struct listener *l, struct proxy *fe,
if (http_ver >= sizeof(fe->fe_counters.shared.tg[tgid - 1]->p.http.cum_req) / sizeof(*fe->fe_counters.shared.tg[tgid - 1]->p.http.cum_req))
return;
if (fe->fe_counters.shared.tg[tgid - 1])
if (fe->fe_counters.shared.tg && fe->fe_counters.shared.tg[tgid - 1]) {
_HA_ATOMIC_INC(&fe->fe_counters.shared.tg[tgid - 1]->p.http.cum_req[http_ver]);
if (l && l->counters && l->counters->shared.tg[tgid - 1])
_HA_ATOMIC_INC(&l->counters->shared.tg[tgid - 1]->p.http.cum_req[http_ver]);
if (fe->fe_counters.shared.tg[tgid - 1])
update_freq_ctr(&fe->fe_counters.shared.tg[tgid - 1]->req_per_sec, 1);
}
if (l && l->counters && l->counters->shared.tg && l->counters->shared.tg[tgid - 1])
_HA_ATOMIC_INC(&l->counters->shared.tg[tgid - 1]->p.http.cum_req[http_ver]);
HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.p.http.rps_max,
update_freq_ctr(&fe->fe_counters.p.http._req_per_sec, 1));
}

View File

@ -207,7 +207,7 @@ static inline void server_index_id(struct proxy *px, struct server *srv)
/* increase the number of cumulated streams on the designated server */
static inline void srv_inc_sess_ctr(struct server *s)
{
if (s->counters.shared.tg[tgid - 1]) {
if (s->counters.shared.tg && s->counters.shared.tg[tgid - 1]) {
_HA_ATOMIC_INC(&s->counters.shared.tg[tgid - 1]->cum_sess);
update_freq_ctr(&s->counters.shared.tg[tgid - 1]->sess_per_sec, 1);
}
@ -218,7 +218,7 @@ static inline void srv_inc_sess_ctr(struct server *s)
/* set the time of last session on the designated server */
static inline void srv_set_sess_last(struct server *s)
{
if (s->counters.shared.tg[tgid - 1])
if (s->counters.shared.tg && s->counters.shared.tg[tgid - 1])
HA_ATOMIC_STORE(&s->counters.shared.tg[tgid - 1]->last_sess, ns_to_sec(now_ns));
}

View File

@ -823,7 +823,7 @@ int assign_server(struct stream *s)
else if (srv != prev_srv) {
if (s->be_tgcounters)
_HA_ATOMIC_INC(&s->be_tgcounters->cum_lbconn);
if (srv->counters.shared.tg[tgid - 1])
if (srv->counters.shared.tg && srv->counters.shared.tg[tgid - 1])
_HA_ATOMIC_INC(&srv->counters.shared.tg[tgid - 1]->cum_lbconn);
}
stream_set_srv_target(s, srv);
@ -998,12 +998,12 @@ int assign_server_and_queue(struct stream *s)
s->txn->flags |= TX_CK_DOWN;
}
s->flags |= SF_REDISP;
if (prev_srv->counters.shared.tg[tgid - 1])
if (prev_srv->counters.shared.tg && prev_srv->counters.shared.tg[tgid - 1])
_HA_ATOMIC_INC(&prev_srv->counters.shared.tg[tgid - 1]->redispatches);
if (s->be_tgcounters)
_HA_ATOMIC_INC(&s->be_tgcounters->redispatches);
} else {
if (prev_srv->counters.shared.tg[tgid - 1])
if (prev_srv->counters.shared.tg && prev_srv->counters.shared.tg[tgid - 1])
_HA_ATOMIC_INC(&prev_srv->counters.shared.tg[tgid - 1]->retries);
if (s->be_tgcounters)
_HA_ATOMIC_INC(&s->be_tgcounters->retries);

View File

@ -2133,11 +2133,11 @@ enum act_return http_action_req_cache_use(struct act_rule *rule, struct proxy *p
return ACT_RET_CONT;
if (px == strm_fe(s)) {
if (px->fe_counters.shared.tg[tgid - 1])
if (px->fe_counters.shared.tg && px->fe_counters.shared.tg[tgid - 1])
_HA_ATOMIC_INC(&px->fe_counters.shared.tg[tgid - 1]->p.http.cache_lookups);
}
else {
if (px->be_counters.shared.tg[tgid - 1])
if (px->be_counters.shared.tg && px->be_counters.shared.tg[tgid - 1])
_HA_ATOMIC_INC(&px->be_counters.shared.tg[tgid - 1]->p.http.cache_lookups);
}
@ -2226,11 +2226,11 @@ enum act_return http_action_req_cache_use(struct act_rule *rule, struct proxy *p
should_send_notmodified_response(cache, htxbuf(&s->req.buf), res);
if (px == strm_fe(s)) {
if (px->fe_counters.shared.tg[tgid - 1])
if (px->fe_counters.shared.tg && px->fe_counters.shared.tg[tgid - 1])
_HA_ATOMIC_INC(&px->fe_counters.shared.tg[tgid - 1]->p.http.cache_hits);
}
else {
if (px->be_counters.shared.tg[tgid - 1])
if (px->be_counters.shared.tg && px->be_counters.shared.tg[tgid - 1])
_HA_ATOMIC_INC(&px->be_counters.shared.tg[tgid - 1]->p.http.cache_hits);
}
return ACT_RET_CONT;

View File

@ -513,7 +513,7 @@ void set_server_check_status(struct check *check, short status, const char *desc
if ((!(check->state & CHK_ST_AGENT) ||
(check->status >= HCHK_STATUS_L57DATA)) &&
(check->health > 0)) {
if (s->counters.shared.tg[tgid - 1])
if (s->counters.shared.tg && s->counters.shared.tg[tgid - 1])
_HA_ATOMIC_INC(&s->counters.shared.tg[tgid - 1]->failed_checks);
report = 1;
check->health--;
@ -741,7 +741,7 @@ void __health_adjust(struct server *s, short status)
HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
HA_ATOMIC_STORE(&s->consecutive_errors, 0);
if (s->counters.shared.tg[tgid - 1])
if (s->counters.shared.tg && s->counters.shared.tg[tgid - 1])
_HA_ATOMIC_INC(&s->counters.shared.tg[tgid - 1]->failed_hana);
if (s->check.fastinter) {

View File

@ -3372,8 +3372,14 @@ read_again:
target_pid = s->pcli_next_pid;
/* we can connect now */
s->target = pcli_pid_to_server(target_pid);
if (objt_server(s->target))
s->sv_tgcounters = __objt_server(s->target)->counters.shared.tg[tgid - 1];
if (objt_server(s->target)) {
struct server *srv = __objt_server(s->target);
if (srv->counters.shared.tg)
s->sv_tgcounters = srv->counters.shared.tg[tgid - 1];
else
s->sv_tgcounters = NULL;
}
if (!s->target)
goto server_disconnect;

View File

@ -37,7 +37,7 @@ static void _counters_shared_drop(void *counters)
if (!shared)
return;
while (it < global.nbtgroups && shared->tg[it]) {
while (it < global.nbtgroups && shared->tg && shared->tg[it]) {
if (shared->flags & COUNTERS_SHARED_F_LOCAL) {
/* memory was allocated using calloc(), simply free it */
free(shared->tg[it]);
@ -53,6 +53,7 @@ static void _counters_shared_drop(void *counters)
}
it += 1;
}
free(shared->tg);
}
/* release a shared fe counters struct */
@ -86,6 +87,14 @@ static int _counters_shared_prepare(struct counters_shared *shared,
if (!guid->key || !shm_stats_file_hdr)
shared->flags |= COUNTERS_SHARED_F_LOCAL;
if (!shared->tg) {
shared->tg = calloc(global.nbtgroups, sizeof(*shared->tg));
if (!shared->tg) {
memprintf(errmsg, "couldn't allocate memory for shared counters");
return 0;
}
}
while (it < global.nbtgroups) {
if (shared->flags & COUNTERS_SHARED_F_LOCAL) {
size_t tg_size;

View File

@ -951,8 +951,14 @@ int httpclient_applet_init(struct appctx *appctx)
s = appctx_strm(appctx);
s->target = target;
if (objt_server(s->target))
s->sv_tgcounters = __objt_server(s->target)->counters.shared.tg[tgid - 1];
if (objt_server(s->target)) {
struct server *srv = __objt_server(s->target);
if (srv->counters.shared.tg)
s->sv_tgcounters = __objt_server(s->target)->counters.shared.tg[tgid - 1];
else
s->sv_tgcounters = NULL;
}
/* set the "timeout server" */
s->scb->ioto = hc->timeout_server;

View File

@ -1117,7 +1117,7 @@ void listener_accept(struct listener *l)
int max = 0;
int it;
for (it = 0; (it < global.nbtgroups && p->fe_counters.shared.tg[it]); it++)
for (it = 0; (it < global.nbtgroups && p->fe_counters.shared.tg && p->fe_counters.shared.tg[it]); it++)
max += freq_ctr_remain(&p->fe_counters.shared.tg[it]->sess_per_sec, p->fe_sps_lim, 0);
if (unlikely(!max)) {

View File

@ -2578,7 +2578,11 @@ int stream_set_backend(struct stream *s, struct proxy *be)
return 0;
s->be = be;
s->be_tgcounters = be->be_counters.shared.tg[tgid - 1];
if (be->be_counters.shared.tg)
s->be_tgcounters = be->be_counters.shared.tg[tgid - 1];
else
s->be_tgcounters = NULL;
HA_ATOMIC_UPDATE_MAX(&be->be_counters.conn_max,
HA_ATOMIC_ADD_FETCH(&be->beconn, 1));
proxy_inc_be_ctr(be);

View File

@ -7159,7 +7159,7 @@ static void srv_update_status(struct server *s, int type, int cause)
}
else if (s->cur_state == SRV_ST_STOPPED) {
/* server was up and is currently down */
if (s->counters.shared.tg[tgid - 1])
if (s->counters.shared.tg && s->counters.shared.tg[tgid - 1])
HA_ATOMIC_INC(&s->counters.shared.tg[tgid - 1]->down_trans);
_srv_event_hdl_publish(EVENT_HDL_SUB_SERVER_DOWN, cb_data.common, s);
}
@ -7174,7 +7174,7 @@ static void srv_update_status(struct server *s, int type, int cause)
}
s->last_change = ns_to_sec(now_ns);
if (s->counters.shared.tg[tgid - 1])
if (s->counters.shared.tg && s->counters.shared.tg[tgid - 1])
HA_ATOMIC_STORE(&s->counters.shared.tg[tgid - 1]->last_state_change, s->last_change);
/* publish the state change */
@ -7195,7 +7195,7 @@ static void srv_update_status(struct server *s, int type, int cause)
if (last_change < ns_to_sec(now_ns)) // ignore negative times
s->proxy->down_time += ns_to_sec(now_ns) - last_change;
s->proxy->last_change = ns_to_sec(now_ns);
if (s->proxy->be_counters.shared.tg[tgid - 1])
if (s->proxy->be_counters.shared.tg && s->proxy->be_counters.shared.tg[tgid - 1])
HA_ATOMIC_STORE(&s->proxy->be_counters.shared.tg[tgid - 1]->last_state_change, s->proxy->last_change);
}
}

View File

@ -322,7 +322,7 @@ static void srv_state_srv_update(struct server *srv, int version, char **params)
}
srv->last_change = ns_to_sec(now_ns) - srv_last_time_change;
if (srv->counters.shared.tg[0])
if (srv->counters.shared.tg && srv->counters.shared.tg[0])
HA_ATOMIC_STORE(&srv->counters.shared.tg[0]->last_state_change, srv->last_change);
srv->check.status = srv_check_status;
srv->check.result = srv_check_result;

View File

@ -99,8 +99,12 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type
sess->flags = SESS_FL_NONE;
sess->src = NULL;
sess->dst = NULL;
sess->fe_tgcounters = sess->fe->fe_counters.shared.tg[tgid - 1];
if (sess->listener && sess->listener->counters)
if (sess->fe->fe_counters.shared.tg)
sess->fe_tgcounters = sess->fe->fe_counters.shared.tg[tgid - 1];
else
sess->fe_tgcounters = NULL;
if (sess->listener && sess->listener->counters && sess->listener->counters->shared.tg)
sess->li_tgcounters = sess->listener->counters->shared.tg[tgid - 1];
else
sess->li_tgcounters = NULL;

View File

@ -293,6 +293,9 @@ static int parse_stat_line(struct ist line,
if (!(px->cap & PR_CAP_FE))
return 0; /* silently ignored fe/be mismatch */
if (!px->fe_counters.shared.tg)
return 0;
base_off_shared = (char *)px->fe_counters.shared.tg[0];
if (!base_off_shared)
return 0; // not allocated
@ -305,6 +308,9 @@ static int parse_stat_line(struct ist line,
if (!(px->cap & PR_CAP_BE))
return 0; /* silently ignored fe/be mismatch */
if (!px->be_counters.shared.tg)
return 0;
base_off_shared = (char *)px->be_counters.shared.tg[0];
if (!base_off_shared)
return 0; // not allocated
@ -328,6 +334,9 @@ static int parse_stat_line(struct ist line,
if (!li->counters)
return 0;
if (!li->counters->shared.tg)
return 0;
base_off_shared = (char *)li->counters->shared.tg[0];
if (!base_off_shared)
return 0; // not allocated
@ -342,6 +351,9 @@ static int parse_stat_line(struct ist line,
goto err;
srv = __objt_server(node->obj_type);
if (!srv->counters.shared.tg)
return 0;
base_off_shared = (char *)srv->counters.shared.tg[0];
if (!base_off_shared)
return 0; // not allocated
@ -765,8 +777,13 @@ static void shm_stats_file_preload(void)
BUG_ON(curr_obj->type != SHM_STATS_FILE_OBJECT_TYPE_FE);
li = __objt_listener(node->obj_type);
// counters are optional for listeners
if (li->counters)
if (li->counters) {
if (!li->counters->shared.tg)
li->counters->shared.tg = calloc(global.nbtgroups, sizeof(*li->counters->shared.tg));
if (li->counters->shared.tg == NULL)
goto release;
li->counters->shared.tg[obj_tgid - 1] = &curr_obj->data.fe;
}
break;
}
case OBJ_TYPE_SERVER:
@ -775,6 +792,10 @@ static void shm_stats_file_preload(void)
BUG_ON(curr_obj->type != SHM_STATS_FILE_OBJECT_TYPE_BE);
sv = __objt_server(node->obj_type);
if (!sv->counters.shared.tg)
sv->counters.shared.tg = calloc(global.nbtgroups, sizeof(*sv->counters.shared.tg));
if (sv->counters.shared.tg == NULL)
goto release;
sv->counters.shared.tg[obj_tgid - 1] = &curr_obj->data.be;
break;
}
@ -783,11 +804,19 @@ static void shm_stats_file_preload(void)
struct proxy *px;
px = __objt_proxy(node->obj_type);
if (curr_obj->type == SHM_STATS_FILE_OBJECT_TYPE_FE)
if (curr_obj->type == SHM_STATS_FILE_OBJECT_TYPE_FE) {
if (!px->fe_counters.shared.tg)
px->fe_counters.shared.tg = calloc(global.nbtgroups, sizeof(*px->fe_counters.shared.tg));
if (px->fe_counters.shared.tg == NULL)
goto release;
px->fe_counters.shared.tg[obj_tgid - 1] = &curr_obj->data.fe;
else if (curr_obj->type == SHM_STATS_FILE_OBJECT_TYPE_BE)
} else if (curr_obj->type == SHM_STATS_FILE_OBJECT_TYPE_BE) {
if (!px->be_counters.shared.tg)
px->be_counters.shared.tg = calloc(global.nbtgroups, sizeof(*px->be_counters.shared.tg));
if (px->fe_counters.shared.tg == NULL)
goto release;
px->be_counters.shared.tg[obj_tgid - 1] = &curr_obj->data.be;
else
} else
goto release; // not supported
break;
}

View File

@ -290,7 +290,7 @@ static struct field me_generate_field(const struct stat_col *col,
case STATS_PX_CAP_FE:
case STATS_PX_CAP_LI:
if (col->flags & STAT_COL_FL_SHARED) {
counter = (char *)&((struct fe_counters *)counters)->shared.tg;
counter = ((struct fe_counters *)counters)->shared.tg;
offset = col->metric.offset[0];
}
else
@ -301,7 +301,7 @@ static struct field me_generate_field(const struct stat_col *col,
case STATS_PX_CAP_BE:
case STATS_PX_CAP_SRV:
if (col->flags & STAT_COL_FL_SHARED) {
counter = (char *)&((struct be_counters *)counters)->shared.tg;
counter = ((struct be_counters *)counters)->shared.tg;
offset = col->metric.offset[1];
}
else

View File

@ -451,7 +451,11 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer
* when the default backend is assigned.
*/
s->be = sess->fe;
s->be_tgcounters = sess->fe->be_counters.shared.tg[tgid - 1];
if (sess->fe->be_counters.shared.tg)
s->be_tgcounters = sess->fe->be_counters.shared.tg[tgid - 1];
else
s->be_tgcounters = NULL;
s->sv_tgcounters = NULL; // default value
s->req_cap = NULL;