MINOR: peers: move send code to reduce the size of the I/O handler.

This patch extracts the code responsible of sending peer protocol
messages from the peer I/O handler to create a new function and to
reduce the size of this handler.

May be backported as far as 1.5.
This commit is contained in:
Frdric Lcaille 2019-01-24 17:33:48 +01:00 committed by Willy Tarreau
parent 444243c62c
commit 25e1d5e435

View File

@ -1704,6 +1704,110 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
return 1;
}
/*
* Send any message to <peer> peer.
* Returns 1 if succeeded, or -1 or 0 if failed.
* -1 means an internal error occured, 0 is for a peer protocol error leading
* to a peer state change (from the peer I/O handler point of view).
*/
static inline int peer_send_msgs(struct appctx *appctx, struct peer *peer)
{
int repl;
struct stream_interface *si = appctx->owner;
struct stream *s = si_strm(si);
struct peers *peers = strm_fe(s)->parent;
/* Need to request a resync */
if ((peer->flags & PEER_F_LEARN_ASSIGN) &&
(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
!(peers->flags & PEERS_F_RESYNC_PROCESS)) {
repl = peer_send_resync_reqmsg(appctx);
if (repl <= 0)
return repl;
peers->flags |= PEERS_F_RESYNC_PROCESS;
}
/* Nothing to read, now we start to write */
if (peer->tables) {
struct shared_table *st;
struct shared_table *last_local_table;
last_local_table = peer->last_local_table;
if (!last_local_table)
last_local_table = peer->tables;
st = last_local_table->next;
while (1) {
if (!st)
st = peer->tables;
/* It remains some updates to ack */
if (st->last_get != st->last_acked) {
repl = peer_send_ackmsg(st, appctx);
if (repl <= 0)
return repl;
st->last_acked = st->last_get;
}
if (!(peer->flags & PEER_F_TEACH_PROCESS)) {
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
if (!(peer->flags & PEER_F_LEARN_ASSIGN) &&
((int)(st->last_pushed - st->table->localupdate) < 0)) {
repl = peer_send_teach_process_msgs(appctx, peer, st);
if (repl <= 0) {
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
return repl;
}
}
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
}
else {
if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
repl = peer_send_teach_stage1_msgs(appctx, peer, st);
if (repl <= 0)
return repl;
}
if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
repl = peer_send_teach_stage2_msgs(appctx, peer, st);
if (repl <= 0)
return repl;
}
}
if (st == last_local_table)
break;
st = st->next;
}
}
if ((peer->flags & PEER_F_TEACH_PROCESS) && !(peer->flags & PEER_F_TEACH_FINISHED)) {
repl = peer_send_resync_finishedmsg(appctx, peer);
if (repl <= 0)
return repl;
/* flag finished message sent */
peer->flags |= PEER_F_TEACH_FINISHED;
}
/* Confirm finished or partial messages */
while (peer->confirm) {
repl = peer_send_resync_confirmsg(appctx);
if (repl <= 0)
return repl;
peer->confirm--;
}
return 1;
}
/*
* IO Handler to handle message exchance with a peer
*/
@ -2042,111 +2146,11 @@ incomplete:
goto switchstate;
}
/* Need to request a resync */
if ((curpeer->flags & PEER_F_LEARN_ASSIGN) &&
(curpeers->flags & PEERS_F_RESYNC_ASSIGN) &&
!(curpeers->flags & PEERS_F_RESYNC_PROCESS)) {
repl = peer_send_resync_reqmsg(appctx);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
curpeers->flags |= PEERS_F_RESYNC_PROCESS;
}
/* Nothing to read, now we start to write */
if (curpeer->tables) {
struct shared_table *st;
struct shared_table *last_local_table;
last_local_table = curpeer->last_local_table;
if (!last_local_table)
last_local_table = curpeer->tables;
st = last_local_table->next;
while (1) {
if (!st)
st = curpeer->tables;
/* It remains some updates to ack */
if (st->last_get != st->last_acked) {
repl = peer_send_ackmsg(st, appctx);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
st->last_acked = st->last_get;
}
if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) {
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) &&
((int)(st->last_pushed - st->table->localupdate) < 0)) {
repl = peer_send_teach_process_msgs(appctx, curpeer, st);
if (repl <= 0) {
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
if (repl == -1)
goto out;
goto switchstate;
}
}
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
}
else {
if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
repl = peer_send_teach_stage1_msgs(appctx, curpeer, st);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
}
if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
repl = peer_send_teach_stage2_msgs(appctx, curpeer, st);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
}
}
if (st == last_local_table)
break;
st = st->next;
}
}
if ((curpeer->flags & PEER_F_TEACH_PROCESS) && !(curpeer->flags & PEER_F_TEACH_FINISHED)) {
repl = peer_send_resync_finishedmsg(appctx, curpeer);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
/* flag finished message sent */
curpeer->flags |= PEER_F_TEACH_FINISHED;
}
/* Confirm finished or partial messages */
while (curpeer->confirm) {
repl = peer_send_resync_confirmsg(appctx);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
curpeer->confirm--;
repl = peer_send_msgs(appctx, curpeer);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
/* noting more to do */