diff --git a/include/types/global.h b/include/types/global.h index 18cc63e6c..344003a91 100644 --- a/include/types/global.h +++ b/include/types/global.h @@ -225,6 +225,7 @@ extern int actconn; /* # of active sessions */ extern int listeners; extern int jobs; /* # of active jobs (listeners, sessions, open devices) */ extern int active_peers; /* # of active peers (connection attempts and successes) */ +extern int connected_peers; /* # of really connected peers */ extern THREAD_LOCAL struct buffer trash; extern int nb_oldpids; /* contains the number of old pids found */ extern const int zero; diff --git a/include/types/stats.h b/include/types/stats.h index e59a240dd..2cb23bfdc 100644 --- a/include/types/stats.h +++ b/include/types/stats.h @@ -292,6 +292,7 @@ enum info_field { INF_JOBS, INF_LISTENERS, INF_ACTIVE_PEERS, + INF_CONNECTED_PEERS, /* must always be the last one */ INF_TOTAL_FIELDS diff --git a/src/haproxy.c b/src/haproxy.c index 129ff1d5e..03b267f4e 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -176,6 +176,7 @@ int stopping; /* non zero means stopping in progress */ int killed; /* non zero means a hard-stop is triggered */ int jobs = 0; /* number of active jobs (conns, listeners, active tasks, ...) */ int active_peers = 0; /* number of active peers (connection attempts and connected) */ +int connected_peers = 0; /* number of connected peers (verified ones) */ /* Here we store informations about the pids of the processes we may pause * or kill. We will send them a signal every 10 ms until we can bind to all diff --git a/src/peers.c b/src/peers.c index 1d6dac122..8a8e8900e 100644 --- a/src/peers.c +++ b/src/peers.c @@ -506,6 +506,8 @@ static void peer_session_release(struct appctx *appctx) /* peer session identified */ if (peer) { + if (appctx->st0 == PEER_SESS_ST_WAITMSG) + HA_ATOMIC_SUB(&connected_peers, 1); HA_ATOMIC_SUB(&active_peers, 1); HA_SPIN_LOCK(PEER_LOCK, &peer->lock); if (peer->appctx == appctx) { @@ -571,20 +573,24 @@ static void peer_io_handler(struct appctx *appctx) int repl = 0; size_t proto_len = strlen(PEER_SESSION_PROTO_NAME); unsigned int maj_ver, min_ver; + int prev_state; /* Check if the input buffer is avalaible. */ if (si_ic(si)->buf.size == 0) goto full; while (1) { + prev_state = appctx->st0; switchstate: maj_ver = min_ver = (unsigned int)-1; switch(appctx->st0) { case PEER_SESS_ST_ACCEPT: + prev_state = appctx->st0; appctx->ctx.peers.ptr = NULL; appctx->st0 = PEER_SESS_ST_GETVERSION; /* fall through */ case PEER_SESS_ST_GETVERSION: + prev_state = appctx->st0; reql = co_getline(si_oc(si), trash.area, trash.size); if (reql <= 0) { /* closed or EOL not found */ @@ -620,6 +626,7 @@ switchstate: appctx->st0 = PEER_SESS_ST_GETHOST; /* fall through */ case PEER_SESS_ST_GETHOST: + prev_state = appctx->st0; reql = co_getline(si_oc(si), trash.area, trash.size); if (reql <= 0) { /* closed or EOL not found */ @@ -650,6 +657,8 @@ switchstate: /* fall through */ case PEER_SESS_ST_GETPEER: { char *p; + + prev_state = appctx->st0; reql = co_getline(si_oc(si), trash.area, trash.size); if (reql <= 0) { /* closed or EOL not found */ @@ -725,6 +734,7 @@ switchstate: case PEER_SESS_ST_SENDSUCCESS: { struct shared_table *st; + prev_state = appctx->st0; if (!curpeer) { curpeer = appctx->ctx.peers.ptr; HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock); @@ -783,11 +793,12 @@ switchstate: /* switch to waiting message state */ + HA_ATOMIC_ADD(&connected_peers, 1); appctx->st0 = PEER_SESS_ST_WAITMSG; goto switchstate; } case PEER_SESS_ST_CONNECT: { - + prev_state = appctx->st0; if (!curpeer) { curpeer = appctx->ctx.peers.ptr; HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock); @@ -827,6 +838,7 @@ switchstate: case PEER_SESS_ST_GETSTATUS: { struct shared_table *st; + prev_state = appctx->st0; if (!curpeer) { curpeer = appctx->ctx.peers.ptr; HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock); @@ -904,6 +916,7 @@ switchstate: appctx->st0 = PEER_SESS_ST_END; goto switchstate; } + HA_ATOMIC_ADD(&connected_peers, 1); appctx->st0 = PEER_SESS_ST_WAITMSG; /* fall through */ } @@ -915,6 +928,7 @@ switchstate: unsigned char msg_head[7]; int totl = 0; + prev_state = appctx->st0; if (!curpeer) { curpeer = appctx->ctx.peers.ptr; HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock); @@ -1818,6 +1832,9 @@ incomplete: goto out; } case PEER_SESS_ST_EXIT: + if (prev_state == PEER_SESS_ST_WAITMSG) + HA_ATOMIC_SUB(&connected_peers, 1); + prev_state = appctx->st0; repl = snprintf(trash.area, trash.size, "%d\n", appctx->st1); if (ci_putblk(si_ic(si), trash.area, repl) == -1) @@ -1827,6 +1844,9 @@ incomplete: case PEER_SESS_ST_ERRSIZE: { unsigned char msg[2]; + if (prev_state == PEER_SESS_ST_WAITMSG) + HA_ATOMIC_SUB(&connected_peers, 1); + prev_state = appctx->st0; msg[0] = PEER_MSG_CLASS_ERROR; msg[1] = PEER_MSG_ERR_SIZELIMIT; @@ -1838,15 +1858,22 @@ incomplete: case PEER_SESS_ST_ERRPROTO: { unsigned char msg[2]; + if (prev_state == PEER_SESS_ST_WAITMSG) + HA_ATOMIC_SUB(&connected_peers, 1); + prev_state = appctx->st0; msg[0] = PEER_MSG_CLASS_ERROR; msg[1] = PEER_MSG_ERR_PROTOCOL; if (ci_putblk(si_ic(si), (char *)msg, sizeof(msg)) == -1) goto full; appctx->st0 = PEER_SESS_ST_END; + prev_state = appctx->st0; /* fall through */ } case PEER_SESS_ST_END: { + if (prev_state == PEER_SESS_ST_WAITMSG) + HA_ATOMIC_SUB(&connected_peers, 1); + prev_state = appctx->st0; if (curpeer) { HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock); curpeer = NULL; @@ -1893,6 +1920,8 @@ static void peer_session_forceshutdown(struct appctx *appctx) if (appctx->applet != &peer_applet) return; + if (appctx->st0 == PEER_SESS_ST_WAITMSG) + HA_ATOMIC_SUB(&connected_peers, 1); appctx->st0 = PEER_SESS_ST_END; appctx_wakeup(appctx); } diff --git a/src/stats.c b/src/stats.c index 231c17292..7ac18d0d9 100644 --- a/src/stats.c +++ b/src/stats.c @@ -134,6 +134,7 @@ const char *info_field_names[INF_TOTAL_FIELDS] = { [INF_JOBS] = "Jobs", [INF_LISTENERS] = "Listeners", [INF_ACTIVE_PEERS] = "ActivePeers", + [INF_CONNECTED_PEERS] = "ConnectedPeers", }; const char *stat_field_names[ST_F_TOTAL_FIELDS] = { @@ -3300,6 +3301,7 @@ int stats_fill_info(struct field *info, int len) info[INF_JOBS] = mkf_u32(0, jobs); info[INF_LISTENERS] = mkf_u32(0, listeners); info[INF_ACTIVE_PEERS] = mkf_u32(0, active_peers); + info[INF_CONNECTED_PEERS] = mkf_u32(0, connected_peers); return 1; }