MEDIUM: listener/config: make the "thread" parser rely on thread_sets

Instead of reading and storing a single group and a single mask for a
"thread" directive on a bind line, we now store the complete range in
a thread set that's stored in the bind_conf. The bind_parse_thread()
function now just calls parse_thread_set() to complete the current set,
which starts empty, and thread_resolve_group_mask() was updated to
support retrieving thread group numbers or absolute thread numbers
directly from the pre-filled thread_set, and continue to feed bind_tgroup
and bind_thread. The CLI parsers which were pre-initialized to set the
bind_tgroup to 1 cannot do it anymore as it would prevent one from
restricting the thread set. Instead check_config_validity() now detects
the CLI frontend and passes the info down to thread_resolve_group_mask()
that will automatically use only the group 1's threads for these
listeners. The same is done for the peers listeners for now.

At this step it's already possible to start with all previous valid
configs as well as extended ones supporting comma-delimited thread
sets. In addition the parser already accepts large ranges spanning
multiple groups, but since the underlying listeners infrastructure
is not read, for now we're maintaining a specific check against this
at the higher level of the config validity check.

The patch is a bit large because thread resolution is performed in
multiple steps, so we need to adjust all of them at once to preserve
functional and technical consistency.
This commit is contained in:
Willy Tarreau 2023-01-31 19:31:27 +01:00
parent bef43dfa60
commit f0de8cacc4
7 changed files with 109 additions and 115 deletions

View File

@ -15063,33 +15063,51 @@ tfo
need to build HAProxy with USE_TFO=1 if your libc doesn't define need to build HAProxy with USE_TFO=1 if your libc doesn't define
TCP_FASTOPEN. TCP_FASTOPEN.
thread [<thread-group>/]<thread-set> thread [<thread-group>/]<thread-set>[,...]
This restricts the list of threads on which this listener is allowed to run. This restricts the list of threads on which this listener is allowed to run.
It does not enforce any of them but eliminates those which do not match. It It does not enforce any of them but eliminates those which do not match. It
limits the threads allowed to process incoming connections for this listener. limits the threads allowed to process incoming connections for this listener.
There are two numbering schemes. By default, thread numbers are absolute in There are two numbering schemes. By default, thread numbers are absolute in
the process, comprised between 1 and the value specified in global.nbthread. the process, comprised between 1 and the value specified in global.nbthread.
When thread groups are enabled, the number of a single desired thread group It is also possible to designate a thread number using its relative number
(starting at 1) may be specified before a slash ('/') before the thread inside its thread group, by specifying the thread group number first, then a
range. In this case, the thread numbers in the range are relative to the slash ('/') and the relative thread number(s). In this case thread numbers
thread group instead, and start at 1 for each thread group. Absolute and also start at 1 and end at 32 or 64 depending on the platform. When absolute
relative thread numbers may be used interchangeably but they must not be thread numbers are specified, they will be automatically translated to
mixed on a single "bind" line, as those not set will be resolved at the end relative numbers once thread groups are known. Usually, absolute numbers are
of the parsing. preferred for simple configurations, and relative ones are preferred for
complex configurations where CPU arrangement matters for performance.
For the unlikely case where several ranges are needed, this directive may be After the optional thread group number, the "thread-set" specification must
repeated. It is not permitted to use different thread groups even when using use the following format:
multiple directives. The <thread-set> specification must use the format:
all | odd | even | number[-[number]] "all" | "odd" | "even" | [number][-[number]]
Ranges can be partially defined. The higher bound can be omitted. In such a As their names imply, "all" validates all threads within the set (either all
case, it is replaced by the corresponding maximum value. The main purpose is of the group's when a group is specified, or all of the process' threads),
to have multiple bind lines sharing the same IP:port but not the same thread "odd" validates all odd-numberred threads (every other thread starting at 1)
in a listener, so that the system can distribute the incoming connections either for the process or the group, and "even" validates all even-numberred
into multiple queues, bypassing haproxy's internal queue load balancing. threads (every other thread starting at 2). If instead thread number ranges
Currently Linux 3.9 and above is known for supporting this. are used, then all threads included in the range from the first to the last
thread number are validated. The numbers are either relative to the group
or absolute dependeing on the presence of a thread group number. If the first
thread number is omitted, "1" is used, representing either the first thread
of the group or the first thread of the process. If the last thread number is
omitted, either the last thread number of the group (32 or 64) is used, or
the last thread number of the process (global.nbthread).
These ranges may be repeated and delimited by a comma, so that non-contiguous
thread sets can be specified, and the group, if present, must be specified
again for each new range. Note that it is not permitted to mix group-relative
and absolute specifications because the whole "bind" line must use either
an absolute notation or a relative one, as those not set will be resolved at
the end of the parsing.
The main purpose is to have multiple bind lines sharing the same IP:port but
not the same thread in a listener, so that the system can distribute the
incoming connections into multiple queues, bypassing haproxy's internal queue
load balancing. Currently Linux 3.9 and above is known for supporting this.
tls-ticket-keys <keyfile> tls-ticket-keys <keyfile>
Sets the TLS ticket keys file to load the keys from. The keys need to be 48 Sets the TLS ticket keys file to load the keys from. The keys need to be 48

