MINOR: peers: move messages treatment code to reduce the size of the I/O handler.

Extract the code of the peer I/O handler responsible of treating
any peer protocol message to create peer_treat_awaited_msg() function.
Also rename peer_recv_updatemsg() to peer_treat_updatemsg() as this
function only parse a stick-table update message already received
by peer_recv_msg().

May be backported as far as 1.5.
This commit is contained in:
Frdric Lcaille 2019-01-24 15:40:11 +01:00 committed by Willy Tarreau
parent 7d0ceeec80
commit 444243c62c

View File

@ -1139,8 +1139,8 @@ static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer
* update ID.
* <totl> is the length of the stick-table update message computed upon receipt.
*/
static int peer_recv_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp,
char **msg_cur, char *msg_end, int msg_len, int totl)
static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp,
char **msg_cur, char *msg_end, int msg_len, int totl)
{
struct stream_interface *si = appctx->owner;
struct shared_table *st = p->remote_table;
@ -1603,6 +1603,107 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
return 0;
}
/*
* Treat the awaited message with <msg_head> as header.*
* Return 1 if succeeded, 0 if not.
*/
static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *peer, unsigned char *msg_head,
char **msg_cur, char *msg_end, int msg_len, int totl)
{
struct stream_interface *si = appctx->owner;
struct stream *s = si_strm(si);
struct peers *peers = strm_fe(s)->parent;
if (msg_head[0] == PEER_MSG_CLASS_CONTROL) {
if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) {
struct shared_table *st;
/* Reset message: remote need resync */
/* prepare tables fot a global push */
for (st = peer->tables; st; st = st->next) {
st->teaching_origin = st->last_pushed = st->table->update;
st->flags = 0;
}
/* reset teaching flags to 0 */
peer->flags &= PEER_TEACH_RESET;
/* flag to start to teach lesson */
peer->flags |= PEER_F_TEACH_PROCESS;
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
if (peer->flags & PEER_F_LEARN_ASSIGN) {
peer->flags &= ~PEER_F_LEARN_ASSIGN;
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
}
peer->confirm++;
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
if (peer->flags & PEER_F_LEARN_ASSIGN) {
peer->flags &= ~PEER_F_LEARN_ASSIGN;
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
peer->flags |= PEER_F_LEARN_NOTUP2DATE;
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
}
peer->confirm++;
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) {
struct shared_table *st;
/* If stopping state */
if (stopping) {
/* Close session, push resync no more needed */
peer->flags |= PEER_F_TEACH_COMPLETE;
appctx->st0 = PEER_SESS_ST_END;
return 0;
}
for (st = peer->tables; st; st = st->next) {
st->update = st->last_pushed = st->teaching_origin;
st->flags = 0;
}
/* reset teaching flags to 0 */
peer->flags &= PEER_TEACH_RESET;
}
}
else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
if (!peer_treat_definemsg(appctx, peer, msg_cur, msg_end, totl))
return 0;
}
else if (msg_head[1] == PEER_MSG_STKT_SWITCH) {
if (!peer_treat_switchmsg(appctx, peer, msg_cur, msg_end))
return 0;
}
else if (msg_head[1] == PEER_MSG_STKT_UPDATE ||
msg_head[1] == PEER_MSG_STKT_INCUPDATE ||
msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED ||
msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
int update, expire;
update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED;
expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED;
if (!peer_treat_updatemsg(appctx, peer, update, expire,
msg_cur, msg_end, msg_len, totl))
return 0;
}
else if (msg_head[1] == PEER_MSG_STKT_ACK) {
if (!peer_treat_ackmsg(appctx, peer, msg_cur, msg_end))
return 0;
}
}
else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) {
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
return 1;
}
/*
* IO Handler to handle message exchance with a peer
*/
@ -1924,94 +2025,8 @@ switchstate:
}
msg_end += msg_len;
if (msg_head[0] == PEER_MSG_CLASS_CONTROL) {
if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) {
struct shared_table *st;
/* Reset message: remote need resync */
/* prepare tables fot a global push */
for (st = curpeer->tables; st; st = st->next) {
st->teaching_origin = st->last_pushed = st->table->update;
st->flags = 0;
}
/* reset teaching flags to 0 */
curpeer->flags &= PEER_TEACH_RESET;
/* flag to start to teach lesson */
curpeer->flags |= PEER_F_TEACH_PROCESS;
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
if (curpeer->flags & PEER_F_LEARN_ASSIGN) {
curpeer->flags &= ~PEER_F_LEARN_ASSIGN;
curpeers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
curpeers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
}
curpeer->confirm++;
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
if (curpeer->flags & PEER_F_LEARN_ASSIGN) {
curpeer->flags &= ~PEER_F_LEARN_ASSIGN;
curpeers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
curpeer->flags |= PEER_F_LEARN_NOTUP2DATE;
curpeers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG);
}
curpeer->confirm++;
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) {
struct shared_table *st;
/* If stopping state */
if (stopping) {
/* Close session, push resync no more needed */
curpeer->flags |= PEER_F_TEACH_COMPLETE;
appctx->st0 = PEER_SESS_ST_END;
goto switchstate;
}
for (st = curpeer->tables; st; st = st->next) {
st->update = st->last_pushed = st->teaching_origin;
st->flags = 0;
}
/* reset teaching flags to 0 */
curpeer->flags &= PEER_TEACH_RESET;
}
}
else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
if (!peer_treat_definemsg(appctx, curpeer, &msg_cur, msg_end, totl))
goto switchstate;
}
else if (msg_head[1] == PEER_MSG_STKT_SWITCH) {
if (!peer_treat_switchmsg(appctx, curpeer, &msg_cur, msg_end))
goto switchstate;
}
else if (msg_head[1] == PEER_MSG_STKT_UPDATE
|| msg_head[1] == PEER_MSG_STKT_INCUPDATE
|| msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED
|| msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
int update, expire;
update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED;
expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED;
if (!peer_recv_updatemsg(appctx, curpeer, update, expire,
&msg_cur, msg_end, msg_len, totl))
goto switchstate;
}
else if (msg_head[1] == PEER_MSG_STKT_ACK) {
if (!peer_treat_ackmsg(appctx, curpeer, &msg_cur, msg_end))
goto switchstate;
}
}
else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) {
appctx->st0 = PEER_SESS_ST_ERRPROTO;
if (!peer_treat_awaited_msg(appctx, curpeer, msg_head, &msg_cur, msg_end, msg_len, totl))
goto switchstate;
}
ignore_msg:
/* skip consumed message */
co_skip(si_oc(si), totl);