diff --git a/include/haproxy/listener.h b/include/haproxy/listener.h index afb4530bf..46d9ee27a 100644 --- a/include/haproxy/listener.h +++ b/include/haproxy/listener.h @@ -184,6 +184,14 @@ int default_suspend_listener(struct listener *l); */ int default_resume_listener(struct listener *l); +/* Applies the thread mask, shards etc to the bind_conf. It normally returns 0 + * otherwie the number of errors. Upon error it may set error codes (ERR_*) in + * err_code. It is supposed to be called only once very late in the boot process + * after the bind_conf's thread_set is fixed. The function may emit warnings and + * alerts. Extra listeners may be created on the fly. + */ +int bind_complete_thread_setup(struct bind_conf *bind_conf, int *err_code); + /* * Registers the bind keyword list as a list of valid keywords for next * parsing sessions. diff --git a/src/cfgparse.c b/src/cfgparse.c index 9855ee1cc..6a9d42d65 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -2930,7 +2930,7 @@ init_proxies_list_stage1: /* check and reduce the bind-proc of each listener */ list_for_each_entry(bind_conf, &curproxy->conf.bind, by_fe) { - struct listener *li; + int ret; /* HTTP frontends with "h2" as ALPN/NPN will work in * HTTP/2 and absolutely require buffers 16kB or larger. @@ -2986,130 +2986,12 @@ init_proxies_list_stage1: } /* HTTP && bufsize < 16384 */ #endif - /* detect and address thread affinity inconsistencies */ - err = NULL; - if (thread_resolve_group_mask(&bind_conf->thread_set, (curproxy == global.cli_fe) ? 1 : 0, &err) < 0) { - ha_alert("Proxy '%s': %s in 'bind %s' at [%s:%d].\n", - curproxy->id, err, bind_conf->arg, bind_conf->file, bind_conf->line); - free(err); - cfgerr++; - continue; - } - - /* apply thread masks and groups to all receivers */ - list_for_each_entry(li, &bind_conf->listeners, by_bind) { - struct listener *new_li, *ref; - struct thread_set new_ts; - int shard, shards, todo, done, grp, dups; - ulong mask, gmask, bit; - - shards = bind_conf->settings.shards; - todo = thread_set_count(&bind_conf->thread_set); - - /* special values: -1 = "by-thread", -2 = "by-group" */ - if (shards == -1) - shards = todo; - else if (shards == -2) - shards = my_popcountl(bind_conf->thread_set.grps); - - /* no more shards than total threads */ - if (shards > todo) - shards = todo; - - shard = done = grp = bit = mask = 0; - new_li = li; - - while (shard < shards) { - memset(&new_ts, 0, sizeof(new_ts)); - while (grp < global.nbtgroups && done < todo) { - /* enlarge mask to cover next bit of bind_thread till we - * have enough bits for one shard. We restart from the - * current grp+bit. - */ - - /* first let's find the first non-empty group starting at */ - if (!(bind_conf->thread_set.rel[grp] & ha_tgroup_info[grp].threads_enabled & ~mask)) { - grp++; - mask = 0; - continue; - } - - /* take next unassigned bit */ - bit = (bind_conf->thread_set.rel[grp] & ~mask) & -(bind_conf->thread_set.rel[grp] & ~mask); - new_ts.rel[grp] |= bit; - mask |= bit; - new_ts.grps |= 1UL << grp; - - done += shards; - }; - - BUG_ON(!new_ts.grps); // no more bits left unassigned - - /* Create all required listeners for all bound groups. If more than one group is - * needed, the first receiver serves as a reference, and subsequent ones point to - * it. We already have a listener available in new_li() so we only allocate a new - * one if we're not on the last one. We count the remaining groups by copying their - * mask into and dropping the lowest bit at the end of the loop until there - * is no more. Ah yes, it's not pretty :-/ - */ - ref = new_li; - gmask = new_ts.grps; - for (dups = 0; gmask; dups++) { - /* assign the first (and only) thread and group */ - new_li->rx.bind_thread = thread_set_nth_tmask(&new_ts, dups); - new_li->rx.bind_tgroup = thread_set_nth_group(&new_ts, dups); - - if (dups) { - /* it has been allocated already in the previous round */ - shard_info_attach(&new_li->rx, ref->rx.shard_info); - new_li->rx.flags |= RX_F_MUST_DUP; - } - - gmask &= gmask - 1; // drop lowest bit - if (gmask) { - /* yet another listener expected in this shard, let's - * chain it. - */ - struct listener *tmp_li = clone_listener(new_li); - - if (!tmp_li) { - ha_alert("Out of memory while trying to allocate extra listener for group %u of shard %d in %s %s\n", - new_li->rx.bind_tgroup, shard, proxy_type_str(curproxy), curproxy->id); - cfgerr++; - err_code |= ERR_FATAL | ERR_ALERT; - goto out; - } - - /* if we're forced to create at least two listeners, we have to - * allocate a shared shard_info that's linked to from the reference - * and each other listener, so we'll create it here. - */ - if (!shard_info_attach(&ref->rx, NULL)) { - ha_alert("Out of memory while trying to allocate shard_info for listener for group %u of shard %d in %s %s\n", - new_li->rx.bind_tgroup, shard, proxy_type_str(curproxy), curproxy->id); - cfgerr++; - err_code |= ERR_FATAL | ERR_ALERT; - goto out; - } - new_li = tmp_li; - } - } - done -= todo; - - shard++; - if (shard >= shards) - break; - - /* create another listener for new shards */ - new_li = clone_listener(li); - if (!new_li) { - ha_alert("Out of memory while trying to allocate extra listener for shard %d in %s %s\n", - shard, proxy_type_str(curproxy), curproxy->id); - cfgerr++; - err_code |= ERR_FATAL | ERR_ALERT; - goto out; - } - } + /* finish the bind setup */ + ret = bind_complete_thread_setup(bind_conf, &err_code); + if (ret != 0) { + cfgerr += ret; + if (err_code & ERR_FATAL) + goto out; } } diff --git a/src/listener.c b/src/listener.c index 6a0460f32..67b64f69b 100644 --- a/src/listener.c +++ b/src/listener.c @@ -1636,6 +1636,146 @@ struct task *manage_global_listener_queue(struct task *t, void *context, unsigne return t; } +/* Applies the thread mask, shards etc to the bind_conf. It normally returns 0 + * otherwie the number of errors. Upon error it may set error codes (ERR_*) in + * err_code. It is supposed to be called only once very late in the boot process + * after the bind_conf's thread_set is fixed. The function may emit warnings and + * alerts. Extra listeners may be created on the fly. + */ +int bind_complete_thread_setup(struct bind_conf *bind_conf, int *err_code) +{ + struct proxy *fe = bind_conf->frontend; + struct listener *li, *new_li, *ref; + struct thread_set new_ts; + int shard, shards, todo, done, grp, dups; + ulong mask, gmask, bit; + int cfgerr = 0; + char *err; + + err = NULL; + if (thread_resolve_group_mask(&bind_conf->thread_set, (fe == global.cli_fe) ? 1 : 0, &err) < 0) { + ha_alert("Proxy '%s': %s in 'bind %s' at [%s:%d].\n", + fe->id, err, bind_conf->arg, bind_conf->file, bind_conf->line); + free(err); + cfgerr++; + return cfgerr; + } + + /* apply thread masks and groups to all receivers */ + list_for_each_entry(li, &bind_conf->listeners, by_bind) { + shards = bind_conf->settings.shards; + todo = thread_set_count(&bind_conf->thread_set); + + /* special values: -1 = "by-thread", -2 = "by-group" */ + if (shards == -1) + shards = todo; + else if (shards == -2) + shards = my_popcountl(bind_conf->thread_set.grps); + + /* no more shards than total threads */ + if (shards > todo) + shards = todo; + + shard = done = grp = bit = mask = 0; + new_li = li; + + while (shard < shards) { + memset(&new_ts, 0, sizeof(new_ts)); + while (grp < global.nbtgroups && done < todo) { + /* enlarge mask to cover next bit of bind_thread till we + * have enough bits for one shard. We restart from the + * current grp+bit. + */ + + /* first let's find the first non-empty group starting at */ + if (!(bind_conf->thread_set.rel[grp] & ha_tgroup_info[grp].threads_enabled & ~mask)) { + grp++; + mask = 0; + continue; + } + + /* take next unassigned bit */ + bit = (bind_conf->thread_set.rel[grp] & ~mask) & -(bind_conf->thread_set.rel[grp] & ~mask); + new_ts.rel[grp] |= bit; + mask |= bit; + new_ts.grps |= 1UL << grp; + + done += shards; + }; + + BUG_ON(!new_ts.grps); // no more bits left unassigned + + /* Create all required listeners for all bound groups. If more than one group is + * needed, the first receiver serves as a reference, and subsequent ones point to + * it. We already have a listener available in new_li() so we only allocate a new + * one if we're not on the last one. We count the remaining groups by copying their + * mask into and dropping the lowest bit at the end of the loop until there + * is no more. Ah yes, it's not pretty :-/ + */ + ref = new_li; + gmask = new_ts.grps; + for (dups = 0; gmask; dups++) { + /* assign the first (and only) thread and group */ + new_li->rx.bind_thread = thread_set_nth_tmask(&new_ts, dups); + new_li->rx.bind_tgroup = thread_set_nth_group(&new_ts, dups); + + if (dups) { + /* it has been allocated already in the previous round */ + shard_info_attach(&new_li->rx, ref->rx.shard_info); + new_li->rx.flags |= RX_F_MUST_DUP; + } + + gmask &= gmask - 1; // drop lowest bit + if (gmask) { + /* yet another listener expected in this shard, let's + * chain it. + */ + struct listener *tmp_li = clone_listener(new_li); + + if (!tmp_li) { + ha_alert("Out of memory while trying to allocate extra listener for group %u of shard %d in %s %s\n", + new_li->rx.bind_tgroup, shard, proxy_type_str(fe), fe->id); + cfgerr++; + *err_code |= ERR_FATAL | ERR_ALERT; + return cfgerr; + } + + /* if we're forced to create at least two listeners, we have to + * allocate a shared shard_info that's linked to from the reference + * and each other listener, so we'll create it here. + */ + if (!shard_info_attach(&ref->rx, NULL)) { + ha_alert("Out of memory while trying to allocate shard_info for listener for group %u of shard %d in %s %s\n", + new_li->rx.bind_tgroup, shard, proxy_type_str(fe), fe->id); + cfgerr++; + *err_code |= ERR_FATAL | ERR_ALERT; + return cfgerr; + } + new_li = tmp_li; + } + } + done -= todo; + + shard++; + if (shard >= shards) + break; + + /* create another listener for new shards */ + new_li = clone_listener(li); + if (!new_li) { + ha_alert("Out of memory while trying to allocate extra listener for shard %d in %s %s\n", + shard, proxy_type_str(fe), fe->id); + cfgerr++; + *err_code |= ERR_FATAL | ERR_ALERT; + return cfgerr; + } + } + } + + /* success */ + return cfgerr; +} + /* * Registers the bind keyword list as a list of valid keywords for next * parsing sessions.