MAJOR: sessions: Store multiple outgoing connections in the session.

Instead of just storing the last connection in the session, store all of
the connections, for at most MAX_SRV_LIST (currently 5) targets.
That way we can do keepalive on more than 1 outgoing connection when the
client uses HTTP/2.
This commit is contained in:
Olivier Houchard 2018-11-30 17:24:55 +01:00 committed by Willy Tarreau
parent 93c8852572
commit 00cf70f28b
6 changed files with 134 additions and 94 deletions

View File

@ -662,13 +662,8 @@ static inline void conn_force_unsubscribe(struct connection *conn)
/* Releases a connection previously allocated by conn_new() */
static inline void conn_free(struct connection *conn)
{
struct session *sess, *sess_back;
list_for_each_entry_safe(sess, sess_back, &conn->session_list, conn_list) {
sess->srv_conn = NULL;
LIST_DEL(&sess->conn_list);
LIST_INIT(&sess->conn_list);
}
/* Remove ourself from the session's connections list, if any. */
LIST_DEL(&conn->session_list);
/* If we temporarily stored the connection as the stream_interface's
* end point, remove it.
*/

View File

@ -71,6 +71,46 @@ static inline void session_store_counters(struct session *sess)
}
}
static inline void session_add_conn(struct session *sess, struct connection *conn, void *target)
{
int avail = -1;
int i;
for (i = 0; i < MAX_SRV_LIST; i++) {
if (sess->srv_list[i].target == target) {
avail = i;
break;
}
if (LIST_ISEMPTY(&sess->srv_list[i].list) && avail == -1)
avail = i;
}
if (avail == -1) {
struct connection *conn, *conn_back;
int count = 0;
/* We have no slot free, let's free the one with the fewer connections */
for (i = 0; i < MAX_SRV_LIST; i++) {
int count_list = 0;
list_for_each_entry(conn, &sess->srv_list[i].list, session_list)
count_list++;
if (count == 0 || count_list < count) {
count = count_list;
avail = i;
}
}
/* Now unown all the connections */
list_for_each_entry_safe(conn, conn_back, &sess->srv_list[avail].list, session_list) {
conn->owner = NULL;
LIST_DEL(&conn->session_list);
LIST_INIT(&conn->session_list);
if (conn->mux)
conn->mux->destroy(conn);
}
}
sess->srv_list[avail].target = target;
LIST_ADDQ(&sess->srv_list[avail].list, &conn->session_list);
}
#endif /* _PROTO_SESSION_H */

View File

