MAJOR: peers: Stop to track acked updates per shared table

This patch is quite small but the change is really important. Thanks to the
previous patch, we can use PEER_F_SYNCHED flag to know if a peer is
synchronized or not. So instead of tracking last ack messages for each table
to be able to restart at a given point when the peer reconnects, we decided
to restart from the begining if a peer is not synchronized when a new
connection is established.

So, it is a huge change because, on reconnect, instead of pushing some
missed updates, all local updates are pushed again. Most of time, it is not
a problem because nowadays, connection are quite stable, especially because
a heartbeat message is sent to keep it active. The only drawback is when a
peer is restarted. In that case, we have no way to know it is synchronized
because he learned table contents from it old local peer.

This change is mandatory. First to replace the update tree by a mt-list and
remove the update lock. Then to split this list by buckets to reduce
contention.
This commit is contained in:
Christopher Faulet 2025-09-29 15:48:18 +02:00
parent 9babc988da
commit 1cd4bd8880
2 changed files with 12 additions and 19 deletions

View File

@ -143,7 +143,6 @@ struct shared_table {
unsigned int last_pushed; unsigned int last_pushed;
unsigned int last_get; unsigned int last_get;
unsigned int teaching_origin; unsigned int teaching_origin;
unsigned int update;
struct shared_table *next; /* next shared table in list */ struct shared_table *next; /* next shared table in list */
}; };

View File

@ -2212,7 +2212,6 @@ static inline int peer_treat_ackmsg(struct appctx *appctx, struct peer *p,
/* ack message */ /* ack message */
uint32_t table_id ; uint32_t table_id ;
uint32_t update; uint32_t update;
struct shared_table *st;
int ret = 1; int ret = 1;
TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p); TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p);
@ -2233,19 +2232,13 @@ static inline int peer_treat_ackmsg(struct appctx *appctx, struct peer *p,
memcpy(&update, *msg_cur, sizeof(update)); memcpy(&update, *msg_cur, sizeof(update));
update = ntohl(update); update = ntohl(update);
for (st = p->tables; st; st = st->next) {
if (st->local_id == table_id) {
st->update = update;
break;
}
}
if (table_id == p->last.table->local_id && update == p->last.id) { if (table_id == p->last.table->local_id && update == p->last.id) {
TRACE_STATE("Peer synched again", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p); TRACE_STATE("Peer synched again", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p);
p->flags |= PEER_F_SYNCHED; p->flags |= PEER_F_SYNCHED;
} }
end: end:
TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p, st); TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p);
return ret; return ret;
} }
@ -2582,7 +2575,7 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
/* prepare tables for a global push */ /* prepare tables for a global push */
for (st = peer->tables; st; st = st->next) { for (st = peer->tables; st; st = st->next) {
st->teaching_origin = st->last_pushed = st->update; st->teaching_origin = st->last_pushed;
st->flags = 0; st->flags = 0;
} }
@ -2627,7 +2620,7 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
} }
peer->flags |= PEER_F_SYNCHED; peer->flags |= PEER_F_SYNCHED;
for (st = peer->tables; st; st = st->next) { for (st = peer->tables; st; st = st->next) {
st->update = st->last_pushed = st->teaching_origin; st->last_pushed = st->teaching_origin;
st->flags = 0; st->flags = 0;
} }
@ -2962,21 +2955,22 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
/* Init cursors */ /* Init cursors */
for (st = peer->tables; st ; st = st->next) { for (st = peer->tables; st ; st = st->next) {
st->last_get = st->last_acked = 0; st->last_get = st->last_acked = 0;
HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock); HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
/* if st->update appears to be in future it means /* if st->last_pushed appears to be in future it means
* that the last acked value is very old and we * that the last update is very old and we
* remain unconnected a too long time to use this * remain unconnected a too long time to use this
* acknowledgement as a reset. * acknowledgement as a reset.
* We should update the protocol to be able to * We should update the protocol to be able to
* signal the remote peer that it needs a full resync. * signal the remote peer that it needs a full resync.
* Here a partial fix consist to set st->update at * Here a partial fix consist to set st->last_pushed at
* the max past value. * the max past value.
*/ */
if (!(peer->flags & PEER_F_SYNCHED) || (int)(st->table->localupdate - st->update) < 0) { if (!(peer->flags & PEER_F_SYNCHED) || (int)(st->table->localupdate - st->last_pushed) < 0) {
st->update = st->table->localupdate + (2147483648U); st->last_pushed = st->table->localupdate + (2147483648U);
peer->flags &= ~PEER_F_SYNCHED; peer->flags &= ~PEER_F_SYNCHED;
} }
st->teaching_origin = st->last_pushed = st->update; st->teaching_origin = st->last_pushed;
st->flags = 0; st->flags = 0;
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock); HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
@ -4318,8 +4312,8 @@ static int peers_dump_peer(struct buffer *msg, struct appctx *appctx, struct pee
st, st->local_id, st->remote_id, st, st->local_id, st->remote_id,
st->flags, (unsigned long long)st->remote_data); st->flags, (unsigned long long)st->remote_data);
chunk_appendf(&trash, "\n last_acked=%u last_pushed=%u last_get=%u" chunk_appendf(&trash, "\n last_acked=%u last_pushed=%u last_get=%u"
" teaching_origin=%u update=%u", " teaching_origin=%u",
st->last_acked, st->last_pushed, st->last_get, st->teaching_origin, st->update); st->last_acked, st->last_pushed, st->last_get, st->teaching_origin);
chunk_appendf(&trash, "\n table:%p id=%s update=%u localupdate=%u refcnt=%u", chunk_appendf(&trash, "\n table:%p id=%s update=%u localupdate=%u refcnt=%u",
t, t->id, t->update, t->localupdate, t->refcnt); t, t->id, t->update, t->localupdate, t->refcnt);
if (flags & PEERS_SHOW_F_DICT) { if (flags & PEERS_SHOW_F_DICT) {