MEDIUM: stktable/peers: "write-to" local table on peer updates

In this patch, we add the possibility to declare on a table definition
("table" in peer section, or "stick-table" in proxy section) that we
want the remote/peer updates on that table to be pushed on a local
haproxy table in addition to the source table.

Consider this example:

  |peers mypeers
  |        peer local 127.0.0.1:3334
  |        peer clust 127.0.0.1:3333
  |        table t1.local type string size 10m store server_id,server_key expire 30s
  |        table t1.clust type string size 10m store server_id,server_key write-to mypeers/t1.local expire 30s

With this setup, we consider haproxy uses t1.local as cache/local table
for read and write operations, and that t1.clust is a remote table
containing datas processed from t1.local and similar tables from other
haproxy peers in a cluster setup. The t1.clust table will be used to
refresh the local/cache one via the "write-to" statement.

What will happen, is that every time haproxy will see entry updates for
the t1.clust table: it will overwrite t1.local table with fresh data and
will update the entry expiration timer. If t1.local entry doesn't exist
yet (key doesn't exist), it will automatically create it. Note that only
types that cannot be used for arithmetic ops will be handled, and this
to prevent processed values from the remote table from interfering with
computations based on values from the local table. (ie: prevent
cumulative counters from growing indefinitely).

"write-to" will only push supported types if they both exist in the source
and the target table. Be careful with server_id and server_key storage
because they are often declared implicitly when referencing a table in
sticking rules but it is required to declare them explicitly for them to
be pushed between a remote and a local table through "write-to" option.

Also note that the "write-to" target table should have the same type as
the source one, and that the key length should be strictly equal,
otherwise haproxy will raise an error due to the tables being
incompatibles. A table that is already being written to cannot be used
as a source table for a "write-to" target.

Thanks to this patch, it will now be possible to use sticking rules in
peer cluster context by using a local table as a local cache which
will be automatically refreshed by one or multiple remote table(s).

This commit depends on:
 - "MINOR: stktable: stktable_init() sets err_msg on error"
 - "MINOR: stktable: check if a type should be used as-is"
This commit is contained in:
Aurelien DARRAGON 2023-10-02 16:40:27 +02:00 committed by Willy Tarreau
parent db0cb54f81
commit 5158c0ff69
4 changed files with 137 additions and 25 deletions

View File

@ -3892,7 +3892,7 @@ shards <shards>
See also "shard" server parameter. See also "shard" server parameter.
table <tablename> type {ip | integer | string [len <length>] | binary [len <length>]} table <tablename> type {ip | integer | string [len <length>] | binary [len <length>]}
size <size> [expire <expire>] [nopurge] [store <data_type>]* size <size> [expire <expire>] [write-to <wtable>] [nopurge] [store <data_type>]*
Configure a stickiness table for the current section. This line is parsed Configure a stickiness table for the current section. This line is parsed
exactly the same way as the "stick-table" keyword in others section, except exactly the same way as the "stick-table" keyword in others section, except
@ -12506,7 +12506,7 @@ stick store-request <pattern> [table <table>] [{if | unless} <condition>]
stick-table type {ip | integer | string [len <length>] | binary [len <length>]} stick-table type {ip | integer | string [len <length>] | binary [len <length>]}
size <size> [expire <expire>] [nopurge] [peers <peersect>] [srvkey <srvkey>] size <size> [expire <expire>] [nopurge] [peers <peersect>] [srvkey <srvkey>]
[store <data_type>]* [write-to <wtable>] [store <data_type>]*
Configure the stickiness table for the current section Configure the stickiness table for the current section
May be used in sections : defaults | frontend | listen | backend May be used in sections : defaults | frontend | listen | backend
no | yes | yes | yes no | yes | yes | yes
@ -12570,6 +12570,23 @@ stick-table type {ip | integer | string [len <length>] | binary [len <length>]}
automatically learned from the local peer (old process) during a automatically learned from the local peer (old process) during a
soft restart. soft restart.
<wtable> is the name of the stick table where peers updates will be
written to in addition to the source table. <wtable> must be of
the same type as the table being defined and must have the same
key length, and source table cannot be used as a target table
itself. Everytime an entry update will be received on the source
table through a peer, haproxy will try to refresh related
<wtable> entry. If the entry doesn't exist yet, it will be
created, else its values will be updated as well as its timer.
Note that only types that are not involved in arithmetic ops such
as server_id, server_key and gpt will be written to <wtable> to
prevent processed values from a remote table from intefering with
arithmetic operations performed on the local target table.
(ie: prevent shared cumulative counter from growing indefinitely)
One common use of this option is to be able to use sticking rules
(for server persistence) in a peers cluster setup, because
matching keys will be learned from remote tables.
<expire> defines the maximum duration of an entry in the table since it <expire> defines the maximum duration of an entry in the table since it
was last created, refreshed using 'track-sc' or matched using was last created, refreshed using 'track-sc' or matched using
'stick match' or 'stick on' rule. The expiration delay is 'stick match' or 'stick on' rule. The expiration delay is

View File

@ -188,6 +188,11 @@ struct stktable {
void *p; void *p;
} data_arg[STKTABLE_DATA_TYPES]; /* optional argument of each data type */ } data_arg[STKTABLE_DATA_TYPES]; /* optional argument of each data type */
struct proxy *proxy; /* The proxy this stick-table is attached to, if any.*/ struct proxy *proxy; /* The proxy this stick-table is attached to, if any.*/
union {
char *name; /* preparsing hint */
struct stktable *t; /* postparsing */
void *ptr; /* generic ptr to check if set or not */
} write_to; /* updates received on the source table will also update write_to */
THREAD_ALIGN(64); THREAD_ALIGN(64);

