MEDIUM: peer: Improve management of reconnect timer and heartbeat messages

heartbeat messages are sent to keep a connection active. However, a
heartbeat messages was sent periodically, even if some other messages were
sent during this period. It is not an issue but it is useless. heartbeat
messages should only be sent if nothing was sent to the peer since a
moment. So, in this patch, the heartbeat timer is rearmed each time a message
is sent.

On the receiver side, the reconnect timer was only rearmed when a heartbeat
message was received instead of rearming it for any messages. Again, it is
not an issue because the inactivity is managed with PEER_F_ALIVE flag. This
flag is removed when the reconnect timer timed out but it is reinserted when
something is received. But an periodic wakeup may be uselessly performed.
So, in this patch, the reconnect timer is rearmed each time a message is
received.
This commit is contained in:
Christopher Faulet 2025-10-14 14:09:07 +02:00
parent 922a1d4e3a
commit 2f1a1e0de3

View File

@ -1245,6 +1245,7 @@ static inline int peer_send_msg(struct appctx *appctx,
int (*peer_prepare_msg)(char *, size_t, struct peer_prep_params *),
struct peer_prep_params *params)
{
struct peer *peer = appctx->svcctx;
int ret, msglen;
TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx);
@ -1264,6 +1265,11 @@ static inline int peer_send_msg(struct appctx *appctx,
appctx->st0 = PEER_SESS_ST_END;
}
}
else if (peer) {
/* A message was sent, rearm the heartbeat timer */
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
peer->flags &= ~PEER_F_HEARTBEAT;
}
TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx);
return ret;
@ -2516,7 +2522,6 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
}
else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) {
TRACE_PROTO("Heartbeat message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
peer->rx_hbt++;
}
}
@ -3108,6 +3113,7 @@ switchstate:
if (!peer_treat_awaited_msg(appctx, curpeer, msg_head, &msg_cur, msg_end, msg_len, totl))
goto switchstate;
curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
curpeer->flags |= PEER_F_ALIVE;
/* skip consumed message */
@ -3122,8 +3128,15 @@ switchstate:
goto switchstate;
send_msgs:
if (curpeer->flags & PEER_F_HEARTBEAT) {
curpeer->flags &= ~PEER_F_HEARTBEAT;
/* we get here when a peer_recv_msg() returns 0 in reql */
repl = peer_send_msgs(appctx, curpeer, curpeer->peers);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
if (tick_is_expired(curpeer->heartbeat, now_ms) || (curpeer->flags & PEER_F_HEARTBEAT)) {
repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers);
if (repl <= 0) {
if (repl == -1)
@ -3132,13 +3145,6 @@ send_msgs:
}
curpeer->tx_hbt++;
}
/* we get here when a peer_recv_msg() returns 0 in reql */
repl = peer_send_msgs(appctx, curpeer, curpeer->peers);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
/* noting more to do */
goto out;
@ -3518,65 +3524,43 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers,
appctx_wakeup(peer->appctx);
}
else {
int update_to_push = 0;
/* Awake session if there is data to push */
for (st = peer->tables; st ; st = st->next) {
if (tick_is_le(peer->last_update, st->table->last_update))
update_to_push = 1;
if (update_to_push == 1) {
/* wake up the peer handler to push local updates */
/* There is no need to send a heartbeat message
* when some updates must be pushed. The remote
* peer will consider <peer> peer as alive when it will
* receive these updates.
if (tick_is_expired(peer->reconnect, now_ms)) {
if (peer->flags & PEER_F_ALIVE) {
/* This peer was alive during a 'reconnect' period.
* Flag it as not alive again for the next period.
*/
peer->flags &= ~PEER_F_HEARTBEAT;
/* Re-schedule another one later. */
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
/* Refresh reconnect if necessary */
if (tick_is_expired(peer->reconnect, now_ms))
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
/* We are going to send updates, let's ensure we will
* come back to send heartbeat messages or to reconnect.
*/
TRACE_DEVEL("wakeup peer session to send update", PEERS_EV_SESS_WAKE, NULL, peer);
task->expire = tick_first(peer->reconnect, peer->heartbeat);
appctx_wakeup(peer->appctx);
break;
peer->flags &= ~PEER_F_ALIVE;
TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer);
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
}
else {
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
peer->heartbeat = TICK_ETERNITY;
TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer);
peer_session_forceshutdown(peer);
sync_peer_app_state(peers, peer);
peer->no_hbt++;
}
}
/* When there are updates to send we do not reconnect
* and do not send heartbeat message either.
*/
if (!update_to_push) {
if (tick_is_expired(peer->reconnect, now_ms)) {
if (peer->flags & PEER_F_ALIVE) {
/* This peer was alive during a 'reconnect' period.
* Flag it as not alive again for the next period.
*/
peer->flags &= ~PEER_F_ALIVE;
TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer);
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
}
else {
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
peer->heartbeat = TICK_ETERNITY;
TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer);
peer_session_forceshutdown(peer);
sync_peer_app_state(peers, peer);
peer->no_hbt++;
}
}
else if (tick_is_expired(peer->heartbeat, now_ms)) {
if (peer->appctx) {
if (tick_is_expired(peer->heartbeat, now_ms)) {
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
peer->flags |= PEER_F_HEARTBEAT;
TRACE_DEVEL("wakeup peer session to send heartbeat message", PEERS_EV_SESS_WAKE, NULL, peer);
appctx_wakeup(peer->appctx);
}
task->expire = tick_first(peer->reconnect, peer->heartbeat);
else {
for (st = peer->tables; st ; st = st->next) {
if (tick_is_le(peer->last_update, st->table->last_update)) {
appctx_wakeup(peer->appctx);
break;
}
}
}
}
task->expire = tick_first(peer->reconnect, peer->heartbeat);
}
/* else do nothing */
} /* SUCCESSCODE */