diff --git a/include/proto/queue.h b/include/proto/queue.h index 092747a03..6899aee48 100644 --- a/include/proto/queue.h +++ b/include/proto/queue.h @@ -2,7 +2,7 @@ include/proto/queue.h This file defines everything related to queues. - Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu + Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -38,7 +38,7 @@ int init_pendconn(); struct session *pendconn_get_next_sess(struct server *srv, struct proxy *px); struct pendconn *pendconn_add(struct session *sess); void pendconn_free(struct pendconn *p); -void process_srv_queue(struct task *t, struct timeval *next); +void process_srv_queue(struct server *s); unsigned int srv_dynamic_maxconn(const struct server *s); @@ -68,8 +68,7 @@ static inline struct pendconn *pendconn_from_px(const struct proxy *px) { */ static inline int may_dequeue_tasks(const struct server *s, const struct proxy *p) { return (s && (s->nbpend || p->nbpend) && - (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)) && - s->queue_mgt); + (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s))); } #endif /* _PROTO_QUEUE_H */ diff --git a/include/proto/session.h b/include/proto/session.h index 4b86af28e..348e65086 100644 --- a/include/proto/session.h +++ b/include/proto/session.h @@ -2,7 +2,7 @@ include/proto/session.h This file defines everything related to sessions. - Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu + Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -34,6 +34,7 @@ void session_free(struct session *s); int init_session(); void session_process_counters(struct session *s); +void sess_change_server(struct session *sess, struct server *newsrv); #endif /* _PROTO_SESSION_H */ diff --git a/include/types/server.h b/include/types/server.h index 0e2183ef5..bb55629b2 100644 --- a/include/types/server.h +++ b/include/types/server.h @@ -78,12 +78,12 @@ struct server { char *rdr_pfx; /* the redirection prefix */ struct proxy *proxy; /* the proxy this server belongs to */ + int served; /* # of active sessions currently being served (ie not pending) */ int cur_sess, cur_sess_max; /* number of currently active sessions (including syn_sent) */ unsigned maxconn, minconn; /* max # of active sessions (0 = unlimited), min# for dynamic limit. */ int nbpend, nbpend_max; /* number of pending connections */ int maxqueue; /* maximum number of pending connections allowed */ struct list pendconns; /* pending connections */ - struct task *queue_mgt; /* the task associated to the queue processing */ struct sockaddr_in addr; /* the address to connect to */ struct sockaddr_in source_addr; /* the address to which we want to bind for connect() */ diff --git a/include/types/session.h b/include/types/session.h index 08fdc8609..8885b966b 100644 --- a/include/types/session.h +++ b/include/types/session.h @@ -81,6 +81,19 @@ /* WARNING: if new fields are added, they must be initialized in event_accept() * and freed in session_free() ! */ + +/* + * Note: some session flags have dependencies : + * - SN_DIRECT cannot exist without SN_ASSIGNED, because a server is + * immediately assigned when SN_DIRECT is determined. Both must be cleared + * when clearing SN_DIRECT (eg: redispatch). + * - ->srv has no meaning without SN_ASSIGNED and must not be checked without + * it. ->prev_srv should be used to check previous ->srv. If SN_ASSIGNED is + * set and sess->srv is NULL, then it is a dispatch or proxy mode. + * - a session being processed has srv_conn set. + * - srv_conn might remain after SN_DIRECT has been reset, but the assigned + * server should eventually be released. + */ struct session { struct task *task; /* the task associated with this session */ /* application specific below */ @@ -97,7 +110,9 @@ struct session { struct sockaddr_storage cli_addr; /* the client address */ struct sockaddr_storage frt_addr; /* the frontend address reached by the client if SN_FRT_ADDR_SET is set */ struct sockaddr_in srv_addr; /* the address to connect to */ - struct server *srv; /* the server being used */ + struct server *srv; /* the server the session will be running or has been running on */ + struct server *srv_conn; /* session already has a slot on a server and is not in queue */ + struct server *prev_srv; /* the server the was running on, after a redispatch, otherwise NULL */ struct pendconn *pend_pos; /* if not NULL, points to the position in the pending queue */ struct http_txn txn; /* current HTTP transaction being processed. Should become a list. */ struct { diff --git a/src/backend.c b/src/backend.c index 649756787..cdd8c9074 100644 --- a/src/backend.c +++ b/src/backend.c @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -40,6 +41,7 @@ #include #include #include +#include #include #include @@ -785,7 +787,7 @@ static struct server *fwrr_get_next_server(struct proxy *p, struct server *srvto fwrr_update_position(grp, srv); fwrr_dequeue_srv(srv); grp->curr_pos++; - if (!srv->maxconn || srv->cur_sess < srv_dynamic_maxconn(srv)) { + if (!srv->maxconn || (!srv->nbpend && srv->served < srv_dynamic_maxconn(srv))) { /* make sure it is not the server we are trying to exclude... */ if (srv != srvtoavoid || avoided) break; @@ -851,7 +853,7 @@ static inline void fwlc_dequeue_srv(struct server *s) */ static inline void fwlc_queue_srv(struct server *s) { - s->lb_node.key = s->cur_sess * SRV_EWGHT_MAX / s->eweight; + s->lb_node.key = s->served * SRV_EWGHT_MAX / s->eweight; eb32_insert(s->lb_tree, &s->lb_node); } @@ -1099,7 +1101,7 @@ static struct server *fwlc_get_next_server(struct proxy *p, struct server *srvto struct server *s; s = eb32_entry(node, struct server, lb_node); - if (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)) { + if (!s->maxconn || (!s->nbpend && s->served < srv_dynamic_maxconn(s))) { if (s != srvtoavoid) { srv = s; break; @@ -1281,117 +1283,163 @@ struct server *get_server_ph_post(struct session *s) /* - * This function marks the session as 'assigned' in direct or dispatch modes, - * or tries to assign one in balance mode, according to the algorithm. It does - * nothing if the session had already been assigned a server. + * This function applies the load-balancing algorithm to the session, as + * defined by the backend it is assigned to. The session is then marked as + * 'assigned'. + * + * This function MAY NOT be called with SN_ASSIGNED already set. If the session + * had a server previously assigned, it is rebalanced, trying to avoid the same + * server. + * The function tries to keep the original connection slot if it reconnects to + * the same server, otherwise it releases it and tries to offer it. + * + * It is illegal to call this function with a session in a queue. * * It may return : - * SRV_STATUS_OK if everything is OK. s->srv will be valid. - * SRV_STATUS_NOSRV if no server is available. s->srv = NULL. - * SRV_STATUS_FULL if all servers are saturated. s->srv = NULL. + * SRV_STATUS_OK if everything is OK. Session assigned to ->srv + * SRV_STATUS_NOSRV if no server is available. Session is not ASSIGNED + * SRV_STATUS_FULL if all servers are saturated. Session is not ASSIGNED * SRV_STATUS_INTERNAL for other unrecoverable errors. * - * Upon successful return, the session flag SN_ASSIGNED to indicate that it does - * not need to be called anymore. This usually means that s->srv can be trusted - * in balance and direct modes. This flag is not cleared, so it's to the caller - * to clear it if required (eg: redispatch). + * Upon successful return, the session flag SN_ASSIGNED is set to indicate that + * it does not need to be called anymore. This means that s->srv can be trusted + * in balance and direct modes. * */ int assign_server(struct session *s) { - struct server *srvtoavoid; + struct server *conn_slot; + int err; #ifdef DEBUG_FULL fprintf(stderr,"assign_server : s=%p\n",s); #endif - srvtoavoid = s->srv; + err = SRV_STATUS_INTERNAL; + if (unlikely(s->pend_pos || s->flags & SN_ASSIGNED)) + goto out_err; + + s->prev_srv = s->prev_srv; + conn_slot = s->srv_conn; + + /* We have to release any connection slot before applying any LB algo, + * otherwise we may erroneously end up with no available slot. + */ + if (conn_slot) + sess_change_server(s, NULL); + + /* We will now try to find the good server and store it into srv>. + * Note that srv> may be NULL in case of dispatch or proxy mode, + * as well as if no server is available (check error code). + */ + s->srv = NULL; + if (s->be->lbprm.algo & BE_LB_ALGO) { + int len; + /* we must check if we have at least one server available */ + if (!s->be->lbprm.tot_weight) { + err = SRV_STATUS_NOSRV; + goto out; + } - if (s->pend_pos) - return SRV_STATUS_INTERNAL; - - if (!(s->flags & SN_ASSIGNED)) { - if (s->be->lbprm.algo & BE_LB_ALGO) { - int len; - - if (s->flags & SN_DIRECT) { - s->flags |= SN_ASSIGNED; - return SRV_STATUS_OK; + switch (s->be->lbprm.algo & BE_LB_ALGO) { + case BE_LB_ALGO_RR: + s->srv = fwrr_get_next_server(s->be, s->prev_srv); + if (!s->srv) { + err = SRV_STATUS_FULL; + goto out; + } + break; + case BE_LB_ALGO_LC: + s->srv = fwlc_get_next_server(s->be, s->prev_srv); + if (!s->srv) { + err = SRV_STATUS_FULL; + goto out; + } + break; + case BE_LB_ALGO_SH: + if (s->cli_addr.ss_family == AF_INET) + len = 4; + else if (s->cli_addr.ss_family == AF_INET6) + len = 16; + else { + /* unknown IP family */ + err = SRV_STATUS_INTERNAL; + goto out; } - - if (!s->be->lbprm.tot_weight) - return SRV_STATUS_NOSRV; - - switch (s->be->lbprm.algo & BE_LB_ALGO) { - case BE_LB_ALGO_RR: - s->srv = fwrr_get_next_server(s->be, srvtoavoid); - if (!s->srv) - return SRV_STATUS_FULL; - break; - case BE_LB_ALGO_LC: - s->srv = fwlc_get_next_server(s->be, srvtoavoid); - if (!s->srv) - return SRV_STATUS_FULL; - break; - case BE_LB_ALGO_SH: - if (s->cli_addr.ss_family == AF_INET) - len = 4; - else if (s->cli_addr.ss_family == AF_INET6) - len = 16; - else /* unknown IP family */ - return SRV_STATUS_INTERNAL; - s->srv = get_server_sh(s->be, - (void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr, - len); - break; - case BE_LB_ALGO_UH: - /* URI hashing */ - s->srv = get_server_uh(s->be, + s->srv = get_server_sh(s->be, + (void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr, + len); + break; + case BE_LB_ALGO_UH: + /* URI hashing */ + s->srv = get_server_uh(s->be, + s->txn.req.sol + s->txn.req.sl.rq.u, + s->txn.req.sl.rq.u_l); + break; + case BE_LB_ALGO_PH: + /* URL Parameter hashing */ + if (s->txn.meth == HTTP_METH_POST && + memchr(s->txn.req.sol + s->txn.req.sl.rq.u, '&', + s->txn.req.sl.rq.u_l ) == NULL) + s->srv = get_server_ph_post(s); + else + s->srv = get_server_ph(s->be, s->txn.req.sol + s->txn.req.sl.rq.u, s->txn.req.sl.rq.u_l); - break; - case BE_LB_ALGO_PH: - /* URL Parameter hashing */ - if (s->txn.meth == HTTP_METH_POST && - memchr(s->txn.req.sol + s->txn.req.sl.rq.u, '&', - s->txn.req.sl.rq.u_l ) == NULL) - s->srv = get_server_ph_post(s); - else - s->srv = get_server_ph(s->be, - s->txn.req.sol + s->txn.req.sl.rq.u, - s->txn.req.sl.rq.u_l); + if (!s->srv) { + /* parameter not found, fall back to round robin on the map */ + s->srv = get_server_rr_with_conns(s->be, s->prev_srv); if (!s->srv) { - /* parameter not found, fall back to round robin on the map */ - s->srv = get_server_rr_with_conns(s->be, srvtoavoid); - if (!s->srv) - return SRV_STATUS_FULL; + err = SRV_STATUS_FULL; + goto out; } - break; - default: - /* unknown balancing algorithm */ - return SRV_STATUS_INTERNAL; - } - if (s->srv != srvtoavoid) { - s->be->cum_lbconn++; - s->srv->cum_lbconn++; } + break; + default: + /* unknown balancing algorithm */ + err = SRV_STATUS_INTERNAL; + goto out; } - else if (s->be->options & PR_O_HTTP_PROXY) { - if (!s->srv_addr.sin_addr.s_addr) - return SRV_STATUS_NOSRV; + if (s->srv != s->prev_srv) { + s->be->cum_lbconn++; + s->srv->cum_lbconn++; } - else if (!*(int *)&s->be->dispatch_addr.sin_addr && - !(s->fe->options & PR_O_TRANSP)) { - return SRV_STATUS_NOSRV; - } - s->flags |= SN_ASSIGNED; } - return SRV_STATUS_OK; + else if (s->be->options & PR_O_HTTP_PROXY) { + if (!s->srv_addr.sin_addr.s_addr) { + err = SRV_STATUS_NOSRV; + goto out; + } + } + else if (!*(int *)&s->be->dispatch_addr.sin_addr && + !(s->fe->options & PR_O_TRANSP)) { + err = SRV_STATUS_NOSRV; + goto out; + } + + s->flags |= SN_ASSIGNED; + err = SRV_STATUS_OK; + out: + + /* Either we take back our connection slot, or we offer it to someone + * else if we don't need it anymore. + */ + if (conn_slot) { + if (conn_slot == s->srv) { + sess_change_server(s, s->srv); + } else { + if (may_dequeue_tasks(conn_slot, s->be)) + process_srv_queue(conn_slot); + } + } + + out_err: + return err; } @@ -1465,6 +1513,11 @@ int assign_server_address(struct session *s) /* This function assigns a server to session if required, and can add the * connection to either the assigned server's queue or to the proxy's queue. + * If ->srv_conn is set, the session is first released from the server. + * It may also be called with SN_DIRECT and/or SN_ASSIGNED though. It will + * be called before any connection and after any retry or redispatch occurs. + * + * It is not allowed to call this function with a session in a queue. * * Returns : * @@ -1472,92 +1525,89 @@ int assign_server_address(struct session *s) * SRV_STATUS_NOSRV if no server is available. s->srv = NULL. * SRV_STATUS_QUEUED if the connection has been queued. * SRV_STATUS_FULL if the server(s) is/are saturated and the - * connection could not be queued. + * connection could not be queued in s->srv, + * which may be NULL if we queue on the backend. * SRV_STATUS_INTERNAL for other unrecoverable errors. * */ int assign_server_and_queue(struct session *s) { struct pendconn *p; - struct server *srv; int err; if (s->pend_pos) return SRV_STATUS_INTERNAL; - if (s->flags & SN_ASSIGNED) { - if ((s->flags & SN_REDIRECTABLE) && s->srv && s->srv->rdr_len) { - /* server scheduled for redirection, and already assigned. We - * don't want to go further nor check the queue. + err = SRV_STATUS_OK; + if (!(s->flags & SN_ASSIGNED)) { + err = assign_server(s); + if (s->prev_srv) { + /* This session was previously assigned to a server. We have to + * update the session's and the server's stats : + * - if the server changed : + * - set TX_CK_DOWN if txn.flags was TX_CK_VALID + * - set SN_REDISP if it was successfully redispatched + * - increment srv->redispatches and be->redispatches + * - if the server remained the same : update retries. */ - return SRV_STATUS_OK; - } - if (s->srv && s->srv->maxqueue > 0 && s->srv->nbpend >= s->srv->maxqueue) { - /* it's left to the dispatcher to choose a server */ - s->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET); - } else { - /* a server does not need to be assigned, perhaps because we're in - * direct mode, or in dispatch or transparent modes where the server - * is not needed. - */ - if (s->srv && - s->srv->maxconn && s->srv->cur_sess >= srv_dynamic_maxconn(s->srv)) { - p = pendconn_add(s); - if (p) - return SRV_STATUS_QUEUED; - else - return SRV_STATUS_FULL; + if (s->prev_srv != s->srv) { + if ((s->txn.flags & TX_CK_MASK) == TX_CK_VALID) { + s->txn.flags &= ~TX_CK_MASK; + s->txn.flags |= TX_CK_DOWN; + } + s->flags |= SN_REDISP; + s->prev_srv->redispatches++; + s->be->redispatches++; + } else { + s->prev_srv->retries++; + s->be->retries++; } - return SRV_STATUS_OK; - } - } - - /* a server needs to be assigned */ - srv = s->srv; - err = assign_server(s); - - if (srv) { - if (srv != s->srv) { - /* This session was previously dispatched to another server: - * - set TX_CK_DOWN if txn.flags was TX_CK_VALID - * - set SN_REDISP if it was successfully redispatched - * - increment srv->redispatches and be->redispatches - */ - - if ((s->txn.flags & TX_CK_MASK) == TX_CK_VALID) { - s->txn.flags &= ~TX_CK_MASK; - s->txn.flags |= TX_CK_DOWN; - } - - s->flags |= SN_REDISP; - - srv->redispatches++; - s->be->redispatches++; - } else { - srv->retries++; - s->be->retries++; } } switch (err) { case SRV_STATUS_OK: - if ((s->flags & SN_REDIRECTABLE) && s->srv && s->srv->rdr_len) { - /* server supporting redirection and it is possible. - * Let's report that and ignore maxconn ! + /* we have SN_ASSIGNED set */ + if (!s->srv) + return SRV_STATUS_OK; /* dispatch or proxy mode */ + + /* If we already have a connection slot, no need to check any queue */ + if (s->srv_conn == s->srv) + return SRV_STATUS_OK; + + /* OK, this session already has an assigned server, but no + * connection slot yet. Either it is a redispatch, or it was + * assigned from persistence information (direct mode). + */ + if ((s->flags & SN_REDIRECTABLE) && s->srv->rdr_len) { + /* server scheduled for redirection, and already assigned. We + * don't want to go further nor check the queue. */ + sess_change_server(s, s->srv); /* not really needed in fact */ return SRV_STATUS_OK; } - /* in balance mode, we might have servers with connection limits */ - if (s->srv && - s->srv->maxconn && s->srv->cur_sess >= srv_dynamic_maxconn(s->srv)) { + /* We might have to queue this session if the assigned server is full. + * We know we have to queue it into the server's queue, so if a maxqueue + * is set on the server, we must also check that the server's queue is + * not full, in which case we have to return FULL. + */ + if (s->srv->maxconn && + (s->srv->nbpend || s->srv->served >= srv_dynamic_maxconn(s->srv))) { + + if (s->srv->maxqueue > 0 && s->srv->nbpend >= s->srv->maxqueue) + return SRV_STATUS_FULL; + p = pendconn_add(s); if (p) return SRV_STATUS_QUEUED; else - return SRV_STATUS_FULL; + return SRV_STATUS_INTERNAL; } + + /* OK, we can use this server. Let's reserve our place */ + sess_change_server(s, s->srv); return SRV_STATUS_OK; case SRV_STATUS_FULL: @@ -1566,11 +1616,14 @@ int assign_server_and_queue(struct session *s) if (p) return SRV_STATUS_QUEUED; else - return SRV_STATUS_FULL; + return SRV_STATUS_INTERNAL; case SRV_STATUS_NOSRV: + return err; + case SRV_STATUS_INTERNAL: return err; + default: return SRV_STATUS_INTERNAL; } @@ -1808,7 +1861,7 @@ int srv_count_retry_down(struct session *t, int conn_err) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } return 0; @@ -1851,7 +1904,7 @@ int srv_retryable_connect(struct session *t) t->be->failed_conns++; /* release other sessions waiting for this server */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } /* ensure that we have enough retries left */ @@ -1865,13 +1918,14 @@ int srv_retryable_connect(struct session *t) */ /* let's try to offer this slot to anybody */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); if (t->srv) t->srv->cum_sess++; //FIXME? /* it's left to the dispatcher to choose a server */ t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET); + t->prev_srv = t->srv; return 0; } @@ -1891,11 +1945,34 @@ int srv_redispatch_connect(struct session *t) /* We know that we don't have any connection pending, so we will * try to get a new one, and wait in this state if it's queued */ + redispatch: conn_err = assign_server_and_queue(t); switch (conn_err) { case SRV_STATUS_OK: break; + case SRV_STATUS_FULL: + /* The server has reached its maxqueue limit. Either PR_O_REDISP is set + * and we can redispatch to another server, or it is not and we return + * 503. This only makes sense in DIRECT mode however, because normal LB + * algorithms would never select such a server, and hash algorithms + * would bring us on the same server again. Note that t->srv is set in + * this case. + */ + if ((t->flags & SN_DIRECT) && (t->be->options & PR_O_REDISP)) { + t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET); + t->prev_srv = t->srv; + goto redispatch; + } + + tv_eternity(&t->req->cex); + srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_Q, + 503, error_message(t, HTTP_ERR_503)); + + t->srv->failed_conns++; + t->be->failed_conns++; + return 1; + case SRV_STATUS_NOSRV: /* note: it is guaranteed that t->srv == NULL here */ tv_eternity(&t->req->cex); @@ -1903,18 +1980,15 @@ int srv_redispatch_connect(struct session *t) 503, error_message(t, HTTP_ERR_503)); t->be->failed_conns++; - return 1; case SRV_STATUS_QUEUED: - /* note: we use the connect expiration date for the queue. */ if (!tv_add_ifset(&t->req->cex, &now, &t->be->timeout.queue)) tv_eternity(&t->req->cex); t->srv_state = SV_STIDLE; /* do nothing else and do not wake any other session up */ return 1; - case SRV_STATUS_FULL: case SRV_STATUS_INTERNAL: default: tv_eternity(&t->req->cex); @@ -1928,7 +2002,7 @@ int srv_redispatch_connect(struct session *t) /* release other sessions waiting for this server */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } /* if we get here, it's because we got SRV_STATUS_OK, which also diff --git a/src/cfgparse.c b/src/cfgparse.c index 7dbadd1e7..d34bfbe1c 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -1,7 +1,7 @@ /* * Configuration parser * - * Copyright 2000-2007 Willy Tarreau + * Copyright 2000-2008 Willy Tarreau * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -3007,26 +3007,6 @@ int readcfgfile(const char *file) goto err; } - if (newsrv->maxconn > 0) { - struct task *t; - - if ((t = pool_alloc2(pool2_task)) == NULL) { - Alert("parsing [%s:%d] : out of memory.\n", file, linenum); - goto err; - } - - t->qlist.p = NULL; - t->wq = NULL; - t->state = TASK_IDLE; - t->process = process_srv_queue; - t->context = newsrv; - newsrv->queue_mgt = t; - - /* never run it unless specifically woken up */ - tv_eternity(&t->expire); - task_queue(t); - } - if (newsrv->trackit) { struct proxy *px; struct server *srv; diff --git a/src/client.c b/src/client.c index 67748e637..1dd318f5e 100644 --- a/src/client.c +++ b/src/client.c @@ -188,7 +188,7 @@ int event_accept(int fd) { s->cli_fd = cfd; s->srv_fd = -1; - s->srv = NULL; + s->srv = s->prev_srv = s->srv_conn = NULL; s->pend_pos = NULL; s->conn_retries = s->be->conn_retries; diff --git a/src/haproxy.c b/src/haproxy.c index fc82d2f8f..1e3c6ced7 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -1,6 +1,6 @@ /* * HA-Proxy : High Availability-enabled HTTP/TCP proxy - * Copyright 2000-2007 Willy Tarreau . + * Copyright 2000-2008 Willy Tarreau . * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License diff --git a/src/proto_http.c b/src/proto_http.c index eb261ab8f..0311748cd 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -1,7 +1,7 @@ /* * HTTP protocol analyzer * - * Copyright 2000-2007 Willy Tarreau + * Copyright 2000-2008 Willy Tarreau * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -2569,9 +2569,9 @@ int process_srv(struct session *t) * to any other session to release it and wake us up again. */ if (t->pend_pos) { - if (!tv_isle(&req->cex, &now)) + if (!tv_isle(&req->cex, &now)) { return 0; - else { + } else { /* we've been waiting too long here */ tv_eternity(&req->cex); t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now); @@ -2660,8 +2660,7 @@ int process_srv(struct session *t) fd_delete(t->srv_fd); if (t->srv) { t->srv->cur_sess--; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } } @@ -2689,8 +2688,7 @@ int process_srv(struct session *t) fd_delete(t->srv_fd); if (t->srv) { t->srv->cur_sess--; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } if (!(req->flags & BF_WRITE_STATUS)) @@ -2721,10 +2719,11 @@ int process_srv(struct session *t) */ /* let's try to offer this slot to anybody */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); /* it's left to the dispatcher to choose a server */ t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET); + t->prev_srv = t->srv; /* first, get a connection */ if (srv_redispatch_connect(t)) @@ -2884,8 +2883,7 @@ int process_srv(struct session *t) if (t->srv) { t->srv->cur_sess--; t->srv->failed_resp++; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } t->be->failed_resp++; t->srv_state = SV_STCLOSE; @@ -2899,7 +2897,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (t->srv && may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -2928,8 +2926,7 @@ int process_srv(struct session *t) if (t->srv) { t->srv->cur_sess--; t->srv->failed_resp++; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } t->be->failed_resp++; t->srv_state = SV_STCLOSE; @@ -2943,7 +2940,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (t->srv && may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -3098,8 +3095,7 @@ int process_srv(struct session *t) if (t->srv) { t->srv->cur_sess--; t->srv->failed_resp++; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } cur_proxy->failed_resp++; return_srv_prx_502: @@ -3117,7 +3113,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (t->srv && may_dequeue_tasks(t->srv, cur_proxy)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } } @@ -3127,8 +3123,7 @@ int process_srv(struct session *t) if (t->srv) { t->srv->cur_sess--; t->srv->failed_secu++; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } cur_proxy->denied_resp++; goto return_srv_prx_502; @@ -3264,8 +3259,7 @@ int process_srv(struct session *t) if (t->srv) { t->srv->cur_sess--; t->srv->failed_secu++; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } t->be->denied_resp++; @@ -3353,8 +3347,7 @@ int process_srv(struct session *t) if (t->srv) { t->srv->cur_sess--; t->srv->failed_resp++; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } t->be->failed_resp++; t->srv_state = SV_STCLOSE; @@ -3366,7 +3359,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -3462,8 +3455,7 @@ int process_srv(struct session *t) if (t->srv) { t->srv->cur_sess--; t->srv->failed_resp++; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } t->be->failed_resp++; //close(t->srv_fd); @@ -3476,7 +3468,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -3486,8 +3478,7 @@ int process_srv(struct session *t) fd_delete(t->srv_fd); if (t->srv) { t->srv->cur_sess--; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } //close(t->srv_fd); t->srv_state = SV_STCLOSE; @@ -3495,7 +3486,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -3505,8 +3496,7 @@ int process_srv(struct session *t) fd_delete(t->srv_fd); if (t->srv) { t->srv->cur_sess--; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } //close(t->srv_fd); t->srv_state = SV_STCLOSE; @@ -3518,7 +3508,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -3545,8 +3535,7 @@ int process_srv(struct session *t) if (t->srv) { t->srv->cur_sess--; t->srv->failed_resp++; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } t->be->failed_resp++; //close(t->srv_fd); @@ -3559,7 +3548,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -3569,8 +3558,7 @@ int process_srv(struct session *t) fd_delete(t->srv_fd); if (t->srv) { t->srv->cur_sess--; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } //close(t->srv_fd); t->srv_state = SV_STCLOSE; @@ -3578,7 +3566,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -3588,8 +3576,7 @@ int process_srv(struct session *t) fd_delete(t->srv_fd); if (t->srv) { t->srv->cur_sess--; - if (t->srv->proxy->lbprm.server_drop_conn) - t->srv->proxy->lbprm.server_drop_conn(t->srv); + sess_change_server(t, NULL); } //close(t->srv_fd); t->srv_state = SV_STCLOSE; @@ -3601,7 +3588,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } diff --git a/src/proto_uxst.c b/src/proto_uxst.c index 556649c35..b708689c1 100644 --- a/src/proto_uxst.c +++ b/src/proto_uxst.c @@ -1,7 +1,7 @@ /* * UNIX SOCK_STREAM protocol layer (uxst) * - * Copyright 2000-2007 Willy Tarreau + * Copyright 2000-2008 Willy Tarreau * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -917,7 +917,7 @@ static int process_uxst_srv(struct session *t) */ /* let's try to offer this slot to anybody */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); if (t->srv) t->srv->failed_conns++; @@ -1004,7 +1004,7 @@ static int process_uxst_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -1112,7 +1112,7 @@ static int process_uxst_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -1128,7 +1128,7 @@ static int process_uxst_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -1148,7 +1148,7 @@ static int process_uxst_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -1187,7 +1187,7 @@ static int process_uxst_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -1203,7 +1203,7 @@ static int process_uxst_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } @@ -1223,7 +1223,7 @@ static int process_uxst_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(t->srv->queue_mgt); + process_srv_queue(t->srv); return 1; } diff --git a/src/queue.c b/src/queue.c index 4d383f03f..178f1877c 100644 --- a/src/queue.c +++ b/src/queue.c @@ -1,7 +1,7 @@ /* * Queue management functions. * - * Copyright 2000-2007 Willy Tarreau + * Copyright 2000-2008 Willy Tarreau * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -63,36 +63,34 @@ unsigned int srv_dynamic_maxconn(const struct server *s) /* - * Manages a server's connection queue. If woken up, will try to dequeue as - * many pending sessions as possible, and wake them up. The task has nothing - * else to do, so it always returns ETERNITY. + * Manages a server's connection queue. This function will try to dequeue as + * many pending sessions as possible, and wake them up. */ -void process_srv_queue(struct task *t, struct timeval *next) +void process_srv_queue(struct server *s) { - struct server *s = (struct server*)t->context; struct proxy *p = s->proxy; - int xferred; + int maxconn; /* First, check if we can handle some connections queued at the proxy. We * will take as many as we can handle. */ - for (xferred = 0; s->cur_sess + xferred < srv_dynamic_maxconn(s); xferred++) { - struct session *sess; - sess = pendconn_get_next_sess(s, p); + maxconn = srv_dynamic_maxconn(s); + while (s->served < maxconn) { + struct session *sess = pendconn_get_next_sess(s, p); if (sess == NULL) break; task_wakeup(sess->task); } - - tv_eternity(next); } /* Detaches the next pending connection from either a server or a proxy, and * returns its associated session. If no pending connection is found, NULL is - * returned. Note that neither nor can be NULL. + * returned. Note that neither nor may be NULL. * Priority is given to the oldest request in the queue if both and * have pending requests. This ensures that no request will be left unserved. + * The session is immediately marked as "assigned", and both its and + * are set to , */ struct session *pendconn_get_next_sess(struct server *srv, struct proxy *px) { @@ -114,13 +112,23 @@ struct session *pendconn_get_next_sess(struct server *srv, struct proxy *px) } sess = ps->sess; pendconn_free(ps); + + /* we want to note that the session has now been assigned a server */ + sess->flags |= SN_ASSIGNED; + sess->srv = srv; + sess->srv_conn = srv; + srv->served++; + if (px->lbprm.server_take_conn) + px->lbprm.server_take_conn(srv); + return sess; } /* Adds the session to the pending connection list of server ->srv * or to the one of ->proxy if srv is NULL. All counters and back pointers * are updated accordingly. Returns NULL if no memory is available, otherwise the - * pendconn itself. + * pendconn itself. If the session was already marked as served, its flag is + * cleared. It is illegal to call this function with a non-NULL sess->srv_conn. */ struct pendconn *pendconn_add(struct session *sess) { @@ -133,7 +141,8 @@ struct pendconn *pendconn_add(struct session *sess) sess->pend_pos = p; p->sess = sess; p->srv = sess->srv; - if (sess->srv) { + + if (sess->flags & SN_ASSIGNED && sess->srv) { LIST_ADDQ(&sess->srv->pendconns, &p->list); sess->srv->nbpend++; sess->logs.srv_queue_size += sess->srv->nbpend; diff --git a/src/session.c b/src/session.c index 594f7df1f..e3a736d2b 100644 --- a/src/session.c +++ b/src/session.c @@ -1,7 +1,7 @@ /* * Server management functions. * - * Copyright 2000-2007 Willy Tarreau + * Copyright 2000-2008 Willy Tarreau * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -40,6 +41,16 @@ void session_free(struct session *s) if (s->pend_pos) pendconn_free(s->pend_pos); + if (s->srv) /* there may be requests left pending in queue */ + process_srv_queue(s->srv); + if (unlikely(s->srv_conn)) { + /* the session still has a reserved slot on a server, but + * it should normally be only the same as the one above, + * so this should not happen in fact. + */ + sess_change_server(s, NULL); + } + if (s->req) pool_free2(pool2_buffer, s->req); if (s->rep) @@ -135,6 +146,34 @@ void session_process_counters(struct session *s) } } +/* + * This function adjusts sess->srv_conn and maintains the previous and new + * server's served session counts. Setting newsrv to NULL is enough to release + * current connection slot. This function also notifies any LB algo which might + * expect to be informed about any change in the number of active sessions on a + * server. + */ +void sess_change_server(struct session *sess, struct server *newsrv) +{ + if (sess->srv_conn == newsrv) + return; + + if (sess->srv_conn) { + sess->srv_conn->served--; + if (sess->srv_conn->proxy->lbprm.server_drop_conn) + sess->srv_conn->proxy->lbprm.server_drop_conn(sess->srv_conn); + sess->srv_conn = NULL; + } + + if (newsrv) { + newsrv->served++; + if (newsrv->proxy->lbprm.server_take_conn) + newsrv->proxy->lbprm.server_take_conn(newsrv); + sess->srv_conn = newsrv; + } +} + + /* * Local variables: * c-indent-level: 8