MEDIUM: connections: Attempt to get idle connections from other threads.

In connect_server(), if we no longer have any idle connections for the
current thread, attempt to use the new "takeover" mux method to steal a
connection from another thread.
This should have no impact right now, given no mux implements it.
This commit is contained in:
Olivier Houchard 2020-03-06 18:18:56 +01:00 committed by Olivier Houchard
parent d2489e00b0
commit 566df309c6
4 changed files with 99 additions and 39 deletions

View File

@ -477,7 +477,8 @@ static inline void conn_free(struct connection *conn)
if (conn->idle_time > 0) { if (conn->idle_time > 0) {
struct server *srv = __objt_server(conn->target); struct server *srv = __objt_server(conn->target);
_HA_ATOMIC_SUB(&srv->curr_idle_conns, 1); _HA_ATOMIC_SUB(&srv->curr_idle_conns, 1);
srv->curr_idle_thr[tid]--; _HA_ATOMIC_SUB(conn->flags & CO_FL_SAFE_LIST ? &srv->curr_safe_nb : &srv->curr_idle_nb, 1);
_HA_ATOMIC_SUB(&srv->curr_idle_thr[tid], 1);
} }
conn_force_unsubscribe(conn); conn_force_unsubscribe(conn);

View File

@ -262,11 +262,16 @@ static inline int srv_add_to_idle_list(struct server *srv, struct connection *co
return 0; return 0;
} }
MT_LIST_DEL(&conn->list); MT_LIST_DEL(&conn->list);
conn->flags = (conn->flags &~ CO_FL_LIST_MASK) | if (is_safe) {
(is_safe ? CO_FL_SAFE_LIST : CO_FL_IDLE_LIST); conn->flags = (conn->flags & ~CO_FL_LIST_MASK) | CO_FL_SAFE_LIST;
MT_LIST_ADDQ(is_safe ? &srv->safe_conns[tid] : &srv->idle_conns[tid], MT_LIST_ADDQ(&srv->safe_conns[tid], (struct mt_list *)&conn->list);
(struct mt_list *)&conn->list); _HA_ATOMIC_ADD(&srv->curr_safe_nb, 1);
srv->curr_idle_thr[tid]++; } else {
conn->flags = (conn->flags & ~CO_FL_LIST_MASK) | CO_FL_IDLE_LIST;
MT_LIST_ADDQ(&srv->idle_conns[tid], (struct mt_list *)&conn->list);
_HA_ATOMIC_ADD(&srv->curr_idle_nb, 1);
}
_HA_ATOMIC_ADD(&srv->curr_idle_thr[tid], 1);
conn->idle_time = now_ms; conn->idle_time = now_ms;
__ha_barrier_full(); __ha_barrier_full();

View File