@ -414,7 +414,7 @@ struct connection {
struct wait_event *send_wait; /* Task to wake when we're ready to send */
struct wait_event *recv_wait; /* Task to wake when we're ready to recv */
struct list list; /* attach point to various connection lists (idle, ...) */
struct list session_list; /* List of all sessions attached to this connection */
struct list session_list; /* List of attached connections to a session */
int xprt_st; /* transport layer state, initialized to zero */
int tmp_early_data; /* 1st byte of early data, if any */
int sent_early_data; /* Amount of early data we sent so far */

View File

@ -37,6 +37,13 @@
#include <types/task.h>
#include <types/vars.h>
struct sess_srv_list {
void *target;
struct list list;
};
#define MAX_SRV_LIST 5
struct session {
struct proxy *fe; /* the proxy this session depends on for the client side */
struct listener *listener; /* the listener by which the request arrived */
@ -47,8 +54,7 @@ struct session {
struct vars vars; /* list of variables for the session scope. */
struct task *task; /* handshake timeout processing */
long t_handshake; /* handshake duration, -1 = not completed */
struct connection *srv_conn; /* Server connection we last used */
struct list conn_list; /* List element for the session list in each connection */
struct sess_srv_list srv_list[MAX_SRV_LIST]; /* List of servers and the connections the session is currently responsible for */
};
#endif /* _TYPES_SESSION_H */

View File

@ -52,6 +52,7 @@
#include <proto/queue.h>
#include <proto/sample.h>
#include <proto/server.h>
#include <proto/session.h>
#include <proto/stream.h>
#include <proto/stream_interface.h>
#include <proto/task.h>
@ -559,8 +560,9 @@ int assign_server(struct stream *s)
{
struct connection *conn;
struct server *conn_slot;
struct server *srv, *prev_srv;
struct server *srv = NULL, *prev_srv;
int err;
int i;
DPRINTF(stderr,"assign_server : s=%p\n",s);
@ -584,27 +586,27 @@ int assign_server(struct stream *s)
srv = NULL;
s->target = NULL;
conn = s->sess->srv_conn;
if (conn &&
(conn->flags & CO_FL_CONNECTED) &&
objt_server(conn->target) && __objt_server(conn->target)->proxy == s->be &&
(s->be->lbprm.algo & BE_LB_KIND) != BE_LB_KIND_HI &&
((s->txn && s->txn->flags & TX_PREFER_LAST) ||
((s->be->options & PR_O_PREF_LAST) &&
(!s->be->max_ka_queue ||
server_has_room(__objt_server(conn->target)) ||
(__objt_server(conn->target)->nbpend + 1) < s->be->max_ka_queue))) &&
srv_currently_usable(__objt_server(conn->target))) {
/* This stream was relying on a server in a previous request
* and the proxy has "option prefer-last-server" set
* and balance algorithm dont tell us to do otherwise, so
* let's try to reuse the same server.
*/
srv = __objt_server(conn->target);
s->target = &srv->obj_type;
for (i = 0; i < MAX_SRV_LIST; i++) {
list_for_each_entry(conn, &s->sess->srv_list[i].list, session_list) {
if (conn->flags & CO_FL_CONNECTED &&
objt_server(conn->target) &&
__objt_server(conn->target)->proxy == s->be &&
(s->be->lbprm.algo & BE_LB_KIND) != BE_LB_KIND_HI &&
((s->txn && s->txn->flags & TX_PREFER_LAST) ||
((s->be->options & PR_O_PREF_LAST) &&
(!s->be->max_ka_queue ||
server_has_room(__objt_server(conn->target)) ||
(__objt_server(conn->target)->nbpend + 1) < s->be->max_ka_queue))) &&
srv_currently_usable(__objt_server(conn->target))) {
srv = __objt_server(conn->target);
s->target = &srv->obj_type;
goto out_ok;
}
}
}
else if (s->be->lbprm.algo & BE_LB_KIND) {
if (s->be->lbprm.algo & BE_LB_KIND) {
/* we must check if we have at least one server available */
if (!s->be->lbprm.tot_weight) {
@ -748,6 +750,7 @@ int assign_server(struct stream *s)
goto out;
}
out_ok:
s->flags |= SF_ASSIGNED;
err = SRV_STATUS_OK;
out:
@ -1100,20 +1103,39 @@ fail:
int connect_server(struct stream *s)
{
struct connection *cli_conn = NULL;
struct connection *srv_conn;
struct connection *old_conn;
struct connection *srv_conn = NULL;
struct connection *old_conn = NULL;
struct conn_stream *srv_cs;
struct server *srv;
int reuse = 0;
int err;
int i;
for (i = 0; i < MAX_SRV_LIST; i++) {
if (s->sess->srv_list[i].target == s->target) {
list_for_each_entry(srv_conn, &s->sess->srv_list[i].list,
session_list) {
if (conn_xprt_ready(srv_conn) &&
srv_conn->mux && (srv_conn->mux->avail_streams(srv_conn) > 0)) {
reuse = 1;
break;
}
}
}
}
if (!srv_conn) {
for (i = 0; i < MAX_SRV_LIST; i++) {
if (!LIST_ISEMPTY(&s->sess->srv_list[i].list)) {
srv_conn = LIST_ELEM(&s->sess->srv_list[i].list,
struct connection *, session_list);
break;
}
}
}
old_conn = srv_conn;
srv = objt_server(s->target);
old_conn = srv_conn = s->sess->srv_conn;
if (srv_conn)
reuse = (s->target == srv_conn->target) &&
conn_xprt_ready(srv_conn) && srv_conn->mux &&
(srv_conn->mux->avail_streams(srv_conn) > 0);
if (srv && !reuse) {
srv_conn = NULL;
@ -1176,55 +1198,25 @@ int connect_server(struct stream *s)
/* We're about to use another connection, let the mux know we're
* done with this one
*/
if (old_conn != srv_conn) {
int did_switch = 0;
if (old_conn != srv_conn || !reuse) {
if (srv_conn && reuse) {
struct session *sess;
int count = 0;
struct session *sess = srv_conn->owner;
/*
* If we're attempting to reuse a connection, and
* the new connection has only one user, and there
* are no more streams available, attempt to give
* it our old connection
*/
list_for_each_entry(sess, &srv_conn->session_list,
conn_list) {
count++;
if (count > 1)
break;
}
if (count == 1) {
sess = LIST_ELEM(srv_conn->session_list.n,
struct session *, conn_list);
LIST_DEL(&sess->conn_list);
if (sess) {
if (old_conn &&
!(old_conn->flags & CO_FL_PRIVATE) &&
old_conn->mux != NULL &&
(old_conn->mux->avail_streams(old_conn) > 0) &&
(srv_conn->mux->avail_streams(srv_conn) == 1)) {
LIST_ADDQ(&old_conn->session_list, &sess->conn_list);
sess->srv_conn = old_conn;
did_switch = 1;
} else {
LIST_INIT(&sess->conn_list);
sess->srv_conn = NULL;
LIST_DEL(&old_conn->session_list);
LIST_INIT(&old_conn->session_list);
session_add_conn(sess, old_conn, s->target);
old_conn->owner = sess;
}
}
}
/*
* We didn't manage to give our old connection, destroy it
*/
if (old_conn && !did_switch) {
old_conn->owner = NULL;
LIST_DEL(&old_conn->list);
LIST_INIT(&old_conn->list);
if (old_conn->mux)
old_conn->mux->destroy(old_conn);
old_conn = NULL;
}
}
if (!reuse) {
@ -1242,9 +1234,8 @@ int connect_server(struct stream *s)
}
if (srv_conn && old_conn != srv_conn) {
srv_conn->owner = s->sess;
s->sess->srv_conn = srv_conn;
LIST_DEL(&s->sess->conn_list);
LIST_ADDQ(&srv_conn->session_list, &s->sess->conn_list);
LIST_DEL(&srv_conn->session_list);
session_add_conn(s->sess, srv_conn, s->target);
}
if (!srv_conn)

View File

@ -41,6 +41,7 @@ static struct task *session_expire_embryonic(struct task *t, void *context, unsi
struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type *origin)
{
struct session *sess;
int i;
sess = pool_alloc(pool_head_session);
if (sess) {
@ -53,21 +54,24 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type
vars_init(&sess->vars, SCOPE_SESS);
sess->task = NULL;
sess->t_handshake = -1; /* handshake not done yet */
LIST_INIT(&sess->conn_list);
sess->srv_conn = NULL;
HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.conn_max,
HA_ATOMIC_ADD(&fe->feconn, 1));
if (li)
proxy_inc_fe_conn_ctr(li, fe);
HA_ATOMIC_ADD(&totalconn, 1);
HA_ATOMIC_ADD(&jobs, 1);
for (i = 0; i < MAX_SRV_LIST; i++) {
sess->srv_list[i].target = NULL;
LIST_INIT(&sess->srv_list[i].list);
}
}
return sess;
}
void session_free(struct session *sess)
{
struct connection *conn;
struct connection *conn, *conn_back;
int i;
HA_ATOMIC_SUB(&sess->fe->feconn, 1);
if (sess->listener)
@ -77,21 +81,25 @@ void session_free(struct session *sess)
conn = objt_conn(sess->origin);
if (conn != NULL && conn->mux)
conn->mux->destroy(conn);
conn = sess->srv_conn;
if (conn != NULL && conn->mux) {
LIST_DEL(&conn->list);
LIST_INIT(&conn->list);
conn->owner = NULL;
conn->mux->destroy(conn);
} else if (conn) {
/* We have a connection, but not yet an associated mux.
* So destroy it now.
*/
conn_stop_tracking(conn);
conn_full_close(conn);
conn_free(conn);
for (i = 0; i < MAX_SRV_LIST; i++) {
int count = 0;
list_for_each_entry_safe(conn, conn_back, &sess->srv_list[i].list, session_list) {
count++;
if (conn->mux) {
LIST_DEL(&conn->session_list);
LIST_INIT(&conn->session_list);
conn->owner = NULL;
conn->mux->destroy(conn);
} else {
/* We have a connection, but not yet an associated mux.
* So destroy it now.
*/
conn_stop_tracking(conn);
conn_full_close(conn);
conn_free(conn);
}
}
}
LIST_DEL(&sess->conn_list);
pool_free(pool_head_session, sess);
HA_ATOMIC_SUB(&jobs, 1);
}