mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-05-05 04:56:10 +02:00
MAJOR: peers: Replace the update tree by a list
To be able to split updates by shard, the update tree was replaced by a simple list. Indeed, by splitting the updates by shard, we loose the order of updates. So there is no reason to still use a tree. To make it work properly, instead of using the id of the last update pushed, we use markers, special sticky session, only present in the updates list and owned by a peer. There are two marker. The main one, <last>, is used as restart point to send the new updates. The other one, <end>, is only used as stop point when a resync was requested by a remote peer. Thanks to these changed, the function responsible to loop on updates is a littel simpler. We only loop frop <last> to the end of the list, or <end> if found in the list.
This commit is contained in:
parent
96fb2dd3fa
commit
41dfd28da3
@ -138,12 +138,13 @@ struct shared_table {
|
||||
int remote_id;
|
||||
int flags;
|
||||
unsigned int update_id;
|
||||
struct stksess *last;
|
||||
struct stksess *end;
|
||||
uint64_t remote_data;
|
||||
unsigned int remote_data_nbelem[STKTABLE_DATA_TYPES];
|
||||
unsigned int last_update; /* the counter of the last local update sent */
|
||||
unsigned int last_acked;
|
||||
unsigned int last_pushed;
|
||||
unsigned int last_get;
|
||||
unsigned int teaching_origin;
|
||||
struct shared_table *next; /* next shared table in list */
|
||||
};
|
||||
|
||||
|
||||
@ -160,7 +160,7 @@ struct stksess {
|
||||
int shard; /* shard number used by peers */
|
||||
int seen; /* 0 only when no peer has seen this entry yet */
|
||||
struct eb32_node exp; /* ebtree node used to hold the session in expiration tree */
|
||||
struct eb32_node upd; /* ebtree node used to hold the update sequence tree */
|
||||
struct list upd; /* entry in the table's update sequence list */
|
||||
struct mt_list pend_updts;/* list of entries to be inserted/moved in the update sequence tree */
|
||||
unsigned int updt_type; /* One of STKSESS_UPDT_* value */
|
||||
struct ebmb_node key; /* ebtree node used to hold the session in table */
|
||||
@ -232,10 +232,9 @@ struct stktable {
|
||||
unsigned int current; /* number of sticky sessions currently in table */
|
||||
THREAD_ALIGN(64);
|
||||
|
||||
struct eb_root updates; /* head of sticky updates sequence tree, uses updt_lock */
|
||||
unsigned int last_update; /* a counter representing the update inserted in the list (will wrap) */
|
||||
struct list updates; /* head of sticky updates sequence list, uses updt_lock */
|
||||
struct mt_list *pend_updts; /* list of updates to be added to the update sequence tree, one per thread-group */
|
||||
unsigned int update; /* uses updt_lock */
|
||||
unsigned int localupdate; /* uses updt_lock */
|
||||
struct tasklet *updt_task;/* tasklet responsible for pushing the pending updates into the tree */
|
||||
|
||||
THREAD_ALIGN(64);
|
||||
|
||||
368
src/peers.c
368
src/peers.c
@ -52,9 +52,7 @@
|
||||
/***********************************/
|
||||
/* Current shared table sync state */
|
||||
/***********************************/
|
||||
#define SHTABLE_F_TEACH_STAGE1 0x00000001 /* Teach state 1 complete */
|
||||
#define SHTABLE_F_TEACH_STAGE2 0x00000002 /* Teach state 2 complete */
|
||||
|
||||
#define SHTABLE_F_RESET_SYNCHED 0x00000001
|
||||
|
||||
#define PEER_RESYNC_TIMEOUT 5000 /* 5 seconds */
|
||||
#define PEER_RECONNECT_TIMEOUT 5000 /* 5 seconds */
|
||||
@ -1519,84 +1517,6 @@ static inline int peer_send_error_protomsg(struct appctx *appctx)
|
||||
return peer_send_msg(appctx, peer_prepare_error_msg, &p);
|
||||
}
|
||||
|
||||
/*
|
||||
* Function used to lookup for recent stick-table updates associated with
|
||||
* <st> shared stick-table when a lesson must be taught a peer (learn state is not PEER_LR_ST_NOTASSIGNED).
|
||||
*/
|
||||
static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st)
|
||||
{
|
||||
struct eb32_node *eb;
|
||||
struct stksess *ret;
|
||||
|
||||
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
||||
if (!eb) {
|
||||
eb = eb32_first(&st->table->updates);
|
||||
if (!eb || (eb->key == st->last_pushed)) {
|
||||
st->last_pushed = st->table->localupdate;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* if distance between the last pushed and the retrieved key
|
||||
* is greater than the distance last_pushed and the local_update
|
||||
* this means we are beyond localupdate.
|
||||
*/
|
||||
if ((eb->key - st->last_pushed) > (st->table->localupdate - st->last_pushed)) {
|
||||
st->last_pushed = st->table->localupdate;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ret = eb32_entry(eb, struct stksess, upd);
|
||||
if (!_HA_ATOMIC_LOAD(&ret->seen))
|
||||
_HA_ATOMIC_STORE(&ret->seen, 1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Function used to lookup for recent stick-table updates associated with
|
||||
* <st> shared stick-table during teach state 1 step.
|
||||
*/
|
||||
static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st)
|
||||
{
|
||||
struct eb32_node *eb;
|
||||
struct stksess *ret;
|
||||
|
||||
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
||||
if (!eb) {
|
||||
st->flags |= SHTABLE_F_TEACH_STAGE1;
|
||||
eb = eb32_first(&st->table->updates);
|
||||
if (eb)
|
||||
st->last_pushed = eb->key - 1;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ret = eb32_entry(eb, struct stksess, upd);
|
||||
if (!_HA_ATOMIC_LOAD(&ret->seen))
|
||||
_HA_ATOMIC_STORE(&ret->seen, 1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Function used to lookup for recent stick-table updates associated with
|
||||
* <st> shared stick-table during teach state 2 step.
|
||||
*/
|
||||
static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st)
|
||||
{
|
||||
struct eb32_node *eb;
|
||||
struct stksess *ret;
|
||||
|
||||
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
||||
if (!eb || eb->key > st->teaching_origin) {
|
||||
st->flags |= SHTABLE_F_TEACH_STAGE2;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ret = eb32_entry(eb, struct stksess, upd);
|
||||
if (!_HA_ATOMIC_LOAD(&ret->seen))
|
||||
_HA_ATOMIC_STORE(&ret->seen, 1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Generic function to emit update messages for <st> stick-table when a lesson must
|
||||
* be taught to the peer <p>.
|
||||
@ -1612,86 +1532,92 @@ static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_tab
|
||||
* If it returns 0 or -1, this function leave <st> locked if already locked when entering this function
|
||||
* unlocked if not already locked when entering this function.
|
||||
*/
|
||||
int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
|
||||
struct stksess *(*peer_stksess_lookup)(struct shared_table *),
|
||||
struct shared_table *st)
|
||||
int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, struct shared_table *st)
|
||||
{
|
||||
int ret, new_pushed, use_timed;
|
||||
int updates_sent = 0;
|
||||
int failed_once = 0;
|
||||
int locked = 0;
|
||||
|
||||
TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st);
|
||||
|
||||
ret = 1;
|
||||
use_timed = 0;
|
||||
use_timed = !(p->flags & (PEER_F_TEACH_PROCESS|PEER_F_DWNGRD)); // TODO: check that
|
||||
|
||||
if (st != p->last_local_table) {
|
||||
ret = peer_send_switchmsg(st, appctx);
|
||||
if (ret <= 0)
|
||||
goto out;
|
||||
goto out; // FIXME: issue in patch about traces !
|
||||
|
||||
p->last_local_table = st;
|
||||
}
|
||||
|
||||
if (peer_stksess_lookup != peer_teach_process_stksess_lookup)
|
||||
use_timed = !(p->flags & PEER_F_DWNGRD);
|
||||
if (st->flags & SHTABLE_F_RESET_SYNCHED) {
|
||||
if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
|
||||
/* just don't engage here if there is any contention */
|
||||
applet_have_more_data(appctx);
|
||||
ret = -1;
|
||||
goto out;
|
||||
}
|
||||
locked = 1;
|
||||
LIST_DEL_INIT(&st->last->upd);
|
||||
LIST_INSERT(&st->table->updates, &st->last->upd);
|
||||
if (p->flags & PEER_F_TEACH_PROCESS) {
|
||||
LIST_DEL_INIT(&st->end->upd);
|
||||
LIST_APPEND(&st->table->updates, &st->end->upd);
|
||||
}
|
||||
st->flags &= ~SHTABLE_F_RESET_SYNCHED;
|
||||
}
|
||||
|
||||
/* We force new pushed to 1 to force identifier in update message */
|
||||
new_pushed = 1;
|
||||
|
||||
if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
|
||||
/* just don't engage here if there is any contention */
|
||||
applet_have_more_data(appctx);
|
||||
ret = -1;
|
||||
goto out_unlocked;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
struct stksess *ts;
|
||||
unsigned last_pushed;
|
||||
|
||||
/* push local updates */
|
||||
ts = peer_stksess_lookup(st);
|
||||
if (!ts) {
|
||||
ret = 1; // done
|
||||
if (locked == 0 && HA_RWLOCK_TRYWRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
|
||||
/* just don't engage here if there is any contention */
|
||||
applet_have_more_data(appctx);
|
||||
ret = -1;
|
||||
break;
|
||||
}
|
||||
locked = 1;
|
||||
|
||||
BUG_ON(!LIST_INLIST(&st->last->upd));
|
||||
|
||||
ts = LIST_NEXT(&st->last->upd, typeof(ts), upd);
|
||||
|
||||
if (&ts->upd == &st->table->updates)
|
||||
break;
|
||||
if (ts == st->end) {
|
||||
LIST_DEL_INIT(&st->end->upd);
|
||||
break;
|
||||
}
|
||||
|
||||
last_pushed = ts->upd.key;
|
||||
if (p->srv->shard && ts->shard != p->srv->shard) {
|
||||
/* Skip this entry */
|
||||
st->last_pushed = last_pushed;
|
||||
new_pushed = 1;
|
||||
LIST_DEL_INIT(&st->last->upd);
|
||||
LIST_INSERT(&ts->upd, &st->last->upd);
|
||||
|
||||
if ((!(p->flags & PEER_F_TEACH_PROCESS) && ts->updt_type != STKSESS_UPDT_LOCAL) ||
|
||||
(ts->updt_type != STKSESS_UPDT_LOCAL && ts->updt_type != STKSESS_UPDT_REMOTE) ||
|
||||
(p->srv->shard && ts->shard != p->srv->shard) ||
|
||||
ts->updt_type == STKSESS_UPDT_MARKER) {
|
||||
// TODO: INC updates_sent ?
|
||||
continue;
|
||||
}
|
||||
|
||||
HA_ATOMIC_INC(&ts->ref_cnt);
|
||||
HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
||||
|
||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
||||
locked = 0;
|
||||
|
||||
if (!_HA_ATOMIC_LOAD(&ts->seen))
|
||||
_HA_ATOMIC_STORE(&ts->seen, 1);
|
||||
|
||||
ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
|
||||
|
||||
if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
|
||||
if (failed_once) {
|
||||
/* we've already faced contention twice in this
|
||||
* loop, this is getting serious, do not insist
|
||||
* anymore and come back later
|
||||
*/
|
||||
HA_ATOMIC_DEC(&ts->ref_cnt);
|
||||
applet_have_more_data(appctx);
|
||||
ret = -1;
|
||||
goto out_unlocked;
|
||||
}
|
||||
/* OK contention happens, for this one we'll wait on the
|
||||
* lock, but only once.
|
||||
*/
|
||||
failed_once++;
|
||||
HA_RWLOCK_RDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
||||
}
|
||||
|
||||
HA_ATOMIC_DEC(&ts->ref_cnt);
|
||||
if (ret <= 0)
|
||||
break;
|
||||
|
||||
st->last_pushed = last_pushed;
|
||||
p->last.table = st;
|
||||
p->last.id = st->update_id;
|
||||
st->update_id++;
|
||||
@ -1708,67 +1634,16 @@ int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
|
||||
}
|
||||
}
|
||||
|
||||
if (locked) {
|
||||
st->last_update = st->table->last_update;
|
||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
||||
}
|
||||
|
||||
out:
|
||||
HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
||||
out_unlocked:
|
||||
TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Function to emit update messages for <st> stick-table when a lesson must
|
||||
* be taught to the peer <p> (learn state is not PEER_LR_ST_NOTASSIGNED).
|
||||
*
|
||||
* Note that <st> shared stick-table is locked when calling this function, and
|
||||
* the lock is dropped then re-acquired.
|
||||
*
|
||||
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
|
||||
* Returns -1 if there was not enough room left to send the message,
|
||||
* any other negative returned value must be considered as an error with an appcxt st0
|
||||
* returned value equal to PEER_SESS_ST_END.
|
||||
*/
|
||||
static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p,
|
||||
struct shared_table *st)
|
||||
{
|
||||
TRACE_PROTO("send teach process messages", PEERS_EV_SESS_IO, appctx, p, st);
|
||||
return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st);
|
||||
}
|
||||
|
||||
/*
|
||||
* Function to emit update messages for <st> stick-table when a lesson must
|
||||
* be taught to the peer <p> during teach state 1 step. It must be called with
|
||||
* the stick-table lock released.
|
||||
*
|
||||
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
|
||||
* Returns -1 if there was not enough room left to send the message,
|
||||
* any other negative returned value must be considered as an error with an appcxt st0
|
||||
* returned value equal to PEER_SESS_ST_END.
|
||||
*/
|
||||
static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p,
|
||||
struct shared_table *st)
|
||||
{
|
||||
TRACE_PROTO("send teach stage1 messages", PEERS_EV_SESS_IO, appctx, p, st);
|
||||
return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st);
|
||||
}
|
||||
|
||||
/*
|
||||
* Function to emit update messages for <st> stick-table when a lesson must
|
||||
* be taught to the peer <p> during teach state 1 step. It must be called with
|
||||
* the stick-table lock released.
|
||||
*
|
||||
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
|
||||
* Returns -1 if there was not enough room left to send the message,
|
||||
* any other negative returned value must be considered as an error with an appcxt st0
|
||||
* returned value equal to PEER_SESS_ST_END.
|
||||
*/
|
||||
static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p,
|
||||
struct shared_table *st)
|
||||
{
|
||||
TRACE_PROTO("send teach stage2 messages", PEERS_EV_SESS_IO, appctx, p, st);
|
||||
return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Function used to parse a stick-table update message after it has been received
|
||||
* by <p> peer with <msg_cur> as address of the pointer to the position in the
|
||||
@ -2574,16 +2449,16 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
||||
if (msg_head[0] == PEER_MSG_CLASS_CONTROL) {
|
||||
if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) {
|
||||
struct shared_table *st;
|
||||
|
||||
/* Reset message: remote need resync */
|
||||
TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
|
||||
|
||||
/* prepare tables for a global push */
|
||||
for (st = peer->tables; st; st = st->next) {
|
||||
st->teaching_origin = st->last_pushed;
|
||||
st->flags = 0;
|
||||
}
|
||||
for (st = peer->tables; st; st = st->next)
|
||||
st->flags |= SHTABLE_F_RESET_SYNCHED;
|
||||
|
||||
/* reset teaching flags to 0 */
|
||||
peer->flags &= ~PEER_TEACH_FLAGS;
|
||||
peer->flags &= ~(PEER_F_SYNCHED|PEER_TEACH_FLAGS);
|
||||
|
||||
/* flag to start to teach lesson */
|
||||
peer->flags |= (PEER_F_TEACH_PROCESS|PEER_F_DBG_RESYNC_REQUESTED);
|
||||
@ -2610,8 +2485,6 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
||||
peer->confirm++;
|
||||
}
|
||||
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) {
|
||||
struct shared_table *st;
|
||||
|
||||
TRACE_PROTO("Resync confirm message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
|
||||
/* If stopping state */
|
||||
if (stopping) {
|
||||
@ -2622,10 +2495,6 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
||||
return 0;
|
||||
}
|
||||
peer->flags |= PEER_F_SYNCHED;
|
||||
for (st = peer->tables; st; st = st->next) {
|
||||
st->last_pushed = st->teaching_origin;
|
||||
st->flags = 0;
|
||||
}
|
||||
|
||||
/* reset teaching flags to 0 */
|
||||
peer->flags &= ~PEER_TEACH_FLAGS;
|
||||
@ -2745,41 +2614,15 @@ int peer_send_msgs(struct appctx *appctx,
|
||||
st->last_acked = st->last_get;
|
||||
}
|
||||
|
||||
if (!(peer->flags & PEER_F_TEACH_PROCESS)) {
|
||||
int must_send;
|
||||
|
||||
if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock)) {
|
||||
applet_have_more_data(appctx);
|
||||
repl = -1;
|
||||
goto end;
|
||||
}
|
||||
must_send = (peer->learnstate == PEER_LR_ST_NOTASSIGNED) && (st->last_pushed != st->table->localupdate);
|
||||
HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
||||
|
||||
if (must_send) {
|
||||
repl = peer_send_teach_process_msgs(appctx, peer, st);
|
||||
if (repl <= 0) {
|
||||
peer->stop_local_table = peer->last_local_table;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (!(peer->flags & PEER_F_TEACH_FINISHED)) {
|
||||
if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
|
||||
repl = peer_send_teach_stage1_msgs(appctx, peer, st);
|
||||
if (repl <= 0) {
|
||||
peer->stop_local_table = peer->last_local_table;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
|
||||
repl = peer_send_teach_stage2_msgs(appctx, peer, st);
|
||||
if (repl <= 0) {
|
||||
peer->stop_local_table = peer->last_local_table;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
if ((!(peer->flags & PEER_F_TEACH_PROCESS) && (peer->learnstate == PEER_LR_ST_NOTASSIGNED) &&
|
||||
st->last_update != st->table->last_update) ||
|
||||
(peer->flags & (PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED)) == PEER_F_TEACH_PROCESS) {
|
||||
TRACE_PROTO("send teach messages", PEERS_EV_SESS_IO, appctx, peer, st);
|
||||
repl = peer_send_teachmsgs(appctx, peer, st);
|
||||
if (repl <= 0) {
|
||||
peer->stop_local_table = peer->last_local_table;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
if (st == last_local_table) {
|
||||
@ -2955,28 +2798,21 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
|
||||
TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer);
|
||||
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
|
||||
if (peer->local && !(appctx_is_back(peer->appctx))) {
|
||||
/* If the local peer has established the connection (appctx is
|
||||
* on the frontend side), flag it to start to teach lesson.
|
||||
*/
|
||||
peer->flags |= PEER_F_TEACH_PROCESS;
|
||||
peer->flags &= ~PEER_F_SYNCHED;
|
||||
TRACE_STATE("peer elected to teach lesson to local peer", PEERS_EV_SESS_NEW, NULL, peer);
|
||||
}
|
||||
|
||||
/* Init cursors */
|
||||
for (st = peer->tables; st ; st = st->next) {
|
||||
st->last_get = st->last_acked = 0;
|
||||
|
||||
HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
||||
/* if st->last_pushed appears to be in future it means
|
||||
* that the last update is very old and we
|
||||
* remain unconnected a too long time to use this
|
||||
* acknowledgement as a reset.
|
||||
* We should update the protocol to be able to
|
||||
* signal the remote peer that it needs a full resync.
|
||||
* Here a partial fix consist to set st->last_pushed at
|
||||
* the max past value.
|
||||
*/
|
||||
if (!(peer->flags & PEER_F_SYNCHED) || (int)(st->table->localupdate - st->last_pushed) < 0) {
|
||||
st->last_pushed = st->table->localupdate + (2147483648U);
|
||||
peer->flags &= ~PEER_F_SYNCHED;
|
||||
}
|
||||
st->teaching_origin = st->last_pushed;
|
||||
st->flags = 0;
|
||||
|
||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
||||
if (!(peer->flags & PEER_F_SYNCHED) || (peer->flags & PEER_F_TEACH_PROCESS))
|
||||
st->flags |= SHTABLE_F_RESET_SYNCHED;
|
||||
}
|
||||
|
||||
/* Awake main task to ack the new peer state */
|
||||
@ -2988,15 +2824,6 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
|
||||
/* reset teaching flags to 0 */
|
||||
peer->flags &= ~PEER_TEACH_FLAGS;
|
||||
|
||||
if (peer->local && !(appctx_is_back(peer->appctx))) {
|
||||
/* If the local peer has established the connection (appctx is
|
||||
* on the frontend side), flag it to start to teach lesson.
|
||||
*/
|
||||
peer->flags |= PEER_F_TEACH_PROCESS;
|
||||
peer->flags &= ~PEER_F_SYNCHED;
|
||||
TRACE_STATE("peer elected to teach lesson to lacal peer", PEERS_EV_SESS_NEW, NULL, peer);
|
||||
}
|
||||
|
||||
/* Mark the peer as starting and wait the sync task */
|
||||
peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
|
||||
peer->appstate = PEER_APP_ST_STARTING;
|
||||
@ -3678,9 +3505,10 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers,
|
||||
|
||||
/* Awake session if there is data to push */
|
||||
for (st = peer->tables; st ; st = st->next) {
|
||||
if (st->last_pushed != st->table->localupdate) {
|
||||
/* wake up the peer handler to push local updates */
|
||||
if (st->last_update != st->table->last_update) {
|
||||
update_to_push = 1;
|
||||
|
||||
/* wake up the peer handler to push local updates */
|
||||
/* There is no need to send a heartbeat message
|
||||
* when some updates must be pushed. The remote
|
||||
* peer will consider <peer> peer as alive when it will
|
||||
@ -3867,10 +3695,10 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
|
||||
/* current peer connection is active and established
|
||||
* wake up all peer handlers to push remaining local updates */
|
||||
for (st = peer->tables; st ; st = st->next) {
|
||||
if (st->last_pushed != st->table->localupdate) {
|
||||
appctx_wakeup(peer->appctx);
|
||||
break;
|
||||
}
|
||||
if (st->last_update != st->table->last_update) {
|
||||
appctx_wakeup(peer->appctx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
|
||||
@ -4102,6 +3930,15 @@ int peers_register_table(struct peers *peers, struct stktable *table)
|
||||
id = curpeer->tables->local_id;
|
||||
st->local_id = id + 1;
|
||||
|
||||
st->last = calloc(1, sizeof(*st->last));
|
||||
st->last->updt_type = STKSESS_UPDT_MARKER;
|
||||
LIST_INIT(&st->last->upd);
|
||||
LIST_APPEND(&table->updates, &st->last->upd);
|
||||
|
||||
st->end = calloc(1, sizeof(*st->end));
|
||||
st->end->updt_type = STKSESS_UPDT_MARKER;
|
||||
LIST_INIT(&st->end->upd);
|
||||
|
||||
/* If peer is local we inc table
|
||||
* refcnt to protect against flush
|
||||
* until this process pushed all
|
||||
@ -4314,11 +4151,10 @@ static int peers_dump_peer(struct buffer *msg, struct appctx *appctx, struct pee
|
||||
"flags=0x%x remote_data=0x%llx",
|
||||
st, st->local_id, st->remote_id,
|
||||
st->flags, (unsigned long long)st->remote_data);
|
||||
chunk_appendf(&trash, "\n last_acked=%u last_pushed=%u last_get=%u"
|
||||
" teaching_origin=%u",
|
||||
st->last_acked, st->last_pushed, st->last_get, st->teaching_origin);
|
||||
chunk_appendf(&trash, "\n table:%p id=%s update=%u localupdate=%u refcnt=%u",
|
||||
t, t->id, t->update, t->localupdate, t->refcnt);
|
||||
chunk_appendf(&trash, "\n last_acked=%u last_get=%u",
|
||||
st->last_acked, st->last_get);
|
||||
chunk_appendf(&trash, "\n table:%p id=%s refcnt=%u",
|
||||
t, t->id, t->refcnt);
|
||||
if (flags & PEERS_SHOW_F_DICT) {
|
||||
chunk_appendf(&trash, "\n TX dictionary cache:");
|
||||
count = 0;
|
||||
|
||||
@ -142,15 +142,15 @@ int __stksess_kill(struct stktable *t, struct stksess *ts)
|
||||
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
|
||||
return 0;
|
||||
|
||||
/* ... and that we didn't leave the update list for the tree */
|
||||
if (ts->upd.node.leaf_p) {
|
||||
/* ... and that we didn't leave the update list */
|
||||
if (LIST_INLIST(&ts->upd)) {
|
||||
updt_locked = 1;
|
||||
HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
|
||||
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
|
||||
goto out_unlock;
|
||||
}
|
||||
eb32_delete(&ts->exp);
|
||||
eb32_delete(&ts->upd);
|
||||
LIST_DEL_INIT(&ts->upd);
|
||||
ebmb_delete(&ts->key);
|
||||
__stksess_free(t, ts);
|
||||
|
||||
@ -269,7 +269,7 @@ static struct stksess *__stksess_init(struct stktable *t, struct stksess * ts)
|
||||
ts->seen = 0;
|
||||
ts->key.node.leaf_p = NULL;
|
||||
ts->exp.node.leaf_p = NULL;
|
||||
ts->upd.node.leaf_p = NULL;
|
||||
LIST_INIT(&ts->upd);
|
||||
ts->updt_type = STKSESS_UPDT_NONE;
|
||||
MT_LIST_INIT(&ts->pend_updts);
|
||||
ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
|
||||
@ -400,7 +400,7 @@ int stktable_trash_oldest(struct stktable *t)
|
||||
/* session expired, trash it */
|
||||
ebmb_delete(&ts->key);
|
||||
MT_LIST_DELETE(&ts->pend_updts);
|
||||
eb32_delete(&ts->upd);
|
||||
LIST_DEL_INIT(&ts->upd);
|
||||
__stksess_free(t, ts);
|
||||
batched++;
|
||||
done_per_shard++;
|
||||
@ -618,7 +618,7 @@ struct stksess *stktable_lookup(struct stktable *t, struct stksess *ts)
|
||||
|
||||
/* Update the expiration timer for <ts> but do not touch its expiration node.
|
||||
* The table's expiration timer is updated if set.
|
||||
* The node will be also inserted into the update tree if needed, at a position
|
||||
* The node will be also inserted into the update list if needed, at a position
|
||||
* depending if the update is a local or coming from a remote node.
|
||||
* If <decrefcnt> is set, the ts entry's ref_cnt will be decremented. The table's
|
||||
* updt_lock may be taken for writes.
|
||||
@ -639,13 +639,13 @@ void stktable_touch_with_exp(struct stktable *t, struct stksess *ts, int local,
|
||||
/* Check if this entry is not in the tree or not
|
||||
* scheduled for at least one peer.
|
||||
*/
|
||||
if (!ts->upd.node.leaf_p || _HA_ATOMIC_LOAD(&ts->seen)) {
|
||||
if (!LIST_INLIST(&ts->upd) || _HA_ATOMIC_LOAD(&ts->seen)) {
|
||||
_HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_LOCAL);
|
||||
did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (!ts->upd.node.leaf_p) {
|
||||
if (!LIST_INLIST(&ts->upd)) {
|
||||
_HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_REMOTE);
|
||||
did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
|
||||
}
|
||||
@ -836,8 +836,7 @@ struct stksess *stktable_get_entry(struct stktable *table, struct stktable_key *
|
||||
struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int state)
|
||||
{
|
||||
struct stktable *table = ctx;
|
||||
struct eb32_node *eb;
|
||||
int i = 0, is_local, cur_tgid = tgid - 1, empty_tgid = 0;
|
||||
int i = 0, cur_tgid = tgid - 1, empty_tgid = 0;
|
||||
|
||||
/* we really don't want to wait on this one */
|
||||
if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &table->updt_lock) != 0)
|
||||
@ -860,27 +859,12 @@ struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int s
|
||||
empty_tgid = 0;
|
||||
if (cur_tgid == global.nbtgroups)
|
||||
cur_tgid = 0;
|
||||
is_local = (stksess->updt_type == STKSESS_UPDT_LOCAL);
|
||||
stksess->seen = 0;
|
||||
if (is_local) {
|
||||
stksess->upd.key = ++table->update;
|
||||
table->localupdate = table->update;
|
||||
eb32_delete(&stksess->upd);
|
||||
} else {
|
||||
stksess->upd.key = (++table->update) + (2147483648U);
|
||||
}
|
||||
if (stksess->updt_type == STKSESS_UPDT_LOCAL)
|
||||
table->last_update++;
|
||||
LIST_DEL_INIT(&stksess->upd);
|
||||
LIST_APPEND(&table->updates, &stksess->upd);
|
||||
|
||||
/* even though very unlikely, it seldom happens that the entry
|
||||
* is already in the tree (both for local and remote ones). We
|
||||
* must dequeue it and requeue it at its new position (e.g. it
|
||||
* might already have been seen by some peers).
|
||||
*/
|
||||
eb32_delete(&stksess->upd);
|
||||
eb = eb32_insert(&table->updates, &stksess->upd);
|
||||
if (eb != &stksess->upd) {
|
||||
eb32_delete(eb);
|
||||
eb32_insert(&table->updates, &stksess->upd);
|
||||
}
|
||||
/*
|
||||
* Now that we're done inserting the stksess, unlock it.
|
||||
* It is kept locked here to prevent a race condition
|
||||
@ -1092,7 +1076,7 @@ struct task *process_tables_expire(struct task *task, void *context, unsigned in
|
||||
/* session expired, trash it */
|
||||
ebmb_delete(&ts->key);
|
||||
MT_LIST_DELETE(&ts->pend_updts);
|
||||
eb32_delete(&ts->upd);
|
||||
LIST_DEL_INIT(&ts->upd);
|
||||
__stksess_free(t, ts);
|
||||
}
|
||||
|
||||
@ -1175,7 +1159,7 @@ int stktable_init(struct stktable *t, char **err_msg)
|
||||
MT_LIST_INIT(&t->shards[shard].in_bucket_toadd);
|
||||
}
|
||||
|
||||
t->updates = EB_ROOT_UNIQUE;
|
||||
LIST_INIT(&t->updates);
|
||||
|
||||
t->pool = create_pool("sticktables", sizeof(struct stksess) + round_ptr_size(t->data_size) + t->key_size, MEM_F_SHARED);
|
||||
|
||||
@ -1222,6 +1206,7 @@ int stktable_init(struct stktable *t, char **err_msg)
|
||||
for (i = 0; i < global.nbtgroups; i++)
|
||||
MT_LIST_INIT(&t->pend_updts[i]);
|
||||
t->updt_task = tasklet_new();
|
||||
t->last_update = 0;
|
||||
if (!t->updt_task)
|
||||
goto mem_error;
|
||||
t->updt_task->context = t;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user