@ -227,7 +227,9 @@ struct server {
struct list *available_conns; /* Connection in used, but with still new streams available */ struct list *available_conns; /* Connection in used, but with still new streams available */
unsigned int pool_purge_delay; /* Delay before starting to purge the idle conns pool */ unsigned int pool_purge_delay; /* Delay before starting to purge the idle conns pool */
unsigned int max_idle_conns; /* Max number of connection allowed in the orphan connections list */ unsigned int max_idle_conns; /* Max number of connection allowed in the orphan connections list */
unsigned int curr_idle_conns; /* Current number of orphan idling connections */ unsigned int curr_idle_conns; /* Current number of orphan idling connections, both the idle and the safe lists */
unsigned int curr_idle_nb; /* Current number of connections in the idle list */
unsigned int curr_safe_nb; /* Current number of connections in the safe list */
unsigned int *curr_idle_thr; /* Current number of orphan idling connections per thread */ unsigned int *curr_idle_thr; /* Current number of orphan idling connections per thread */
int max_reuse; /* Max number of requests on a same connection */ int max_reuse; /* Max number of requests on a same connection */
struct eb32_node idle_node; /* When to next do cleanup in the idle connections */ struct eb32_node idle_node; /* When to next do cleanup in the idle connections */

View File

@ -1074,6 +1074,58 @@ static void assign_tproxy_address(struct stream *s)
#endif #endif
} }
/* Attempt to get a backend connection from the specified mt_list array
* (safe or idle connections).
*/
static struct connection *conn_backend_get(struct server *srv, int is_safe)
{
struct mt_list *mt_list = is_safe ? srv->safe_conns : srv->idle_conns;
struct connection *conn;
int i;
int found = 0;
/* We need to lock even if this is our own list, because another
* thread may be trying to migrate that connection, and we don't want
* to end up with two threads using the same connection.
*/
HA_SPIN_LOCK(OTHER_LOCK, &toremove_lock[tid]);
conn = MT_LIST_POP(&mt_list[tid], struct connection *, list);
HA_SPIN_UNLOCK(OTHER_LOCK, &toremove_lock[tid]);
/* If we found a connection in our own list, and we don't have to
* steal one from another thread, then we're done.
*/
if (conn)
return conn;
/* Lookup all other threads for an idle connection, starting from tid + 1 */
for (i = tid; !found && (i = ((i + 1 == global.nbthread) ? 0 : i + 1)) != tid;) {
struct mt_list *elt1, elt2;
HA_SPIN_LOCK(OTHER_LOCK, &toremove_lock[i]);
mt_list_for_each_entry_safe(conn, &mt_list[i], list, elt1, elt2) {
if (conn->mux->takeover && conn->mux->takeover(conn) == 0) {
MT_LIST_DEL_SAFE(elt1);
found = 1;
break;
}
}
HA_SPIN_UNLOCK(OTHER_LOCK, &toremove_lock[i]);
}
if (!found)
conn = NULL;
else {
conn->idle_time = 0;
_HA_ATOMIC_SUB(&srv->curr_idle_conns, 1);
_HA_ATOMIC_SUB(&srv->curr_idle_thr[i], 1);
_HA_ATOMIC_SUB(is_safe ? &srv->curr_safe_nb : &srv->curr_idle_nb, 1);
__ha_barrier_atomic_store();
LIST_ADDQ(&srv->available_conns[tid], mt_list_to_list(&conn->list));
}
return conn;
}
/* /*
* This function initiates a connection to the server assigned to this stream * This function initiates a connection to the server assigned to this stream
* (s->target, s->si[1].addr.to). It will assign a server if none * (s->target, s->si[1].addr.to). It will assign a server if none
@ -1148,32 +1200,39 @@ int connect_server(struct stream *s)
* that there is no concurrency issues. * that there is no concurrency issues.
*/ */
if (srv->available_conns && !LIST_ISEMPTY(&srv->available_conns[tid]) && if (srv->available_conns && !LIST_ISEMPTY(&srv->available_conns[tid]) &&
((s->be->options & PR_O_REUSE_MASK) != PR_O_REUSE_NEVR)) ((s->be->options & PR_O_REUSE_MASK) != PR_O_REUSE_NEVR)) {
srv_conn = LIST_ELEM(srv->available_conns[tid].n, struct connection *, list); srv_conn = LIST_ELEM(srv->available_conns[tid].n, struct connection *, list);
if (srv->idle_conns && !MT_LIST_ISEMPTY(&srv->idle_conns[tid]) && reuse = 1;
((s->be->options & PR_O_REUSE_MASK) != PR_O_REUSE_NEVR &&
s->txn && (s->txn->flags & TX_NOT_FIRST))) {
srv_conn = MT_LIST_POP(&srv->idle_conns[tid], struct connection *, list);
} }
else if (srv->safe_conns && !MT_LIST_ISEMPTY(&srv->safe_conns[tid]) && else if (!srv_conn && srv->curr_idle_conns > 0) {
((s->txn && (s->txn->flags & TX_NOT_FIRST)) || if (srv->idle_conns &&
(s->be->options & PR_O_REUSE_MASK) >= PR_O_REUSE_AGGR)) { ((s->be->options & PR_O_REUSE_MASK) != PR_O_REUSE_NEVR &&
srv_conn = MT_LIST_POP(&srv->safe_conns[tid], struct connection *, list); s->txn && (s->txn->flags & TX_NOT_FIRST)) &&
} srv->curr_idle_nb > 0) {
else if (srv->idle_conns && !MT_LIST_ISEMPTY(&srv->idle_conns[tid]) && srv_conn = conn_backend_get(srv, 0);
(s->be->options & PR_O_REUSE_MASK) == PR_O_REUSE_ALWS) { }
srv_conn = MT_LIST_POP(&srv->idle_conns[tid], struct connection *, list); else if (srv->safe_conns &&
} ((s->txn && (s->txn->flags & TX_NOT_FIRST)) ||
/* If we've picked a connection from the pool, we now have to (s->be->options & PR_O_REUSE_MASK) >= PR_O_REUSE_AGGR) &&
* detach it. We may have to get rid of the previous idle srv->curr_safe_nb > 0) {
* connection we had, so for this we try to swap it with the srv_conn = conn_backend_get(srv, 1);
* other owner's. That way it may remain alive for others to }
* pick. else if (srv->idle_conns &&
*/ ((s->be->options & PR_O_REUSE_MASK) == PR_O_REUSE_ALWS) &&
if (srv_conn) { srv->curr_idle_nb > 0) {
reuse_orphan = 1; srv_conn = conn_backend_get(srv, 0);
reuse = 1; }
srv_conn->flags &= ~CO_FL_LIST_MASK; /* If we've picked a connection from the pool, we now have to
* detach it. We may have to get rid of the previous idle
* connection we had, so for this we try to swap it with the
* other owner's. That way it may remain alive for others to
* pick.
*/
if (srv_conn) {
reuse_orphan = 1;
reuse = 1;
srv_conn->flags &= ~CO_FL_LIST_MASK;
}
} }
} }
@ -1247,14 +1306,7 @@ int connect_server(struct stream *s)
* list and add it back to the idle list. * list and add it back to the idle list.
*/ */
if (reuse) { if (reuse) {
if (reuse_orphan) { if (!reuse_orphan) {
srv_conn->idle_time = 0;
_HA_ATOMIC_SUB(&srv->curr_idle_conns, 1);
__ha_barrier_atomic_store();
srv->curr_idle_thr[tid]--;
LIST_ADDQ(&srv->available_conns[tid], mt_list_to_list(&srv_conn->list));
}
else {
if (srv_conn->flags & CO_FL_SESS_IDLE) { if (srv_conn->flags & CO_FL_SESS_IDLE) {
struct session *sess = srv_conn->owner; struct session *sess = srv_conn->owner;