REORG: peers: Rename all occurrences to 'ps' variable

In loops on the peer list in the code, the 'ps' variable was used as a
shortcut for the peer session. However, if mays be confusing with the peers
section too. So, all occurrences to 'ps' variable were renamed to 'peer'.
This commit is contained in:
Christopher Faulet 2024-04-25 10:57:44 +02:00
parent fff5f63e10
commit 0243691de1

View File

@ -3286,7 +3286,6 @@ static void clear_peer_learning_status(struct peer *peer)
static void sync_peer_learn_state(struct peers *peers, struct peer *peer) static void sync_peer_learn_state(struct peers *peers, struct peer *peer)
{ {
struct peer *ps;
unsigned int flags = 0; unsigned int flags = 0;
if (peer->learnstate != PEER_LR_ST_FINISHED) if (peer->learnstate != PEER_LR_ST_FINISHED)
@ -3300,29 +3299,30 @@ static void sync_peer_learn_state(struct peers *peers, struct peer *peer)
} }
else { else {
/* Full resync */ /* Full resync */
struct peer *rem_peer;
int commit_a_finish = 1; int commit_a_finish = 1;
if (peer->srv->shard) { if (peer->srv->shard) {
flags |= PEERS_F_DBG_RESYNC_REMOTEPARTIAL; flags |= PEERS_F_DBG_RESYNC_REMOTEPARTIAL;
peer->flags |= PEER_F_LEARN_NOTUP2DATE; peer->flags |= PEER_F_LEARN_NOTUP2DATE;
for (ps = peers->remote; ps; ps = ps->next) { for (rem_peer = peers->remote; rem_peer; rem_peer = rem_peer->next) {
if (ps->srv->shard && ps != peer) { if (rem_peer->srv->shard && rem_peer != peer) {
HA_SPIN_LOCK(PEER_LOCK, &ps->lock); HA_SPIN_LOCK(PEER_LOCK, &rem_peer->lock);
if (ps->srv->shard == peer->srv->shard) { if (rem_peer->srv->shard == peer->srv->shard) {
/* flag all peers from same shard /* flag all peers from same shard
* notup2date to disable request * notup2date to disable request
* of a resync frm them * of a resync frm them
*/ */
ps->flags |= PEER_F_LEARN_NOTUP2DATE; rem_peer->flags |= PEER_F_LEARN_NOTUP2DATE;
} }
else if (!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) { else if (!(rem_peer->flags & PEER_F_LEARN_NOTUP2DATE)) {
/* it remains some other shards not requested /* it remains some other shards not requested
* we don't commit a resync finish to request * we don't commit a resync finish to request
* the other shards * the other shards
*/ */
commit_a_finish = 0; commit_a_finish = 0;
} }
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock); HA_SPIN_UNLOCK(PEER_LOCK, &rem_peer->lock);
} }
} }
@ -3391,7 +3391,7 @@ static void sync_peer_app_state(struct peers *peers, struct peer *peer)
/* Process the sync task for a running process. It is called from process_peer_sync() only */ /* Process the sync task for a running process. It is called from process_peer_sync() only */
static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state) static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)
{ {
struct peer *ps; struct peer *peer;
struct shared_table *st; struct shared_table *st;
/* resync timeout set to TICK_ETERNITY means we just start /* resync timeout set to TICK_ETERNITY means we just start
@ -3419,84 +3419,84 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers,
} }
/* For each session */ /* For each session */
for (ps = peers->remote; ps; ps = ps->next) { for (peer = peers->remote; peer; peer = peer->next) {
HA_SPIN_LOCK(PEER_LOCK, &ps->lock); HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
sync_peer_learn_state(peers, ps); sync_peer_learn_state(peers, peer);
sync_peer_app_state(peers, ps); sync_peer_app_state(peers, peer);
/* Peer changes, if any, were now ack by the sync task. Unblock /* Peer changes, if any, were now ack by the sync task. Unblock
* the peer (any wakeup should already be performed, no need to * the peer (any wakeup should already be performed, no need to
* do it here) * do it here)
*/ */
ps->flags &= ~PEER_F_WAIT_SYNCTASK_ACK; peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
/* For each remote peers */ /* For each remote peers */
if (!ps->local) { if (!peer->local) {
if (!ps->appctx) { if (!peer->appctx) {
/* no active peer connection */ /* no active peer connection */
if (ps->statuscode == 0 || if (peer->statuscode == 0 ||
((ps->statuscode == PEER_SESS_SC_CONNECTCODE || ((peer->statuscode == PEER_SESS_SC_CONNECTCODE ||
ps->statuscode == PEER_SESS_SC_SUCCESSCODE || peer->statuscode == PEER_SESS_SC_SUCCESSCODE ||
ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) && peer->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
tick_is_expired(ps->reconnect, now_ms))) { tick_is_expired(peer->reconnect, now_ms))) {
/* connection never tried /* connection never tried
* or previous peer connection established with success * or previous peer connection established with success
* or previous peer connection failed while connecting * or previous peer connection failed while connecting
* and reconnection timer is expired */ * and reconnection timer is expired */
/* retry a connect */ /* retry a connect */
ps->appctx = peer_session_create(peers, ps); peer->appctx = peer_session_create(peers, peer);
} }
else if (!tick_is_expired(ps->reconnect, now_ms)) { else if (!tick_is_expired(peer->reconnect, now_ms)) {
/* If previous session failed during connection /* If previous session failed during connection
* but reconnection timer is not expired */ * but reconnection timer is not expired */
/* reschedule task for reconnect */ /* reschedule task for reconnect */
task->expire = tick_first(task->expire, ps->reconnect); task->expire = tick_first(task->expire, peer->reconnect);
} }
/* else do nothing */ /* else do nothing */
} /* !ps->appctx */ } /* !peer->appctx */
else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) { else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
/* current peer connection is active and established */ /* current peer connection is active and established */
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
!(peers->flags & PEERS_F_RESYNC_ASSIGN) && !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) { !(peer->flags & PEER_F_LEARN_NOTUP2DATE)) {
/* Resync from a remote is needed /* Resync from a remote is needed
* and no peer was assigned for lesson * and no peer was assigned for lesson
* and current peer may be up2date */ * and current peer may be up2date */
/* assign peer for the lesson */ /* assign peer for the lesson */
ps->learnstate = PEER_LR_ST_ASSIGNED; peer->learnstate = PEER_LR_ST_ASSIGNED;
HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN); HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN);
/* wake up peer handler to handle a request of resync */ /* wake up peer handler to handle a request of resync */
appctx_wakeup(ps->appctx); appctx_wakeup(peer->appctx);
} }
else { else {
int update_to_push = 0; int update_to_push = 0;
/* Awake session if there is data to push */ /* Awake session if there is data to push */
for (st = ps->tables; st ; st = st->next) { for (st = peer->tables; st ; st = st->next) {
if (st->last_pushed != st->table->localupdate) { if (st->last_pushed != st->table->localupdate) {
/* wake up the peer handler to push local updates */ /* wake up the peer handler to push local updates */
update_to_push = 1; update_to_push = 1;
/* There is no need to send a heartbeat message /* There is no need to send a heartbeat message
* when some updates must be pushed. The remote * when some updates must be pushed. The remote
* peer will consider <ps> peer as alive when it will * peer will consider <peer> peer as alive when it will
* receive these updates. * receive these updates.
*/ */
ps->flags &= ~PEER_F_HEARTBEAT; peer->flags &= ~PEER_F_HEARTBEAT;
/* Re-schedule another one later. */ /* Re-schedule another one later. */
ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
/* Refresh reconnect if necessary */ /* Refresh reconnect if necessary */
if (tick_is_expired(ps->reconnect, now_ms)) if (tick_is_expired(peer->reconnect, now_ms))
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
/* We are going to send updates, let's ensure we will /* We are going to send updates, let's ensure we will
* come back to send heartbeat messages or to reconnect. * come back to send heartbeat messages or to reconnect.
*/ */
task->expire = tick_first(ps->reconnect, ps->heartbeat); task->expire = tick_first(peer->reconnect, peer->heartbeat);
appctx_wakeup(ps->appctx); appctx_wakeup(peer->appctx);
break; break;
} }
} }
@ -3504,35 +3504,35 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers,
* and do not send heartbeat message either. * and do not send heartbeat message either.
*/ */
if (!update_to_push) { if (!update_to_push) {
if (tick_is_expired(ps->reconnect, now_ms)) { if (tick_is_expired(peer->reconnect, now_ms)) {
if (ps->flags & PEER_F_ALIVE) { if (peer->flags & PEER_F_ALIVE) {
/* This peer was alive during a 'reconnect' period. /* This peer was alive during a 'reconnect' period.
* Flag it as not alive again for the next period. * Flag it as not alive again for the next period.
*/ */
ps->flags &= ~PEER_F_ALIVE; peer->flags &= ~PEER_F_ALIVE;
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
} }
else { else {
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
ps->heartbeat = TICK_ETERNITY; peer->heartbeat = TICK_ETERNITY;
peer_session_forceshutdown(ps); peer_session_forceshutdown(peer);
sync_peer_app_state(peers, ps); sync_peer_app_state(peers, peer);
ps->no_hbt++; peer->no_hbt++;
} }
} }
else if (tick_is_expired(ps->heartbeat, now_ms)) { else if (tick_is_expired(peer->heartbeat, now_ms)) {
ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
ps->flags |= PEER_F_HEARTBEAT; peer->flags |= PEER_F_HEARTBEAT;
appctx_wakeup(ps->appctx); appctx_wakeup(peer->appctx);
} }
task->expire = tick_first(ps->reconnect, ps->heartbeat); task->expire = tick_first(peer->reconnect, peer->heartbeat);
} }
} }
/* else do nothing */ /* else do nothing */
} /* SUCCESSCODE */ } /* SUCCESSCODE */
} /* !ps->peer->local */ } /* !peer->peer->local */
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock); HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
} /* for */ } /* for */
/* Resync from remotes expired: consider resync is finished */ /* Resync from remotes expired: consider resync is finished */
@ -3558,36 +3558,36 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers,
/* Process the sync task for a stopping process. It is called from process_peer_sync() only */ /* Process the sync task for a stopping process. It is called from process_peer_sync() only */
static void __process_stopping_peer_sync(struct task *task, struct peers *peers, unsigned int state) static void __process_stopping_peer_sync(struct task *task, struct peers *peers, unsigned int state)
{ {
struct peer *ps; struct peer *peer;
struct shared_table *st; struct shared_table *st;
static int dont_stop = 0; static int dont_stop = 0;
/* For each peer */ /* For each peer */
for (ps = peers->remote; ps; ps = ps->next) { for (peer = peers->remote; peer; peer = peer->next) {
HA_SPIN_LOCK(PEER_LOCK, &ps->lock); HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
sync_peer_learn_state(peers, ps); sync_peer_learn_state(peers, peer);
sync_peer_app_state(peers, ps); sync_peer_app_state(peers, peer);
/* Peer changes, if any, were now ack by the sync task. Unblock /* Peer changes, if any, were now ack by the sync task. Unblock
* the peer (any wakeup should already be performed, no need to * the peer (any wakeup should already be performed, no need to
* do it here) * do it here)
*/ */
ps->flags &= ~PEER_F_WAIT_SYNCTASK_ACK; peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
if ((state & TASK_WOKEN_SIGNAL) && !dont_stop) { if ((state & TASK_WOKEN_SIGNAL) && !dont_stop) {
/* we're killing a connection, we must apply a random delay before /* we're killing a connection, we must apply a random delay before
* retrying otherwise the other end will do the same and we can loop * retrying otherwise the other end will do the same and we can loop
* for a while. * for a while.
*/ */
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
if (ps->appctx) { if (peer->appctx) {
peer_session_forceshutdown(ps); peer_session_forceshutdown(peer);
sync_peer_app_state(peers, ps); sync_peer_app_state(peers, peer);
} }
} }
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock); HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
} }
/* We've just received the signal */ /* We've just received the signal */
@ -3603,18 +3603,18 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
} }
} }
ps = peers->local; peer = peers->local;
HA_SPIN_LOCK(PEER_LOCK, &ps->lock); HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
if (ps->flags & PEER_F_LOCAL_TEACH_COMPLETE) { if (peer->flags & PEER_F_LOCAL_TEACH_COMPLETE) {
if (dont_stop) { if (dont_stop) {
/* resync of new process was complete, current process can die now */ /* resync of new process was complete, current process can die now */
_HA_ATOMIC_DEC(&jobs); _HA_ATOMIC_DEC(&jobs);
dont_stop = 0; dont_stop = 0;
for (st = ps->tables; st ; st = st->next) for (st = peer->tables; st ; st = st->next)
HA_ATOMIC_DEC(&st->table->refcnt); HA_ATOMIC_DEC(&st->table->refcnt);
} }
} }
else if (!ps->appctx) { else if (!peer->appctx) {
/* Re-arm resync timeout if necessary */ /* Re-arm resync timeout if necessary */
if (!tick_isset(peers->resync_timeout)) if (!tick_isset(peers->resync_timeout))
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
@ -3622,10 +3622,10 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
/* If there's no active peer connection */ /* If there's no active peer connection */
if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED && if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED &&
!tick_is_expired(peers->resync_timeout, now_ms) && !tick_is_expired(peers->resync_timeout, now_ms) &&
(ps->statuscode == 0 || (peer->statuscode == 0 ||
ps->statuscode == PEER_SESS_SC_SUCCESSCODE || peer->statuscode == PEER_SESS_SC_SUCCESSCODE ||
ps->statuscode == PEER_SESS_SC_CONNECTEDCODE || peer->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
ps->statuscode == PEER_SESS_SC_TRYAGAIN)) { peer->statuscode == PEER_SESS_SC_TRYAGAIN)) {
/* The resync is finished for the local peer and /* The resync is finished for the local peer and
* the resync timeout is not expired and * the resync timeout is not expired and
* connection never tried * connection never tried
@ -3633,14 +3633,14 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
* or previous tcp connect succeeded but init state incomplete * or previous tcp connect succeeded but init state incomplete
* or during previous connect, peer replies a try again statuscode */ * or during previous connect, peer replies a try again statuscode */
if (!tick_is_expired(ps->reconnect, now_ms)) { if (!tick_is_expired(peer->reconnect, now_ms)) {
/* reconnection timer is not expired. reschedule task for reconnect */ /* reconnection timer is not expired. reschedule task for reconnect */
task->expire = tick_first(task->expire, ps->reconnect); task->expire = tick_first(task->expire, peer->reconnect);
} }
else { else {
/* connect to the local peer if we must push a local sync */ /* connect to the local peer if we must push a local sync */
if (dont_stop) { if (dont_stop) {
peer_session_create(peers, ps); peer_session_create(peers, peer);
} }
} }
} }
@ -3650,25 +3650,25 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
/* unable to resync new process, current process can die now */ /* unable to resync new process, current process can die now */
_HA_ATOMIC_DEC(&jobs); _HA_ATOMIC_DEC(&jobs);
dont_stop = 0; dont_stop = 0;
for (st = ps->tables; st ; st = st->next) for (st = peer->tables; st ; st = st->next)
HA_ATOMIC_DEC(&st->table->refcnt); HA_ATOMIC_DEC(&st->table->refcnt);
} }
} }
} }
else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) { else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
/* Reset resync timeout during a resync */ /* Reset resync timeout during a resync */
peers->resync_timeout = TICK_ETERNITY; peers->resync_timeout = TICK_ETERNITY;
/* current peer connection is active and established /* current peer connection is active and established
* wake up all peer handlers to push remaining local updates */ * wake up all peer handlers to push remaining local updates */
for (st = ps->tables; st ; st = st->next) { for (st = peer->tables; st ; st = st->next) {
if (st->last_pushed != st->table->localupdate) { if (st->last_pushed != st->table->localupdate) {
appctx_wakeup(ps->appctx); appctx_wakeup(peer->appctx);
break; break;
} }
} }
} }
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock); HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
} }
/* /*