diff --git a/include/proto/session.h b/include/proto/session.h index 049fc931c..d88a19ee3 100644 --- a/include/proto/session.h +++ b/include/proto/session.h @@ -47,6 +47,7 @@ int parse_track_counters(char **args, int *arg, int section_type, struct proxy *curpx, struct track_ctr_prm *prm, struct proxy *defpx, char **err); +int conn_session_initialize(struct connection *conn, int flag); /* Remove the refcount from the session to the tracked counters, and clear the * pointer to ensure this is only performed once. The caller is responsible for diff --git a/include/types/connection.h b/include/types/connection.h index 0e12bb5d1..b74c71ae3 100644 --- a/include/types/connection.h +++ b/include/types/connection.h @@ -75,6 +75,8 @@ enum { /* below we have all handshake flags grouped into one */ CO_FL_HANDSHAKE = CO_FL_SI_SEND_PROXY, + CO_FL_INIT_SESS = 0x00000800, /* initialize a session before using data */ + /* when any of these flags is set, polling is defined by socket-layer * operations, as opposed to data-layer. */ diff --git a/include/types/session.h b/include/types/session.h index cce55a4bf..0450163c0 100644 --- a/include/types/session.h +++ b/include/types/session.h @@ -50,7 +50,7 @@ #define SN_FORCE_PRST 0x00000010 /* force persistence here, even if server is down */ #define SN_MONITOR 0x00000020 /* this session comes from a monitoring system */ #define SN_CURR_SESS 0x00000040 /* a connection is currently being counted on the server */ -/* unused: 0x00000080 */ +#define SN_INITIALIZED 0x00000080 /* the session was fully initialized */ #define SN_REDISP 0x00000100 /* set if this session was redispatched from one server to another */ #define SN_CONN_TAR 0x00000200 /* set if this session is turning around before reconnecting */ #define SN_REDIRECTABLE 0x00000400 /* set if this session is redirectable (GET or HEAD) */ diff --git a/src/connection.c b/src/connection.c index 0abbe42c4..9e3d79eb9 100644 --- a/src/connection.c +++ b/src/connection.c @@ -15,6 +15,7 @@ #include #include +#include #include /* I/O callback for fd-based connections. It calls the read/write handlers @@ -25,7 +26,7 @@ int conn_fd_handler(int fd) struct connection *conn = fdtab[fd].owner; if (unlikely(!conn)) - goto leave; + return 0; process_handshake: /* The handshake callbacks are called in sequence. If either of them is @@ -47,6 +48,14 @@ int conn_fd_handler(int fd) if (!(conn->flags & CO_FL_POLL_SOCK)) __conn_sock_stop_both(conn); + /* Maybe we need to finish initializing an incoming session. The + * function may fail and cause the connection to be destroyed, thus + * we must not use it anymore and should immediately leave instead. + */ + if ((conn->flags & CO_FL_INIT_SESS) && + conn_session_initialize(conn, CO_FL_INIT_SESS) < 0) + return 0; + if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) conn->app_cb->recv(conn); @@ -80,6 +89,13 @@ int conn_fd_handler(int fd) } leave: + /* we may need to release the connection which is an embryonic session */ + if ((conn->flags & (CO_FL_ERROR|CO_FL_INIT_SESS)) == (CO_FL_ERROR|CO_FL_INIT_SESS)) { + conn->flags |= CO_FL_ERROR; + conn_session_complete(conn, CO_FL_INIT_SESS); + return 0; + } + if (conn->flags & CO_FL_NOTIFY_SI) conn_notify_si(conn); diff --git a/src/session.c b/src/session.c index 6b784cfe3..eb3249635 100644 --- a/src/session.c +++ b/src/session.c @@ -48,16 +48,19 @@ struct pool_head *pool2_session; struct list sessions; -/* This function is called from the protocol layer accept() in order to instanciate - * a new session on behalf of a given listener and frontend. It returns a positive - * value upon success, 0 if the connection can be ignored, or a negative value upon - * critical failure. The accepted file descriptor is closed if we return <= 0. +static struct task *expire_mini_session(struct task *t); +int session_complete(struct session *s); + +/* This function is called from the protocol layer accept() in order to + * instanciate a new embryonic session on behalf of a given listener and + * frontend. It returns a positive value upon success, 0 if the connection + * can be ignored, or a negative value upon critical failure. The accepted + * file descriptor is closed if we return <= 0. */ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) { struct proxy *p = l->frontend; struct session *s; - struct http_txn *txn; struct task *t; int ret; @@ -67,7 +70,12 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) if (unlikely((s = pool_alloc2(pool2_session)) == NULL)) goto out_close; - /* minimum session initialization required for monitor mode below */ + /* minimum session initialization required for an embryonic session is + * fairly low. We need very little to execute L4 ACLs, then we need a + * task to make the client-side connection live on its own. + * - flags + * - stick-entry tracking + */ s->flags = 0; s->logs.logwait = p->to_log; s->stkctr1_entry = NULL; @@ -75,40 +83,25 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) s->stkctr1_table = NULL; s->stkctr2_table = NULL; - if (unlikely((t = task_new()) == NULL)) - goto out_free_session; + s->listener = l; + s->fe = p; /* OK, we're keeping the session, so let's properly initialize the session */ - LIST_ADDQ(&sessions, &s->list); - LIST_INIT(&s->back_refs); - - s->unique_id = NULL; - s->term_trace = 0; s->si[0].conn.t.sock.fd = cfd; s->si[0].conn.ctrl = l->proto; - s->si[0].conn.flags = CO_FL_NONE | CO_FL_NOTIFY_SI; /* we're on a stream_interface */ + s->si[0].conn.flags = CO_FL_NONE; s->si[0].conn.addr.from = *addr; + set_target_client(&s->si[0].conn.target, l); + s->logs.accept_date = date; /* user-visible date for logging */ s->logs.tv_accept = now; /* corrected date for internal use */ s->uniq_id = totalconn; - p->feconn++; /* beconn will be increased once assigned */ + p->feconn++; + /* This session was accepted, count it now */ + if (p->feconn > p->fe_counters.conn_max) + p->fe_counters.conn_max = p->feconn; - proxy_inc_fe_conn_ctr(l, p); /* note: cum_beconn will be increased once assigned */ - - t->process = l->handler; - t->context = s; - t->nice = l->nice; - t->expire = TICK_ETERNITY; - - s->task = t; - s->listener = l; - - /* Note: initially, the session's backend points to the frontend. - * This changes later when switching rules are executed or - * when the default backend is assigned. - */ - s->be = s->fe = p; - s->req = s->rep = NULL; /* will be allocated later */ + proxy_inc_fe_conn_ctr(l, p); /* if this session comes from a known monitoring system, we want to ignore * it as soon as possible, which means closing it immediately for TCP, but @@ -129,13 +122,182 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) /* let's do a no-linger now to close with a single RST. */ setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger)); ret = 0; /* successful termination */ - goto out_free_task; + goto out_free_session; } - /* This session was accepted, count it now */ - if (p->feconn > p->fe_counters.conn_max) - p->fe_counters.conn_max = p->feconn; + /* Adjust some socket options */ + if (unlikely(fcntl(cfd, F_SETFL, O_NONBLOCK) == -1)) + goto out_free_session; + if (unlikely((t = task_new()) == NULL)) + goto out_free_session; + + t->context = s; + t->nice = l->nice; + s->task = t; + + /* add the various callbacks. Right now the data layer is present but + * not initialized. Also note we need to be careful as the stream int + * is not initialized yet. + */ + si_prepare_conn(&s->si[0], l->proto, l->data); + + /* finish initialization of the accepted file descriptor */ + fd_insert(cfd); + fdtab[cfd].owner = &s->si[0].conn; + fdtab[cfd].flags = 0; + fdtab[cfd].iocb = conn_fd_handler; + conn_data_want_recv(&s->si[0].conn); + if (conn_data_init(&s->si[0].conn) < 0) + goto out_free_task; + + /* OK, now either we have a pending handshake to execute with and + * then we must return to the I/O layer, or we can proceed with the + * end of the session initialization. In case of handshake, we also + * set the I/O timeout to the frontend's client timeout. + */ + + if (s->si[0].conn.flags & CO_FL_HANDSHAKE) { + t->process = expire_mini_session; + t->expire = tick_add_ifset(now_ms, p->timeout.client); + task_queue(t); + s->si[0].conn.flags |= CO_FL_INIT_SESS; + return 1; + } + + /* OK let's complete session initialization */ + ret = session_complete(s); + if (ret > 0) + return ret; + + /* Error unrolling */ + out_free_task: + task_free(t); + out_free_session: + p->feconn--; + if (s->stkctr1_entry || s->stkctr2_entry) + session_store_counters(s); + pool_free2(pool2_session, s); + out_close: + if (ret < 0 && p->mode == PR_MODE_HTTP) { + /* critical error, no more memory, try to emit a 500 response */ + struct chunk *err_msg = error_message(s, HTTP_ERR_500); + send(cfd, err_msg->str, err_msg->len, MSG_DONTWAIT|MSG_NOSIGNAL); + } + + if (fdtab[cfd].owner) + fd_delete(cfd); + else + close(cfd); + return ret; +} + +/* This function kills an existing embryonic session. It stops the connection's + * data layer, releases assigned resources, resumes the listener if it was + * disabled and finally kills the file descriptor. + */ +static void kill_mini_session(struct session *s) +{ + /* kill the connection now */ + conn_data_close(&s->si[0].conn); + + s->fe->feconn--; + if (s->stkctr1_entry || s->stkctr2_entry) + session_store_counters(s); + + if (!(s->listener->options & LI_O_UNLIMITED)) + actconn--; + jobs--; + s->listener->nbconn--; + if (s->listener->state == LI_FULL) + resume_listener(s->listener); + + /* Dequeues all of the listeners waiting for a resource */ + if (!LIST_ISEMPTY(&global_listener_queue)) + dequeue_all_listeners(&global_listener_queue); + + if (!LIST_ISEMPTY(&s->fe->listener_queue) && + (!s->fe->fe_sps_lim || freq_ctr_remain(&s->fe->fe_sess_per_sec, s->fe->fe_sps_lim, 0) > 0)) + dequeue_all_listeners(&s->fe->listener_queue); + + task_delete(s->task); + task_free(s->task); + + if (fdtab[s->si[0].conn.t.sock.fd].owner) + fd_delete(s->si[0].conn.t.sock.fd); + else + close(s->si[0].conn.t.sock.fd); + + pool_free2(pool2_session, s); +} + +/* Finish initializing a session from a connection. Returns <0 if the + * connection was killed. + */ +int conn_session_initialize(struct connection *conn, int flag) +{ + struct session *s = container_of(conn, struct session, si[0].conn); + + if (session_complete(s) > 0) { + conn->flags &= ~flag; + return 0; + } + + /* kill the connection now */ + kill_mini_session(s); + return -1; +} + +/* Manages embryonic sessions timeout. It is only called when the timeout + * strikes and performs the required cleanup. + */ +static struct task *expire_mini_session(struct task *t) +{ + struct session *s = t->context; + + if (!(t->state & TASK_WOKEN_TIMER)) + return t; + + kill_mini_session(s); + return NULL; +} + +/* This function is called from the I/O handler which detects the end of + * handshake, in order to complete initialization of a valid session. It must + * be called with an embryonic session. It returns a positive value upon + * success, 0 if the connection can be ignored, or a negative value upon + * critical failure. The accepted file descriptor is closed if we return <= 0. + */ +int session_complete(struct session *s) +{ + struct listener *l = s->listener; + struct proxy *p = s->fe; + struct http_txn *txn; + struct task *t = s->task; + int ret; + + ret = -1; /* assume unrecoverable error by default */ + + /* OK, we're keeping the session, so let's properly initialize the session */ + LIST_ADDQ(&sessions, &s->list); + LIST_INIT(&s->back_refs); + s->flags |= SN_INITIALIZED; + + s->unique_id = NULL; + s->term_trace = 0; + + t->process = l->handler; + t->context = s; + t->expire = TICK_ETERNITY; + + /* Note: initially, the session's backend points to the frontend. + * This changes later when switching rules are executed or + * when the default backend is assigned. + */ + s->be = s->fe; + s->req = s->rep = NULL; /* will be allocated later */ + + /* Let's count a session now */ proxy_inc_fe_sess_ctr(l, p); if (s->stkctr1_entry) { void *ptr; @@ -170,16 +332,12 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) s->si[0].err_loc = NULL; s->si[0].release = NULL; s->si[0].send_proxy_ofs = 0; - set_target_client(&s->si[0].conn.target, l); s->si[0].exp = TICK_ETERNITY; s->si[0].flags = SI_FL_NONE; if (likely(s->fe->options2 & PR_O2_INDEPSTR)) s->si[0].flags |= SI_FL_INDEP_STR; - /* add the various callbacks */ - si_prepare_conn(&s->si[0], l->proto, l->data); - /* pre-initialize the other side's stream interface to an INIT state. The * callbacks will be initialized before attempting to connect. */ @@ -207,10 +365,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) /* init store persistence */ s->store_count = 0; - /* Adjust some socket options */ - if (unlikely(fcntl(cfd, F_SETFL, O_NONBLOCK) == -1)) - goto out_free_task; - if (unlikely((s->req = pool_alloc2(pool2_channel)) == NULL)) goto out_free_task; /* no memory */ @@ -273,13 +427,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) txn->rsp.buf = s->rep; /* finish initialization of the accepted file descriptor */ - fd_insert(cfd); - fdtab[cfd].owner = &s->si[0].conn; - fdtab[cfd].flags = 0; - fdtab[cfd].iocb = conn_fd_handler; conn_data_want_recv(&s->si[0].conn); - if (conn_data_init(&s->si[0].conn) < 0) - goto out_free_rep; if (p->accept && (ret = p->accept(s)) <= 0) { /* Either we had an unrecoverable error (<0) or work is @@ -289,6 +437,9 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) goto out_free_rep; } + /* we want the connection handler to notify the stream interface about updates. */ + s->si[0].conn.flags |= CO_FL_NOTIFY_SI; + /* it is important not to call the wakeup function directly but to * pass through task_wakeup(), because this one knows how to apply * priorities to tasks. @@ -302,24 +453,6 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) out_free_req: pool_free2(pool2_channel, s->req); out_free_task: - p->feconn--; - if (s->stkctr1_entry || s->stkctr2_entry) - session_store_counters(s); - task_free(t); - LIST_DEL(&s->list); - out_free_session: - pool_free2(pool2_session, s); - out_close: - if (ret < 0 && s->fe->mode == PR_MODE_HTTP) { - /* critical error, no more memory, try to emit a 500 response */ - struct chunk *err_msg = error_message(s, HTTP_ERR_500); - send(cfd, err_msg->str, err_msg->len, MSG_DONTWAIT|MSG_NOSIGNAL); - } - - if (fdtab[cfd].owner) - fd_delete(cfd); - else - close(cfd); return ret; }