MAJOR: stream interface: dynamically allocate the outgoing connection

The outgoing connection is now allocated dynamically upon the first attempt
to touch the connection's source or destination address. If this allocation
fails, we fail on SN_ERR_RESOURCE.

As we didn't use si->conn anymore, it was removed. The endpoints are released
upon session_free(), on the error path, and upon a new transaction. That way
we are able to carry the existing server's address across retries.

The stream interfaces are not initialized anymore before session_complete(),
so we could even think about allocating them dynamically as well, though
that would not provide much savings.

The session initialization now makes use of conn_new()/conn_free(). This
slightly simplifies the code and makes it more logical. The connection
initialization code is now shorter by about 120 bytes because it's done
at once, allowing the compiler to remove all redundant initializations.

The si_attach_applet() function now takes care of first detaching the
existing endpoint, and it is called from stream_int_register_handler(),
so we can safely remove the calls to si_release_endpoint() in the
application code around this call.

A call to si_detach() was made upon stream_int_unregister_handler() to
ensure we always free the allocated connection if one was allocated in
parallel to setting an applet (eg: detect HTTP proxy while proceeding
with stats maybe).
This commit is contained in:
Willy Tarreau 2013-10-11 19:34:20 +02:00
parent 2a6e8802c0
commit 32e3c6a607
8 changed files with 103 additions and 87 deletions

View File