View File

@ -204,6 +204,7 @@ struct bind_conf {
char *file; /* file where the section appears */ char *file; /* file where the section appears */
int line; /* line where the section appears */ int line; /* line where the section appears */
__decl_thread(HA_RWLOCK_T sni_lock); /* lock the SNI trees during add/del operations */ __decl_thread(HA_RWLOCK_T sni_lock); /* lock the SNI trees during add/del operations */
struct thread_set thread_set; /* entire set of the allowed threads (0=no restriction) */
unsigned long bind_thread; /* bitmask of threads allowed on this bind_conf */ unsigned long bind_thread; /* bitmask of threads allowed on this bind_conf */
uint bind_tgroup; /* thread group ID: 0=global IDs, non-zero=local IDs */ uint bind_tgroup; /* thread group ID: 0=global IDs, non-zero=local IDs */
struct rx_settings settings; /* all the settings needed for the listening socket */ struct rx_settings settings; /* all the settings needed for the listening socket */

View File

@ -44,7 +44,7 @@ void ha_tkill(unsigned int thr, int sig);
void ha_tkillall(int sig); void ha_tkillall(int sig);
void ha_thread_relax(void); void ha_thread_relax(void);
int thread_map_to_groups(); int thread_map_to_groups();
int thread_resolve_group_mask(struct thread_set *ts, uint *ogid, ulong *omask, char **err); int thread_resolve_group_mask(struct thread_set *ts, int defgrp, uint *ogid, ulong *omask, char **err);
int parse_thread_set(const char *arg, struct thread_set *ts, char **err); int parse_thread_set(const char *arg, struct thread_set *ts, char **err);
extern int thread_cpus_enabled_at_boot; extern int thread_cpus_enabled_at_boot;

View File

@ -2954,13 +2954,18 @@ init_proxies_list_stage1:
/* detect and address thread affinity inconsistencies */ /* detect and address thread affinity inconsistencies */
err = NULL; err = NULL;
if (thread_resolve_group_mask(bind_conf->bind_tgroup, bind_conf->bind_thread, if (thread_resolve_group_mask(&bind_conf->thread_set, 1,
&bind_conf->bind_tgroup, &bind_conf->bind_thread, &err) < 0) { &bind_conf->bind_tgroup, &bind_conf->bind_thread, &err) < 0) {
ha_alert("Proxy '%s': %s in 'bind %s' at [%s:%d].\n", ha_alert("Proxy '%s': %s in 'bind %s' at [%s:%d].\n",
curproxy->id, err, bind_conf->arg, bind_conf->file, bind_conf->line); curproxy->id, err, bind_conf->arg, bind_conf->file, bind_conf->line);
free(err); free(err);
cfgerr++; cfgerr++;
} }
else if (bind_conf->thread_set.nbgrp > 1) {
ha_alert("Proxy '%s': 'thread' spans more than one group in 'bind %s' at [%s:%d].\n",
curproxy->id, bind_conf->arg, bind_conf->file, bind_conf->line);
cfgerr++;
}
/* apply thread masks and groups to all receivers */ /* apply thread masks and groups to all receivers */
list_for_each_entry(li, &bind_conf->listeners, by_bind) { list_for_each_entry(li, &bind_conf->listeners, by_bind) {
@ -4431,13 +4436,18 @@ init_proxies_list_stage2:
} }
err = NULL; err = NULL;
if (thread_resolve_group_mask(bind_conf->bind_tgroup, bind_conf->bind_thread, if (thread_resolve_group_mask(&bind_conf->thread_set, (curproxy == global.cli_fe) ? 1 : 0,
&bind_conf->bind_tgroup, &bind_conf->bind_thread, &err) < 0) { &bind_conf->bind_tgroup, &bind_conf->bind_thread, &err) < 0) {
ha_alert("Peers section '%s': %s in 'bind %s' at [%s:%d].\n", ha_alert("Peers section '%s': %s in 'bind %s' at [%s:%d].\n",
curpeers->peers_fe->id, err, bind_conf->arg, bind_conf->file, bind_conf->line); curpeers->peers_fe->id, err, bind_conf->arg, bind_conf->file, bind_conf->line);
free(err); free(err);
cfgerr++; cfgerr++;
} }
else if (bind_conf->thread_set.nbgrp > 1) {
ha_alert("Peers section '%s': 'thread' spans more than one group in 'bind %s' at [%s:%d].\n",
curpeers->peers_fe->id, bind_conf->arg, bind_conf->file, bind_conf->line);
cfgerr++;
}
/* apply thread masks and groups to all receivers */ /* apply thread masks and groups to all receivers */
list_for_each_entry(li, &bind_conf->listeners, by_bind) { list_for_each_entry(li, &bind_conf->listeners, by_bind) {

View File

@ -494,7 +494,6 @@ static int cli_parse_global(char **args, int section_type, struct proxy *curpx,
} }
bind_conf->level &= ~ACCESS_LVL_MASK; bind_conf->level &= ~ACCESS_LVL_MASK;
bind_conf->level |= ACCESS_LVL_OPER; /* default access level */ bind_conf->level |= ACCESS_LVL_OPER; /* default access level */
bind_conf->bind_tgroup = 1; // bind to a single group in any case
if (!str2listener(args[2], global.cli_fe, bind_conf, file, line, err)) { if (!str2listener(args[2], global.cli_fe, bind_conf, file, line, err)) {
memprintf(err, "parsing [%s:%d] : '%s %s' : %s\n", memprintf(err, "parsing [%s:%d] : '%s %s' : %s\n",
@ -2999,7 +2998,6 @@ struct bind_conf *mworker_cli_proxy_new_listener(char *line)
bind_conf->level &= ~ACCESS_LVL_MASK; bind_conf->level &= ~ACCESS_LVL_MASK;
bind_conf->level |= ACCESS_LVL_ADMIN; bind_conf->level |= ACCESS_LVL_ADMIN;
bind_conf->level |= ACCESS_MASTER | ACCESS_MASTER_ONLY; bind_conf->level |= ACCESS_MASTER | ACCESS_MASTER_ONLY;
bind_conf->bind_tgroup = 1; // bind to a single group in any case
if (!str2listener(args[0], mworker_proxy, bind_conf, "master-socket", 0, &err)) { if (!str2listener(args[0], mworker_proxy, bind_conf, "master-socket", 0, &err)) {
ha_alert("Cannot create the listener of the master CLI\n"); ha_alert("Cannot create the listener of the master CLI\n");
@ -3096,7 +3094,6 @@ int mworker_cli_sockpair_new(struct mworker_proc *mworker_proc, int proc)
bind_conf->level &= ~ACCESS_LVL_MASK; bind_conf->level &= ~ACCESS_LVL_MASK;
bind_conf->level |= ACCESS_LVL_ADMIN; /* TODO: need to lower the rights with a CLI keyword*/ bind_conf->level |= ACCESS_LVL_ADMIN; /* TODO: need to lower the rights with a CLI keyword*/
bind_conf->level |= ACCESS_FD_LISTENERS; bind_conf->level |= ACCESS_FD_LISTENERS;
bind_conf->bind_tgroup = 1; // bind to a single group in any case
if (!memprintf(&path, "sockpair@%d", mworker_proc->ipc_fd[1])) { if (!memprintf(&path, "sockpair@%d", mworker_proc->ipc_fd[1])) {
ha_alert("Cannot allocate listener.\n"); ha_alert("Cannot allocate listener.\n");

View File

@ -1801,41 +1801,15 @@ static int bind_parse_shards(char **args, int cur_arg, struct proxy *px, struct
return 0; return 0;
} }
/* parse the "thread" bind keyword */ /* parse the "thread" bind keyword. This will replace any preset thread_set */
static int bind_parse_thread(char **args, int cur_arg, struct proxy *px, struct bind_conf *conf, char **err) static int bind_parse_thread(char **args, int cur_arg, struct proxy *px, struct bind_conf *conf, char **err)
{ {
char *sep = NULL; /* note that the thread set is zeroed before first call, and we don't
ulong thread = 0; * want to reset it so that it remains possible to chain multiple
long tgroup = 0; * "thread" directives.
*/
tgroup = strtol(args[cur_arg + 1], &sep, 10); if (parse_thread_set(args[cur_arg+1], &conf->thread_set, err) < 0)
if (*sep == '/') {
/* a thread group was present */
if (tgroup < 1 || tgroup > MAX_TGROUPS) {
memprintf(err, "'%s' thread-group number must be between 1 and %d (was %ld)", args[cur_arg + 1], MAX_TGROUPS, tgroup);
return ERR_ALERT | ERR_FATAL; return ERR_ALERT | ERR_FATAL;
}
sep++;
}
else {
/* no thread group */
tgroup = 0;
sep = args[cur_arg + 1];
}
if ((conf->bind_tgroup || conf->bind_thread) &&
conf->bind_tgroup != tgroup) {
memprintf(err, "'%s' multiple thread-groups are not supported", args[cur_arg + 1]);
return ERR_ALERT | ERR_FATAL;
}
if (parse_process_number(sep, &thread, LONGBITS, NULL, err)) {
memprintf(err, "'%s' : %s", sep, *err);
return ERR_ALERT | ERR_FATAL;
}
conf->bind_thread |= thread;
conf->bind_tgroup = tgroup;
return 0; return 0;
} }

View File

@ -1221,79 +1221,73 @@ int thread_map_to_groups()
return 0; return 0;
} }
/* converts a configuration thread num or group+mask to a global group+mask /* Converts a configuration thread set based on either absolute or relative
* depending on the configured thread group id. This is essentially for use * thread numbers into a global group+mask. This is essentially for use with
* with the "thread" directive on "bind" lines, where "thread 4-6" might be * the "thread" directive on "bind" lines, where "thread 4-6,10-12" might be
* turned to "2/1-3". It cannot be used before the thread mapping above was * turned to "2/1-3,4/1-3". It cannot be used before the thread mapping above
* completed and the thread group number configured. Possible options: * was completed and the thread group numbers configured. The thread_set is
* - igid == 0: imask represents global IDs. We have to check that all * replaced by the resolved group-based one. It is possible to force a single
* configured threads in the mask belong to the same group. If imask is zero * default group for unspecified sets instead of enabling all groups by passing
* it means everything, so for now we only support this with a single group. * this group's non-zero value to defgrp. The output ogid and omask are set,
* - igid > 0, imask = 0: convert global values to local values for this thread * respectively, to the first non-empty group and its mask. They're used only
* - igid > 0, imask > 0: convert global values to local values * for the transition to the new model.
* Note that the output mask is always local to the group.
* *
* Returns <0 on failure, >=0 on success. * Returns <0 on failure, >=0 on success.
*/ */
int thread_resolve_group_mask(uint igid, ulong imask, uint *ogid, ulong *omask, char **err) int thread_resolve_group_mask(struct thread_set *ts, int defgrp, uint *ogid, ulong *omask, char **err)
{ {
ulong mask; struct thread_set new_ts = { 0 };
uint t; ulong mask, imask;
uint g;
if (igid == 0) { if (!ts->nbgrp) {
/* unspecified group, IDs are global */ /* unspecified group, IDs are global */
if (!imask) { if (thread_set_is_empty(ts)) {
/* all threads of all groups */ /* all threads of all groups, unless defgrp is set and
if (global.nbtgroups > 1) { * we then set it as the only group.
memprintf(err, "'thread' directive spans multiple groups"); */
return -1; for (g = defgrp ? defgrp-1 : 0; g < (defgrp ? defgrp : global.nbtgroups); g++) {
} new_ts.rel[g] = ha_tgroup_info[g].threads_enabled;
*ogid = 1; // first and only group new_ts.nbgrp++;
*omask = ha_tgroup_info[0].threads_enabled;
return 0;
} else {
/* some global threads */
for (t = 0; t < global.nbthread; t++) {
if (imask & (1UL << t)) {
if (ha_thread_info[t].tgid != igid) {
if (!igid)
igid = ha_thread_info[t].tgid;
else {
memprintf(err, "'thread' directive spans multiple groups (at least %u and %u)", igid, ha_thread_info[t].tgid);
return -1;
}
}
}
}
if (!igid) {
memprintf(err, "'thread' directive contains threads that belong to no group");
return -1;
}
/* we have a valid group, convert this to global thread IDs */
*ogid = igid;
imask = imask >> ha_tgroup_info[igid - 1].base;
imask &= ha_tgroup_info[igid - 1].threads_enabled;
*omask = imask;
return 0;
} }
} else { } else {
/* group was specified */ /* some absolute threads are set, we must remap them to
if (igid > global.nbtgroups) { * relative ones. Each group cannot have more than
memprintf(err, "'thread' directive references non-existing thread group %u", igid); * LONGBITS threads, thus it spans at most two absolute
* blocks.
*/
for (g = 0; g < global.nbtgroups; g++) {
uint block = ha_tgroup_info[g].base / LONGBITS;
uint base = ha_tgroup_info[g].base % LONGBITS;
mask = ts->abs[block] >> base;
if (base && ha_tgroup_info[g].count > (LONGBITS - base))
mask |= ts->abs[block + 1] << (LONGBITS - base);
mask &= nbits(ha_tgroup_info[g].count);
mask &= ha_tgroup_info[g].threads_enabled;
/* now the mask exactly matches the threads to be enabled
* in this group.
*/
if (!new_ts.rel[g] && mask)
new_ts.nbgrp++;
new_ts.rel[g] |= mask;
}
}
} else {
/* groups were specified */
for (g = 0; g < MAX_TGROUPS; g++) {
imask = ts->rel[g];
if (!imask)
continue;
if (g >= global.nbtgroups) {
memprintf(err, "'thread' directive references non-existing thread group %u", g+1);
return -1; return -1;
} }
if (!imask) { /* some relative threads are set. Keep only existing ones for this group */
/* all threads of this groups. Let's make a mask from their count and base. */ mask = nbits(ha_tgroup_info[g].count);
*ogid = igid;
*omask = nbits(ha_tgroup_info[igid - 1].count);
return 0;
} else {
/* some local threads. Keep only existing ones for this group */
mask = nbits(ha_tgroup_info[igid - 1].count);
if (!(mask & imask)) { if (!(mask & imask)) {
/* no intersection between the thread group's /* no intersection between the thread group's