mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-12-04 01:01:00 +01:00
MAJOR: peers: Remove the update lock by using a mt-list to deal with updates
In this patch, the update tree is replaced by a mt-list. It is a huge patch with several changes. Main ones are in the function sending updates. By using a list instead of a tree, we loose the order between updates and the ability to restart for a given update using its id (the key to index updates in the tree). However, to use the tree, it had to be locked and it was a cause of contention between threads, leading the watchdog to kill the process in worst cases. Because the idea it to split the updates by buckets to divide the contention on updated, the order between updates will be lost anyway. So, the tree can be replaced by a list. By using a mt-list, we can also remove the update lock. To be able to use a list instead of a tree, each peer must save its position in the list, to be able to process new entries only at each loop. These marker are "special" sticky session of type STKSESS_UPDT_MARKER. Of course, these marker are not in any stick-tables but only in updates lists. And only the ownr of a marker can move it in the list. Each peer owns two markers for each list (so two markers per shared table). The first one used a start point for a loop, and the other one used as stop point. The way these marker are moved in the list is not obvious, especially for the first one. Updates sent during a full resync are now handled exactly a the same way than other updates. Only the moment the stop marker is set is different.
This commit is contained in:
parent
8d85f724fd
commit
adf6a167a4
@ -138,12 +138,12 @@ struct shared_table {
|
|||||||
int remote_id;
|
int remote_id;
|
||||||
int flags;
|
int flags;
|
||||||
unsigned int update_id;
|
unsigned int update_id;
|
||||||
|
struct stksess *last;
|
||||||
|
struct stksess *end;
|
||||||
uint64_t remote_data;
|
uint64_t remote_data;
|
||||||
unsigned int remote_data_nbelem[STKTABLE_DATA_TYPES];
|
unsigned int remote_data_nbelem[STKTABLE_DATA_TYPES];
|
||||||
unsigned int last_acked;
|
unsigned int last_acked;
|
||||||
unsigned int last_pushed;
|
|
||||||
unsigned int last_get;
|
unsigned int last_get;
|
||||||
unsigned int teaching_origin;
|
|
||||||
struct shared_table *next; /* next shared table in list */
|
struct shared_table *next; /* next shared table in list */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -160,8 +160,8 @@ struct stksess {
|
|||||||
int shard; /* shard number used by peers */
|
int shard; /* shard number used by peers */
|
||||||
int seen; /* 0 only when no peer has seen this entry yet */
|
int seen; /* 0 only when no peer has seen this entry yet */
|
||||||
struct eb32_node exp; /* ebtree node used to hold the session in expiration tree */
|
struct eb32_node exp; /* ebtree node used to hold the session in expiration tree */
|
||||||
struct eb32_node upd; /* ebtree node used to hold the update sequence tree */
|
struct mt_list upd; /* entry in the table's update sequence list */
|
||||||
struct mt_list pend_updts;/* list of entries to be inserted/moved in the update sequence tree */
|
struct mt_list pend_updts;/* entry in list of pending updates */
|
||||||
unsigned int updt_type; /* One of STKSESS_UPDT_* value */
|
unsigned int updt_type; /* One of STKSESS_UPDT_* value */
|
||||||
struct ebmb_node key; /* ebtree node used to hold the session in table */
|
struct ebmb_node key; /* ebtree node used to hold the session in table */
|
||||||
/* WARNING! do not put anything after <keys>, it's used by the key */
|
/* WARNING! do not put anything after <keys>, it's used by the key */
|
||||||
@ -231,17 +231,11 @@ struct stktable {
|
|||||||
|
|
||||||
THREAD_ALIGN(64);
|
THREAD_ALIGN(64);
|
||||||
|
|
||||||
struct eb_root updates; /* head of sticky updates sequence tree, uses updt_lock */
|
struct mt_list updates; /* list of sticky updates sequence */
|
||||||
struct mt_list *pend_updts; /* list of updates to be added to the update sequence tree, one per thread-group */
|
struct mt_list *pend_updts; /* list of updates to be added to the update sequence list, one per thread-group */
|
||||||
unsigned int update; /* uses updt_lock */
|
|
||||||
unsigned int localupdate; /* uses updt_lock */
|
|
||||||
struct tasklet *updt_task;/* tasklet responsible for pushing the pending updates into the tree */
|
struct tasklet *updt_task;/* tasklet responsible for pushing the pending updates into the tree */
|
||||||
|
|
||||||
THREAD_ALIGN(64);
|
/* rarely used config stuff below */
|
||||||
/* this lock is heavily used and must be on its own cache line */
|
|
||||||
__decl_thread(HA_RWLOCK_T updt_lock); /* lock protecting the updates part */
|
|
||||||
|
|
||||||
/* rarely used config stuff below (should not interfere with updt_lock) */
|
|
||||||
struct proxy *proxies_list; /* The list of proxies which reference this stick-table. */
|
struct proxy *proxies_list; /* The list of proxies which reference this stick-table. */
|
||||||
struct {
|
struct {
|
||||||
const char *file; /* The file where the stick-table is declared (global name). */
|
const char *file; /* The file where the stick-table is declared (global name). */
|
||||||
|
|||||||
417
src/peers.c
417
src/peers.c
@ -52,9 +52,6 @@
|
|||||||
/***********************************/
|
/***********************************/
|
||||||
/* Current shared table sync state */
|
/* Current shared table sync state */
|
||||||
/***********************************/
|
/***********************************/
|
||||||
#define SHTABLE_F_TEACH_STAGE1 0x00000001 /* Teach state 1 complete */
|
|
||||||
#define SHTABLE_F_TEACH_STAGE2 0x00000002 /* Teach state 2 complete */
|
|
||||||
|
|
||||||
|
|
||||||
#define PEER_RESYNC_TIMEOUT 5000 /* 5 seconds */
|
#define PEER_RESYNC_TIMEOUT 5000 /* 5 seconds */
|
||||||
#define PEER_RECONNECT_TIMEOUT 5000 /* 5 seconds */
|
#define PEER_RECONNECT_TIMEOUT 5000 /* 5 seconds */
|
||||||
@ -1519,91 +1516,20 @@ static inline int peer_send_error_protomsg(struct appctx *appctx)
|
|||||||
return peer_send_msg(appctx, peer_prepare_error_msg, &p);
|
return peer_send_msg(appctx, peer_prepare_error_msg, &p);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Function used to lookup for recent stick-table updates associated with
|
|
||||||
* <st> shared stick-table when a lesson must be taught a peer (learn state is not PEER_LR_ST_NOTASSIGNED).
|
|
||||||
*/
|
|
||||||
static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st)
|
|
||||||
{
|
|
||||||
struct eb32_node *eb;
|
|
||||||
struct stksess *ret;
|
|
||||||
|
|
||||||
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
|
||||||
if (!eb) {
|
|
||||||
eb = eb32_first(&st->table->updates);
|
|
||||||
if (!eb || (eb->key == st->last_pushed)) {
|
|
||||||
st->last_pushed = st->table->localupdate;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* if distance between the last pushed and the retrieved key
|
|
||||||
* is greater than the distance last_pushed and the local_update
|
|
||||||
* this means we are beyond localupdate.
|
|
||||||
*/
|
|
||||||
if ((eb->key - st->last_pushed) > (st->table->localupdate - st->last_pushed)) {
|
|
||||||
st->last_pushed = st->table->localupdate;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = eb32_entry(eb, struct stksess, upd);
|
|
||||||
if (!_HA_ATOMIC_LOAD(&ret->seen))
|
|
||||||
_HA_ATOMIC_STORE(&ret->seen, 1);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Function used to lookup for recent stick-table updates associated with
|
|
||||||
* <st> shared stick-table during teach state 1 step.
|
|
||||||
*/
|
|
||||||
static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st)
|
|
||||||
{
|
|
||||||
struct eb32_node *eb;
|
|
||||||
struct stksess *ret;
|
|
||||||
|
|
||||||
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
|
||||||
if (!eb) {
|
|
||||||
st->flags |= SHTABLE_F_TEACH_STAGE1;
|
|
||||||
eb = eb32_first(&st->table->updates);
|
|
||||||
if (eb)
|
|
||||||
st->last_pushed = eb->key - 1;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = eb32_entry(eb, struct stksess, upd);
|
|
||||||
if (!_HA_ATOMIC_LOAD(&ret->seen))
|
|
||||||
_HA_ATOMIC_STORE(&ret->seen, 1);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Function used to lookup for recent stick-table updates associated with
|
|
||||||
* <st> shared stick-table during teach state 2 step.
|
|
||||||
*/
|
|
||||||
static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st)
|
|
||||||
{
|
|
||||||
struct eb32_node *eb;
|
|
||||||
struct stksess *ret;
|
|
||||||
|
|
||||||
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
|
||||||
if (!eb || eb->key > st->teaching_origin) {
|
|
||||||
st->flags |= SHTABLE_F_TEACH_STAGE2;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = eb32_entry(eb, struct stksess, upd);
|
|
||||||
if (!_HA_ATOMIC_LOAD(&ret->seen))
|
|
||||||
_HA_ATOMIC_STORE(&ret->seen, 1);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Generic function to emit update messages for <st> stick-table when a lesson must
|
* Generic function to emit update messages for <st> stick-table when a lesson must
|
||||||
* be taught to the peer <p>.
|
* be taught to the peer <p>.
|
||||||
*
|
*
|
||||||
* This function temporary unlock/lock <st> when it sends stick-table updates or
|
* This function loops on the stick-table update list from the <last> to the
|
||||||
* when decrementing its refcount in case of any error when it sends this updates.
|
* <end> markers of the peer shared table. Before starting the loop, the <end>
|
||||||
* It must be called with the stick-table lock released.
|
* marker, if not in the update list, is appended to the end of the list. If the
|
||||||
|
* loop is interrupted, the <last> maker is inserted before the current sticky
|
||||||
|
* session. It is the restart point for the next time. If the <end> marker is
|
||||||
|
* reached, it means all updated that should be send were send. The <last>
|
||||||
|
* marker is move just after <end> marker and this last one is removed from the
|
||||||
|
* update list.
|
||||||
|
*
|
||||||
|
* When a sticky session is processed, the element is locked.
|
||||||
*
|
*
|
||||||
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
|
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
|
||||||
* Returns -1 if there was not enough room left to send the message,
|
* Returns -1 if there was not enough room left to send the message,
|
||||||
@ -1612,86 +1538,98 @@ static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_tab
|
|||||||
* If it returns 0 or -1, this function leave <st> locked if already locked when entering this function
|
* If it returns 0 or -1, this function leave <st> locked if already locked when entering this function
|
||||||
* unlocked if not already locked when entering this function.
|
* unlocked if not already locked when entering this function.
|
||||||
*/
|
*/
|
||||||
int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
|
int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, struct shared_table *st)
|
||||||
struct stksess *(*peer_stksess_lookup)(struct shared_table *),
|
|
||||||
struct shared_table *st)
|
|
||||||
{
|
{
|
||||||
int ret, new_pushed, use_timed;
|
int ret, new_pushed, use_timed;
|
||||||
int updates_sent = 0;
|
int updates_sent = 0;
|
||||||
int failed_once = 0;
|
struct stksess *ts = NULL;
|
||||||
|
struct mt_list back;
|
||||||
|
|
||||||
TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st);
|
TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st);
|
||||||
|
|
||||||
ret = 1;
|
ret = 1;
|
||||||
use_timed = 0;
|
use_timed = 0;
|
||||||
if (st != p->last_local_table) {
|
|
||||||
ret = peer_send_switchmsg(st, appctx);
|
|
||||||
if (ret <= 0)
|
|
||||||
goto out;
|
|
||||||
|
|
||||||
p->last_local_table = st;
|
if (!(p->flags & PEER_F_TEACH_PROCESS))
|
||||||
}
|
|
||||||
|
|
||||||
if (peer_stksess_lookup != peer_teach_process_stksess_lookup)
|
|
||||||
use_timed = !(p->flags & PEER_F_DWNGRD);
|
use_timed = !(p->flags & PEER_F_DWNGRD);
|
||||||
|
|
||||||
/* We force new pushed to 1 to force identifier in update message */
|
/* We force new pushed to 1 to force identifier in update message */
|
||||||
new_pushed = 1;
|
new_pushed = 1;
|
||||||
|
|
||||||
if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
|
MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
|
||||||
/* just don't engage here if there is any contention */
|
if (ts == st->end) {
|
||||||
applet_have_more_data(appctx);
|
|
||||||
ret = -1;
|
|
||||||
goto out_unlocked;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
struct stksess *ts;
|
|
||||||
unsigned last_pushed;
|
|
||||||
|
|
||||||
/* push local updates */
|
|
||||||
ts = peer_stksess_lookup(st);
|
|
||||||
if (!ts) {
|
|
||||||
ret = 1; // done
|
ret = 1; // done
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
last_pushed = ts->upd.key;
|
if (!(p->flags & PEER_F_TEACH_PROCESS) && ts->updt_type != STKSESS_UPDT_LOCAL)
|
||||||
if (p->srv->shard && ts->shard != p->srv->shard) {
|
|
||||||
/* Skip this entry */
|
|
||||||
st->last_pushed = last_pushed;
|
|
||||||
new_pushed = 1;
|
|
||||||
continue;
|
continue;
|
||||||
|
else if (ts->updt_type != STKSESS_UPDT_LOCAL && ts->updt_type != STKSESS_UPDT_REMOTE)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (st != p->last_local_table) {
|
||||||
|
ret = peer_send_switchmsg(st, appctx);
|
||||||
|
if (ret <= 0)
|
||||||
|
break;
|
||||||
|
|
||||||
|
p->last_local_table = st;
|
||||||
}
|
}
|
||||||
|
|
||||||
HA_ATOMIC_INC(&ts->ref_cnt);
|
if (!_HA_ATOMIC_LOAD(&ts->seen))
|
||||||
HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
_HA_ATOMIC_STORE(&ts->seen, 1);
|
||||||
|
|
||||||
ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
|
if (p->srv->shard && ts->shard != p->srv->shard)
|
||||||
|
continue;
|
||||||
|
|
||||||
if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
|
if (updates_sent >= peers_max_updates_at_once) {
|
||||||
if (failed_once) {
|
applet_have_more_data(appctx);
|
||||||
/* we've already faced contention twice in this
|
ret = -1;
|
||||||
* loop, this is getting serious, do not insist
|
}
|
||||||
* anymore and come back later
|
else {
|
||||||
*/
|
HA_ATOMIC_INC(&ts->ref_cnt);
|
||||||
HA_ATOMIC_DEC(&ts->ref_cnt);
|
ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
|
||||||
applet_have_more_data(appctx);
|
HA_ATOMIC_DEC(&ts->ref_cnt);
|
||||||
ret = -1;
|
}
|
||||||
goto out_unlocked;
|
|
||||||
}
|
if (ret <= 0) {
|
||||||
/* OK contention happens, for this one we'll wait on the
|
/* Insert <last> marker before <ts> to process it again
|
||||||
* lock, but only once.
|
* on next iteration. <ts> remains locked
|
||||||
|
*
|
||||||
|
* Before:
|
||||||
|
* <back>: <A> <==============> <C>
|
||||||
|
*
|
||||||
|
* <last> <==> ... <A> <==> x x <==> <C> ...
|
||||||
|
*
|
||||||
|
* x <==> TS <==> x
|
||||||
|
*
|
||||||
|
* <back>.prev == <A> and <back>.prev->next == MT_LIST_BUSY
|
||||||
|
*
|
||||||
|
* Step 1: detach <last> from the list
|
||||||
|
* Step 2: insert <last> after <A>
|
||||||
|
* - <last>.prev = <A> (or <back>.prev)
|
||||||
|
* - <last>.next = MT_LIST_BUSY (or <back>.prev->next)
|
||||||
|
* Step 3: move <A> before <last>
|
||||||
|
* - <A>.next = <last> (or back.prev->next = <last>)
|
||||||
|
*
|
||||||
|
* Step 4: update <back>.prev to point on <last>
|
||||||
|
*
|
||||||
|
* After:
|
||||||
|
* <back>: <last> <==============> <C>
|
||||||
|
*
|
||||||
|
* ... <A> <==> <last> <==> x x <==> <C> ...
|
||||||
|
*
|
||||||
|
* x <==> TS <==> x
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
failed_once++;
|
MT_LIST_DELETE(&st->last->upd);
|
||||||
HA_RWLOCK_RDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
st->last->upd.next = back.prev->next; /* == MT_LIST_BUSY */
|
||||||
|
__ha_barrier_atomic_store();
|
||||||
|
st->last->upd.prev = back.prev;
|
||||||
|
back.prev->next = &st->last->upd;
|
||||||
|
back.prev = &st->last->upd;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
HA_ATOMIC_DEC(&ts->ref_cnt);
|
|
||||||
if (ret <= 0)
|
|
||||||
break;
|
|
||||||
|
|
||||||
st->last_pushed = last_pushed;
|
|
||||||
p->last.table = st;
|
p->last.table = st;
|
||||||
p->last.id = st->update_id;
|
p->last.id = st->update_id;
|
||||||
st->update_id++;
|
st->update_id++;
|
||||||
@ -1699,76 +1637,19 @@ int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
|
|||||||
|
|
||||||
/* identifier may not needed in next update message */
|
/* identifier may not needed in next update message */
|
||||||
new_pushed = 0;
|
new_pushed = 0;
|
||||||
|
|
||||||
updates_sent++;
|
updates_sent++;
|
||||||
if (updates_sent >= peers_max_updates_at_once) {
|
|
||||||
applet_have_more_data(appctx);
|
|
||||||
ret = -1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
out:
|
if (ret == 1) {
|
||||||
HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
MT_LIST_DELETE(&st->last->upd);
|
||||||
out_unlocked:
|
MT_LIST_APPEND(&st->end->upd, &st->last->upd);
|
||||||
|
MT_LIST_DELETE(&st->end->upd);
|
||||||
|
}
|
||||||
|
|
||||||
TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st);
|
TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Function to emit update messages for <st> stick-table when a lesson must
|
|
||||||
* be taught to the peer <p> (learn state is not PEER_LR_ST_NOTASSIGNED).
|
|
||||||
*
|
|
||||||
* Note that <st> shared stick-table is locked when calling this function, and
|
|
||||||
* the lock is dropped then re-acquired.
|
|
||||||
*
|
|
||||||
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
|
|
||||||
* Returns -1 if there was not enough room left to send the message,
|
|
||||||
* any other negative returned value must be considered as an error with an appcxt st0
|
|
||||||
* returned value equal to PEER_SESS_ST_END.
|
|
||||||
*/
|
|
||||||
static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p,
|
|
||||||
struct shared_table *st)
|
|
||||||
{
|
|
||||||
TRACE_PROTO("send teach process messages", PEERS_EV_SESS_IO, appctx, p, st);
|
|
||||||
return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Function to emit update messages for <st> stick-table when a lesson must
|
|
||||||
* be taught to the peer <p> during teach state 1 step. It must be called with
|
|
||||||
* the stick-table lock released.
|
|
||||||
*
|
|
||||||
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
|
|
||||||
* Returns -1 if there was not enough room left to send the message,
|
|
||||||
* any other negative returned value must be considered as an error with an appcxt st0
|
|
||||||
* returned value equal to PEER_SESS_ST_END.
|
|
||||||
*/
|
|
||||||
static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p,
|
|
||||||
struct shared_table *st)
|
|
||||||
{
|
|
||||||
TRACE_PROTO("send teach stage1 messages", PEERS_EV_SESS_IO, appctx, p, st);
|
|
||||||
return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Function to emit update messages for <st> stick-table when a lesson must
|
|
||||||
* be taught to the peer <p> during teach state 1 step. It must be called with
|
|
||||||
* the stick-table lock released.
|
|
||||||
*
|
|
||||||
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
|
|
||||||
* Returns -1 if there was not enough room left to send the message,
|
|
||||||
* any other negative returned value must be considered as an error with an appcxt st0
|
|
||||||
* returned value equal to PEER_SESS_ST_END.
|
|
||||||
*/
|
|
||||||
static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p,
|
|
||||||
struct shared_table *st)
|
|
||||||
{
|
|
||||||
TRACE_PROTO("send teach stage2 messages", PEERS_EV_SESS_IO, appctx, p, st);
|
|
||||||
return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Function used to parse a stick-table update message after it has been received
|
* Function used to parse a stick-table update message after it has been received
|
||||||
* by <p> peer with <msg_cur> as address of the pointer to the position in the
|
* by <p> peer with <msg_cur> as address of the pointer to the position in the
|
||||||
@ -2578,7 +2459,10 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
|||||||
TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
|
TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
|
||||||
/* prepare tables for a global push */
|
/* prepare tables for a global push */
|
||||||
for (st = peer->tables; st; st = st->next) {
|
for (st = peer->tables; st; st = st->next) {
|
||||||
st->teaching_origin = st->last_pushed;
|
MT_LIST_DELETE(&st->last->upd);
|
||||||
|
MT_LIST_INSERT(&st->table->updates, &st->last->upd);
|
||||||
|
MT_LIST_DELETE(&st->end->upd);
|
||||||
|
MT_LIST_APPEND(&st->table->updates, &st->end->upd);
|
||||||
st->flags = 0;
|
st->flags = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2622,10 +2506,8 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
peer->flags |= PEER_F_SYNCHED;
|
peer->flags |= PEER_F_SYNCHED;
|
||||||
for (st = peer->tables; st; st = st->next) {
|
for (st = peer->tables; st; st = st->next)
|
||||||
st->last_pushed = st->teaching_origin;
|
MT_LIST_DELETE(&st->end->upd);
|
||||||
st->flags = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* reset teaching flags to 0 */
|
/* reset teaching flags to 0 */
|
||||||
peer->flags &= ~PEER_TEACH_FLAGS;
|
peer->flags &= ~PEER_TEACH_FLAGS;
|
||||||
@ -2745,41 +2627,14 @@ int peer_send_msgs(struct appctx *appctx,
|
|||||||
st->last_acked = st->last_get;
|
st->last_acked = st->last_get;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(peer->flags & PEER_F_TEACH_PROCESS)) {
|
if ((!(peer->flags & PEER_F_TEACH_PROCESS) && (peer->learnstate == PEER_LR_ST_NOTASSIGNED)) ||
|
||||||
int must_send;
|
(peer->flags & (PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED)) == PEER_F_TEACH_PROCESS) {
|
||||||
|
TRACE_PROTO("send teach messages", PEERS_EV_SESS_IO, appctx, peer, st);
|
||||||
if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock)) {
|
repl = peer_send_teachmsgs(appctx, peer, st);
|
||||||
applet_have_more_data(appctx);
|
if (repl <= 0) {
|
||||||
repl = -1;
|
peer->stop_local_table = peer->last_local_table;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
must_send = (peer->learnstate == PEER_LR_ST_NOTASSIGNED) && (st->last_pushed != st->table->localupdate);
|
|
||||||
HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
|
||||||
|
|
||||||
if (must_send) {
|
|
||||||
repl = peer_send_teach_process_msgs(appctx, peer, st);
|
|
||||||
if (repl <= 0) {
|
|
||||||
peer->stop_local_table = peer->last_local_table;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (!(peer->flags & PEER_F_TEACH_FINISHED)) {
|
|
||||||
if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
|
|
||||||
repl = peer_send_teach_stage1_msgs(appctx, peer, st);
|
|
||||||
if (repl <= 0) {
|
|
||||||
peer->stop_local_table = peer->last_local_table;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
|
|
||||||
repl = peer_send_teach_stage2_msgs(appctx, peer, st);
|
|
||||||
if (repl <= 0) {
|
|
||||||
peer->stop_local_table = peer->last_local_table;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (st == last_local_table) {
|
if (st == last_local_table) {
|
||||||
@ -2955,28 +2810,25 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
|
|||||||
TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer);
|
TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer);
|
||||||
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||||
|
|
||||||
|
if (peer->local && !(appctx_is_back(peer->appctx))) {
|
||||||
|
/* If the local peer has established the connection (appctx is
|
||||||
|
* on the frontend side), flag it to start to teach lesson.
|
||||||
|
*/
|
||||||
|
peer->flags |= PEER_F_TEACH_PROCESS;
|
||||||
|
peer->flags &= ~PEER_F_SYNCHED;
|
||||||
|
TRACE_STATE("peer elected to teach lesson to lacal peer", PEERS_EV_SESS_NEW, NULL, peer);
|
||||||
|
}
|
||||||
|
|
||||||
/* Init cursors */
|
/* Init cursors */
|
||||||
for (st = peer->tables; st ; st = st->next) {
|
for (st = peer->tables; st ; st = st->next) {
|
||||||
st->last_get = st->last_acked = 0;
|
st->last_get = st->last_acked = 0;
|
||||||
|
|
||||||
HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
|
||||||
/* if st->last_pushed appears to be in future it means
|
|
||||||
* that the last update is very old and we
|
|
||||||
* remain unconnected a too long time to use this
|
|
||||||
* acknowledgement as a reset.
|
|
||||||
* We should update the protocol to be able to
|
|
||||||
* signal the remote peer that it needs a full resync.
|
|
||||||
* Here a partial fix consist to set st->last_pushed at
|
|
||||||
* the max past value.
|
|
||||||
*/
|
|
||||||
if (!(peer->flags & PEER_F_SYNCHED) || (int)(st->table->localupdate - st->last_pushed) < 0) {
|
|
||||||
st->last_pushed = st->table->localupdate + (2147483648U);
|
|
||||||
peer->flags &= ~PEER_F_SYNCHED;
|
|
||||||
}
|
|
||||||
st->teaching_origin = st->last_pushed;
|
|
||||||
st->flags = 0;
|
st->flags = 0;
|
||||||
|
|
||||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
|
if (!MT_LIST_INLIST(&st->last->upd) || !(peer->flags & PEER_F_SYNCHED)) {
|
||||||
|
MT_LIST_DELETE(&st->last->upd);
|
||||||
|
MT_LIST_INSERT(&st->table->updates, &st->last->upd);
|
||||||
|
}
|
||||||
|
MT_LIST_DELETE(&st->end->upd);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Awake main task to ack the new peer state */
|
/* Awake main task to ack the new peer state */
|
||||||
@ -2988,15 +2840,6 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
|
|||||||
/* reset teaching flags to 0 */
|
/* reset teaching flags to 0 */
|
||||||
peer->flags &= ~PEER_TEACH_FLAGS;
|
peer->flags &= ~PEER_TEACH_FLAGS;
|
||||||
|
|
||||||
if (peer->local && !(appctx_is_back(peer->appctx))) {
|
|
||||||
/* If the local peer has established the connection (appctx is
|
|
||||||
* on the frontend side), flag it to start to teach lesson.
|
|
||||||
*/
|
|
||||||
peer->flags |= PEER_F_TEACH_PROCESS;
|
|
||||||
peer->flags &= ~PEER_F_SYNCHED;
|
|
||||||
TRACE_STATE("peer elected to teach lesson to lacal peer", PEERS_EV_SESS_NEW, NULL, peer);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Mark the peer as starting and wait the sync task */
|
/* Mark the peer as starting and wait the sync task */
|
||||||
peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
|
peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
|
||||||
peer->appstate = PEER_APP_ST_STARTING;
|
peer->appstate = PEER_APP_ST_STARTING;
|
||||||
@ -3678,9 +3521,20 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers,
|
|||||||
|
|
||||||
/* Awake session if there is data to push */
|
/* Awake session if there is data to push */
|
||||||
for (st = peer->tables; st ; st = st->next) {
|
for (st = peer->tables; st ; st = st->next) {
|
||||||
if (st->last_pushed != st->table->localupdate) {
|
struct stksess *ts;
|
||||||
|
struct mt_list back;
|
||||||
|
|
||||||
|
// TODO: may be handled via an atomic flags ?
|
||||||
|
MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
|
||||||
|
if (&ts->upd == &st->table->updates)
|
||||||
|
break;
|
||||||
|
if (&ts->upd != &st->table->updates && ts->updt_type == STKSESS_UPDT_LOCAL) {
|
||||||
|
update_to_push = 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (update_to_push == 1) {
|
||||||
/* wake up the peer handler to push local updates */
|
/* wake up the peer handler to push local updates */
|
||||||
update_to_push = 1;
|
|
||||||
/* There is no need to send a heartbeat message
|
/* There is no need to send a heartbeat message
|
||||||
* when some updates must be pushed. The remote
|
* when some updates must be pushed. The remote
|
||||||
* peer will consider <peer> peer as alive when it will
|
* peer will consider <peer> peer as alive when it will
|
||||||
@ -3867,7 +3721,17 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
|
|||||||
/* current peer connection is active and established
|
/* current peer connection is active and established
|
||||||
* wake up all peer handlers to push remaining local updates */
|
* wake up all peer handlers to push remaining local updates */
|
||||||
for (st = peer->tables; st ; st = st->next) {
|
for (st = peer->tables; st ; st = st->next) {
|
||||||
if (st->last_pushed != st->table->localupdate) {
|
struct stksess *ts;
|
||||||
|
struct mt_list back;
|
||||||
|
int wakeup = 0;
|
||||||
|
|
||||||
|
MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
|
||||||
|
if (&ts->upd != &st->table->updates && ts->updt_type == STKSESS_UPDT_LOCAL) {
|
||||||
|
wakeup = 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (wakeup) {
|
||||||
appctx_wakeup(peer->appctx);
|
appctx_wakeup(peer->appctx);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -4102,6 +3966,14 @@ int peers_register_table(struct peers *peers, struct stktable *table)
|
|||||||
id = curpeer->tables->local_id;
|
id = curpeer->tables->local_id;
|
||||||
st->local_id = id + 1;
|
st->local_id = id + 1;
|
||||||
|
|
||||||
|
st->last = calloc(1, sizeof(*st->last));
|
||||||
|
st->last->updt_type = STKSESS_UPDT_MARKER;
|
||||||
|
MT_LIST_INIT(&st->last->upd);
|
||||||
|
|
||||||
|
st->end = calloc(1, sizeof(*st->end));
|
||||||
|
st->end->updt_type = STKSESS_UPDT_MARKER;
|
||||||
|
MT_LIST_INIT(&st->end->upd);
|
||||||
|
|
||||||
/* If peer is local we inc table
|
/* If peer is local we inc table
|
||||||
* refcnt to protect against flush
|
* refcnt to protect against flush
|
||||||
* until this process pushed all
|
* until this process pushed all
|
||||||
@ -4314,11 +4186,10 @@ static int peers_dump_peer(struct buffer *msg, struct appctx *appctx, struct pee
|
|||||||
"flags=0x%x remote_data=0x%llx",
|
"flags=0x%x remote_data=0x%llx",
|
||||||
st, st->local_id, st->remote_id,
|
st, st->local_id, st->remote_id,
|
||||||
st->flags, (unsigned long long)st->remote_data);
|
st->flags, (unsigned long long)st->remote_data);
|
||||||
chunk_appendf(&trash, "\n last_acked=%u last_pushed=%u last_get=%u"
|
chunk_appendf(&trash, "\n last_acked=%u last_get=%u",
|
||||||
" teaching_origin=%u",
|
st->last_acked, st->last_get);
|
||||||
st->last_acked, st->last_pushed, st->last_get, st->teaching_origin);
|
chunk_appendf(&trash, "\n table:%p id=%s refcnt=%u",
|
||||||
chunk_appendf(&trash, "\n table:%p id=%s update=%u localupdate=%u refcnt=%u",
|
t, t->id, t->refcnt);
|
||||||
t, t->id, t->update, t->localupdate, t->refcnt);
|
|
||||||
if (flags & PEERS_SHOW_F_DICT) {
|
if (flags & PEERS_SHOW_F_DICT) {
|
||||||
chunk_appendf(&trash, "\n TX dictionary cache:");
|
chunk_appendf(&trash, "\n TX dictionary cache:");
|
||||||
count = 0;
|
count = 0;
|
||||||
|
|||||||
@ -128,7 +128,7 @@ void stksess_free(struct stktable *t, struct stksess *ts)
|
|||||||
*/
|
*/
|
||||||
int __stksess_kill(struct stktable *t, struct stksess *ts)
|
int __stksess_kill(struct stktable *t, struct stksess *ts)
|
||||||
{
|
{
|
||||||
int updt_locked = 0;
|
struct mt_list link;
|
||||||
|
|
||||||
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
|
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
|
||||||
return 0;
|
return 0;
|
||||||
@ -141,20 +141,21 @@ int __stksess_kill(struct stktable *t, struct stksess *ts)
|
|||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
/* ... and that we didn't leave the update list for the tree */
|
/* ... and that we didn't leave the update list for the tree */
|
||||||
if (ts->upd.node.leaf_p) {
|
if (MT_LIST_INLIST(&ts->upd)) {
|
||||||
updt_locked = 1;
|
link = mt_list_lock_full(&ts->upd);
|
||||||
HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
|
if (HA_ATOMIC_LOAD(&ts->ref_cnt)) {
|
||||||
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
|
mt_list_unlock_full(&ts->upd, link);
|
||||||
goto out_unlock;
|
goto out;
|
||||||
|
}
|
||||||
|
mt_list_unlock_link(link);
|
||||||
|
mt_list_unlock_self(&ts->upd);
|
||||||
}
|
}
|
||||||
|
|
||||||
eb32_delete(&ts->exp);
|
eb32_delete(&ts->exp);
|
||||||
eb32_delete(&ts->upd);
|
|
||||||
ebmb_delete(&ts->key);
|
ebmb_delete(&ts->key);
|
||||||
__stksess_free(t, ts);
|
__stksess_free(t, ts);
|
||||||
|
|
||||||
out_unlock:
|
out:
|
||||||
if (updt_locked)
|
|
||||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -267,8 +268,8 @@ static struct stksess *__stksess_init(struct stktable *t, struct stksess * ts)
|
|||||||
ts->seen = 0;
|
ts->seen = 0;
|
||||||
ts->key.node.leaf_p = NULL;
|
ts->key.node.leaf_p = NULL;
|
||||||
ts->exp.node.leaf_p = NULL;
|
ts->exp.node.leaf_p = NULL;
|
||||||
ts->upd.node.leaf_p = NULL;
|
|
||||||
ts->updt_type = STKSESS_UPDT_NONE;
|
ts->updt_type = STKSESS_UPDT_NONE;
|
||||||
|
MT_LIST_INIT(&ts->upd);
|
||||||
MT_LIST_INIT(&ts->pend_updts);
|
MT_LIST_INIT(&ts->pend_updts);
|
||||||
ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
|
ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
|
||||||
HA_RWLOCK_INIT(&ts->lock);
|
HA_RWLOCK_INIT(&ts->lock);
|
||||||
@ -285,12 +286,12 @@ int stktable_trash_oldest(struct stktable *t)
|
|||||||
{
|
{
|
||||||
struct stksess *ts;
|
struct stksess *ts;
|
||||||
struct eb32_node *eb;
|
struct eb32_node *eb;
|
||||||
|
struct mt_list link;
|
||||||
int max_search; // no more than 50% misses
|
int max_search; // no more than 50% misses
|
||||||
int max_per_shard;
|
int max_per_shard;
|
||||||
int done_per_shard;
|
int done_per_shard;
|
||||||
int batched = 0;
|
int batched = 0;
|
||||||
int to_batch;
|
int to_batch;
|
||||||
int updt_locked;
|
|
||||||
int failed_once = 0;
|
int failed_once = 0;
|
||||||
int looped;
|
int looped;
|
||||||
int shard;
|
int shard;
|
||||||
@ -307,7 +308,6 @@ int stktable_trash_oldest(struct stktable *t)
|
|||||||
do {
|
do {
|
||||||
done_per_shard = 0;
|
done_per_shard = 0;
|
||||||
looped = 0;
|
looped = 0;
|
||||||
updt_locked = 0;
|
|
||||||
|
|
||||||
if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock) != 0) {
|
if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock) != 0) {
|
||||||
if (batched)
|
if (batched)
|
||||||
@ -379,26 +379,28 @@ int stktable_trash_oldest(struct stktable *t)
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if the entry is in the update list, we must be extremely careful
|
|
||||||
* because peers can see it at any moment and start to use it. Peers
|
|
||||||
* will take the table's updt_lock for reading when doing that, and
|
|
||||||
* with that lock held, will grab a ref_cnt before releasing the
|
|
||||||
* lock. So we must take this lock as well and check the ref_cnt.
|
|
||||||
*/
|
|
||||||
if (!updt_locked) {
|
|
||||||
updt_locked = 1;
|
|
||||||
HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
|
|
||||||
}
|
|
||||||
/* now we're locked, new peers can't grab it anymore,
|
|
||||||
* existing ones already have the ref_cnt.
|
|
||||||
*/
|
|
||||||
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
|
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
|
||||||
goto requeue;
|
goto requeue;
|
||||||
|
|
||||||
|
/* if the entry is in the update list, we must be extremely careful
|
||||||
|
* because peers can see it at any moment and start to use it. In this case,
|
||||||
|
* Peers will lock the element. So to the same here to avoid any conflict
|
||||||
|
*/
|
||||||
|
MT_LIST_DELETE(&ts->pend_updts);
|
||||||
|
|
||||||
|
if (MT_LIST_INLIST(&ts->upd)) {
|
||||||
|
link = mt_list_lock_full(&ts->upd);
|
||||||
|
if (HA_ATOMIC_LOAD(&ts->ref_cnt)) {
|
||||||
|
mt_list_unlock_full(&ts->upd, link);
|
||||||
|
goto requeue;
|
||||||
|
}
|
||||||
|
mt_list_unlock_link(link);
|
||||||
|
mt_list_unlock_self(&ts->upd);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* session expired, trash it */
|
/* session expired, trash it */
|
||||||
ebmb_delete(&ts->key);
|
ebmb_delete(&ts->key);
|
||||||
MT_LIST_DELETE(&ts->pend_updts);
|
|
||||||
eb32_delete(&ts->upd);
|
|
||||||
__stksess_free(t, ts);
|
__stksess_free(t, ts);
|
||||||
batched++;
|
batched++;
|
||||||
done_per_shard++;
|
done_per_shard++;
|
||||||
@ -408,9 +410,6 @@ int stktable_trash_oldest(struct stktable *t)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (updt_locked)
|
|
||||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
|
|
||||||
|
|
||||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
|
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
|
||||||
|
|
||||||
shard++;
|
shard++;
|
||||||
@ -605,11 +604,9 @@ struct stksess *stktable_lookup(struct stktable *t, struct stksess *ts)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Update the expiration timer for <ts> but do not touch its expiration node.
|
/* Update the expiration timer for <ts> but do not touch its expiration node.
|
||||||
* The table's expiration timer is updated if set.
|
* The table's expiration timer is updated if set. <ts> will also be inserted
|
||||||
* The node will be also inserted into the update tree if needed, at a position
|
* into the pending update list to be added later in the update list. If
|
||||||
* depending if the update is a local or coming from a remote node.
|
* <decrefcnt> is set, the <ts> entry's ref_cnt will be decremented.
|
||||||
* If <decrefcnt> is set, the ts entry's ref_cnt will be decremented. The table's
|
|
||||||
* updt_lock may be taken for writes.
|
|
||||||
*/
|
*/
|
||||||
void stktable_touch_with_exp(struct stktable *t, struct stksess *ts, int local, int expire, int decrefcnt)
|
void stktable_touch_with_exp(struct stktable *t, struct stksess *ts, int local, int expire, int decrefcnt)
|
||||||
{
|
{
|
||||||
@ -627,13 +624,13 @@ void stktable_touch_with_exp(struct stktable *t, struct stksess *ts, int local,
|
|||||||
/* Check if this entry is not in the tree or not
|
/* Check if this entry is not in the tree or not
|
||||||
* scheduled for at least one peer.
|
* scheduled for at least one peer.
|
||||||
*/
|
*/
|
||||||
if (!ts->upd.node.leaf_p || _HA_ATOMIC_LOAD(&ts->seen)) {
|
if (!MT_LIST_INLIST(&ts->upd) || _HA_ATOMIC_LOAD(&ts->seen)) {
|
||||||
_HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_LOCAL);
|
_HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_LOCAL);
|
||||||
did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
|
did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
if (!ts->upd.node.leaf_p) {
|
if (!MT_LIST_INLIST(&ts->upd)) {
|
||||||
_HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_REMOTE);
|
_HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_REMOTE);
|
||||||
did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
|
did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
|
||||||
}
|
}
|
||||||
@ -805,12 +802,7 @@ struct stksess *stktable_get_entry(struct stktable *table, struct stktable_key *
|
|||||||
struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int state)
|
struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int state)
|
||||||
{
|
{
|
||||||
struct stktable *table = ctx;
|
struct stktable *table = ctx;
|
||||||
struct eb32_node *eb;
|
int i = 0, cur_tgid = tgid - 1, empty_tgid = 0;
|
||||||
int i = 0, is_local, cur_tgid = tgid - 1, empty_tgid = 0;
|
|
||||||
|
|
||||||
/* we really don't want to wait on this one */
|
|
||||||
if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &table->updt_lock) != 0)
|
|
||||||
goto leave;
|
|
||||||
|
|
||||||
for (i = 0; i < STKTABLE_MAX_UPDATES_AT_ONCE; i++) {
|
for (i = 0; i < STKTABLE_MAX_UPDATES_AT_ONCE; i++) {
|
||||||
struct stksess *stksess = MT_LIST_POP_LOCKED(&table->pend_updts[cur_tgid], typeof(stksess), pend_updts);
|
struct stksess *stksess = MT_LIST_POP_LOCKED(&table->pend_updts[cur_tgid], typeof(stksess), pend_updts);
|
||||||
@ -829,27 +821,10 @@ struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int s
|
|||||||
empty_tgid = 0;
|
empty_tgid = 0;
|
||||||
if (cur_tgid == global.nbtgroups)
|
if (cur_tgid == global.nbtgroups)
|
||||||
cur_tgid = 0;
|
cur_tgid = 0;
|
||||||
is_local = (stksess->updt_type == STKSESS_UPDT_LOCAL);
|
|
||||||
stksess->seen = 0;
|
stksess->seen = 0;
|
||||||
if (is_local) {
|
MT_LIST_DELETE(&stksess->upd);
|
||||||
stksess->upd.key = ++table->update;
|
MT_LIST_APPEND(&table->updates, &stksess->upd);
|
||||||
table->localupdate = table->update;
|
|
||||||
eb32_delete(&stksess->upd);
|
|
||||||
} else {
|
|
||||||
stksess->upd.key = (++table->update) + (2147483648U);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* even though very unlikely, it seldom happens that the entry
|
|
||||||
* is already in the tree (both for local and remote ones). We
|
|
||||||
* must dequeue it and requeue it at its new position (e.g. it
|
|
||||||
* might already have been seen by some peers).
|
|
||||||
*/
|
|
||||||
eb32_delete(&stksess->upd);
|
|
||||||
eb = eb32_insert(&table->updates, &stksess->upd);
|
|
||||||
if (eb != &stksess->upd) {
|
|
||||||
eb32_delete(eb);
|
|
||||||
eb32_insert(&table->updates, &stksess->upd);
|
|
||||||
}
|
|
||||||
/*
|
/*
|
||||||
* Now that we're done inserting the stksess, unlock it.
|
* Now that we're done inserting the stksess, unlock it.
|
||||||
* It is kept locked here to prevent a race condition
|
* It is kept locked here to prevent a race condition
|
||||||
@ -859,9 +834,6 @@ struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int s
|
|||||||
MT_LIST_INIT(&stksess->pend_updts);
|
MT_LIST_INIT(&stksess->pend_updts);
|
||||||
}
|
}
|
||||||
|
|
||||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &table->updt_lock);
|
|
||||||
|
|
||||||
leave:
|
|
||||||
/* There's more to do, let's schedule another session */
|
/* There's more to do, let's schedule another session */
|
||||||
if (empty_tgid < global.nbtgroups)
|
if (empty_tgid < global.nbtgroups)
|
||||||
tasklet_wakeup(table->updt_task);
|
tasklet_wakeup(table->updt_task);
|
||||||
@ -928,7 +900,7 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
|
|||||||
struct stktable *t = context;
|
struct stktable *t = context;
|
||||||
struct stksess *ts;
|
struct stksess *ts;
|
||||||
struct eb32_node *eb;
|
struct eb32_node *eb;
|
||||||
int updt_locked;
|
struct mt_list link;
|
||||||
int to_visit = STKTABLE_MAX_UPDATES_AT_ONCE;
|
int to_visit = STKTABLE_MAX_UPDATES_AT_ONCE;
|
||||||
int looped;
|
int looped;
|
||||||
int exp_next;
|
int exp_next;
|
||||||
@ -942,7 +914,6 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
|
|||||||
/* start from a random shard number to avoid starvation in the last ones */
|
/* start from a random shard number to avoid starvation in the last ones */
|
||||||
shard = init_shard = statistical_prng_range(CONFIG_HAP_TBL_BUCKETS - 1);
|
shard = init_shard = statistical_prng_range(CONFIG_HAP_TBL_BUCKETS - 1);
|
||||||
do {
|
do {
|
||||||
updt_locked = 0;
|
|
||||||
looped = 0;
|
looped = 0;
|
||||||
|
|
||||||
if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock) != 0) {
|
if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock) != 0) {
|
||||||
@ -979,8 +950,7 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
|
|||||||
goto out_unlock;
|
goto out_unlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Let's quit earlier if we currently hold the update lock */
|
to_visit--;
|
||||||
to_visit -= 1 + 3 * updt_locked;
|
|
||||||
|
|
||||||
/* timer looks expired, detach it from the queue */
|
/* timer looks expired, detach it from the queue */
|
||||||
ts = eb32_entry(eb, struct stksess, exp);
|
ts = eb32_entry(eb, struct stksess, exp);
|
||||||
@ -1025,26 +995,26 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if the entry is in the update list, we must be extremely careful
|
|
||||||
* because peers can see it at any moment and start to use it. Peers
|
|
||||||
* will take the table's updt_lock for reading when doing that, and
|
|
||||||
* with that lock held, will grab a ref_cnt before releasing the
|
|
||||||
* lock. So we must take this lock as well and check the ref_cnt.
|
|
||||||
*/
|
|
||||||
if (!updt_locked) {
|
|
||||||
updt_locked = 1;
|
|
||||||
HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
|
|
||||||
}
|
|
||||||
/* now we're locked, new peers can't grab it anymore,
|
|
||||||
* existing ones already have the ref_cnt.
|
|
||||||
*/
|
|
||||||
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
|
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
|
||||||
goto requeue;
|
goto requeue;
|
||||||
|
|
||||||
|
/* if the entry is in the update list, we must be extremely careful
|
||||||
|
* because peers can see it at any moment and start to use it. In this case,
|
||||||
|
* Peers will lock the element. So to the same here to avoid any conflict
|
||||||
|
*/
|
||||||
|
MT_LIST_DELETE(&ts->pend_updts);
|
||||||
|
if (MT_LIST_INLIST(&ts->upd)) {
|
||||||
|
link = mt_list_lock_full(&ts->upd);
|
||||||
|
if (HA_ATOMIC_LOAD(&ts->ref_cnt)) {
|
||||||
|
mt_list_unlock_full(&ts->upd, link);
|
||||||
|
goto requeue;
|
||||||
|
}
|
||||||
|
mt_list_unlock_link(link);
|
||||||
|
mt_list_unlock_self(&ts->upd);
|
||||||
|
}
|
||||||
|
|
||||||
/* session expired, trash it */
|
/* session expired, trash it */
|
||||||
ebmb_delete(&ts->key);
|
ebmb_delete(&ts->key);
|
||||||
MT_LIST_DELETE(&ts->pend_updts);
|
|
||||||
eb32_delete(&ts->upd);
|
|
||||||
__stksess_free(t, ts);
|
__stksess_free(t, ts);
|
||||||
purged++;
|
purged++;
|
||||||
}
|
}
|
||||||
@ -1053,9 +1023,6 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
|
|||||||
exp_next = TICK_ETERNITY;
|
exp_next = TICK_ETERNITY;
|
||||||
|
|
||||||
out_unlock:
|
out_unlock:
|
||||||
if (updt_locked)
|
|
||||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
|
|
||||||
|
|
||||||
task_exp = tick_first(task_exp, exp_next);
|
task_exp = tick_first(task_exp, exp_next);
|
||||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
|
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
|
||||||
|
|
||||||
@ -1101,7 +1068,7 @@ int stktable_init(struct stktable *t, char **err_msg)
|
|||||||
HA_RWLOCK_INIT(&t->shards[shard].sh_lock);
|
HA_RWLOCK_INIT(&t->shards[shard].sh_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
t->updates = EB_ROOT_UNIQUE;
|
MT_LIST_INIT(&t->updates);
|
||||||
HA_RWLOCK_INIT(&t->lock);
|
HA_RWLOCK_INIT(&t->lock);
|
||||||
|
|
||||||
t->pool = create_pool("sticktables", sizeof(struct stksess) + round_ptr_size(t->data_size) + t->key_size, MEM_F_SHARED);
|
t->pool = create_pool("sticktables", sizeof(struct stksess) + round_ptr_size(t->data_size) + t->key_size, MEM_F_SHARED);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user