diff --git a/src/peers.c b/src/peers.c index 04cd0460a..701341098 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1139,8 +1139,8 @@ static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer * update ID. * is the length of the stick-table update message computed upon receipt. */ -static int peer_recv_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp, - char **msg_cur, char *msg_end, int msg_len, int totl) +static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp, + char **msg_cur, char *msg_end, int msg_len, int totl) { struct stream_interface *si = appctx->owner; struct shared_table *st = p->remote_table; @@ -1603,6 +1603,107 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms return 0; } + +/* + * Treat the awaited message with as header.* + * Return 1 if succeeded, 0 if not. + */ +static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *peer, unsigned char *msg_head, + char **msg_cur, char *msg_end, int msg_len, int totl) +{ + struct stream_interface *si = appctx->owner; + struct stream *s = si_strm(si); + struct peers *peers = strm_fe(s)->parent; + + if (msg_head[0] == PEER_MSG_CLASS_CONTROL) { + if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) { + struct shared_table *st; + /* Reset message: remote need resync */ + + /* prepare tables fot a global push */ + for (st = peer->tables; st; st = st->next) { + st->teaching_origin = st->last_pushed = st->table->update; + st->flags = 0; + } + + /* reset teaching flags to 0 */ + peer->flags &= PEER_TEACH_RESET; + + /* flag to start to teach lesson */ + peer->flags |= PEER_F_TEACH_PROCESS; + } + else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) { + if (peer->flags & PEER_F_LEARN_ASSIGN) { + peer->flags &= ~PEER_F_LEARN_ASSIGN; + peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); + peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE); + } + peer->confirm++; + } + else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) { + if (peer->flags & PEER_F_LEARN_ASSIGN) { + peer->flags &= ~PEER_F_LEARN_ASSIGN; + peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); + + peer->flags |= PEER_F_LEARN_NOTUP2DATE; + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); + task_wakeup(peers->sync_task, TASK_WOKEN_MSG); + } + peer->confirm++; + } + else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) { + struct shared_table *st; + + /* If stopping state */ + if (stopping) { + /* Close session, push resync no more needed */ + peer->flags |= PEER_F_TEACH_COMPLETE; + appctx->st0 = PEER_SESS_ST_END; + return 0; + } + for (st = peer->tables; st; st = st->next) { + st->update = st->last_pushed = st->teaching_origin; + st->flags = 0; + } + + /* reset teaching flags to 0 */ + peer->flags &= PEER_TEACH_RESET; + } + } + else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) { + if (msg_head[1] == PEER_MSG_STKT_DEFINE) { + if (!peer_treat_definemsg(appctx, peer, msg_cur, msg_end, totl)) + return 0; + } + else if (msg_head[1] == PEER_MSG_STKT_SWITCH) { + if (!peer_treat_switchmsg(appctx, peer, msg_cur, msg_end)) + return 0; + } + else if (msg_head[1] == PEER_MSG_STKT_UPDATE || + msg_head[1] == PEER_MSG_STKT_INCUPDATE || + msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || + msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) { + int update, expire; + + update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED; + expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED; + if (!peer_treat_updatemsg(appctx, peer, update, expire, + msg_cur, msg_end, msg_len, totl)) + return 0; + + } + else if (msg_head[1] == PEER_MSG_STKT_ACK) { + if (!peer_treat_ackmsg(appctx, peer, msg_cur, msg_end)) + return 0; + } + } + else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) { + appctx->st0 = PEER_SESS_ST_ERRPROTO; + return 0; + } + + return 1; +} /* * IO Handler to handle message exchance with a peer */ @@ -1924,94 +2025,8 @@ switchstate: } msg_end += msg_len; - - if (msg_head[0] == PEER_MSG_CLASS_CONTROL) { - if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) { - struct shared_table *st; - /* Reset message: remote need resync */ - - /* prepare tables fot a global push */ - for (st = curpeer->tables; st; st = st->next) { - st->teaching_origin = st->last_pushed = st->table->update; - st->flags = 0; - } - - /* reset teaching flags to 0 */ - curpeer->flags &= PEER_TEACH_RESET; - - /* flag to start to teach lesson */ - curpeer->flags |= PEER_F_TEACH_PROCESS; - } - else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) { - if (curpeer->flags & PEER_F_LEARN_ASSIGN) { - curpeer->flags &= ~PEER_F_LEARN_ASSIGN; - curpeers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); - curpeers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE); - } - curpeer->confirm++; - } - else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) { - if (curpeer->flags & PEER_F_LEARN_ASSIGN) { - curpeer->flags &= ~PEER_F_LEARN_ASSIGN; - curpeers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); - - curpeer->flags |= PEER_F_LEARN_NOTUP2DATE; - curpeers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); - task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG); - } - curpeer->confirm++; - } - else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) { - struct shared_table *st; - - /* If stopping state */ - if (stopping) { - /* Close session, push resync no more needed */ - curpeer->flags |= PEER_F_TEACH_COMPLETE; - appctx->st0 = PEER_SESS_ST_END; - goto switchstate; - } - for (st = curpeer->tables; st; st = st->next) { - st->update = st->last_pushed = st->teaching_origin; - st->flags = 0; - } - - /* reset teaching flags to 0 */ - curpeer->flags &= PEER_TEACH_RESET; - } - } - else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) { - if (msg_head[1] == PEER_MSG_STKT_DEFINE) { - if (!peer_treat_definemsg(appctx, curpeer, &msg_cur, msg_end, totl)) - goto switchstate; - } - else if (msg_head[1] == PEER_MSG_STKT_SWITCH) { - if (!peer_treat_switchmsg(appctx, curpeer, &msg_cur, msg_end)) - goto switchstate; - } - else if (msg_head[1] == PEER_MSG_STKT_UPDATE - || msg_head[1] == PEER_MSG_STKT_INCUPDATE - || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED - || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) { - int update, expire; - - update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED; - expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED; - if (!peer_recv_updatemsg(appctx, curpeer, update, expire, - &msg_cur, msg_end, msg_len, totl)) - goto switchstate; - - } - else if (msg_head[1] == PEER_MSG_STKT_ACK) { - if (!peer_treat_ackmsg(appctx, curpeer, &msg_cur, msg_end)) - goto switchstate; - } - } - else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) { - appctx->st0 = PEER_SESS_ST_ERRPROTO; + if (!peer_treat_awaited_msg(appctx, curpeer, msg_head, &msg_cur, msg_end, msg_len, totl)) goto switchstate; - } - ignore_msg: /* skip consumed message */ co_skip(si_oc(si), totl);