@ -70,13 +70,24 @@ static inline void si_set_state(struct stream_interface *si, int state)
si->state = si->prev_state = state;
}
static inline void si_detach(struct stream_interface *si)
/* release the endpoint if it's a connection, then nullify it */
static inline void si_release_endpoint(struct stream_interface *si)
{
si->ops = &si_embedded_ops;
struct connection *conn;
conn = objt_conn(si->end);
if (conn)
pool_free2(pool2_connection, conn);
si->end = NULL;
si->appctx.applet = NULL;
}
static inline void si_detach(struct stream_interface *si)
{
si_release_endpoint(si);
si->ops = &si_embedded_ops;
}
/* Attach connection <conn> to the stream interface <si>. The stream interface
* is configured to work with a connection and the connection it configured
* with a stream interface data layer.
@ -90,6 +101,7 @@ static inline void si_attach_conn(struct stream_interface *si, struct connection
static inline void si_attach_applet(struct stream_interface *si, struct si_applet *applet)
{
si_release_endpoint(si);
si->ops = &si_embedded_ops;
si->appctx.applet = applet;
si->appctx.obj_type = OBJ_TYPE_APPCTX;
@ -129,6 +141,28 @@ static inline void si_applet_release(struct stream_interface *si)
applet->release(si);
}
/* Returns the stream interface's existing connection if one such already
* exists, or tries to allocate and initialize a new one which is then
* assigned to the stream interface.
*/
static inline struct connection *si_alloc_conn(struct stream_interface *si)
{
struct connection *conn;
/* we return the connection whether it's a real connection or NULL
* in case another entity (an applet) is registered instead.
*/
conn = objt_conn(si->end);
if (si->end)
return conn;
conn = conn_new();
if (conn)
si_attach_conn(si, conn);
return conn;
}
/* Sends a shutr to the connection using the data layer */
static inline void si_shutr(struct stream_interface *si)
{

View File

@ -164,7 +164,6 @@ struct stream_interface {
unsigned int err_type; /* first error detected, one of SI_ET_* */
enum obj_type *end; /* points to the end point (connection or appctx) */
struct connection *conn; /* pre-allocated connection */
struct si_ops *ops; /* general operations at the stream interface layer */
/* struct members below are the "remote" part, as seen from the buffer side */

View File

@ -708,14 +708,14 @@ int assign_server(struct session *s)
* Upon successful return, the session flag SN_ADDR_SET is set. This flag is
* not cleared, so it's to the caller to clear it if required.
*
* The address is set on si->conn only. This connection is expected to be
* already allocated and initialized.
* The caller is responsible for having already assigned a connection
* to si->end.
*
*/
int assign_server_address(struct session *s)
{
struct connection *cli_conn = objt_conn(s->req->prod->end);
struct connection *srv_conn = s->req->cons->conn;
struct connection *srv_conn = objt_conn(s->req->cons->end);
#ifdef DEBUG_FULL
fprintf(stderr,"assign_server_address : s=%p\n",s);
@ -907,7 +907,7 @@ int assign_server_and_queue(struct session *s)
/* If an explicit source binding is specified on the server and/or backend, and
* this source makes use of the transparent proxy, then it is extracted now and
* assigned to the session's pending connection. This function assumes that an
* outgoing connection has already been allocated into s->req->cons->conn.
* outgoing connection has already been assigned to s->req->cons->end.
*/
static void assign_tproxy_address(struct session *s)
{
@ -915,7 +915,7 @@ static void assign_tproxy_address(struct session *s)
struct server *srv = objt_server(s->target);
struct conn_src *src;
struct connection *cli_conn;
struct connection *srv_conn = s->req->cons->conn;
struct connection *srv_conn = objt_conn(s->req->cons->end);
if (srv && srv->conn_src.opts & CO_SRC_BIND)
src = &srv->conn_src;
@ -982,10 +982,13 @@ static void assign_tproxy_address(struct session *s)
int connect_server(struct session *s)
{
struct connection *cli_conn;
struct connection *srv_conn = s->req->cons->conn;
struct connection *srv_conn = si_alloc_conn(s->req->cons);
struct server *srv;
int err;
if (!srv_conn)
return SN_ERR_RESOURCE;
if (!(s->flags & SN_ADDR_SET)) {
err = assign_server_address(s);
if (err != SRV_STATUS_OK)

View File

@ -156,12 +156,6 @@ extern const char *stat_status_codes[];
*/
static int stats_accept(struct session *s)
{
/* we have a dedicated I/O handler for the CLI/stats, so we can safely
* release the pre-allocated connection that we will never use.
*/
pool_free2(pool2_connection, s->si[1].conn);
s->si[1].conn = NULL;
stream_int_register_handler(&s->si[1], &cli_applet);
s->target = &cli_applet.obj_type; // for logging only
s->si[1].appctx.st1 = 0;
@ -4034,13 +4028,12 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
http_msg_state_str(sess->txn.req.msg_state), http_msg_state_str(sess->txn.rsp.msg_state));
chunk_appendf(&trash,
" si[0]=%p (state=%s flags=0x%02x endp0=%s:%p conn0=%p exp=%s, et=0x%03x)\n",
" si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s, et=0x%03x)\n",
&sess->si[0],
si_state_str(sess->si[0].state),
sess->si[0].flags,
obj_type_name(sess->si[0].end),
obj_base_ptr(sess->si[0].end),
sess->si[0].conn,
sess->si[0].exp ?
tick_is_expired(sess->si[0].exp, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(sess->si[0].exp - now_ms),
@ -4048,13 +4041,12 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se
sess->si[0].err_type);
chunk_appendf(&trash,
" si[1]=%p (state=%s flags=0x%02x endp1=%s:%p conn1=%p exp=%s, et=0x%03x)\n",
" si[1]=%p (state=%s flags=0x%02x endp1=%s:%p exp=%s, et=0x%03x)\n",
&sess->si[1],
si_state_str(sess->si[1].state),
sess->si[1].flags,
obj_type_name(sess->si[1].end),
obj_base_ptr(sess->si[1].end),
sess->si[1].conn,
sess->si[1].exp ?
tick_is_expired(sess->si[1].exp, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(sess->si[1].exp - now_ms),

View File

@ -1083,12 +1083,6 @@ static void peer_session_forceshutdown(struct session * session)
*/
int peer_accept(struct session *s)
{
/* we have a dedicated I/O handler for the peers, so we can safely
* release the pre-allocated connection that we will never use.
*/
pool_free2(pool2_connection, s->si[1].conn);
s->si[1].conn = NULL;
stream_int_register_handler(&s->si[1], &peer_applet);
s->target = &peer_applet.obj_type; // for logging only
s->si[1].appctx.ctx.peers.ptr = s;
@ -1122,15 +1116,13 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
struct session *s;
struct http_txn *txn;
struct task *t;
struct connection *conn;
if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */
Alert("out of memory in peer_session_create().\n");
goto out_close;
}
if (unlikely((s->si[1].conn = pool_alloc2(pool2_connection)) == NULL))
goto out_fail_conn1;
LIST_ADDQ(&sessions, &s->list);
LIST_INIT(&s->back_refs);
@ -1151,7 +1143,6 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
t->context = s;
t->nice = l->nice;
memcpy(&s->si[1].conn->addr.to, &peer->addr, sizeof(s->si[1].conn->addr.to));
s->task = t;
s->listener = l;
@ -1163,7 +1154,6 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
s->req = s->rep = NULL; /* will be allocated later */
s->si[0].conn = NULL;
si_reset(&s->si[0], t);
si_set_state(&s->si[0], SI_ST_EST);
@ -1183,15 +1173,19 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
if (s->be->options2 & PR_O2_INDEPSTR)
s->si[1].flags |= SI_FL_INDEP_STR;
/* will automatically prepare the stream interface to connect to the
/* automatically prepare the stream interface to connect to the
* pre-initialized connection in si->conn.
*/
conn_init(s->si[1].conn);
conn_prepare(s->si[1].conn, peer->proto, peer->xprt);
si_attach_conn(&s->si[1], s->si[1].conn);
if (unlikely((conn = conn_new()) == NULL))
goto out_fail_conn1;
conn_prepare(conn, peer->proto, peer->xprt);
si_attach_conn(&s->si[1], conn);
conn->target = s->target = &s->be->obj_type;
memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
session_init_srv_conn(s);
s->si[1].conn->target = s->target = &s->be->obj_type;
s->pend_pos = NULL;
/* init store persistence */
@ -1302,11 +1296,11 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
out_fail_req_buf:
pool_free2(pool2_channel, s->req);
out_fail_req:
conn_free(conn);
out_fail_conn1:
task_free(t);
out_free_session:
LIST_DEL(&s->list);
pool_free2(pool2_connection, s->si[1].conn);
out_fail_conn1:
pool_free2(pool2_session, s);
out_close:
return s;

View File

@ -3723,7 +3723,22 @@ int http_process_request(struct session *s, struct channel *req, int an_bit)
* allocated on the server side.
*/
if ((s->be->options & PR_O_HTTP_PROXY) && !(s->flags & SN_ADDR_SET)) {
url2sa(req->buf->p + msg->sl.rq.u, msg->sl.rq.u_l, &s->req->cons->conn->addr.to);
struct connection *conn;
if (unlikely((conn = si_alloc_conn(req->cons)) == NULL)) {
txn->req.msg_state = HTTP_MSG_ERROR;
txn->status = 500;
req->analysers = 0;
stream_int_retnclose(req->prod, http_error_message(s, HTTP_ERR_500));
if (!(s->flags & SN_ERR_MASK))
s->flags |= SN_ERR_RESOURCE;
if (!(s->flags & SN_FINST_MASK))
s->flags |= SN_FINST_R;
return 0;
}
url2sa(req->buf->p + msg->sl.rq.u, msg->sl.rq.u_l, &conn->addr.to);
}
/*
@ -4291,11 +4306,8 @@ void http_end_txn_clean_session(struct session *s)
s->target = NULL;
/* reinitialize the connection to the server */
conn_init(s->req->cons->conn);
s->req->cons->state = s->req->cons->prev_state = SI_ST_INI;
s->req->cons->end = NULL;
si_release_endpoint(s->req->cons);
s->req->cons->err_type = SI_ET_NONE;
s->req->cons->conn_retries = 0; /* used for logging too */
s->req->cons->exp = TICK_ETERNITY;

View File

@ -81,14 +81,18 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
ret = -1; /* assume unrecoverable error by default */
if (unlikely((s = pool_alloc2(pool2_session)) == NULL))
if (unlikely((cli_conn = conn_new()) == NULL))
goto out_close;
if (unlikely((cli_conn = s->si[0].conn = pool_alloc2(pool2_connection)) == NULL))
goto out_fail_conn0;
conn_prepare(cli_conn, l->proto, l->xprt);
if (unlikely((s->si[1].conn = pool_alloc2(pool2_connection)) == NULL))
goto out_fail_conn1;
cli_conn->t.sock.fd = cfd;
cli_conn->addr.from = *addr;
cli_conn->flags |= CO_FL_ADDR_FROM_SET;
cli_conn->target = &l->obj_type;
if (unlikely((s = pool_alloc2(pool2_session)) == NULL))
goto out_free_conn;
/* minimum session initialization required for an embryonic session is
* fairly low. We need very little to execute L4 ACLs, then we need a
@ -105,16 +109,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
s->listener = l;
s->fe = p;
/* OK, we're keeping the session, so let's properly initialize the session.
* We first have to initialize the client-side connection.
*/
conn_init(cli_conn);
cli_conn->t.sock.fd = cfd;
cli_conn->flags |= CO_FL_ADDR_FROM_SET;
conn_prepare(cli_conn, l->proto, l->xprt);
cli_conn->addr.from = *addr;
cli_conn->target = &l->obj_type;
/* On a mini-session, the connection is directly attached to the
* session's target so that we don't need to initialize the stream
* interfaces. Another benefit is that it's easy to detect a mini-
@ -123,11 +117,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
*/
s->target = &cli_conn->obj_type;
/* The server side is not used yet, but just initialize it to avoid
* confusing some debugging or "show sess" for example.
*/
s->si[1].conn->obj_type = OBJ_TYPE_NONE;
s->logs.accept_date = date; /* user-visible date for logging */
s->logs.tv_accept = now; /* corrected date for internal use */
s->uniq_id = totalconn;
@ -185,7 +174,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
goto out_free_session;
}
/* wait for a PROXY protocol header */
if (l->options & LI_O_ACC_PROXY) {
cli_conn->flags |= CO_FL_ACCEPT_PROXY;
@ -239,13 +227,11 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
p->feconn--;
if (s->stkctr[0].entry || s->stkctr[1].entry)
session_store_counters(s);
pool_free2(pool2_connection, s->si[1].conn);
out_fail_conn1:
s->si[0].conn->flags &= ~CO_FL_XPRT_TRACKED;
conn_xprt_close(s->si[0].conn);
pool_free2(pool2_connection, s->si[0].conn);
out_fail_conn0:
pool_free2(pool2_session, s);
out_free_conn:
cli_conn->flags &= ~CO_FL_XPRT_TRACKED;
conn_xprt_close(cli_conn);
conn_free(cli_conn);
out_close:
if (ret < 0 && l->xprt == &raw_sock && p->mode == PR_MODE_HTTP) {
/* critical error, no more memory, try to emit a 500 response */
@ -354,8 +340,7 @@ static void kill_mini_session(struct session *s)
task_delete(s->task);
task_free(s->task);
pool_free2(pool2_connection, s->si[1].conn);
pool_free2(pool2_connection, s->si[0].conn);
pool_free2(pool2_connection, conn);
pool_free2(pool2_session, s);
}
@ -425,17 +410,6 @@ int session_complete(struct session *s)
LIST_ADDQ(&sessions, &s->list);
LIST_INIT(&s->back_refs);
/* attach the incoming connection to the stream interface now */
si_reset(&s->si[0], t);
si_set_state(&s->si[0], SI_ST_EST);
if (likely(s->fe->options2 & PR_O2_INDEPSTR))
s->si[0].flags |= SI_FL_INDEP_STR;
s->si[0].conn = conn;
conn_prepare(conn, l->proto, l->xprt);
si_attach_conn(&s->si[0], conn);
s->flags |= SN_INITIALIZED;
s->unique_id = NULL;
@ -470,12 +444,20 @@ int session_complete(struct session *s)
s->stkctr[i].table->data_arg[STKTABLE_DT_SESS_RATE].u, 1);
}
/* this part should be common with other protocols */
si_reset(&s->si[0], t);
si_set_state(&s->si[0], SI_ST_EST);
/* attach the incoming connection to the stream interface now */
si_attach_conn(&s->si[0], conn);
if (likely(s->fe->options2 & PR_O2_INDEPSTR))
s->si[0].flags |= SI_FL_INDEP_STR;
/* pre-initialize the other side's stream interface to an INIT state. The
* callbacks will be initialized before attempting to connect.
*/
si_reset(&s->si[1], t);
conn_init(s->si[1].conn);
s->si[1].conn->target = NULL;
si_detach(&s->si[1]);
if (likely(s->fe->options2 & PR_O2_INDEPSTR))
@ -673,8 +655,8 @@ static void session_free(struct session *s)
bref->ref = s->list.n;
}
LIST_DEL(&s->list);
pool_free2(pool2_connection, s->si[1].conn);
pool_free2(pool2_connection, s->si[0].conn);
si_release_endpoint(&s->si[1]);
si_release_endpoint(&s->si[0]);
pool_free2(pool2_session, s);
/* We may want to free the maximum amount of pools if the proxy is stopping */

View File

@ -364,8 +364,8 @@ struct task *stream_int_register_handler(struct stream_interface *si, struct si_
*/
void stream_int_unregister_handler(struct stream_interface *si)
{
si_detach(si);
si->owner = NULL;
si->end = NULL;
}
/* This callback is used to send a valid PROXY protocol line to a socket being