diff --git a/src/peers.c b/src/peers.c index 1d542585e..6e211e159 100644 --- a/src/peers.c +++ b/src/peers.c @@ -667,6 +667,197 @@ static inline int peer_send_updatemsg(struct shared_table *st, struct appctx *ap return peer_send_msg(st, appctx, peer_prepare_updatemsg, ts, updateid, use_identifier, use_timed); } + +/* + * Function used to lookup for recent stick-table updates associated with + * shared stick-table when a lesson must be taught a peer (PEER_F_LEARN_ASSIGN flag set). + */ +static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st) +{ + struct eb32_node *eb; + + eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); + if (!eb) { + eb = eb32_first(&st->table->updates); + if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) { + st->table->commitupdate = st->last_pushed = st->table->localupdate; + return NULL; + } + } + + if ((int)(eb->key - st->table->localupdate) > 0) { + st->table->commitupdate = st->last_pushed = st->table->localupdate; + return NULL; + } + + return eb32_entry(eb, struct stksess, upd); +} + +/* + * Function used to lookup for recent stick-table updates associated with + * shared stick-table during teach state 1 step. + */ +static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st) +{ + struct eb32_node *eb; + + eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); + if (!eb) { + st->flags |= SHTABLE_F_TEACH_STAGE1; + eb = eb32_first(&st->table->updates); + if (eb) + st->last_pushed = eb->key - 1; + return NULL; + } + + return eb32_entry(eb, struct stksess, upd); +} + +/* + * Function used to lookup for recent stick-table updates associated with + * shared stick-table during teach state 2 step. + */ +static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st) +{ + struct eb32_node *eb; + + eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); + if (!eb || eb->key > st->teaching_origin) { + st->flags |= SHTABLE_F_TEACH_STAGE2; + return NULL; + } + + return eb32_entry(eb, struct stksess, upd); +} + +/* + * Generic function to emit update messages for stick-table when a lesson must + * be taught to the peer

. + * must be set to 1 if the shared table is already locked when entering + * this function, 0 if not. + * + * This function temporary unlock/lock when it sends stick-table updates or + * when decrementing its refcount in case of any error when it sends this updates. + * + * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. + * Returns -1 if there was not enough room left to send the message, + * any other negative returned value must be considered as an error with an appcxt st0 + * returned value equal to PEER_SESS_ST_END. + * If it returns 0 or -1, this function leave locked if already locked when entering this function + * unlocked if not already locked when entering this function. + */ +static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, + struct stksess *(*peer_stksess_lookup)(struct shared_table *), + struct shared_table *st, int locked) +{ + int ret, new_pushed, use_timed; + + ret = 1; + use_timed = 0; + if (st != p->last_local_table) { + ret = peer_send_switchmsg(st, appctx); + if (ret <= 0) + return ret; + + p->last_local_table = st; + } + + if (peer_stksess_lookup != peer_teach_process_stksess_lookup) + use_timed = !(p->flags & PEER_F_DWNGRD); + + /* We force new pushed to 1 to force identifier in update message */ + new_pushed = 1; + + if (!locked) + HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); + + while (1) { + struct stksess *ts; + unsigned updateid; + + /* push local updates */ + ts = peer_stksess_lookup(st); + if (!ts) + break; + + updateid = ts->upd.key; + ts->ref_cnt++; + HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); + + ret = peer_send_updatemsg(st, appctx, ts, updateid, new_pushed, use_timed); + if (ret <= 0) { + HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); + ts->ref_cnt--; + if (!locked) + HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); + return ret; + } + + HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); + ts->ref_cnt--; + st->last_pushed = updateid; + + if (peer_stksess_lookup == peer_teach_process_stksess_lookup && + (int)(st->last_pushed - st->table->commitupdate) > 0) + st->table->commitupdate = st->last_pushed; + + /* identifier may not needed in next update message */ + new_pushed = 0; + } + + out: + if (!locked) + HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); + return 1; +} + +/* + * Function to emit update messages for stick-table when a lesson must + * be taught to the peer

(PEER_F_LEARN_ASSIGN flag set). + * + * Note that shared stick-table is locked when calling this function. + * + * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. + * Returns -1 if there was not enough room left to send the message, + * any other negative returned value must be considered as an error with an appcxt st0 + * returned value equal to PEER_SESS_ST_END. + */ +static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p, + struct shared_table *st) +{ + return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st, 1); +} + +/* + * Function to emit update messages for stick-table when a lesson must + * be taught to the peer

during teach state 1 step. + * + * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. + * Returns -1 if there was not enough room left to send the message, + * any other negative returned value must be considered as an error with an appcxt st0 + * returned value equal to PEER_SESS_ST_END. + */ +static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p, + struct shared_table *st) +{ + return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st, 0); +} + +/* + * Function to emit update messages for stick-table when a lesson must + * be taught to the peer

during teach state 1 step. + * + * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. + * Returns -1 if there was not enough room left to send the message, + * any other negative returned value must be considered as an error with an appcxt st0 + * returned value equal to PEER_SESS_ST_END. + */ +static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p, + struct shared_table *st) +{ + return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st, 0); +} + /* * IO Handler to handle message exchance with a peer */ @@ -1530,182 +1721,34 @@ incomplete: HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) && ((int)(st->last_pushed - st->table->localupdate) < 0)) { - struct eb32_node *eb; - int new_pushed; - if (st != curpeer->last_local_table) { - repl = peer_send_switchmsg(st, appctx); - if (repl <= 0) { - HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); - if (repl == -1) - goto out; - goto switchstate; - } - curpeer->last_local_table = st; - } - - /* We force new pushed to 1 to force identifier in update message */ - new_pushed = 1; - while (1) { - struct stksess *ts; - unsigned updateid; - - /* push local updates */ - eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); - if (!eb) { - eb = eb32_first(&st->table->updates); - if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) { - st->table->commitupdate = st->last_pushed = st->table->localupdate; - break; - } - } - - if ((int)(eb->key - st->table->localupdate) > 0) { - st->table->commitupdate = st->last_pushed = st->table->localupdate; - break; - } - - ts = eb32_entry(eb, struct stksess, upd); - updateid = ts->upd.key; - ts->ref_cnt++; + repl = peer_send_teach_process_msgs(appctx, curpeer, st); + if (repl <= 0) { HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); - - repl = peer_send_updatemsg(st, appctx, ts, - updateid, new_pushed, 0); - if (repl <= 0) { - HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); - ts->ref_cnt--; - HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); - if (repl == -1) - goto out; - goto switchstate; - } - - HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); - ts->ref_cnt--; - st->last_pushed = updateid; - if ((int)(st->last_pushed - st->table->commitupdate) > 0) - st->table->commitupdate = st->last_pushed; - /* identifier may not needed in next update message */ - new_pushed = 0; + if (repl == -1) + goto out; + goto switchstate; } } HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); } else { if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) { - struct eb32_node *eb; - int new_pushed; - - if (st != curpeer->last_local_table) { - repl = peer_send_switchmsg(st, appctx); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } - curpeer->last_local_table = st; + repl = peer_send_teach_stage1_msgs(appctx, curpeer, st); + if (repl <= 0) { + if (repl == -1) + goto out; + goto switchstate; } - - /* We force new pushed to 1 to force identifier in update message */ - new_pushed = 1; - HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); - while (1) { - struct stksess *ts; - int use_timed; - unsigned updateid; - - /* push local updates */ - eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); - if (!eb) { - st->flags |= SHTABLE_F_TEACH_STAGE1; - eb = eb32_first(&st->table->updates); - if (eb) - st->last_pushed = eb->key - 1; - break; - } - - ts = eb32_entry(eb, struct stksess, upd); - updateid = ts->upd.key; - ts->ref_cnt++; - HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); - - use_timed = !(curpeer->flags & PEER_F_DWNGRD); - repl = peer_send_updatemsg(st, appctx, ts, - updateid, new_pushed, use_timed); - if (repl <= 0) { - HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); - ts->ref_cnt--; - HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); - if (repl == -1) - goto out; - goto switchstate; - } - - HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); - ts->ref_cnt--; - st->last_pushed = updateid; - /* identifier may not needed in next update message */ - new_pushed = 0; - } - HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); } if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) { - struct eb32_node *eb; - int new_pushed; - - if (st != curpeer->last_local_table) { - repl = peer_send_switchmsg(st, appctx); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } - curpeer->last_local_table = st; + repl = peer_send_teach_stage2_msgs(appctx, curpeer, st); + if (repl <= 0) { + if (repl == -1) + goto out; + goto switchstate; } - - /* We force new pushed to 1 to force identifier in update message */ - new_pushed = 1; - HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); - while (1) { - struct stksess *ts; - int use_timed; - unsigned updateid; - - /* push local updates */ - eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); - - /* push local updates */ - if (!eb || eb->key > st->teaching_origin) { - st->flags |= SHTABLE_F_TEACH_STAGE2; - break; - } - - ts = eb32_entry(eb, struct stksess, upd); - updateid = ts->upd.key; - ts->ref_cnt++; - HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); - - use_timed = !(curpeer->flags & PEER_F_DWNGRD); - repl = peer_send_updatemsg(st, appctx, ts, - updateid, new_pushed, use_timed); - if (repl <= 0) { - HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); - ts->ref_cnt--; - HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); - if (repl == -1) - goto out; - goto switchstate; - } - - HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); - ts->ref_cnt--; - st->last_pushed = updateid; - /* identifier may not needed in next update message */ - new_pushed = 0; - } - HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); } }