diff --git a/src/peers.c b/src/peers.c index 701341098..48faa9d85 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1704,6 +1704,110 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee return 1; } + + +/* + * Send any message to peer. + * Returns 1 if succeeded, or -1 or 0 if failed. + * -1 means an internal error occured, 0 is for a peer protocol error leading + * to a peer state change (from the peer I/O handler point of view). + */ +static inline int peer_send_msgs(struct appctx *appctx, struct peer *peer) +{ + int repl; + struct stream_interface *si = appctx->owner; + struct stream *s = si_strm(si); + struct peers *peers = strm_fe(s)->parent; + + /* Need to request a resync */ + if ((peer->flags & PEER_F_LEARN_ASSIGN) && + (peers->flags & PEERS_F_RESYNC_ASSIGN) && + !(peers->flags & PEERS_F_RESYNC_PROCESS)) { + + repl = peer_send_resync_reqmsg(appctx); + if (repl <= 0) + return repl; + + peers->flags |= PEERS_F_RESYNC_PROCESS; + } + + /* Nothing to read, now we start to write */ + if (peer->tables) { + struct shared_table *st; + struct shared_table *last_local_table; + + last_local_table = peer->last_local_table; + if (!last_local_table) + last_local_table = peer->tables; + st = last_local_table->next; + + while (1) { + if (!st) + st = peer->tables; + + /* It remains some updates to ack */ + if (st->last_get != st->last_acked) { + repl = peer_send_ackmsg(st, appctx); + if (repl <= 0) + return repl; + + st->last_acked = st->last_get; + } + + if (!(peer->flags & PEER_F_TEACH_PROCESS)) { + HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); + if (!(peer->flags & PEER_F_LEARN_ASSIGN) && + ((int)(st->last_pushed - st->table->localupdate) < 0)) { + + repl = peer_send_teach_process_msgs(appctx, peer, st); + if (repl <= 0) { + HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); + return repl; + } + } + HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); + } + else { + if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) { + repl = peer_send_teach_stage1_msgs(appctx, peer, st); + if (repl <= 0) + return repl; + } + + if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) { + repl = peer_send_teach_stage2_msgs(appctx, peer, st); + if (repl <= 0) + return repl; + } + } + + if (st == last_local_table) + break; + st = st->next; + } + } + + if ((peer->flags & PEER_F_TEACH_PROCESS) && !(peer->flags & PEER_F_TEACH_FINISHED)) { + repl = peer_send_resync_finishedmsg(appctx, peer); + if (repl <= 0) + return repl; + + /* flag finished message sent */ + peer->flags |= PEER_F_TEACH_FINISHED; + } + + /* Confirm finished or partial messages */ + while (peer->confirm) { + repl = peer_send_resync_confirmsg(appctx); + if (repl <= 0) + return repl; + + peer->confirm--; + } + + return 1; +} + /* * IO Handler to handle message exchance with a peer */ @@ -2042,111 +2146,11 @@ incomplete: goto switchstate; } - - - - /* Need to request a resync */ - if ((curpeer->flags & PEER_F_LEARN_ASSIGN) && - (curpeers->flags & PEERS_F_RESYNC_ASSIGN) && - !(curpeers->flags & PEERS_F_RESYNC_PROCESS)) { - - repl = peer_send_resync_reqmsg(appctx); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } - - curpeers->flags |= PEERS_F_RESYNC_PROCESS; - } - - /* Nothing to read, now we start to write */ - if (curpeer->tables) { - struct shared_table *st; - struct shared_table *last_local_table; - - last_local_table = curpeer->last_local_table; - if (!last_local_table) - last_local_table = curpeer->tables; - st = last_local_table->next; - - while (1) { - if (!st) - st = curpeer->tables; - - /* It remains some updates to ack */ - if (st->last_get != st->last_acked) { - repl = peer_send_ackmsg(st, appctx); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } - st->last_acked = st->last_get; - } - - if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) { - HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); - if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) && - ((int)(st->last_pushed - st->table->localupdate) < 0)) { - - repl = peer_send_teach_process_msgs(appctx, curpeer, st); - if (repl <= 0) { - HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); - if (repl == -1) - goto out; - goto switchstate; - } - } - HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); - } - else { - if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) { - repl = peer_send_teach_stage1_msgs(appctx, curpeer, st); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } - } - - if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) { - repl = peer_send_teach_stage2_msgs(appctx, curpeer, st); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } - } - } - - if (st == last_local_table) - break; - st = st->next; - } - } - - - if ((curpeer->flags & PEER_F_TEACH_PROCESS) && !(curpeer->flags & PEER_F_TEACH_FINISHED)) { - repl = peer_send_resync_finishedmsg(appctx, curpeer); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } - /* flag finished message sent */ - curpeer->flags |= PEER_F_TEACH_FINISHED; - } - - /* Confirm finished or partial messages */ - while (curpeer->confirm) { - repl = peer_send_resync_confirmsg(appctx); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } - curpeer->confirm--; + repl = peer_send_msgs(appctx, curpeer); + if (repl <= 0) { + if (repl == -1) + goto out; + goto switchstate; } /* noting more to do */