View File

@ -1708,19 +1708,24 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
char **msg_cur, char *msg_end, int msg_len, int totl) char **msg_cur, char *msg_end, int msg_len, int totl)
{ {
struct shared_table *st = p->remote_table; struct shared_table *st = p->remote_table;
struct stktable *table;
struct stksess *ts, *newts; struct stksess *ts, *newts;
struct stksess *wts = NULL; /* write_to stksess */
uint32_t update; uint32_t update;
int expire; int expire;
unsigned int data_type; unsigned int data_type;
size_t keylen; size_t keylen;
void *data_ptr; void *data_ptr;
char *msg_save;
TRACE_ENTER(PEERS_EV_UPDTMSG, NULL, p); TRACE_ENTER(PEERS_EV_UPDTMSG, NULL, p);
/* Here we have data message */ /* Here we have data message */
if (!st) if (!st)
goto ignore_msg; goto ignore_msg;
expire = MS_TO_TICKS(st->table->expire); table = st->table;
expire = MS_TO_TICKS(table->expire);
if (updt) { if (updt) {
if (msg_len < sizeof(update)) { if (msg_len < sizeof(update)) {
@ -1752,11 +1757,11 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
expire = ntohl(expire); expire = ntohl(expire);
} }
newts = stksess_new(st->table, NULL); newts = stksess_new(table, NULL);
if (!newts) if (!newts)
goto ignore_msg; goto ignore_msg;
if (st->table->type == SMP_T_STR) { if (table->type == SMP_T_STR) {
unsigned int to_read, to_store; unsigned int to_read, to_store;
to_read = intdecode(msg_cur, msg_end); to_read = intdecode(msg_cur, msg_end);
@ -1765,7 +1770,7 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
goto malformed_free_newts; goto malformed_free_newts;
} }
to_store = MIN(to_read, st->table->key_size - 1); to_store = MIN(to_read, table->key_size - 1);
if (*msg_cur + to_store > msg_end) { if (*msg_cur + to_store > msg_end) {
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
NULL, p, *msg_cur); NULL, p, *msg_cur);
@ -1779,7 +1784,7 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
newts->key.key[keylen] = 0; newts->key.key[keylen] = 0;
*msg_cur += to_read; *msg_cur += to_read;
} }
else if (st->table->type == SMP_T_SINT) { else if (table->type == SMP_T_SINT) {
unsigned int netinteger; unsigned int netinteger;
if (*msg_cur + sizeof(netinteger) > msg_end) { if (*msg_cur + sizeof(netinteger) > msg_end) {
@ -1797,39 +1802,50 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
*msg_cur += keylen; *msg_cur += keylen;
} }
else { else {
if (*msg_cur + st->table->key_size > msg_end) { if (*msg_cur + table->key_size > msg_end) {
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
NULL, p, *msg_cur); NULL, p, *msg_cur);
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
NULL, p, msg_end, &st->table->key_size); NULL, p, msg_end, &table->key_size);
goto malformed_free_newts; goto malformed_free_newts;
} }
keylen = st->table->key_size; keylen = table->key_size;
memcpy(newts->key.key, *msg_cur, keylen); memcpy(newts->key.key, *msg_cur, keylen);
*msg_cur += keylen; *msg_cur += keylen;
} }
newts->shard = stktable_get_key_shard(st->table, newts->key.key, keylen); newts->shard = stktable_get_key_shard(table, newts->key.key, keylen);
/* lookup for existing entry */ /* lookup for existing entry */
ts = stktable_set_entry(st->table, newts); ts = stktable_set_entry(table, newts);
if (ts != newts) { if (ts != newts) {
stksess_free(st->table, newts); stksess_free(table, newts);
newts = NULL; newts = NULL;
} }
msg_save = *msg_cur;
update_wts:
HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock); HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) { for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
uint64_t decoded_int; uint64_t decoded_int;
unsigned int idx; unsigned int idx;
int ignore; int ignore = 0;
if (!((1ULL << data_type) & st->remote_data)) if (!((1ULL << data_type) & st->remote_data))
continue; continue;
ignore = stktable_data_types[data_type].is_local; /* We shouldn't learn local-only values. Also, when handling the
* write_to table we must ignore types that can be processed
* so we don't interfere with any potential arithmetic logic
* performed on them (ie: cumulative counters).
*/
if (stktable_data_types[data_type].is_local ||
(table != st->table && !stktable_data_types[data_type].as_is))
ignore = 1;
if (stktable_data_types[data_type].is_array) { if (stktable_data_types[data_type].is_array) {
/* in case of array all elements /* in case of array all elements
@ -1847,7 +1863,7 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
goto malformed_unlock; goto malformed_unlock;
} }
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx); data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx);
if (data_ptr && !ignore) if (data_ptr && !ignore)
stktable_data_cast(data_ptr, std_t_sint) = decoded_int; stktable_data_cast(data_ptr, std_t_sint) = decoded_int;
} }
@ -1860,7 +1876,7 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
goto malformed_unlock; goto malformed_unlock;
} }
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx); data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx);
if (data_ptr && !ignore) if (data_ptr && !ignore)
stktable_data_cast(data_ptr, std_t_uint) = decoded_int; stktable_data_cast(data_ptr, std_t_uint) = decoded_int;
} }
@ -1873,7 +1889,7 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
goto malformed_unlock; goto malformed_unlock;
} }
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx); data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx);
if (data_ptr && !ignore) if (data_ptr && !ignore)
stktable_data_cast(data_ptr, std_t_ull) = decoded_int; stktable_data_cast(data_ptr, std_t_ull) = decoded_int;
} }
@ -1907,7 +1923,7 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
goto malformed_unlock; goto malformed_unlock;
} }
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx); data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx);
if (data_ptr && !ignore) if (data_ptr && !ignore)
stktable_data_cast(data_ptr, std_t_frqp) = data; stktable_data_cast(data_ptr, std_t_frqp) = data;
} }
@ -1927,19 +1943,19 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
switch (stktable_data_types[data_type].std_type) { switch (stktable_data_types[data_type].std_type) {
case STD_T_SINT: case STD_T_SINT:
data_ptr = stktable_data_ptr(st->table, ts, data_type); data_ptr = stktable_data_ptr(table, ts, data_type);
if (data_ptr && !ignore) if (data_ptr && !ignore)
stktable_data_cast(data_ptr, std_t_sint) = decoded_int; stktable_data_cast(data_ptr, std_t_sint) = decoded_int;
break; break;
case STD_T_UINT: case STD_T_UINT:
data_ptr = stktable_data_ptr(st->table, ts, data_type); data_ptr = stktable_data_ptr(table, ts, data_type);
if (data_ptr && !ignore) if (data_ptr && !ignore)
stktable_data_cast(data_ptr, std_t_uint) = decoded_int; stktable_data_cast(data_ptr, std_t_uint) = decoded_int;
break; break;
case STD_T_ULL: case STD_T_ULL:
data_ptr = stktable_data_ptr(st->table, ts, data_type); data_ptr = stktable_data_ptr(table, ts, data_type);
if (data_ptr && !ignore) if (data_ptr && !ignore)
stktable_data_cast(data_ptr, std_t_ull) = decoded_int; stktable_data_cast(data_ptr, std_t_ull) = decoded_int;
break; break;
@ -1966,7 +1982,7 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
goto malformed_unlock; goto malformed_unlock;
} }
data_ptr = stktable_data_ptr(st->table, ts, data_type); data_ptr = stktable_data_ptr(table, ts, data_type);
if (data_ptr && !ignore) if (data_ptr && !ignore)
stktable_data_cast(data_ptr, std_t_frqp) = data; stktable_data_cast(data_ptr, std_t_frqp) = data;
break; break;
@ -2035,7 +2051,7 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
dc->rx[id - 1].de = de; dc->rx[id - 1].de = de;
} }
if (de) { if (de) {
data_ptr = stktable_data_ptr(st->table, ts, data_type); data_ptr = stktable_data_ptr(table, ts, data_type);
if (data_ptr && !ignore) { if (data_ptr && !ignore) {
HA_ATOMIC_INC(&de->refcount); HA_ATOMIC_INC(&de->refcount);
stktable_data_cast(data_ptr, std_t_dict) = de; stktable_data_cast(data_ptr, std_t_dict) = de;
@ -2045,11 +2061,38 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
} }
} }
} }
if (st->table->write_to.t && table != st->table->write_to.t) {
struct stktable_key stkey = { .key = ts->key.key, .key_len = keylen };
/* While we're still under the main ts lock, try to get related
* write_to stksess with main ts key
*/
wts = stktable_get_entry(st->table->write_to.t, &stkey);
}
/* Force new expiration */ /* Force new expiration */
ts->expire = tick_add(now_ms, expire); ts->expire = tick_add(now_ms, expire);
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock); HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1); stktable_touch_remote(table, ts, 1);
if (wts) {
/* Start over the message decoding for wts as we got a valid stksess
* for write_to table, so we need to refresh the entry with supported
* values.
*
* We prefer to do the decoding a second time even though it might
* cost a bit more than copying from main ts to wts, but doing so
* enables us to get rid of main ts lock: we only need the wts lock
* since upstream data is still available in msg_cur
*/
ts = wts;
table = st->table->write_to.t;
wts = NULL; /* so we don't get back here */
*msg_cur = msg_save;
goto update_wts;
}
ignore_msg: ignore_msg:
TRACE_LEAVE(PEERS_EV_UPDTMSG, NULL, p); TRACE_LEAVE(PEERS_EV_UPDTMSG, NULL, p);

