mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-10 09:07:02 +02:00
MINOR: peers: Support for peer shards
Add "shards" new keyword for "peers" section to configure the number of peer shards attached to such secions. This impact all the stick-tables attached to the section. Add "shard" new "server" parameter to configure the peers which participate to all the stick-tables contents distribution. Each peer receive the stick-tables updates only for keys with this shard value as distribution hash. The "shard" value is stored in ->shard new server struct member. cfg_parse_peers() which is the function which is called to parse all the lines of a "peers" section is modified to parse the "shards" parameter stored in ->nb_shards new peers struct member. Add srv_parse_shard() new callback into server.c to pare the "shard" parameter. Implement stksess_getkey_hash() to compute the distribution hash for a stick-table key as the 64-bits xxhash of the key concatenated to the stick-table name. This function is called by stksess_setkey_shard(), itself called by the already implemented function which create a new stick-table key (stksess_new()). Add ->idlen new stktable struct member to store the stick-table name length to not have to compute it each time a stick-table key hash is computed.
This commit is contained in:
parent
7941ead3aa
commit
36d1565640
@ -3428,6 +3428,13 @@ server <peername> [<ip>:<port>] [param*]
|
|||||||
server haproxy2 192.168.0.2:1024
|
server haproxy2 192.168.0.2:1024
|
||||||
server haproxy3 10.2.0.1:1024
|
server haproxy3 10.2.0.1:1024
|
||||||
|
|
||||||
|
shards <shards>
|
||||||
|
|
||||||
|
In some configurations, one would like to distribute the stick-table contents
|
||||||
|
to some peers in place of sending all the stick-table contents to each peer
|
||||||
|
declared in the "peers" section. In such cases, "shards" specifies the
|
||||||
|
number of peer involved in this stick-table contents distribution.
|
||||||
|
See also "shard" server parameter.
|
||||||
|
|
||||||
table <tablename> type {ip | integer | string [len <length>] | binary [len <length>]}
|
table <tablename> type {ip | integer | string [len <length>] | binary [len <length>]}
|
||||||
size <size> [expire <expire>] [nopurge] [store <data_type>]*
|
size <size> [expire <expire>] [nopurge] [store <data_type>]*
|
||||||
@ -15646,6 +15653,25 @@ send-proxy-v2-ssl-cn
|
|||||||
protocol. See also the "no-send-proxy-v2-ssl-cn" option of this section and
|
protocol. See also the "no-send-proxy-v2-ssl-cn" option of this section and
|
||||||
the "send-proxy-v2" option of the "bind" keyword.
|
the "send-proxy-v2" option of the "bind" keyword.
|
||||||
|
|
||||||
|
shard <shard>
|
||||||
|
This parameter in used only in the context of stick-tables synchronisation
|
||||||
|
with peers protocol. The "shard" parameter identifies the peers which will
|
||||||
|
receive all the stick-table updates for keys with this shard as distribution
|
||||||
|
hash. The accepted values are 0 up to "shards" parameter value specified in
|
||||||
|
the "peers" section. 0 value is the default value meaning that the peer will
|
||||||
|
receive all the key updates. Greater values than "shards" will be ignored.
|
||||||
|
This is also the case for any value provided to the local peer.
|
||||||
|
|
||||||
|
Example :
|
||||||
|
|
||||||
|
peers mypeers
|
||||||
|
shards 3
|
||||||
|
peer A 127.0.0.1:40001 # local peer without shard value (0 internally)
|
||||||
|
peer B 127.0.0.1:40002 shard 1
|
||||||
|
peer C 127.0.0.1:40003 shard 2
|
||||||
|
peer D 127.0.0.1:40004 shard 3
|
||||||
|
|
||||||
|
|
||||||
slowstart <start_time_in_ms>
|
slowstart <start_time_in_ms>
|
||||||
The "slowstart" parameter for a server accepts a value in milliseconds which
|
The "slowstart" parameter for a server accepts a value in milliseconds which
|
||||||
indicates after how long a server which has just come back up will run at
|
indicates after how long a server which has just come back up will run at
|
||||||
|
@ -101,6 +101,7 @@ struct peers {
|
|||||||
unsigned int flags; /* current peers section resync state */
|
unsigned int flags; /* current peers section resync state */
|
||||||
unsigned int resync_timeout; /* resync timeout timer */
|
unsigned int resync_timeout; /* resync timeout timer */
|
||||||
int count; /* total of peers */
|
int count; /* total of peers */
|
||||||
|
int nb_shards; /* Number of peer shards */
|
||||||
int disabled; /* peers proxy disabled if >0 */
|
int disabled; /* peers proxy disabled if >0 */
|
||||||
int applet_count[MAX_THREADS]; /* applet count per thread */
|
int applet_count[MAX_THREADS]; /* applet count per thread */
|
||||||
};
|
};
|
||||||
|
@ -264,6 +264,7 @@ struct server {
|
|||||||
unsigned rweight; /* remainder of weight in the current LB tree */
|
unsigned rweight; /* remainder of weight in the current LB tree */
|
||||||
unsigned cumulative_weight; /* weight of servers prior to this one in the same group, for chash balancing */
|
unsigned cumulative_weight; /* weight of servers prior to this one in the same group, for chash balancing */
|
||||||
int maxqueue; /* maximum number of pending connections allowed */
|
int maxqueue; /* maximum number of pending connections allowed */
|
||||||
|
int shard; /* shard (in peers protocol context only) */
|
||||||
|
|
||||||
enum srv_ws_mode ws; /* configure the protocol selection for websocket */
|
enum srv_ws_mode ws; /* configure the protocol selection for websocket */
|
||||||
/* 3 bytes hole here */
|
/* 3 bytes hole here */
|
||||||
|
@ -145,6 +145,7 @@ struct stksess {
|
|||||||
unsigned int expire; /* session expiration date */
|
unsigned int expire; /* session expiration date */
|
||||||
unsigned int ref_cnt; /* reference count, can only purge when zero */
|
unsigned int ref_cnt; /* reference count, can only purge when zero */
|
||||||
__decl_thread(HA_RWLOCK_T lock); /* lock related to the table entry */
|
__decl_thread(HA_RWLOCK_T lock); /* lock related to the table entry */
|
||||||
|
int shard; /* shard */
|
||||||
struct eb32_node exp; /* ebtree node used to hold the session in expiration tree */
|
struct eb32_node exp; /* ebtree node used to hold the session in expiration tree */
|
||||||
struct eb32_node upd; /* ebtree node used to hold the update sequence tree */
|
struct eb32_node upd; /* ebtree node used to hold the update sequence tree */
|
||||||
struct ebmb_node key; /* ebtree node used to hold the session in table */
|
struct ebmb_node key; /* ebtree node used to hold the session in table */
|
||||||
@ -155,6 +156,7 @@ struct stksess {
|
|||||||
/* stick table */
|
/* stick table */
|
||||||
struct stktable {
|
struct stktable {
|
||||||
char *id; /* local table id name. */
|
char *id; /* local table id name. */
|
||||||
|
size_t idlen; /* local table id name length. */
|
||||||
char *nid; /* table id name sent over the network with peers protocol. */
|
char *nid; /* table id name sent over the network with peers protocol. */
|
||||||
struct stktable *next; /* The stick-table may be linked when belonging to
|
struct stktable *next; /* The stick-table may be linked when belonging to
|
||||||
* the same configuration section.
|
* the same configuration section.
|
||||||
|
@ -685,6 +685,7 @@ static struct peer *cfg_peers_add_peer(struct peers *peers,
|
|||||||
int cfg_parse_peers(const char *file, int linenum, char **args, int kwm)
|
int cfg_parse_peers(const char *file, int linenum, char **args, int kwm)
|
||||||
{
|
{
|
||||||
static struct peers *curpeers = NULL;
|
static struct peers *curpeers = NULL;
|
||||||
|
static int nb_shards = 0;
|
||||||
struct peer *newpeer = NULL;
|
struct peer *newpeer = NULL;
|
||||||
const char *err;
|
const char *err;
|
||||||
struct bind_conf *bind_conf;
|
struct bind_conf *bind_conf;
|
||||||
@ -905,6 +906,13 @@ int cfg_parse_peers(const char *file, int linenum, char **args, int kwm)
|
|||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (nb_shards && curpeers->peers_fe->srv->shard > nb_shards) {
|
||||||
|
ha_warning("parsing [%s:%d] : '%s %s' : %d peer shard greater value than %d shards value is ignored.\n",
|
||||||
|
file, linenum, args[0], args[1], curpeers->peers_fe->srv->shard, nb_shards);
|
||||||
|
curpeers->peers_fe->srv->shard = 0;
|
||||||
|
err_code |= ERR_WARN;
|
||||||
|
}
|
||||||
|
|
||||||
if (curpeers->peers_fe->srv->init_addr_methods || curpeers->peers_fe->srv->resolvers_id ||
|
if (curpeers->peers_fe->srv->init_addr_methods || curpeers->peers_fe->srv->resolvers_id ||
|
||||||
curpeers->peers_fe->srv->do_check || curpeers->peers_fe->srv->do_agent) {
|
curpeers->peers_fe->srv->do_check || curpeers->peers_fe->srv->do_agent) {
|
||||||
ha_warning("parsing [%s:%d] : '%s %s' : init_addr, resolvers, check and agent are ignored for peers.\n", file, linenum, args[0], args[1]);
|
ha_warning("parsing [%s:%d] : '%s %s' : init_addr, resolvers, check and agent are ignored for peers.\n", file, linenum, args[0], args[1]);
|
||||||
@ -966,6 +974,32 @@ int cfg_parse_peers(const char *file, int linenum, char **args, int kwm)
|
|||||||
l->options |= LI_O_UNLIMITED; /* don't make the peers subject to global limits */
|
l->options |= LI_O_UNLIMITED; /* don't make the peers subject to global limits */
|
||||||
global.maxsock++; /* for the listening socket */
|
global.maxsock++; /* for the listening socket */
|
||||||
}
|
}
|
||||||
|
else if (strcmp(args[0], "shards") == 0) {
|
||||||
|
char *endptr;
|
||||||
|
|
||||||
|
if (!*args[1]) {
|
||||||
|
ha_alert("parsing [%s:%d] : '%s' : missing value\n", file, linenum, args[0]);
|
||||||
|
err_code |= ERR_FATAL;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
|
||||||
|
curpeers->nb_shards = strtol(args[1], &endptr, 10);
|
||||||
|
if (*endptr != '\0') {
|
||||||
|
ha_alert("parsing [%s:%d] : '%s' : expects an integer argument, found '%s'\n",
|
||||||
|
file, linenum, args[0], args[1]);
|
||||||
|
err_code |= ERR_FATAL;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!curpeers->nb_shards) {
|
||||||
|
ha_alert("parsing [%s:%d] : '%s' : expects a strictly positive integer argument\n",
|
||||||
|
file, linenum, args[0]);
|
||||||
|
err_code |= ERR_FATAL;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
|
||||||
|
nb_shards = curpeers->nb_shards;
|
||||||
|
}
|
||||||
else if (strcmp(args[0], "table") == 0) {
|
else if (strcmp(args[0], "table") == 0) {
|
||||||
struct stktable *t, *other;
|
struct stktable *t, *other;
|
||||||
char *id;
|
char *id;
|
||||||
@ -4373,6 +4407,7 @@ int check_config_validity()
|
|||||||
*/
|
*/
|
||||||
last = &cfg_peers;
|
last = &cfg_peers;
|
||||||
while (*last) {
|
while (*last) {
|
||||||
|
struct peer *peer;
|
||||||
struct stktable *t;
|
struct stktable *t;
|
||||||
curpeers = *last;
|
curpeers = *last;
|
||||||
|
|
||||||
@ -4464,6 +4499,26 @@ int check_config_validity()
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
last = &curpeers->next;
|
last = &curpeers->next;
|
||||||
|
|
||||||
|
/* Ignore the peer shard greater than the number of peer shard for this section.
|
||||||
|
* Also ignore the peer shard of the local peer.
|
||||||
|
*/
|
||||||
|
for (peer = curpeers->remote; peer; peer = peer->next) {
|
||||||
|
if (peer == curpeers->local) {
|
||||||
|
if (peer->srv->shard) {
|
||||||
|
ha_warning("Peers section '%s': shard ignored for '%s' local peer\n",
|
||||||
|
curpeers->id, peer->id);
|
||||||
|
peer->srv->shard = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (peer->srv->shard > curpeers->nb_shards) {
|
||||||
|
ha_warning("Peers section '%s': shard ignored for '%s' local peer because "
|
||||||
|
"%d shard value is greater than the section number of shards (%d)\n",
|
||||||
|
curpeers->id, peer->id, peer->srv->shard, curpeers->nb_shards);
|
||||||
|
peer->srv->shard = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1597,6 +1597,13 @@ static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
|
|||||||
}
|
}
|
||||||
|
|
||||||
updateid = ts->upd.key;
|
updateid = ts->upd.key;
|
||||||
|
if (p->srv->shard && ts->shard != p->srv->shard) {
|
||||||
|
/* Skip this entry */
|
||||||
|
st->last_pushed = updateid;
|
||||||
|
new_pushed = 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
ts->ref_cnt++;
|
ts->ref_cnt++;
|
||||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
|
||||||
|
@ -807,6 +807,14 @@ static int srv_parse_no_send_proxy_v2(char **args, int *cur_arg,
|
|||||||
return srv_disable_pp_flags(newsrv, SRV_PP_V2);
|
return srv_disable_pp_flags(newsrv, SRV_PP_V2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Parse the "shard" server keyword */
|
||||||
|
static int srv_parse_shard(char **args, int *cur_arg,
|
||||||
|
struct proxy *curproxy, struct server *newsrv, char **err)
|
||||||
|
{
|
||||||
|
newsrv->shard = atol(args[*cur_arg + 1]);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* Parse the "no-tfo" server keyword */
|
/* Parse the "no-tfo" server keyword */
|
||||||
static int srv_parse_no_tfo(char **args, int *cur_arg,
|
static int srv_parse_no_tfo(char **args, int *cur_arg,
|
||||||
struct proxy *curproxy, struct server *newsrv, char **err)
|
struct proxy *curproxy, struct server *newsrv, char **err)
|
||||||
@ -1791,6 +1799,7 @@ static struct srv_kw_list srv_kws = { "ALL", { }, {
|
|||||||
{ "resolvers", srv_parse_resolvers, 1, 1, 0 }, /* Configure the resolver to use for name resolution */
|
{ "resolvers", srv_parse_resolvers, 1, 1, 0 }, /* Configure the resolver to use for name resolution */
|
||||||
{ "send-proxy", srv_parse_send_proxy, 0, 1, 1 }, /* Enforce use of PROXY V1 protocol */
|
{ "send-proxy", srv_parse_send_proxy, 0, 1, 1 }, /* Enforce use of PROXY V1 protocol */
|
||||||
{ "send-proxy-v2", srv_parse_send_proxy_v2, 0, 1, 1 }, /* Enforce use of PROXY V2 protocol */
|
{ "send-proxy-v2", srv_parse_send_proxy_v2, 0, 1, 1 }, /* Enforce use of PROXY V2 protocol */
|
||||||
|
{ "shard", srv_parse_shard, 1, 1, 1 }, /* Server shard (only in peers protocol context) */
|
||||||
{ "slowstart", srv_parse_slowstart, 1, 1, 1 }, /* Set the warm-up timer for a previously failed server */
|
{ "slowstart", srv_parse_slowstart, 1, 1, 1 }, /* Set the warm-up timer for a previously failed server */
|
||||||
{ "source", srv_parse_source, -1, 1, 1 }, /* Set the source address to be used to connect to the server */
|
{ "source", srv_parse_source, -1, 1, 1 }, /* Set the source address to be used to connect to the server */
|
||||||
{ "stick", srv_parse_stick, 0, 1, 0 }, /* Enable stick-table persistence */
|
{ "stick", srv_parse_stick, 0, 1, 0 }, /* Enable stick-table persistence */
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
#include <import/ebmbtree.h>
|
#include <import/ebmbtree.h>
|
||||||
#include <import/ebsttree.h>
|
#include <import/ebsttree.h>
|
||||||
#include <import/ebistree.h>
|
#include <import/ebistree.h>
|
||||||
|
#include <import/xxhash.h>
|
||||||
|
|
||||||
#include <haproxy/api.h>
|
#include <haproxy/api.h>
|
||||||
#include <haproxy/applet.h>
|
#include <haproxy/applet.h>
|
||||||
@ -155,6 +156,48 @@ void stksess_setkey(struct stktable *t, struct stksess *ts, struct stktable_key
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Initialize or update the key hash in the sticky session <ts> present in table <t>
|
||||||
|
* from the value present in <key>.
|
||||||
|
*/
|
||||||
|
static unsigned long long stksess_getkey_hash(struct stktable *t,
|
||||||
|
struct stksess *ts,
|
||||||
|
struct stktable_key *key)
|
||||||
|
{
|
||||||
|
struct buffer *buf;
|
||||||
|
size_t keylen;
|
||||||
|
|
||||||
|
/* Copy the stick-table id into <buf> */
|
||||||
|
buf = get_trash_chunk();
|
||||||
|
memcpy(b_tail(buf), t->id, t->idlen);
|
||||||
|
b_add(buf, t->idlen);
|
||||||
|
/* Copy the key into <buf> */
|
||||||
|
if (t->type == SMP_T_STR)
|
||||||
|
keylen = key->key_len;
|
||||||
|
else
|
||||||
|
keylen = t->key_size;
|
||||||
|
memcpy(b_tail(buf), key->key, keylen);
|
||||||
|
b_add(buf, keylen);
|
||||||
|
|
||||||
|
return XXH64(b_head(buf), b_data(buf), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set the shard for <key> key of <ts> sticky session attached to <t> stick table.
|
||||||
|
* Do nothing for stick-table without peers synchronisation.
|
||||||
|
*/
|
||||||
|
static void stksess_setkey_shard(struct stktable *t, struct stksess *ts,
|
||||||
|
struct stktable_key *key)
|
||||||
|
{
|
||||||
|
if (!t->peers.p)
|
||||||
|
/* This stick-table is not attached to any peers section */
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (!t->peers.p->nb_shards)
|
||||||
|
ts->shard = 0;
|
||||||
|
else
|
||||||
|
ts->shard = stksess_getkey_hash(t, ts, key) % t->peers.p->nb_shards + 1;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Init sticky session <ts> of table <t>. The data parts are cleared and <ts>
|
* Init sticky session <ts> of table <t>. The data parts are cleared and <ts>
|
||||||
@ -282,8 +325,10 @@ struct stksess *stksess_new(struct stktable *t, struct stktable_key *key)
|
|||||||
if (ts) {
|
if (ts) {
|
||||||
ts = (void *)ts + round_ptr_size(t->data_size);
|
ts = (void *)ts + round_ptr_size(t->data_size);
|
||||||
__stksess_init(t, ts);
|
__stksess_init(t, ts);
|
||||||
if (key)
|
if (key) {
|
||||||
stksess_setkey(t, ts, key);
|
stksess_setkey(t, ts, key);
|
||||||
|
stksess_setkey_shard(t, ts, key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ts;
|
return ts;
|
||||||
@ -851,6 +896,7 @@ int parse_stick_table(const char *file, int linenum, char **args,
|
|||||||
}
|
}
|
||||||
|
|
||||||
t->id = id;
|
t->id = id;
|
||||||
|
t->idlen = strlen(id);
|
||||||
t->nid = nid;
|
t->nid = nid;
|
||||||
t->type = (unsigned int)-1;
|
t->type = (unsigned int)-1;
|
||||||
t->conf.file = file;
|
t->conf.file = file;
|
||||||
|
Loading…
Reference in New Issue
Block a user