[MAJOR] first limited implementation of connection queueing.

There is no timeout yet, and the server UP/DOWN events are not used
	to export/import list of connections yet. It seems that the process
	can sometimes eat lots of user CPU (~50%) if a maxconn is set on an
	overloaded server.
This commit is contained in:
willy tarreau 2006-05-02 00:19:57 +02:00
parent fd5c879b4d
commit dfece23f7d

657
haproxy.c
View File

@ -405,6 +405,8 @@ int strlcpy2(char *dst, const char *src, int size) {
/* various other session flags, bits values 0x400000 and above */ /* various other session flags, bits values 0x400000 and above */
#define SN_MONITOR 0x00400000 /* this session comes from a monitoring system */ #define SN_MONITOR 0x00400000 /* this session comes from a monitoring system */
#define SN_ASSIGNED 0x00800000 /* no need to assign a server to this session */
#define SN_ADDR_SET 0x01000000 /* this session's server address has been set */
/* different possible states for the client side */ /* different possible states for the client side */
@ -448,6 +450,13 @@ int strlcpy2(char *dst, const char *src, int size) {
#define SRV_BIND_SRC 8 /* this server uses a specific source address */ #define SRV_BIND_SRC 8 /* this server uses a specific source address */
#define SRV_CHECKED 16 /* this server needs to be checked */ #define SRV_CHECKED 16 /* this server needs to be checked */
/* function which act on servers need to return various errors */
#define SRV_STATUS_OK 0 /* everything is OK. */
#define SRV_STATUS_INTERNAL 1 /* other unrecoverable errors. */
#define SRV_STATUS_NOSRV 2 /* no server is available */
#define SRV_STATUS_FULL 3 /* the/all server(s) are saturated */
#define SRV_STATUS_QUEUED 4 /* the/all server(s) are saturated but the connection was queued */
/* what to do when a header matches a regex */ /* what to do when a header matches a regex */
#define ACT_ALLOW 0 /* allow the request */ #define ACT_ALLOW 0 /* allow the request */
#define ACT_REPLACE 1 /* replace the matching header */ #define ACT_REPLACE 1 /* replace the matching header */
@ -622,6 +631,8 @@ struct proxy {
int srvtimeout; /* server I/O timeout (in milliseconds) */ int srvtimeout; /* server I/O timeout (in milliseconds) */
int contimeout; /* connect timeout (in milliseconds) */ int contimeout; /* connect timeout (in milliseconds) */
char *id; /* proxy id */ char *id; /* proxy id */
struct list pendconns; /* pending connections with no server assigned yet */
int nbpend; /* number of pending connections with no server assigned yet */
int nbconn; /* # of active sessions */ int nbconn; /* # of active sessions */
unsigned int cum_conn; /* cumulated number of processed sessions */ unsigned int cum_conn; /* cumulated number of processed sessions */
int maxconn; /* max # of active sessions */ int maxconn; /* max # of active sessions */
@ -1781,57 +1792,94 @@ struct task *task_queue(struct task *task) {
/*********************************************************************/ /*********************************************************************/
/* /*
* returns the first pending connection of server <s> or NULL if none. * Detaches pending connection <p>, decreases the pending count, and frees
* the pending connection. The connection might have been queued to a specific
* server as well as to the proxy. The session also gets marked unqueued.
*/ */
static inline struct pendconn *pendconn_peek(struct server *s) { static void pendconn_free(struct pendconn *p) {
LIST_DEL(&p->list);
p->sess->pend_pos = NULL;
if (p->srv)
p->srv->nbpend--;
else
p->sess->proxy->nbpend--;
pool_free(pendconn, p);
}
/* Returns the first pending connection for server <s>, which may be NULL if
* nothing is pending.
*/
static inline struct pendconn *pendconn_from_srv(struct server *s) {
if (!s->nbpend) if (!s->nbpend)
return NULL; return NULL;
return LIST_ELEM(s->pendconns.n, struct pendconn *, list); return LIST_ELEM(s->pendconns.n, struct pendconn *, list);
} }
/* /* Returns the first pending connection for proxy <px>, which may be NULL if
* Detaches pending connection <p>, decreases the pending count, and frees * nothing is pending.
* the pending connection.
*/ */
static inline void pendconn_free(struct pendconn *p) { static inline struct pendconn *pendconn_from_px(struct proxy *px) {
LIST_DEL(&p->list); if (!px->nbpend)
p->sess->pend_pos = NULL; return NULL;
p->srv->nbpend--;
pool_free(pendconn, p); return LIST_ELEM(px->pendconns.n, struct pendconn *, list);
} }
/* detaches the first pending connection for server <s> and returns its /* Detaches the next pending connection for either current session's server or
* associated session. If no pending connection is found, NULL is returned. * current session's proxy, and returns its associated session. If no pending
* connection is found, NULL is returned. Note that cur->srv cannot be NULL.
*/ */
static inline struct session *pendconn_get(struct server *s) { static struct session *pendconn_get_next_sess(struct session *cur) {
struct pendconn *p; struct pendconn *p;
struct session *sess; struct session *sess;
p = pendconn_peek(s); p = pendconn_from_srv(cur->srv);
if (!p) {
p = pendconn_from_px(cur->proxy);
if (!p) if (!p)
return NULL; return NULL;
p->sess->srv = cur->srv;
}
sess = p->sess; sess = p->sess;
pendconn_free(p); pendconn_free(p);
return sess; return sess;
} }
/* adds the session <sess> to the pending connection list of server <srv>. /* Checks if other sessions are waiting for the same server, and wakes the
* All counters and back pointers are updated accordingly. Returns NULL if * first one up. Note that cur->srv cannot be NULL.
* no memory is available, otherwise the pendconn itself.
*/ */
static struct pendconn *pendconn_add(struct server *srv, struct session *sess) { void offer_connection_slot(struct session *cur) {
struct session *sess;
sess = pendconn_get_next_sess(cur);
if (sess == NULL)
return;
task_wakeup(&rq, sess->task);
}
/* Adds the session <sess> to the pending connection list of server <sess>->srv
* or to the one of <sess>->proxy if srv is NULL. All counters and back pointers
* are updated accordingly. Returns NULL if no memory is available, otherwise the
* pendconn itself.
*/
static struct pendconn *pendconn_add(struct session *sess) {
struct pendconn *p; struct pendconn *p;
p = pool_alloc(pendconn); p = pool_alloc(pendconn);
if (!p) if (!p)
return NULL; return NULL;
LIST_ADDQ(&srv->pendconns, &p->list);
p->sess = sess;
p->srv = srv;
sess->pend_pos = p; sess->pend_pos = p;
srv->nbpend++; p->sess = sess;
p->srv = sess->srv;
if (sess->srv) {
LIST_ADDQ(&sess->srv->pendconns, &p->list);
sess->srv->nbpend++;
} else {
LIST_ADDQ(&sess->proxy->pendconns, &p->list);
sess->proxy->nbpend++;
}
return p; return p;
} }
@ -1860,7 +1908,7 @@ static int get_original_dst(int fd, struct sockaddr_in *sa, socklen_t *salen) {
/* /*
* frees the context associated to a session. It must have been removed first. * frees the context associated to a session. It must have been removed first.
*/ */
static inline void session_free(struct session *s) { static void session_free(struct session *s) {
if (s->pend_pos) if (s->pend_pos)
pendconn_free(s->pend_pos); pendconn_free(s->pend_pos);
if (s->req) if (s->req)
@ -2053,43 +2101,42 @@ static inline struct server *get_server_sh(struct proxy *px, char *addr, int len
/* /*
* This function initiates a connection to the current server (s->srv) if (s->direct) * This function marks the session as 'assigned' in direct or dispatch modes,
* is set, or to the dispatch server if (s->direct) is 0. * or tries to assign one in balance mode, according to the algorithm. It does
* It can return one of : * nothing if the session had already been assigned a server.
* - SN_ERR_NONE if everything's OK *
* - SN_ERR_SRVTO if there are no more servers * It may return :
* - SN_ERR_SRVCL if the connection was refused by the server * SRV_STATUS_OK if everything is OK.
* - SN_ERR_PRXCOND if the connection has been limited by the proxy (maxconn) * SRV_STATUS_NOSRV if no server is available
* - SN_ERR_RESOURCE if a system resource is lacking (eg: fd limits, ports, ...) * SRV_STATUS_FULL if all servers are saturated
* - SN_ERR_INTERNAL for any other purely internal errors * SRV_STATUS_INTERNAL for other unrecoverable errors.
* Additionnally, in the case of SN_ERR_RESOURCE, an emergency log will be emitted. *
* Upon successful return, the session flag SN_ASSIGNED to indicate that it does
* not need to be called anymore. This usually means that s->srv can be trusted
* in balance and direct modes. This flag is not cleared, so it's to the caller
* to clear it if required (eg: redispatch).
*
*/ */
int connect_server(struct session *s) {
int fd;
int assign_server(struct session *s) {
#ifdef DEBUG_FULL #ifdef DEBUG_FULL
fprintf(stderr,"connect_server : s=%p\n",s); fprintf(stderr,"assign_server : s=%p\n",s);
#endif #endif
if (s->flags & SN_DIRECT) { /* srv cannot be null */ if (s->pend_pos)
s->srv_addr = s->srv->addr; return SRV_STATUS_INTERNAL;
}
else if (s->proxy->options & PR_O_BALANCE) { if (!(s->flags & SN_ASSIGNED)) {
/* Ensure that srv will not be NULL */ if ((s->proxy->options & PR_O_BALANCE) && !(s->flags & SN_DIRECT)) {
if (!s->proxy->srv_act && !s->proxy->srv_bck) if (!s->proxy->srv_act && !s->proxy->srv_bck)
return SN_ERR_SRVTO; return SRV_STATUS_NOSRV;
if (s->proxy->options & PR_O_BALANCE_RR) { if (s->proxy->options & PR_O_BALANCE_RR) {
struct server *srv; s->srv = get_server_rr_with_conns(s->proxy);
if (!s->srv)
srv = get_server_rr_with_conns(s->proxy); return SRV_STATUS_FULL;
if (!srv)
srv = get_server_rr(s->proxy);
s->srv_addr = srv->addr;
s->srv = srv;
} }
else if (s->proxy->options & PR_O_BALANCE_SH) { else if (s->proxy->options & PR_O_BALANCE_SH) {
struct server *srv;
int len; int len;
if (s->cli_addr.ss_family == AF_INET) if (s->cli_addr.ss_family == AF_INET)
@ -2097,16 +2144,56 @@ int connect_server(struct session *s) {
else if (s->cli_addr.ss_family == AF_INET6) else if (s->cli_addr.ss_family == AF_INET6)
len = 16; len = 16;
else /* unknown IP family */ else /* unknown IP family */
return SN_ERR_INTERNAL; return SRV_STATUS_INTERNAL;
srv = get_server_sh(s->proxy, s->srv = get_server_sh(s->proxy,
(void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr, (void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr,
len); len);
s->srv_addr = srv->addr;
s->srv = srv;
} }
else /* unknown balancing algorithm */ else /* unknown balancing algorithm */
return SN_ERR_INTERNAL; return SRV_STATUS_INTERNAL;
}
s->flags |= SN_ASSIGNED;
}
return SRV_STATUS_OK;
}
/*
* This function assigns a server address to a session, and sets SN_ADDR_SET.
* The address is taken from the currently assigned server, or from the
* dispatch or transparent address.
*
* It may return :
* SRV_STATUS_OK if everything is OK.
* SRV_STATUS_INTERNAL for other unrecoverable errors.
*
* Upon successful return, the session flag SN_ADDR_SET is set. This flag is
* not cleared, so it's to the caller to clear it if required.
*
*/
int assign_server_address(struct session *s) {
#ifdef DEBUG_FULL
fprintf(stderr,"assign_server_address : s=%p\n",s);
#endif
if (s->flags & SN_DIRECT || s->proxy->options & PR_O_BALANCE) {
/* A server is necessarily known for this session */
if (!(s->flags & SN_ASSIGNED))
return SRV_STATUS_INTERNAL;
s->srv_addr = s->srv->addr;
/* if this server remaps proxied ports, we'll use
* the port the client connected to with an offset. */
if (s->srv->state & SRV_MAPPORTS) {
struct sockaddr_in sockname;
socklen_t namelen = sizeof(sockname);
if (!(s->proxy->options & PR_O_TRANSP) ||
get_original_dst(s->cli_fd, (struct sockaddr_in *)&sockname, &namelen) == -1)
getsockname(s->cli_fd, (struct sockaddr *)&sockname, &namelen);
s->srv_addr.sin_port = htons(ntohs(s->srv_addr.sin_port) + ntohs(sockname.sin_port));
}
} }
else if (*(int *)&s->proxy->dispatch_addr.sin_addr) { else if (*(int *)&s->proxy->dispatch_addr.sin_addr) {
/* connect to the defined dispatch addr */ /* connect to the defined dispatch addr */
@ -2118,21 +2205,102 @@ int connect_server(struct session *s) {
if (get_original_dst(s->cli_fd, &s->srv_addr, &salen) == -1) { if (get_original_dst(s->cli_fd, &s->srv_addr, &salen) == -1) {
qfprintf(stderr, "Cannot get original server address.\n"); qfprintf(stderr, "Cannot get original server address.\n");
return SRV_STATUS_INTERNAL;
}
}
s->flags |= SN_ADDR_SET;
return SRV_STATUS_OK;
}
/* This function assigns a server to session <s> if required, and can add the
* connection to either the assigned server's queue or to the proxy's queue.
*
* Returns :
*
* SRV_STATUS_OK if everything is OK.
* SRV_STATUS_NOSRV if no server is available
* SRV_STATUS_QUEUED if the connection has been queued.
* SRV_STATUS_FULL if the server(s) is/are saturated and the
* connection could not be queued.
* SRV_STATUS_INTERNAL for other unrecoverable errors.
*
*/
int assign_server_and_queue(struct session *s) {
struct pendconn *p;
int err;
if (s->pend_pos)
return SRV_STATUS_INTERNAL;
if (s->flags & SN_ASSIGNED) {
/* a server does not need to be assigned, perhaps because we're in
* direct mode, or in dispatch or transparent modes where the server
* is not needed.
*/
if (s->srv &&
s->srv->maxconn && s->srv->cur_sess >= s->srv->maxconn) {
p = pendconn_add(s);
if (p)
return SRV_STATUS_QUEUED;
else
return SRV_STATUS_FULL;
}
return SRV_STATUS_OK;
}
/* a server needs to be assigned */
err = assign_server(s);
switch (err) {
case SRV_STATUS_OK:
/* in balance mode, we might have servers with connection limits */
if (s->srv != NULL &&
s->srv->maxconn && s->srv->cur_sess >= s->srv->maxconn) {
p = pendconn_add(s);
if (p)
return SRV_STATUS_QUEUED;
else
return SRV_STATUS_FULL;
}
return SRV_STATUS_OK;
case SRV_STATUS_FULL:
/* queue this session into the proxy's queue */
p = pendconn_add(s);
if (p)
return SRV_STATUS_QUEUED;
else
return SRV_STATUS_FULL;
case SRV_STATUS_NOSRV:
case SRV_STATUS_INTERNAL:
return err;
default:
return SRV_STATUS_INTERNAL;
}
}
/*
* This function initiates a connection to the server assigned to this session
* (s->srv, s->srv_addr). It will assign a server if none is assigned yet.
* It can return one of :
* - SN_ERR_NONE if everything's OK
* - SN_ERR_SRVTO if there are no more servers
* - SN_ERR_SRVCL if the connection was refused by the server
* - SN_ERR_PRXCOND if the connection has been limited by the proxy (maxconn)
* - SN_ERR_RESOURCE if a system resource is lacking (eg: fd limits, ports, ...)
* - SN_ERR_INTERNAL for any other purely internal errors
* Additionnally, in the case of SN_ERR_RESOURCE, an emergency log will be emitted.
*/
int connect_server(struct session *s) {
int fd, err;
if (!(s->flags & SN_ADDR_SET)) {
err = assign_server_address(s);
if (err != SRV_STATUS_OK)
return SN_ERR_INTERNAL; return SN_ERR_INTERNAL;
} }
}
/* if this server remaps proxied ports, we'll use
* the port the client connected to with an offset. */
if (s->srv != NULL && s->srv->state & SRV_MAPPORTS) {
struct sockaddr_in sockname;
socklen_t namelen = sizeof(sockname);
if (!(s->proxy->options & PR_O_TRANSP) ||
get_original_dst(s->cli_fd, (struct sockaddr_in *)&sockname, &namelen) == -1)
getsockname(s->cli_fd, (struct sockaddr *)&sockname, &namelen);
s->srv_addr.sin_port = htons(ntohs(s->srv_addr.sin_port) + ntohs(sockname.sin_port));
}
if ((fd = s->srv_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { if ((fd = s->srv_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
qfprintf(stderr, "Cannot get a server socket.\n"); qfprintf(stderr, "Cannot get a server socket.\n");
@ -3624,7 +3792,7 @@ int process_cli(struct session *t) {
if (srv->state & SRV_RUNNING || t->proxy->options & PR_O_PERSIST) { if (srv->state & SRV_RUNNING || t->proxy->options & PR_O_PERSIST) {
/* we found the server and it's usable */ /* we found the server and it's usable */
t->flags &= ~SN_CK_MASK; t->flags &= ~SN_CK_MASK;
t->flags |= SN_CK_VALID | SN_DIRECT; t->flags |= SN_CK_VALID | SN_DIRECT | SN_ASSIGNED;
t->srv = srv; t->srv = srv;
break; break;
} else { } else {
@ -3879,7 +4047,7 @@ int process_cli(struct session *t) {
if (srv->state & SRV_RUNNING || t->proxy->options & PR_O_PERSIST) { if (srv->state & SRV_RUNNING || t->proxy->options & PR_O_PERSIST) {
/* we found the server and it's usable */ /* we found the server and it's usable */
t->flags &= ~SN_CK_MASK; t->flags &= ~SN_CK_MASK;
t->flags |= SN_CK_VALID | SN_DIRECT; t->flags |= SN_CK_VALID | SN_DIRECT | SN_ASSIGNED;
t->srv = srv; t->srv = srv;
break; break;
} else { } else {
@ -3976,7 +4144,7 @@ int process_cli(struct session *t) {
if (srv->state & SRV_RUNNING || t->proxy->options & PR_O_PERSIST) { if (srv->state & SRV_RUNNING || t->proxy->options & PR_O_PERSIST) {
/* we found the server and it's usable */ /* we found the server and it's usable */
t->flags &= ~SN_CK_MASK; t->flags &= ~SN_CK_MASK;
t->flags |= SN_CK_VALID | SN_DIRECT; t->flags |= SN_CK_VALID | SN_DIRECT | SN_ASSIGNED;
t->srv = srv; t->srv = srv;
break; break;
} else { } else {
@ -4312,6 +4480,150 @@ int process_cli(struct session *t) {
return 0; return 0;
} }
/* This function turns the server state into the SV_STCLOSE, and sets
* indicators accordingly. Note that if <status> is 0, no message is
* returned.
*/
void srv_close_with_err(struct session *t, int err, int finst, int status, int msglen, char *msg) {
t->srv_state = SV_STCLOSE;
if (status > 0) {
t->logs.status = status;
if (t->proxy->mode == PR_MODE_HTTP)
client_return(t, msglen, msg);
}
if (!(t->flags & SN_ERR_MASK))
t->flags |= err;
if (!(t->flags & SN_FINST_MASK))
t->flags |= finst;
}
/*
* This function checks the retry count during the connect() job.
* It updates the session's srv_state and retries, so that the caller knows
* what it has to do. It uses the last connection error to set the log when
* it expires. It returns 1 when it has expired, and 0 otherwise.
*/
int srv_count_retry_down(struct session *t, int conn_err) {
/* we are in front of a retryable error */
t->conn_retries--;
if (t->conn_retries < 0) {
/* if not retryable anymore, let's abort */
tv_eternity(&t->cnexpire);
srv_close_with_err(t, conn_err, SN_FINST_C,
503, t->proxy->errmsg.len503, t->proxy->errmsg.msg503);
/* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1;
}
return 0;
}
/*
* This function performs the retryable part of the connect() job.
* It updates the session's srv_state and retries, so that the caller knows
* what it has to do. It returns 1 when it breaks out of the loop, or 0 if
* it needs to redispatch.
*/
int srv_retryable_connect(struct session *t) {
int conn_err;
/* This loop ensures that we stop before the last retry in case of a
* redispatchable server.
*/
do {
/* initiate a connection to the server */
conn_err = connect_server(t);
switch (conn_err) {
case SN_ERR_NONE:
//fprintf(stderr,"0: c=%d, s=%d\n", c, s);
t->srv_state = SV_STCONN;
return 1;
case SN_ERR_INTERNAL:
tv_eternity(&t->cnexpire);
srv_close_with_err(t, SN_ERR_INTERNAL, SN_FINST_C,
500, t->proxy->errmsg.len500, t->proxy->errmsg.msg500);
/* release other sessions waiting for this server */
if (t->srv)
offer_connection_slot(t);
return 1;
}
/* ensure that we have enough retries left */
if (srv_count_retry_down(t, conn_err))
return 1;
} while (t->srv == NULL || t->conn_retries > 0 || !(t->proxy->options & PR_O_REDISP));
/* We're on our last chance, and the REDISP option was specified.
* We will ignore cookie and force to balance or use the dispatcher.
*/
t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
t->srv = NULL; /* it's left to the dispatcher to choose a server */
if ((t->flags & SN_CK_MASK) == SN_CK_VALID) {
t->flags &= ~SN_CK_MASK;
t->flags |= SN_CK_DOWN;
}
return 0;
}
/* This function performs the "redispatch" part of a connection attempt. It
* will assign a server if required, queue the connection if required, and
* handle errors that might arise at this level. It can change the server
* state. It will return 1 if it encounters an error, switches the server
* state, or has to queue a connection. Otherwise, it will return 0 indicating
* that the connection is ready to use.
*/
int srv_redispatch_connect(struct session *t) {
int conn_err;
/* We know that we don't have any connection pending, so we will
* try to get a new one, and wait in this state if it's queued
*/
conn_err = assign_server_and_queue(t);
switch (conn_err) {
case SRV_STATUS_OK:
break;
case SRV_STATUS_NOSRV:
tv_eternity(&t->cnexpire);
srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_C,
503, t->proxy->errmsg.len503, t->proxy->errmsg.msg503);
/* FIXME-20060501: we should not need this once we flush every session
* when the last server goes down.
*/
/* release other sessions waiting for this server */
if (t->srv)
offer_connection_slot(t);
return 1;
case SRV_STATUS_QUEUED:
t->srv_state = SV_STIDLE;
/* do nothing else and do not wake any other session up */
return 1;
case SRV_STATUS_FULL:
case SRV_STATUS_INTERNAL:
default:
tv_eternity(&t->cnexpire);
srv_close_with_err(t, SN_ERR_INTERNAL, SN_FINST_C,
500, t->proxy->errmsg.len500, t->proxy->errmsg.msg500);
/* release other sessions waiting for this server */
if (t->srv)
offer_connection_slot(t);
return 1;
}
/* if we get here, it's because we got SRV_STATUS_OK, which also
* means that the connection has not been queued.
*/
return 0;
}
/* /*
* manages the server FSM and its socket. It returns 1 if a state has changed * manages the server FSM and its socket. It returns 1 if a state has changed
@ -4340,52 +4652,38 @@ int process_srv(struct session *t) {
c == CL_STSHUTW || c == CL_STSHUTW ||
(c == CL_STSHUTR && t->req->l == 0)) { /* give up */ (c == CL_STSHUTR && t->req->l == 0)) { /* give up */
tv_eternity(&t->cnexpire); tv_eternity(&t->cnexpire);
t->srv_state = SV_STCLOSE; srv_close_with_err(t, SN_ERR_CLICL, SN_FINST_C, 0, 0, NULL);
if (!(t->flags & SN_ERR_MASK))
t->flags |= SN_ERR_CLICL;
if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_C;
return 1;
}
else { /* go to SV_STCONN */
/* initiate a connection to the server */
conn_err = connect_server(t);
if (conn_err == SN_ERR_NONE) {
//fprintf(stderr,"0: c=%d, s=%d\n", c, s);
t->srv_state = SV_STCONN;
}
else { /* try again */
while (t->conn_retries-- > 0) {
if ((t->proxy->options & PR_O_REDISP) && (t->conn_retries == 0)) {
t->flags &= ~SN_DIRECT; /* ignore cookie and force to use the dispatcher */
t->srv = NULL; /* it's left to the dispatcher to choose a server */
if ((t->flags & SN_CK_MASK) == SN_CK_VALID) {
t->flags &= ~SN_CK_MASK;
t->flags |= SN_CK_DOWN;
}
}
conn_err = connect_server(t); /* it might be possible that we have been granted an access to the
if (conn_err == SN_ERR_NONE) { * server while waiting for a free slot. Since we'll never use it,
t->srv_state = SV_STCONN; * we have to pass it on to another session.
break; */
} if (t->srv)
} offer_connection_slot(t);
if (t->conn_retries < 0) {
/* if conn_retries < 0 or other error, let's abort */
tv_eternity(&t->cnexpire);
t->srv_state = SV_STCLOSE;
t->logs.status = 503;
if (t->proxy->mode == PR_MODE_HTTP)
client_return(t, t->proxy->errmsg.len503, t->proxy->errmsg.msg503);
if (!(t->flags & SN_ERR_MASK))
t->flags |= conn_err; /* report the precise connect() error */
if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_C;
}
}
return 1; return 1;
} }
else {
/* Right now, we will need to create a connection to the server.
* We might already have tried, and got a connection pending, in
* which case we will not do anything till it's pending. It's up
* to any other session to release it and wake us up again.
*/
if (t->pend_pos)
return 0;
do {
/* first, get a connection */
if (srv_redispatch_connect(t))
return t->srv_state != SV_STIDLE;
/* try to (re-)connect to the server, and fail if we expire the
* number of retries.
*/
if (srv_retryable_connect(t))
return t->srv_state != SV_STIDLE;
} while (1);
}
} }
else if (s == SV_STCONN) { /* connection in progress */ else if (s == SV_STCONN) { /* connection in progress */
if (t->res_sw == RES_SILENT && tv_cmp2_ms(&t->cnexpire, &now) > 0) { if (t->res_sw == RES_SILENT && tv_cmp2_ms(&t->cnexpire, &now) > 0) {
@ -4393,44 +4691,35 @@ int process_srv(struct session *t) {
return 0; /* nothing changed */ return 0; /* nothing changed */
} }
else if (t->res_sw == RES_SILENT || t->res_sw == RES_ERROR) { else if (t->res_sw == RES_SILENT || t->res_sw == RES_ERROR) {
/* timeout, asynchronous connect error or first write error */
//fprintf(stderr,"2: c=%d, s=%d\n", c, s); //fprintf(stderr,"2: c=%d, s=%d\n", c, s);
/* timeout, connect error or first write error */
//FD_CLR(t->srv_fd, StaticWriteEvent);
fd_delete(t->srv_fd); fd_delete(t->srv_fd);
if (t->srv) if (t->srv)
t->srv->cur_sess--; t->srv->cur_sess--;
//close(t->srv_fd);
t->conn_retries--; if (t->res_sw == RES_SILENT)
if (t->conn_retries >= 0) {
if ((t->proxy->options & PR_O_REDISP) && (t->conn_retries == 0)) {
t->flags &= ~SN_DIRECT; /* ignore cookie and force to use the dispatcher */
t->srv = NULL; /* it's left to the dispatcher to choose a server */
if ((t->flags & SN_CK_MASK) == SN_CK_VALID) {
t->flags &= ~SN_CK_MASK;
t->flags |= SN_CK_DOWN;
}
}
conn_err = connect_server(t);
if (conn_err == SN_ERR_NONE)
return 0; /* no state changed */
}
else if (t->res_sw == RES_SILENT)
conn_err = SN_ERR_SRVTO; // it was a connect timeout. conn_err = SN_ERR_SRVTO; // it was a connect timeout.
else else
conn_err = SN_ERR_SRVCL; // it was a connect error. conn_err = SN_ERR_SRVCL; // it was an asynchronous connect error.
/* if conn_retries < 0 or other error, let's abort */ /* ensure that we have enough retries left */
tv_eternity(&t->cnexpire); if (srv_count_retry_down(t, conn_err))
t->srv_state = SV_STCLOSE;
t->logs.status = 503;
if (t->proxy->mode == PR_MODE_HTTP)
client_return(t, t->proxy->errmsg.len503, t->proxy->errmsg.msg503);
if (!(t->flags & SN_ERR_MASK))
t->flags |= conn_err;
if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_C;
/* TODO : check if there are pending connections on this server */
return 1; return 1;
do {
/* Now we will try to either reconnect to the same server or
* connect to another server. If the connection gets queued
* because all servers are saturated, then we will go back to
* the SV_STIDLE state.
*/
if (srv_retryable_connect(t))
return t->srv_state != SV_STCONN;
/* we need to redispatch the connection to another server */
if (srv_redispatch_connect(t))
return t->srv_state != SV_STCONN;
} while (1);
} }
else { /* no error or write 0 */ else { /* no error or write 0 */
t->logs.t_connect = tv_diff(&t->logs.tv_accept, &now); t->logs.t_connect = tv_diff(&t->logs.tv_accept, &now);
@ -4521,7 +4810,12 @@ int process_srv(struct session *t) {
Alert("Blocking cacheable cookie in response from instance %s, server %s.\n", t->proxy->id, t->srv->id); Alert("Blocking cacheable cookie in response from instance %s, server %s.\n", t->proxy->id, t->srv->id);
send_log(t->proxy, LOG_ALERT, "Blocking cacheable cookie in response from instance %s, server %s.\n", t->proxy->id, t->srv->id); send_log(t->proxy, LOG_ALERT, "Blocking cacheable cookie in response from instance %s, server %s.\n", t->proxy->id, t->srv->id);
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
} }
@ -4540,7 +4834,12 @@ int process_srv(struct session *t) {
t->flags |= SN_ERR_PRXCOND; t->flags |= SN_ERR_PRXCOND;
if (!(t->flags & SN_FINST_MASK)) if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_H; t->flags |= SN_FINST_H;
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
@ -4974,7 +5273,12 @@ int process_srv(struct session *t) {
t->flags |= SN_ERR_SRVCL; t->flags |= SN_ERR_SRVCL;
if (!(t->flags & SN_FINST_MASK)) if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_H; t->flags |= SN_FINST_H;
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
/* end of client write or end of server read. /* end of client write or end of server read.
@ -5004,7 +5308,12 @@ int process_srv(struct session *t) {
t->flags |= SN_ERR_SRVTO; t->flags |= SN_ERR_SRVTO;
if (!(t->flags & SN_FINST_MASK)) if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_H; t->flags |= SN_FINST_H;
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
/* last client read and buffer empty */ /* last client read and buffer empty */
@ -5099,7 +5408,12 @@ int process_srv(struct session *t) {
t->flags |= SN_ERR_SRVCL; t->flags |= SN_ERR_SRVCL;
if (!(t->flags & SN_FINST_MASK)) if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_D; t->flags |= SN_FINST_D;
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
/* last read, or end of client write */ /* last read, or end of client write */
@ -5208,7 +5522,12 @@ int process_srv(struct session *t) {
t->flags |= SN_ERR_SRVCL; t->flags |= SN_ERR_SRVCL;
if (!(t->flags & SN_FINST_MASK)) if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_D; t->flags |= SN_FINST_D;
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
else if ((c == CL_STSHUTR || c == CL_STCLOSE) && (req->l == 0)) { else if ((c == CL_STSHUTR || c == CL_STCLOSE) && (req->l == 0)) {
@ -5219,7 +5538,12 @@ int process_srv(struct session *t) {
t->srv->cur_sess--; t->srv->cur_sess--;
//close(t->srv_fd); //close(t->srv_fd);
t->srv_state = SV_STCLOSE; t->srv_state = SV_STCLOSE;
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
else if (tv_cmp2_ms(&t->swexpire, &now) <= 0) { else if (tv_cmp2_ms(&t->swexpire, &now) <= 0) {
@ -5234,7 +5558,12 @@ int process_srv(struct session *t) {
t->flags |= SN_ERR_SRVTO; t->flags |= SN_ERR_SRVTO;
if (!(t->flags & SN_FINST_MASK)) if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_D; t->flags |= SN_FINST_D;
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
else if (req->l == 0) { else if (req->l == 0) {
@ -5271,7 +5600,12 @@ int process_srv(struct session *t) {
t->flags |= SN_ERR_SRVCL; t->flags |= SN_ERR_SRVCL;
if (!(t->flags & SN_FINST_MASK)) if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_D; t->flags |= SN_FINST_D;
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
else if (t->res_sr == RES_NULL || c == CL_STSHUTW || c == CL_STCLOSE) { else if (t->res_sr == RES_NULL || c == CL_STSHUTW || c == CL_STCLOSE) {
@ -5282,7 +5616,12 @@ int process_srv(struct session *t) {
t->srv->cur_sess--; t->srv->cur_sess--;
//close(t->srv_fd); //close(t->srv_fd);
t->srv_state = SV_STCLOSE; t->srv_state = SV_STCLOSE;
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
else if (tv_cmp2_ms(&t->srexpire, &now) <= 0) { else if (tv_cmp2_ms(&t->srexpire, &now) <= 0) {
@ -5297,7 +5636,12 @@ int process_srv(struct session *t) {
t->flags |= SN_ERR_SRVTO; t->flags |= SN_ERR_SRVTO;
if (!(t->flags & SN_FINST_MASK)) if (!(t->flags & SN_FINST_MASK))
t->flags |= SN_FINST_D; t->flags |= SN_FINST_D;
/* TODO : check if there are pending connections on this server */ /* We used to have a free connection slot. Since we'll never use it,
* we have to pass it on to another session.
*/
if (t->srv)
offer_connection_slot(t);
return 1; return 1;
} }
else if (rep->l == BUFSIZE) { /* no room to read more data */ else if (rep->l == BUFSIZE) { /* no room to read more data */
@ -6666,8 +7010,11 @@ int cfg_parse_listen(char *file, int linenum, char **args) {
Alert("parsing [%s:%d] : out of memory.\n", file, linenum); Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
return -1; return -1;
} }
curproxy->next = proxy; curproxy->next = proxy;
proxy = curproxy; proxy = curproxy;
LIST_INIT(&curproxy->pendconns);
curproxy->id = strdup(args[1]); curproxy->id = strdup(args[1]);
/* parse the listener address if any */ /* parse the listener address if any */