View File

@ -828,6 +828,36 @@ int stktable_init(struct stktable *t, char **err_msg)
if (t->pool == NULL || peers_retval) if (t->pool == NULL || peers_retval)
goto mem_error; goto mem_error;
} }
if (t->write_to.name) {
struct stktable *table;
/* postresolve write_to table */
table = stktable_find_by_name(t->write_to.name);
if (!table) {
memprintf(err_msg, "write-to: table '%s' doesn't exist", t->write_to.name);
ha_free(&t->write_to.name); /* no longer need this */
return 0;
}
ha_free(&t->write_to.name); /* no longer need this */
if (table->write_to.ptr) {
memprintf(err_msg, "write-to: table '%s' is already used as a source table", table->id);
return 0;
}
if (table->type != t->type) {
memprintf(err_msg, "write-to: cannot mix table types ('%s' has '%s' type and '%s' has '%s' type)",
table->id, stktable_types[table->type].kw,
t->id, stktable_types[t->type].kw);
return 0;
}
if (table->key_size != t->key_size) {
memprintf(err_msg, "write-to: cannot mix key sizes ('%s' has '%ld' key_size and '%s' has '%ld' key_size)",
table->id, (long)table->key_size,
t->id, (long)t->key_size);
return 0;
}
t->write_to.t = table;
}
return 1; return 1;
mem_error: mem_error:
@ -976,6 +1006,7 @@ int parse_stick_table(const char *file, int linenum, char **args,
t->type = (unsigned int)-1; t->type = (unsigned int)-1;
t->conf.file = file; t->conf.file = file;
t->conf.line = linenum; t->conf.line = linenum;
t->write_to.name = NULL;
while (*args[idx]) { while (*args[idx]) {
const char *err; const char *err;
@ -1164,6 +1195,22 @@ int parse_stick_table(const char *file, int linenum, char **args,
} }
idx++; idx++;
} }
else if (strcmp(args[idx], "write-to") == 0) {
char *write_to;
idx++;
write_to = args[idx];
if (!write_to[0]) {
ha_alert("parsing [%s:%d] : %s : write-to requires table name.\n",
file, linenum, args[0]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
ha_free(&t->write_to.name);
t->write_to.name = strdup(write_to);
idx++;
}
else { else {
ha_alert("parsing [%s:%d] : %s: unknown argument '%s'.\n", ha_alert("parsing [%s:%d] : %s: unknown argument '%s'.\n",
file, linenum, args[0], args[idx]); file, linenum, args[0], args[idx]);