diff --git a/src/peers.c b/src/peers.c index 9ccdf55db..9afcd99f7 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1104,6 +1104,7 @@ void __peer_session_deinit(struct peer *peer) peer->flags |= PEER_F_WAIT_SYNCTASK_ACK; peer->appstate = PEER_APP_ST_STOPPING; TRACE_STATE("peer session stopping", PEERS_EV_SESS_END, peer->appctx, peer); + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } @@ -1246,6 +1247,7 @@ static inline int peer_send_msg(struct appctx *appctx, int (*peer_prepare_msg)(char *, size_t, struct peer_prep_params *), struct peer_prep_params *params) { + struct peer *peer = appctx->svcctx; int ret, msglen; TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx); @@ -1257,6 +1259,7 @@ static inline int peer_send_msg(struct appctx *appctx, return 0; } + /* message to buffer */ ret = applet_putblk(appctx, trash.area, msglen); if (ret <= 0) { @@ -1265,6 +1268,11 @@ static inline int peer_send_msg(struct appctx *appctx, appctx->st0 = PEER_SESS_ST_END; } } + else if (peer) { + /* A message was sent, rearm the heartbeat timer */ + peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); + peer->flags &= ~PEER_F_HEARTBEAT; + } TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx); return ret; @@ -2502,7 +2510,6 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee } else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) { TRACE_PROTO("Heartbeat message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); peer->rx_hbt++; } } @@ -3091,6 +3098,7 @@ switchstate: if (!peer_treat_awaited_msg(appctx, curpeer, msg_head, &msg_cur, msg_end, msg_len, totl)) goto switchstate; + curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); curpeer->flags |= PEER_F_ALIVE; /* skip consumed message */ @@ -3105,7 +3113,15 @@ switchstate: goto switchstate; send_msgs: - if (curpeer->flags & PEER_F_HEARTBEAT) { + /* we get here when a peer_recv_msg() returns 0 in reql */ + repl = peer_send_msgs(appctx, curpeer, curpeer->peers); + if (repl <= 0) { + if (repl == -1) + goto out; + goto switchstate; + } + + if (tick_is_expired(curpeer->heartbeat, now_ms) || (curpeer->flags & PEER_F_HEARTBEAT)) { curpeer->flags &= ~PEER_F_HEARTBEAT; repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers); if (repl <= 0) { @@ -3115,13 +3131,6 @@ send_msgs: } curpeer->tx_hbt++; } - /* we get here when a peer_recv_msg() returns 0 in reql */ - repl = peer_send_msgs(appctx, curpeer, curpeer->peers); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } /* noting more to do */ goto out; @@ -3501,64 +3510,43 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers, appctx_wakeup(peer->appctx); } else { - int update_to_push = 0; - - /* Awake session if there is data to push */ - for (st = peer->tables; st ; st = st->next) { - if (st->last_update != st->table->last_update) { - update_to_push = 1; - - /* wake up the peer handler to push local updates */ - /* There is no need to send a heartbeat message - * when some updates must be pushed. The remote - * peer will consider peer as alive when it will - * receive these updates. - */ - peer->flags &= ~PEER_F_HEARTBEAT; - /* Re-schedule another one later. */ - peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); - /* Refresh reconnect if necessary */ - if (tick_is_expired(peer->reconnect, now_ms)) - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); - /* We are going to send updates, let's ensure we will - * come back to send heartbeat messages or to reconnect. - */ - TRACE_DEVEL("wakeup peer session to send update", PEERS_EV_SESS_WAKE, NULL, peer); - task->expire = tick_first(peer->reconnect, peer->heartbeat); - appctx_wakeup(peer->appctx); - break; - } + if (tick_is_expired(peer->reconnect, now_ms)) { + if (peer->flags & PEER_F_ALIVE) { + /* This peer was alive during a 'reconnect' period. + * Flag it as not alive again for the next period. + */ + peer->flags &= ~PEER_F_ALIVE; + TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer); + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); + } + else { + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); + peer->heartbeat = TICK_ETERNITY; + TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer); + peer_session_forceshutdown(peer); + sync_peer_app_state(peers, peer); + peer->no_hbt++; + } } - /* When there are updates to send we do not reconnect - * and do not send heartbeat message either. - */ - if (!update_to_push) { - if (tick_is_expired(peer->reconnect, now_ms)) { - if (peer->flags & PEER_F_ALIVE) { - /* This peer was alive during a 'reconnect' period. - * Flag it as not alive again for the next period. - */ - peer->flags &= ~PEER_F_ALIVE; - TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer); - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); - } - else { - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); - peer->heartbeat = TICK_ETERNITY; - TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer); - peer_session_forceshutdown(peer); - sync_peer_app_state(peers, peer); - peer->no_hbt++; - } - } - else if (tick_is_expired(peer->heartbeat, now_ms)) { + + if (peer->appctx) { + if (tick_is_expired(peer->heartbeat, now_ms)) { peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); peer->flags |= PEER_F_HEARTBEAT; TRACE_DEVEL("wakeup peer session to send heartbeat message", PEERS_EV_SESS_WAKE, NULL, peer); appctx_wakeup(peer->appctx); } - task->expire = tick_first(peer->reconnect, peer->heartbeat); + else { + for (st = peer->tables; st ; st = st->next) { + if (st->last_update != st->table->last_update) { + appctx_wakeup(peer->appctx); + break; + } + } + } } + + task->expire = tick_first(peer->reconnect, peer->heartbeat); } /* else do nothing */ } /* SUCCESSCODE */