diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 8b32cf607..3a77bd175 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -155,6 +155,7 @@ enum lock_label { STK_TABLE_LOCK, STK_SESS_LOCK, APPLETS_LOCK, + PEER_LOCK, LOCK_LABELS }; struct lock_stat { @@ -241,7 +242,7 @@ static inline void show_lock_stats() "TASK_RQ", "TASK_WQ", "POOL", "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER", "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS", - "APPLETS" }; + "APPLETS", "PEER" }; int lbl; for (lbl = 0; lbl < LOCK_LABELS; lbl++) { diff --git a/include/types/applet.h b/include/types/applet.h index e96b703c8..b56c5631c 100644 --- a/include/types/applet.h +++ b/include/types/applet.h @@ -71,7 +71,7 @@ struct appctx { union { struct { - void *ptr; /* multi-purpose pointer for peers */ + void *ptr; /* current peer or NULL, do not use for something else */ } peers; /* used by the peers applet */ struct { int connected; diff --git a/include/types/peers.h b/include/types/peers.h index a77a0942b..2fc7435c1 100644 --- a/include/types/peers.h +++ b/include/types/peers.h @@ -67,6 +67,9 @@ struct peer { struct shared_table *remote_table; struct shared_table *last_local_table; struct shared_table *tables; +#ifdef USE_THREAD + HA_SPINLOCK_T lock; /* lock used to handle this peer section */ +#endif struct peer *next; /* next peer in the list */ }; diff --git a/src/cfgparse.c b/src/cfgparse.c index dd4900953..ca2d5d79d 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -2039,6 +2039,7 @@ int cfg_parse_peers(const char *file, int linenum, char **args, int kwm) newpeer->proto = proto; newpeer->xprt = xprt_get(XPRT_RAW); newpeer->sock_init_arg = NULL; + SPIN_INIT(&newpeer->lock); if (strcmp(newpeer->id, localpeer) == 0) { /* Current is local peer, it define a frontend */ diff --git a/src/peers.c b/src/peers.c index ef332eba2..2ca08fe7c 100644 --- a/src/peers.c +++ b/src/peers.c @@ -509,6 +509,7 @@ static void peer_session_release(struct appctx *appctx) /* peer session identified */ if (peer) { + SPIN_LOCK(PEER_LOCK, &peer->lock); if (peer->appctx == appctx) { /* Re-init current table pointers to force announcement on re-connect */ peer->remote_table = peer->last_local_table = NULL; @@ -525,6 +526,7 @@ static void peer_session_release(struct appctx *appctx) peer->flags &= PEER_TEACH_RESET; peer->flags &= PEER_LEARN_RESET; } + SPIN_UNLOCK(PEER_LOCK, &peer->lock); task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } } @@ -566,6 +568,7 @@ static void peer_io_handler(struct appctx *appctx) struct stream_interface *si = appctx->owner; struct stream *s = si_strm(si); struct peers *curpeers = strm_fe(s)->parent; + struct peer *curpeer = NULL; int reql = 0; int repl = 0; size_t proto_len = strlen(PEER_SESSION_PROTO_NAME); @@ -646,7 +649,6 @@ switchstate: appctx->st0 = PEER_SESS_ST_GETPEER; /* fall through */ case PEER_SESS_ST_GETPEER: { - struct peer *curpeer; char *p; reql = co_getline(si_oc(si), trash.str, trash.size); if (reql <= 0) { /* closed or EOL not found */ @@ -689,6 +691,7 @@ switchstate: goto switchstate; } + SPIN_LOCK(PEER_LOCK, &curpeer->lock); if (curpeer->appctx && curpeer->appctx != appctx) { if (curpeer->local) { /* Local connection, reply a retry */ @@ -696,6 +699,12 @@ switchstate: appctx->st1 = PEER_SESS_SC_TRYAGAIN; goto switchstate; } + + /* we're killing a connection, we must apply a random delay before + * retrying otherwise the other end will do the same and we can loop + * for a while. + */ + curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000)); peer_session_forceshutdown(curpeer->appctx); } if (maj_ver != (unsigned int)-1 && min_ver != (unsigned int)-1) { @@ -712,9 +721,16 @@ switchstate: /* fall through */ } case PEER_SESS_ST_SENDSUCCESS: { - struct peer *curpeer = appctx->ctx.peers.ptr; struct shared_table *st; + if (!curpeer) { + curpeer = appctx->ctx.peers.ptr; + SPIN_LOCK(PEER_LOCK, &curpeer->lock); + if (curpeer->appctx != appctx) { + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + } repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE); repl = ci_putblk(si_ic(si), trash.str, repl); if (repl <= 0) { @@ -767,7 +783,15 @@ switchstate: goto switchstate; } case PEER_SESS_ST_CONNECT: { - struct peer *curpeer = appctx->ctx.peers.ptr; + + if (!curpeer) { + curpeer = appctx->ctx.peers.ptr; + SPIN_LOCK(PEER_LOCK, &curpeer->lock); + if (curpeer->appctx != appctx) { + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + } /* Send headers */ repl = snprintf(trash.str, trash.size, @@ -797,9 +821,17 @@ switchstate: /* fall through */ } case PEER_SESS_ST_GETSTATUS: { - struct peer *curpeer = appctx->ctx.peers.ptr; struct shared_table *st; + if (!curpeer) { + curpeer = appctx->ctx.peers.ptr; + SPIN_LOCK(PEER_LOCK, &curpeer->lock); + if (curpeer->appctx != appctx) { + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + } + if (si_ic(si)->flags & CF_WRITE_PARTIAL) curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE; @@ -871,7 +903,6 @@ switchstate: /* fall through */ } case PEER_SESS_ST_WAITMSG: { - struct peer *curpeer = appctx->ctx.peers.ptr; struct stksess *ts, *newts = NULL; uint32_t msg_len = 0; char *msg_cur = trash.str; @@ -879,6 +910,15 @@ switchstate: unsigned char msg_head[7]; int totl = 0; + if (!curpeer) { + curpeer = appctx->ctx.peers.ptr; + SPIN_LOCK(PEER_LOCK, &curpeer->lock); + if (curpeer->appctx != appctx) { + appctx->st0 = PEER_SESS_ST_END; + goto switchstate; + } + } + reql = co_getblk(si_oc(si), (char *)msg_head, 2*sizeof(unsigned char), totl); if (reql <= 0) /* closed or EOL not found */ goto incomplete; @@ -1417,6 +1457,7 @@ incomplete: } if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) { + SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) && ((int)(st->last_pushed - st->table->localupdate) < 0)) { struct eb32_node *eb; @@ -1447,7 +1488,6 @@ incomplete: /* We force new pushed to 1 to force identifier in update message */ new_pushed = 1; - SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock); while (1) { uint32_t msglen; struct stksess *ts; @@ -1505,8 +1545,8 @@ incomplete: /* identifier may not needed in next update message */ new_pushed = 0; } - SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); } + SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock); } else { if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) { @@ -1756,6 +1796,10 @@ incomplete: /* fall through */ } case PEER_SESS_ST_END: { + if (curpeer) { + SPIN_UNLOCK(PEER_LOCK, &curpeer->lock); + curpeer = NULL; + } si_shutw(si); si_shutr(si); si_ic(si)->flags |= CF_READ_NULL; @@ -1765,6 +1809,9 @@ incomplete: } out: si_oc(si)->flags |= CF_READ_DONTWAIT; + + if (curpeer) + SPIN_UNLOCK(PEER_LOCK, &curpeer->lock); return; full: si_applet_cant_put(si); @@ -1783,8 +1830,6 @@ static struct applet peer_applet = { */ static void peer_session_forceshutdown(struct appctx *appctx) { - struct peer *ps; - /* Note that the peer sessions which have just been created * (->st0 == PEER_SESS_ST_CONNECT) must not * be shutdown, if not, the TCP session will never be closed @@ -1797,16 +1842,7 @@ static void peer_session_forceshutdown(struct appctx *appctx) if (appctx->applet != &peer_applet) return; - ps = appctx->ctx.peers.ptr; - /* we're killing a connection, we must apply a random delay before - * retrying otherwise the other end will do the same and we can loop - * for a while. - */ - if (ps) - ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000)); - appctx->st0 = PEER_SESS_ST_END; - appctx->ctx.peers.ptr = NULL; appctx_wakeup(appctx); } @@ -1922,6 +1958,10 @@ static struct task *process_peer_sync(struct task * task) return NULL; } + /* Acquire lock for all peers of the section */ + for (ps = peers->remote; ps; ps = ps->next) + SPIN_LOCK(PEER_LOCK, &ps->lock); + if (!stopping) { /* Normal case (not soft stop)*/ @@ -2033,6 +2073,11 @@ static struct task *process_peer_sync(struct task * task) /* disconnect all connected peers */ for (ps = peers->remote; ps; ps = ps->next) { + /* we're killing a connection, we must apply a random delay before + * retrying otherwise the other end will do the same and we can loop + * for a while. + */ + ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000)); if (ps->appctx) { peer_session_forceshutdown(ps->appctx); ps->appctx = NULL; @@ -2086,6 +2131,11 @@ static struct task *process_peer_sync(struct task * task) } } } /* stopping */ + + /* Release lock for all peers of the section */ + for (ps = peers->remote; ps; ps = ps->next) + SPIN_UNLOCK(PEER_LOCK, &ps->lock); + /* Wakeup for re-connect */ return task; }