diff --git a/include/haproxy/listener.h b/include/haproxy/listener.h index d7d6bf9e8..f7599525c 100644 --- a/include/haproxy/listener.h +++ b/include/haproxy/listener.h @@ -122,6 +122,8 @@ void unbind_listener(struct listener *listener); */ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss, int portl, int porth, int fd, struct protocol *proto, char **err); +struct shard_info *shard_info_attach(struct receiver *rx, struct shard_info *si); +void shard_info_detach(struct receiver *rx); struct listener *clone_listener(struct listener *src); /* Delete a listener from its protocol's list of listeners. The listener's diff --git a/include/haproxy/receiver-t.h b/include/haproxy/receiver-t.h index 0dfca42dd..07155896e 100644 --- a/include/haproxy/receiver-t.h +++ b/include/haproxy/receiver-t.h @@ -52,6 +52,17 @@ struct rx_settings { int shards; /* number of shards, 0=not set yet, -1="by-thread" */ }; +/* info about a shard that is shared between multiple groups. Receivers that + * are alone in their shard do not have a shard_info. + */ +struct shard_info { + uint nbgroups; /* number of groups in this shard (=#rx); Zero = unused. */ + uint nbthreads; /* number of threads in this shard (>=nbgroups) */ + ulong tgroup_mask; /* bitmask of thread groups having a member here */ + struct receiver *ref; /* first one, reference for FDs to duplicate */ + struct receiver *members[MAX_TGROUPS]; /* all members of the shard (one per thread group) */ +}; + /* This describes a receiver with all its characteristics (address, options, etc) */ struct receiver { int fd; /* handle we receive from (fd only for now) */ @@ -62,6 +73,7 @@ struct receiver { unsigned long bind_thread; /* bitmask of threads allowed on this receiver */ uint bind_tgroup; /* thread group ID: 0=global IDs, non-zero=local IDs */ struct rx_settings *settings; /* points to the settings used by this receiver */ + struct shard_info *shard_info; /* points to info about the owning shard, NULL if single rx */ struct list proto_list; /* list in the protocol header */ #ifdef USE_QUIC struct mt_list rxbuf_list; /* list of buffers to receive and dispatch QUIC datagrams. */ diff --git a/src/listener.c b/src/listener.c index e441eff2e..eb872e6f3 100644 --- a/src/listener.c +++ b/src/listener.c @@ -806,13 +806,84 @@ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss, return 1; } +/* Optionally allocates a new shard info (if si == NULL) for receiver rx and + * assigns it to it, or attaches to an existing one. If the rx already had a + * shard_info, it is simply returned. It is illegal to call this function with + * an rx that's part of a group that is already attached. Attaching means the + * shard_info's thread count and group count are updated so the rx's group is + * added to the shard_info's group mask. The rx are added to the members in the + * attachment order, though it must not matter. It is meant for boot time setup + * and is not thread safe. NULL is returned on allocation failure. + */ +struct shard_info *shard_info_attach(struct receiver *rx, struct shard_info *si) +{ + if (rx->shard_info) + return rx->shard_info; + + if (!si) { + si = calloc(1, sizeof(*si)); + if (!si) + return NULL; + + si->ref = rx; + } + + rx->shard_info = si; + BUG_ON (si->tgroup_mask & 1UL << (rx->bind_tgroup - 1)); + si->tgroup_mask |= 1UL << (rx->bind_tgroup - 1); + si->nbgroups = my_popcountl(si->tgroup_mask); + si->nbthreads += my_popcountl(rx->bind_thread); + si->members[si->nbgroups - 1] = rx; + return si; +} + +/* Detaches the rx from an optional shard_info it may be attached to. If so, + * the thread counts, group masks and refcounts are updated. The members list + * remains contiguous by replacing the current entry with the last one. The + * reference continues to point to the first receiver. If the group count + * reaches zero, the shard_info is automatically released. + */ +void shard_info_detach(struct receiver *rx) +{ + struct shard_info *si = rx->shard_info; + uint gr; + + if (!si) + return; + + rx->shard_info = NULL; + + /* find the member slot this rx was attached to */ + for (gr = 0; gr < MAX_TGROUPS && si->members[gr] != rx; gr++) + ; + + BUG_ON(gr == MAX_TGROUPS); + + si->nbthreads -= my_popcountl(rx->bind_thread); + si->tgroup_mask &= ~(1UL << (rx->bind_tgroup - 1)); + si->nbgroups = my_popcountl(si->tgroup_mask); + + /* replace the member by the last one. If we removed the reference, we + * have to switch to another one. It's always the first entry so we can + * simply enforce it upon every removal. + */ + si->members[gr] = si->members[si->nbgroups]; + si->members[si->nbgroups] = NULL; + si->ref = si->members[0]; + + if (!si->nbgroups) + free(si); +} + /* clones listener and returns the new one. All dynamically allocated * fields are reallocated (name for now). The new listener is inserted before * the original one in the bind_conf and frontend lists. This allows it to be * duplicated while iterating over the current list. The original listener must * only be in the INIT or ASSIGNED states, and the new listener will only be * placed into the INIT state. The counters are always set to NULL. Maxsock is - * updated. Returns NULL on allocation error. + * updated. Returns NULL on allocation error. The shard_info is never taken so + * that the caller can decide what to do with it depending on how it intends to + * clone the listener. */ struct listener *clone_listener(struct listener *src) { @@ -830,6 +901,7 @@ struct listener *clone_listener(struct listener *src) } l->rx.owner = l; + l->rx.shard_info = NULL; l->state = LI_INIT; l->counters = NULL; l->extra_counters = NULL; @@ -865,6 +937,7 @@ void __delete_listener(struct listener *listener) if (listener->state == LI_ASSIGNED) { listener_set_state(listener, LI_INIT); LIST_DELETE(&listener->rx.proto_list); + shard_info_detach(&listener->rx); listener->rx.proto->nb_receivers--; _HA_ATOMIC_DEC(&jobs); _HA_ATOMIC_DEC(&listeners);