diff --git a/include/types/peers.h b/include/types/peers.h index 8f8eefa1c..8b7712abd 100644 --- a/include/types/peers.h +++ b/include/types/peers.h @@ -39,6 +39,7 @@ struct peer_session { struct shared_table *table; /* shared table */ struct peer *peer; /* current peer */ struct stream *stream; /* current transport stream */ + struct appctx *appctx; /* the appctx running it */ unsigned int flags; /* peer session flags */ unsigned int statuscode; /* current/last session status code */ unsigned int update; /* current peer acked update */ diff --git a/src/peers.c b/src/peers.c index 3dc3711e2..d637e88d6 100644 --- a/src/peers.c +++ b/src/peers.c @@ -193,6 +193,7 @@ static void peer_session_release(struct appctx *appctx) if (ps) { if (ps->stream == s) { ps->stream = NULL; + ps->appctx = NULL; if (ps->flags & PEER_F_LEARN_ASSIGN) { /* unassign current peer for learning */ ps->flags &= ~(PEER_F_LEARN_ASSIGN); @@ -421,6 +422,7 @@ switchstate: peer_session_forceshutdown(ps->stream); } ps->stream = s; + ps->appctx = appctx; break; } } @@ -1173,6 +1175,8 @@ static struct stream *peer_session_create(struct peer *peer, struct peer_session actconn++; totalconn++; + ps->appctx = appctx; + ps->stream = s; return s; /* Error unrolling */ @@ -1260,11 +1264,11 @@ static struct task *process_peer_sync(struct task * task) st->flags |= SHTABLE_F_RESYNC_ASSIGN; /* awake peer stream task to handle a request of resync */ - task_wakeup(ps->stream->task, TASK_WOKEN_MSG); + appctx_wakeup(ps->appctx); } else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) { /* awake peer stream task to push local updates */ - task_wakeup(ps->stream->task, TASK_WOKEN_MSG); + appctx_wakeup(ps->appctx); } /* else do nothing */ } /* SUCCESSCODE */ @@ -1305,6 +1309,7 @@ static struct task *process_peer_sync(struct task * task) if (ps->stream) { peer_session_forceshutdown(ps->stream); ps->stream = NULL; + ps->appctx = NULL; } } } @@ -1330,7 +1335,7 @@ static struct task *process_peer_sync(struct task * task) * or during previous connect, peer replies a try again statuscode */ /* connect to the peer */ - ps->stream = peer_session_create(ps->peer, ps); + peer_session_create(ps->peer, ps); } else { /* Other error cases */ @@ -1346,7 +1351,7 @@ static struct task *process_peer_sync(struct task * task) (int)(ps->pushed - ps->table->table->localupdate) < 0) { /* current stream active and established awake stream to push remaining local updates */ - task_wakeup(ps->stream->task, TASK_WOKEN_MSG); + appctx_wakeup(ps->appctx); } } /* stopping */ /* Wakeup for re-connect */