diff --git a/include/proto/dumpstats.h b/include/proto/dumpstats.h index 0fe2bbefa..5183c7110 100644 --- a/include/proto/dumpstats.h +++ b/include/proto/dumpstats.h @@ -1,24 +1,24 @@ /* - include/proto/dumpstats.h - This file contains definitions of some primitives to dedicated to - statistics output. - - Copyright (C) 2000-2009 Willy Tarreau - w@1wt.eu - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation, version 2.1 - exclusively. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -*/ + * include/proto/dumpstats.h + * This file contains definitions of some primitives to dedicated to + * statistics output. + * + * Copyright (C) 2000-2010 Willy Tarreau - w@1wt.eu + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, version 2.1 + * exclusively. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ #ifndef _PROTO_DUMPSTATS_H #define _PROTO_DUMPSTATS_H @@ -53,6 +53,7 @@ #define STAT_CLI_O_ERR 7 /* dump errors */ +int stats_accept(struct listener *l, int cfd, struct sockaddr_storage *addr); int stats_sock_parse_request(struct stream_interface *si, char *line); void stats_io_handler(struct stream_interface *si); int stats_dump_raw_to_buffer(struct session *s, struct buffer *rep); diff --git a/include/proto/frontend.h b/include/proto/frontend.h index 504acfa4a..61e0460eb 100644 --- a/include/proto/frontend.h +++ b/include/proto/frontend.h @@ -26,7 +26,7 @@ #include void get_frt_addr(struct session *s); -int event_accept(int fd); +int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr); #endif /* _PROTO_FRONTEND_H */ diff --git a/include/proto/proto_tcp.h b/include/proto/proto_tcp.h index 6d8c34cf3..5c0087b81 100644 --- a/include/proto/proto_tcp.h +++ b/include/proto/proto_tcp.h @@ -26,7 +26,6 @@ #include #include -int tcp_event_accept(int fd); int tcpv4_bind_socket(int fd, int flags, struct sockaddr_in *local, struct sockaddr_in *remote); void tcpv4_add_listener(struct listener *listener); void tcpv6_add_listener(struct listener *listener); diff --git a/include/proto/proto_uxst.h b/include/proto/proto_uxst.h index c0163fb73..b412f0ad4 100644 --- a/include/proto/proto_uxst.h +++ b/include/proto/proto_uxst.h @@ -1,23 +1,23 @@ /* - include/proto/proto_uxst.h - This file contains UNIX-stream socket protocol definitions. - - Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation, version 2.1 - exclusively. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -*/ + * include/proto/proto_uxst.h + * This file contains UNIX-stream socket protocol definitions. + * + * Copyright (C) 2000-2010 Willy Tarreau - w@1wt.eu + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, version 2.1 + * exclusively. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ #ifndef _PROTO_PROTO_UXST_H #define _PROTO_PROTO_UXST_H @@ -26,9 +26,7 @@ #include #include -int uxst_event_accept(int fd); void uxst_add_listener(struct listener *listener); -int uxst_req_analyser_stats(struct session *s, struct buffer *req, int an_bit); #endif /* _PROTO_PROTO_UXST_H */ diff --git a/include/proto/stream_sock.h b/include/proto/stream_sock.h index 929cb082f..d0d172f19 100644 --- a/include/proto/stream_sock.h +++ b/include/proto/stream_sock.h @@ -1,23 +1,23 @@ /* - include/proto/stream_sock.h - This file contains client-side definitions. - - Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation, version 2.1 - exclusively. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -*/ + * include/proto/stream_sock.h + * This file contains client-side definitions. + * + * Copyright (C) 2000-2010 Willy Tarreau - w@1wt.eu + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, version 2.1 + * exclusively. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ #ifndef _PROTO_STREAM_SOCK_H #define _PROTO_STREAM_SOCK_H @@ -31,6 +31,7 @@ /* main event functions used to move data between sockets and buffers */ +int stream_sock_accept(int fd); int stream_sock_read(int fd); int stream_sock_write(int fd); void stream_sock_data_finish(struct stream_interface *si); diff --git a/include/types/protocols.h b/include/types/protocols.h index c13c73d36..0911a40e7 100644 --- a/include/types/protocols.h +++ b/include/types/protocols.h @@ -91,10 +91,10 @@ struct listener { unsigned int backlog; /* if set, listen backlog */ struct listener *next; /* next address for the same proxy, or NULL */ struct list proto_list; /* list in the protocol header */ - int (*accept)(int fd); /* accept() function passed to fdtab[] */ + int (*accept)(struct listener *l, int fd, struct sockaddr_storage *addr); /* upper layer's accept() */ struct task * (*handler)(struct task *t); /* protocol handler. It is a task */ int *timeout; /* pointer to client-side timeout */ - void *private; /* any private data which may be used by accept() */ + struct proxy *frontend; /* the frontend this listener belongs to, or NULL */ unsigned int analysers; /* bitmap of required protocol analysers */ int nice; /* nice value to assign to the instanciated tasks */ union { /* protocol-dependant access restrictions */ @@ -129,6 +129,7 @@ struct protocol { sa_family_t sock_family; /* socket family, for sockaddr */ socklen_t sock_addrlen; /* socket address length, used by bind() */ int l3_addrlen; /* layer3 address length, used by hashes */ + int (*accept)(int fd); /* generic accept function */ int (*read)(int fd); /* generic read function */ int (*write)(int fd); /* generic write function */ int (*bind_all)(struct protocol *proto); /* bind all unbound listeners */ diff --git a/src/cfgparse.c b/src/cfgparse.c index cc82544e2..06ebf3651 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -5349,8 +5350,8 @@ int check_config_validity() listener->maxconn = curproxy->maxconn; listener->backlog = curproxy->backlog; listener->timeout = &curproxy->timeout.client; - listener->accept = event_accept; - listener->private = curproxy; + listener->accept = frontend_accept; + listener->frontend = curproxy; listener->handler = process_session; listener->analysers |= curproxy->fe_req_ana; diff --git a/src/dumpstats.c b/src/dumpstats.c index 15174b81e..1856f2111 100644 --- a/src/dumpstats.c +++ b/src/dumpstats.c @@ -1,7 +1,7 @@ /* - * Functions dedicated to statistics output + * Functions dedicated to statistics output and the stats socket * - * Copyright 2000-2009 Willy Tarreau + * Copyright 2000-2010 Willy Tarreau * Copyright 2007-2009 Krzysztof Piotr Oledzki * * This program is free software; you can redistribute it and/or @@ -44,12 +44,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include const char stats_sock_usage_msg[] = @@ -73,6 +75,167 @@ const char stats_permission_denied_msg[] = "Permission denied\n" ""; +/* This function is called from the protocol layer accept() in order to instanciate + * a new stats socket. It returns a positive value upon success, 0 if the connection + * needs to be closed and ignored, or a negative value upon critical failure. + */ +int stats_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) +{ + struct proxy *p = l->frontend; /* attached frontend */ + struct session *s; + struct task *t; + + if ((s = pool_alloc2(pool2_session)) == NULL) { + Alert("out of memory in stats_accept().\n"); + goto out_close; + } + + LIST_ADDQ(&sessions, &s->list); + LIST_INIT(&s->back_refs); + + s->flags = 0; + s->term_trace = 0; + s->cli_addr = *addr; + + if ((t = task_new()) == NULL) { + Alert("out of memory in stats_accept().\n"); + goto out_free_session; + } + + t->process = l->handler; + t->context = s; + t->nice = l->nice; + + s->task = t; + s->listener = l; + s->fe = s->be = p; + + s->req = s->rep = NULL; /* will be allocated later */ + + s->si[0].state = s->si[0].prev_state = SI_ST_EST; + s->si[0].err_type = SI_ET_NONE; + s->si[0].err_loc = NULL; + s->si[0].owner = t; + s->si[0].update = stream_sock_data_finish; + s->si[0].shutr = stream_sock_shutr; + s->si[0].shutw = stream_sock_shutw; + s->si[0].chk_rcv = stream_sock_chk_rcv; + s->si[0].chk_snd = stream_sock_chk_snd; + s->si[0].connect = NULL; + s->si[0].iohandler = NULL; + s->si[0].fd = cfd; + s->si[0].flags = SI_FL_NONE; + if (s->fe->options2 & PR_O2_INDEPSTR) + s->si[0].flags |= SI_FL_INDEP_STR; + s->si[0].exp = TICK_ETERNITY; + + s->si[1].state = s->si[1].prev_state = SI_ST_INI; + s->si[1].err_type = SI_ET_NONE; + s->si[1].err_loc = NULL; + s->si[1].owner = t; + s->si[1].exp = TICK_ETERNITY; + s->si[1].fd = -1; /* just to help with debugging */ + s->si[1].flags = SI_FL_NONE; + if (s->be->options2 & PR_O2_INDEPSTR) + s->si[1].flags |= SI_FL_INDEP_STR; + + stream_int_register_handler(&s->si[1], stats_io_handler); + s->si[1].private = s; + s->si[1].st1 = 0; + s->si[1].st0 = STAT_CLI_INIT; + + s->srv = s->prev_srv = s->srv_conn = NULL; + s->pend_pos = NULL; + + s->store_count = 0; + + memset(&s->logs, 0, sizeof(s->logs)); + memset(&s->txn, 0, sizeof(s->txn)); + + s->logs.accept_date = date; /* user-visible date for logging */ + s->logs.tv_accept = now; /* corrected date for internal use */ + + s->data_state = DATA_ST_INIT; + s->data_source = DATA_SRC_NONE; + s->uniq_id = totalconn; + proxy_inc_fe_ctr(l, p); /* note: cum_beconn will be increased once assigned */ + + if (fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) { + Alert("accept(): cannot set the socket in non blocking mode. Giving up.\n"); + goto out_free_task; + } + + if ((s->req = pool_alloc2(pool2_buffer)) == NULL) + goto out_free_task; + + s->req->size = global.tune.bufsize; + buffer_init(s->req); + s->req->prod = &s->si[0]; + s->req->cons = &s->si[1]; + s->si[0].ib = s->si[1].ob = s->req; + s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */ + s->req->flags |= BF_READ_DONTWAIT; /* we plan to read small requests */ + + s->req->analysers = l->analysers; + + s->req->wto = TICK_ETERNITY; + s->req->cto = TICK_ETERNITY; + s->req->rto = TICK_ETERNITY; + + if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) + goto out_free_req; + + s->rep->size = global.tune.bufsize; + buffer_init(s->rep); + + s->rep->prod = &s->si[1]; + s->rep->cons = &s->si[0]; + s->si[0].ob = s->si[1].ib = s->rep; + + s->rep->rto = TICK_ETERNITY; + s->rep->cto = TICK_ETERNITY; + s->rep->wto = TICK_ETERNITY; + + s->req->rex = TICK_ETERNITY; + s->req->wex = TICK_ETERNITY; + s->req->analyse_exp = TICK_ETERNITY; + s->rep->rex = TICK_ETERNITY; + s->rep->wex = TICK_ETERNITY; + s->rep->analyse_exp = TICK_ETERNITY; + + t->expire = TICK_ETERNITY; + + if (l->timeout) { + s->req->rto = *l->timeout; + s->rep->wto = *l->timeout; + } + + fd_insert(cfd); + fdtab[cfd].owner = &s->si[0]; + fdtab[cfd].state = FD_STREADY; + fdtab[cfd].cb[DIR_RD].f = l->proto->read; + fdtab[cfd].cb[DIR_RD].b = s->req; + fdtab[cfd].cb[DIR_WR].f = l->proto->write; + fdtab[cfd].cb[DIR_WR].b = s->rep; + fdinfo[cfd].peeraddr = (struct sockaddr *)&s->cli_addr; + fdinfo[cfd].peerlen = sizeof(s->cli_addr); + + EV_FD_SET(cfd, DIR_RD); + + task_wakeup(t, TASK_WOKEN_INIT); + return 1; + + out_free_req: + pool_free2(pool2_buffer, s->req); + out_free_task: + task_free(t); + out_free_session: + LIST_DEL(&s->list); + pool_free2(pool2_session, s); + out_close: + return -1; +} + /* This function parses a "stats" statement in the "global" section. It returns * -1 if there is any error, otherwise zero. If it returns -1, it may write an * error message into ther buffer, for at most bytes, trailing @@ -124,15 +287,16 @@ static int stats_parse_global(char **args, int section_type, struct proxy *curpx global.stats_fe->last_change = now.tv_sec; global.stats_fe->id = strdup("GLOBAL"); global.stats_fe->cap = PR_CAP_FE; + global.stats_fe->maxconn = global.stats_sock.maxconn; } global.stats_sock.state = LI_INIT; global.stats_sock.options = LI_O_NONE; - global.stats_sock.accept = uxst_event_accept; + global.stats_sock.accept = stats_accept; global.stats_sock.handler = process_session; global.stats_sock.analysers = 0; global.stats_sock.nice = -64; /* we want to boost priority for local stats */ - global.stats_sock.private = global.stats_fe; /* must point to the frontend */ + global.stats_sock.frontend = global.stats_fe; global.stats_sock.perm.ux.level = ACCESS_LVL_OPER; /* default access level */ global.stats_fe->timeout.client = MS_TO_TICKS(10000); /* default timeout of 10 seconds */ @@ -224,6 +388,8 @@ static int stats_parse_global(char **args, int section_type, struct proxy *curpx global.maxsock -= global.stats_sock.maxconn; global.stats_sock.maxconn = maxconn; global.maxsock += global.stats_sock.maxconn; + if (global.stats_fe) + global.stats_fe->maxconn = global.stats_sock.maxconn; } else { snprintf(err, errlen, "'stats' only supports 'socket', 'maxconn' and 'timeout' in 'global' section"); diff --git a/src/frontend.c b/src/frontend.c index bc354b922..ef1001d87 100644 --- a/src/frontend.c +++ b/src/frontend.c @@ -53,443 +53,359 @@ void get_frt_addr(struct session *s) s->flags |= SN_FRT_ADDR_SET; } -/* - * FIXME: This should move to the STREAM_SOCK code then split into TCP and HTTP. +/* This function is called from the protocol layer accept() in order to instanciate + * a new proxy. It returns a positive value upon success, 0 if the connection needs + * to be closed and ignored, or a negative value upon critical failure. */ - -/* - * this function is called on a read event from a listen socket, corresponding - * to an accept. It tries to accept as many connections as possible. - * It returns 0. - */ -int event_accept(int fd) { - struct listener *l = fdtab[fd].owner; - struct proxy *p = (struct proxy *)l->private; /* attached frontend */ +int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) +{ + struct proxy *p = l->frontend; struct session *s; struct http_txn *txn; struct task *t; - int cfd; - int max_accept = global.tune.maxaccept; - if (p->fe_sps_lim) { - int max = freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0); - if (max_accept > max) - max_accept = max; + if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */ + Alert("out of memory in event_accept().\n"); + goto out_close; } - while (p->feconn < p->maxconn && actconn < global.maxconn && max_accept--) { - struct sockaddr_storage addr; - socklen_t laddr = sizeof(addr); + LIST_ADDQ(&sessions, &s->list); + LIST_INIT(&s->back_refs); - if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) == -1) { - switch (errno) { - case EAGAIN: - case EINTR: - case ECONNABORTED: - return 0; /* nothing more to accept */ - case ENFILE: - send_log(p, LOG_EMERG, - "Proxy %s reached system FD limit at %d. Please check system tunables.\n", - p->id, maxfd); - return 0; - case EMFILE: - send_log(p, LOG_EMERG, - "Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n", - p->id, maxfd); - return 0; - case ENOBUFS: - case ENOMEM: - send_log(p, LOG_EMERG, - "Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n", - p->id, maxfd); - return 0; - default: - return 0; - } + s->flags = 0; + s->term_trace = 0; + s->cli_addr = *addr; + + /* if this session comes from a known monitoring system, we want to ignore + * it as soon as possible, which means closing it immediately for TCP. + */ + if (p->mon_mask.s_addr && + addr->ss_family == AF_INET && + (((struct sockaddr_in *)addr)->sin_addr.s_addr & p->mon_mask.s_addr) == p->mon_net.s_addr) { + if (p->mode == PR_MODE_TCP) { + pool_free2(pool2_session, s); + return 0; } + s->flags |= SN_MONITOR; + } - if (l->nbconn >= l->maxconn) { - /* too many connections, we shoot this one and return. - * FIXME: it would be better to simply switch the listener's - * state to LI_FULL and disable the FD. We could re-enable - * it upon fd_delete(), but this requires all protocols to - * be switched. - */ - goto out_close; - } + if ((t = task_new()) == NULL) { /* disable this proxy for a while */ + Alert("out of memory in event_accept().\n"); + goto out_free_session; + } - if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */ - Alert("out of memory in event_accept().\n"); - EV_FD_CLR(fd, DIR_RD); - p->state = PR_STIDLE; - goto out_close; - } + if ((fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) || + (setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, + (char *) &one, sizeof(one)) == -1)) { + Alert("accept(): cannot set the socket in non blocking mode. Giving up\n"); + goto out_free_task; + } - LIST_ADDQ(&sessions, &s->list); - LIST_INIT(&s->back_refs); + if (p->options & PR_O_TCP_CLI_KA) + setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)); - s->flags = 0; - s->term_trace = 0; + if (p->options & PR_O_TCP_NOLING) + setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger)); - /* if this session comes from a known monitoring system, we want to ignore - * it as soon as possible, which means closing it immediately for TCP. + if (global.tune.client_sndbuf) + setsockopt(cfd, SOL_SOCKET, SO_SNDBUF, &global.tune.client_sndbuf, sizeof(global.tune.client_sndbuf)); + + if (global.tune.client_rcvbuf) + setsockopt(cfd, SOL_SOCKET, SO_RCVBUF, &global.tune.client_rcvbuf, sizeof(global.tune.client_rcvbuf)); + + t->process = l->handler; + t->context = s; + t->nice = l->nice; + + s->task = t; + s->listener = l; + + /* Note: initially, the session's backend points to the frontend. + * This changes later when switching rules are executed or + * when the default backend is assigned. + */ + s->be = s->fe = p; + + s->req = s->rep = NULL; /* will be allocated later */ + + s->si[0].state = s->si[0].prev_state = SI_ST_EST; + s->si[0].err_type = SI_ET_NONE; + s->si[0].err_loc = NULL; + s->si[0].owner = t; + s->si[0].update = stream_sock_data_finish; + s->si[0].shutr = stream_sock_shutr; + s->si[0].shutw = stream_sock_shutw; + s->si[0].chk_rcv = stream_sock_chk_rcv; + s->si[0].chk_snd = stream_sock_chk_snd; + s->si[0].connect = NULL; + s->si[0].iohandler = NULL; + s->si[0].fd = cfd; + s->si[0].flags = SI_FL_NONE | SI_FL_CAP_SPLTCP; /* TCP splicing capable */ + if (s->fe->options2 & PR_O2_INDEPSTR) + s->si[0].flags |= SI_FL_INDEP_STR; + s->si[0].exp = TICK_ETERNITY; + + s->si[1].state = s->si[1].prev_state = SI_ST_INI; + s->si[1].err_type = SI_ET_NONE; + s->si[1].err_loc = NULL; + s->si[1].owner = t; + s->si[1].update = stream_sock_data_finish; + s->si[1].shutr = stream_sock_shutr; + s->si[1].shutw = stream_sock_shutw; + s->si[1].chk_rcv = stream_sock_chk_rcv; + s->si[1].chk_snd = stream_sock_chk_snd; + s->si[1].connect = tcpv4_connect_server; + s->si[1].iohandler = NULL; + s->si[1].exp = TICK_ETERNITY; + s->si[1].fd = -1; /* just to help with debugging */ + s->si[1].flags = SI_FL_NONE; + if (s->be->options2 & PR_O2_INDEPSTR) + s->si[1].flags |= SI_FL_INDEP_STR; + + s->srv = s->prev_srv = s->srv_conn = NULL; + s->pend_pos = NULL; + s->conn_retries = s->be->conn_retries; + + /* init store persistence */ + s->store_count = 0; + + /* FIXME: the logs are horribly complicated now, because they are + * defined in

,

, and later and . + */ + + if (s->flags & SN_MONITOR) + s->logs.logwait = 0; + else + s->logs.logwait = p->to_log; + + if (s->logs.logwait & LW_REQ) + s->do_log = http_sess_log; + else + s->do_log = tcp_sess_log; + + /* default error reporting function, may be changed by analysers */ + s->srv_error = default_srv_error; + + s->logs.accept_date = date; /* user-visible date for logging */ + s->logs.tv_accept = now; /* corrected date for internal use */ + tv_zero(&s->logs.tv_request); + s->logs.t_queue = -1; + s->logs.t_connect = -1; + s->logs.t_data = -1; + s->logs.t_close = 0; + s->logs.bytes_in = s->logs.bytes_out = 0; + s->logs.prx_queue_size = 0; /* we get the number of pending conns before us */ + s->logs.srv_queue_size = 0; /* we will get this number soon */ + + s->data_source = DATA_SRC_NONE; + + s->uniq_id = totalconn; + proxy_inc_fe_ctr(l, p); /* note: cum_beconn will be increased once assigned */ + + txn = &s->txn; + /* Those variables will be checked and freed if non-NULL in + * session.c:session_free(). It is important that they are + * properly initialized. + */ + txn->sessid = NULL; + txn->srv_cookie = NULL; + txn->cli_cookie = NULL; + txn->uri = NULL; + txn->req.cap = NULL; + txn->rsp.cap = NULL; + txn->hdr_idx.v = NULL; + txn->hdr_idx.size = txn->hdr_idx.used = 0; + + if (p->mode == PR_MODE_HTTP) { + /* the captures are only used in HTTP frontends */ + if (p->nb_req_cap > 0 && + (txn->req.cap = pool_alloc2(p->req_cap_pool)) == NULL) + goto out_fail_reqcap; /* no memory */ + + if (p->nb_rsp_cap > 0 && + (txn->rsp.cap = pool_alloc2(p->rsp_cap_pool)) == NULL) + goto out_fail_rspcap; /* no memory */ + } + + if (p->acl_requires & ACL_USE_L7_ANY) { + /* we have to allocate header indexes only if we know + * that we may make use of them. This of course includes + * (mode == PR_MODE_HTTP). */ - if (addr.ss_family == AF_INET && - p->mon_mask.s_addr && - (((struct sockaddr_in *)&addr)->sin_addr.s_addr & p->mon_mask.s_addr) == p->mon_net.s_addr) { - if (p->mode == PR_MODE_TCP) { - close(cfd); - pool_free2(pool2_session, s); - continue; - } - s->flags |= SN_MONITOR; + txn->hdr_idx.size = MAX_HTTP_HDR; + + if ((txn->hdr_idx.v = pool_alloc2(p->hdr_idx_pool)) == NULL) + goto out_fail_idx; /* no memory */ + + /* and now initialize the HTTP transaction state */ + http_init_txn(s); + } + + if ((p->mode == PR_MODE_TCP || p->mode == PR_MODE_HTTP) + && (p->logfac1 >= 0 || p->logfac2 >= 0)) { + if (p->to_log) { + /* we have the client ip */ + if (s->logs.logwait & LW_CLIP) + if (!(s->logs.logwait &= ~LW_CLIP)) + s->do_log(s); } - - if ((t = task_new()) == NULL) { /* disable this proxy for a while */ - Alert("out of memory in event_accept().\n"); - EV_FD_CLR(fd, DIR_RD); - p->state = PR_STIDLE; - goto out_free_session; - } - - s->cli_addr = addr; - if (cfd >= global.maxsock) { - Alert("accept(): not enough free sockets. Raise -n argument. Giving up.\n"); - goto out_free_task; - } - - if ((fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) || - (setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, - (char *) &one, sizeof(one)) == -1)) { - Alert("accept(): cannot set the socket in non blocking mode. Giving up\n"); - goto out_free_task; - } - - if (p->options & PR_O_TCP_CLI_KA) - setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)); - - if (p->options & PR_O_TCP_NOLING) - setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger)); - - if (global.tune.client_sndbuf) - setsockopt(cfd, SOL_SOCKET, SO_SNDBUF, &global.tune.client_sndbuf, sizeof(global.tune.client_sndbuf)); - - if (global.tune.client_rcvbuf) - setsockopt(cfd, SOL_SOCKET, SO_RCVBUF, &global.tune.client_rcvbuf, sizeof(global.tune.client_rcvbuf)); - - t->process = l->handler; - t->context = s; - t->nice = l->nice; - - s->task = t; - s->listener = l; - - /* Note: initially, the session's backend points to the frontend. - * This changes later when switching rules are executed or - * when the default backend is assigned. - */ - s->be = s->fe = p; - - s->req = s->rep = NULL; /* will be allocated later */ - - s->si[0].state = s->si[0].prev_state = SI_ST_EST; - s->si[0].err_type = SI_ET_NONE; - s->si[0].err_loc = NULL; - s->si[0].owner = t; - s->si[0].update = stream_sock_data_finish; - s->si[0].shutr = stream_sock_shutr; - s->si[0].shutw = stream_sock_shutw; - s->si[0].chk_rcv = stream_sock_chk_rcv; - s->si[0].chk_snd = stream_sock_chk_snd; - s->si[0].connect = NULL; - s->si[0].iohandler = NULL; - s->si[0].fd = cfd; - s->si[0].flags = SI_FL_NONE | SI_FL_CAP_SPLTCP; /* TCP splicing capable */ - if (s->fe->options2 & PR_O2_INDEPSTR) - s->si[0].flags |= SI_FL_INDEP_STR; - s->si[0].exp = TICK_ETERNITY; - - s->si[1].state = s->si[1].prev_state = SI_ST_INI; - s->si[1].err_type = SI_ET_NONE; - s->si[1].err_loc = NULL; - s->si[1].owner = t; - s->si[1].update = stream_sock_data_finish; - s->si[1].shutr = stream_sock_shutr; - s->si[1].shutw = stream_sock_shutw; - s->si[1].chk_rcv = stream_sock_chk_rcv; - s->si[1].chk_snd = stream_sock_chk_snd; - s->si[1].connect = tcpv4_connect_server; - s->si[1].iohandler = NULL; - s->si[1].exp = TICK_ETERNITY; - s->si[1].fd = -1; /* just to help with debugging */ - s->si[1].flags = SI_FL_NONE; - if (s->be->options2 & PR_O2_INDEPSTR) - s->si[1].flags |= SI_FL_INDEP_STR; - - s->srv = s->prev_srv = s->srv_conn = NULL; - s->pend_pos = NULL; - s->conn_retries = s->be->conn_retries; - - /* init store persistence */ - s->store_count = 0; - - /* FIXME: the logs are horribly complicated now, because they are - * defined in

,

, and later and . - */ - - if (s->flags & SN_MONITOR) - s->logs.logwait = 0; - else - s->logs.logwait = p->to_log; - - if (s->logs.logwait & LW_REQ) - s->do_log = http_sess_log; - else - s->do_log = tcp_sess_log; - - /* default error reporting function, may be changed by analysers */ - s->srv_error = default_srv_error; - - s->logs.accept_date = date; /* user-visible date for logging */ - s->logs.tv_accept = now; /* corrected date for internal use */ - tv_zero(&s->logs.tv_request); - s->logs.t_queue = -1; - s->logs.t_connect = -1; - s->logs.t_data = -1; - s->logs.t_close = 0; - s->logs.bytes_in = s->logs.bytes_out = 0; - s->logs.prx_queue_size = 0; /* we get the number of pending conns before us */ - s->logs.srv_queue_size = 0; /* we will get this number soon */ - - s->data_source = DATA_SRC_NONE; - - s->uniq_id = totalconn; - totalconn++; - proxy_inc_fe_ctr(l, p); /* note: cum_beconn will be increased once assigned */ - - txn = &s->txn; - /* Those variables will be checked and freed if non-NULL in - * session.c:session_free(). It is important that they are - * properly initialized. - */ - txn->sessid = NULL; - txn->srv_cookie = NULL; - txn->cli_cookie = NULL; - txn->uri = NULL; - txn->req.cap = NULL; - txn->rsp.cap = NULL; - txn->hdr_idx.v = NULL; - txn->hdr_idx.size = txn->hdr_idx.used = 0; - - if (p->mode == PR_MODE_HTTP) { - /* the captures are only used in HTTP frontends */ - if (p->nb_req_cap > 0 && - (txn->req.cap = pool_alloc2(p->req_cap_pool)) == NULL) - goto out_fail_reqcap; /* no memory */ - - if (p->nb_rsp_cap > 0 && - (txn->rsp.cap = pool_alloc2(p->rsp_cap_pool)) == NULL) - goto out_fail_rspcap; /* no memory */ - } - - if (p->acl_requires & ACL_USE_L7_ANY) { - /* we have to allocate header indexes only if we know - * that we may make use of them. This of course includes - * (mode == PR_MODE_HTTP). - */ - txn->hdr_idx.size = MAX_HTTP_HDR; - - if ((txn->hdr_idx.v = pool_alloc2(p->hdr_idx_pool)) == NULL) - goto out_fail_idx; /* no memory */ - - /* and now initialize the HTTP transaction state */ - http_init_txn(s); - } - - if ((p->mode == PR_MODE_TCP || p->mode == PR_MODE_HTTP) - && (p->logfac1 >= 0 || p->logfac2 >= 0)) { - if (p->to_log) { - /* we have the client ip */ - if (s->logs.logwait & LW_CLIP) - if (!(s->logs.logwait &= ~LW_CLIP)) - s->do_log(s); - } - else if (s->cli_addr.ss_family == AF_INET) { - char pn[INET_ADDRSTRLEN], sn[INET_ADDRSTRLEN]; - - if (!(s->flags & SN_FRT_ADDR_SET)) - get_frt_addr(s); - - if (inet_ntop(AF_INET, (const void *)&((struct sockaddr_in *)&s->frt_addr)->sin_addr, - sn, sizeof(sn)) && - inet_ntop(AF_INET, (const void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr, - pn, sizeof(pn))) { - send_log(p, LOG_INFO, "Connect from %s:%d to %s:%d (%s/%s)\n", - pn, ntohs(((struct sockaddr_in *)&s->cli_addr)->sin_port), - sn, ntohs(((struct sockaddr_in *)&s->frt_addr)->sin_port), - p->id, (p->mode == PR_MODE_HTTP) ? "HTTP" : "TCP"); - } - } - else { - char pn[INET6_ADDRSTRLEN], sn[INET6_ADDRSTRLEN]; - - if (!(s->flags & SN_FRT_ADDR_SET)) - get_frt_addr(s); - - if (inet_ntop(AF_INET6, (const void *)&((struct sockaddr_in6 *)&s->frt_addr)->sin6_addr, - sn, sizeof(sn)) && - inet_ntop(AF_INET6, (const void *)&((struct sockaddr_in6 *)&s->cli_addr)->sin6_addr, - pn, sizeof(pn))) { - send_log(p, LOG_INFO, "Connect from %s:%d to %s:%d (%s/%s)\n", - pn, ntohs(((struct sockaddr_in6 *)&s->cli_addr)->sin6_port), - sn, ntohs(((struct sockaddr_in6 *)&s->frt_addr)->sin6_port), - p->id, (p->mode == PR_MODE_HTTP) ? "HTTP" : "TCP"); - } - } - } - - if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) { - int len; + else if (s->cli_addr.ss_family == AF_INET) { + char pn[INET_ADDRSTRLEN], sn[INET_ADDRSTRLEN]; if (!(s->flags & SN_FRT_ADDR_SET)) get_frt_addr(s); - if (s->cli_addr.ss_family == AF_INET) { - char pn[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, - (const void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr, - pn, sizeof(pn)); - - len = sprintf(trash, "%08x:%s.accept(%04x)=%04x from [%s:%d]\n", - s->uniq_id, p->id, (unsigned short)fd, (unsigned short)cfd, - pn, ntohs(((struct sockaddr_in *)&s->cli_addr)->sin_port)); + if (inet_ntop(AF_INET, (const void *)&((struct sockaddr_in *)&s->frt_addr)->sin_addr, + sn, sizeof(sn)) && + inet_ntop(AF_INET, (const void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr, + pn, sizeof(pn))) { + send_log(p, LOG_INFO, "Connect from %s:%d to %s:%d (%s/%s)\n", + pn, ntohs(((struct sockaddr_in *)&s->cli_addr)->sin_port), + sn, ntohs(((struct sockaddr_in *)&s->frt_addr)->sin_port), + p->id, (p->mode == PR_MODE_HTTP) ? "HTTP" : "TCP"); } - else { - char pn[INET6_ADDRSTRLEN]; - inet_ntop(AF_INET6, - (const void *)&((struct sockaddr_in6 *)(&s->cli_addr))->sin6_addr, - pn, sizeof(pn)); - - len = sprintf(trash, "%08x:%s.accept(%04x)=%04x from [%s:%d]\n", - s->uniq_id, p->id, (unsigned short)fd, (unsigned short)cfd, - pn, ntohs(((struct sockaddr_in6 *)(&s->cli_addr))->sin6_port)); - } - - write(1, trash, len); - } - - if ((s->req = pool_alloc2(pool2_buffer)) == NULL) - goto out_fail_req; /* no memory */ - - s->req->size = global.tune.bufsize; - buffer_init(s->req); - s->req->prod = &s->si[0]; - s->req->cons = &s->si[1]; - s->si[0].ib = s->si[1].ob = s->req; - - s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */ - - if (p->mode == PR_MODE_HTTP) - s->req->flags |= BF_READ_DONTWAIT; /* one read is usually enough */ - - /* activate default analysers enabled for this listener */ - s->req->analysers = l->analysers; - - /* note: this should not happen anymore since there's always at least the switching rules */ - if (!s->req->analysers) { - buffer_auto_connect(s->req); /* don't wait to establish connection */ - buffer_auto_close(s->req); /* let the producer forward close requests */ - } - - s->req->rto = s->fe->timeout.client; - s->req->wto = s->be->timeout.server; - s->req->cto = s->be->timeout.connect; - - if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) - goto out_fail_rep; /* no memory */ - - s->rep->size = global.tune.bufsize; - buffer_init(s->rep); - s->rep->prod = &s->si[1]; - s->rep->cons = &s->si[0]; - s->si[0].ob = s->si[1].ib = s->rep; - s->rep->analysers = 0; - - s->rep->rto = s->be->timeout.server; - s->rep->wto = s->fe->timeout.client; - s->rep->cto = TICK_ETERNITY; - - s->req->rex = TICK_ETERNITY; - s->req->wex = TICK_ETERNITY; - s->req->analyse_exp = TICK_ETERNITY; - s->rep->rex = TICK_ETERNITY; - s->rep->wex = TICK_ETERNITY; - s->rep->analyse_exp = TICK_ETERNITY; - t->expire = TICK_ETERNITY; - - fd_insert(cfd); - fdtab[cfd].owner = &s->si[0]; - fdtab[cfd].state = FD_STREADY; - fdtab[cfd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY; - if (p->options & PR_O_TCP_NOLING) - fdtab[cfd].flags |= FD_FL_TCP_NOLING; - - fdtab[cfd].cb[DIR_RD].f = l->proto->read; - fdtab[cfd].cb[DIR_RD].b = s->req; - fdtab[cfd].cb[DIR_WR].f = l->proto->write; - fdtab[cfd].cb[DIR_WR].b = s->rep; - fdinfo[cfd].peeraddr = (struct sockaddr *)&s->cli_addr; - fdinfo[cfd].peerlen = sizeof(s->cli_addr); - - if ((p->mode == PR_MODE_HTTP && (s->flags & SN_MONITOR)) || - (p->mode == PR_MODE_HEALTH && (p->options & PR_O_HTTP_CHK))) { - /* Either we got a request from a monitoring system on an HTTP instance, - * or we're in health check mode with the 'httpchk' option enabled. In - * both cases, we return a fake "HTTP/1.0 200 OK" response and we exit. - */ - struct chunk msg; - chunk_initstr(&msg, "HTTP/1.0 200 OK\r\n\r\n"); - stream_int_retnclose(&s->si[0], &msg); /* forge a 200 response */ - s->req->analysers = 0; - t->expire = s->rep->wex; - } - else if (p->mode == PR_MODE_HEALTH) { /* health check mode, no client reading */ - struct chunk msg; - chunk_initstr(&msg, "OK\n"); - stream_int_retnclose(&s->si[0], &msg); /* forge an "OK" response */ - s->req->analysers = 0; - t->expire = s->rep->wex; } else { - EV_FD_SET(cfd, DIR_RD); + char pn[INET6_ADDRSTRLEN], sn[INET6_ADDRSTRLEN]; + + if (!(s->flags & SN_FRT_ADDR_SET)) + get_frt_addr(s); + + if (inet_ntop(AF_INET6, (const void *)&((struct sockaddr_in6 *)&s->frt_addr)->sin6_addr, + sn, sizeof(sn)) && + inet_ntop(AF_INET6, (const void *)&((struct sockaddr_in6 *)&s->cli_addr)->sin6_addr, + pn, sizeof(pn))) { + send_log(p, LOG_INFO, "Connect from %s:%d to %s:%d (%s/%s)\n", + pn, ntohs(((struct sockaddr_in6 *)&s->cli_addr)->sin6_port), + sn, ntohs(((struct sockaddr_in6 *)&s->frt_addr)->sin6_port), + p->id, (p->mode == PR_MODE_HTTP) ? "HTTP" : "TCP"); + } + } + } + + if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) { + int len; + + if (!(s->flags & SN_FRT_ADDR_SET)) + get_frt_addr(s); + + if (s->cli_addr.ss_family == AF_INET) { + char pn[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, + (const void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr, + pn, sizeof(pn)); + + len = sprintf(trash, "%08x:%s.accept(%04x)=%04x from [%s:%d]\n", + s->uniq_id, p->id, (unsigned short)l->fd, (unsigned short)cfd, + pn, ntohs(((struct sockaddr_in *)&s->cli_addr)->sin_port)); + } + else { + char pn[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, + (const void *)&((struct sockaddr_in6 *)(&s->cli_addr))->sin6_addr, + pn, sizeof(pn)); + + len = sprintf(trash, "%08x:%s.accept(%04x)=%04x from [%s:%d]\n", + s->uniq_id, p->id, (unsigned short)l->fd, (unsigned short)cfd, + pn, ntohs(((struct sockaddr_in6 *)(&s->cli_addr))->sin6_port)); } - /* it is important not to call the wakeup function directly but to - * pass through task_wakeup(), because this one knows how to apply - * priorities to tasks. + write(1, trash, len); + } + + if ((s->req = pool_alloc2(pool2_buffer)) == NULL) + goto out_fail_req; /* no memory */ + + s->req->size = global.tune.bufsize; + buffer_init(s->req); + s->req->prod = &s->si[0]; + s->req->cons = &s->si[1]; + s->si[0].ib = s->si[1].ob = s->req; + + s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */ + + if (p->mode == PR_MODE_HTTP) + s->req->flags |= BF_READ_DONTWAIT; /* one read is usually enough */ + + /* activate default analysers enabled for this listener */ + s->req->analysers = l->analysers; + + /* note: this should not happen anymore since there's always at least the switching rules */ + if (!s->req->analysers) { + buffer_auto_connect(s->req); /* don't wait to establish connection */ + buffer_auto_close(s->req); /* let the producer forward close requests */ + } + + s->req->rto = s->fe->timeout.client; + s->req->wto = s->be->timeout.server; + s->req->cto = s->be->timeout.connect; + + if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) + goto out_fail_rep; /* no memory */ + + s->rep->size = global.tune.bufsize; + buffer_init(s->rep); + s->rep->prod = &s->si[1]; + s->rep->cons = &s->si[0]; + s->si[0].ob = s->si[1].ib = s->rep; + s->rep->analysers = 0; + + s->rep->rto = s->be->timeout.server; + s->rep->wto = s->fe->timeout.client; + s->rep->cto = TICK_ETERNITY; + + s->req->rex = TICK_ETERNITY; + s->req->wex = TICK_ETERNITY; + s->req->analyse_exp = TICK_ETERNITY; + s->rep->rex = TICK_ETERNITY; + s->rep->wex = TICK_ETERNITY; + s->rep->analyse_exp = TICK_ETERNITY; + t->expire = TICK_ETERNITY; + + fd_insert(cfd); + fdtab[cfd].owner = &s->si[0]; + fdtab[cfd].state = FD_STREADY; + fdtab[cfd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY; + if (p->options & PR_O_TCP_NOLING) + fdtab[cfd].flags |= FD_FL_TCP_NOLING; + + fdtab[cfd].cb[DIR_RD].f = l->proto->read; + fdtab[cfd].cb[DIR_RD].b = s->req; + fdtab[cfd].cb[DIR_WR].f = l->proto->write; + fdtab[cfd].cb[DIR_WR].b = s->rep; + fdinfo[cfd].peeraddr = (struct sockaddr *)&s->cli_addr; + fdinfo[cfd].peerlen = sizeof(s->cli_addr); + + if ((p->mode == PR_MODE_HTTP && (s->flags & SN_MONITOR)) || + (p->mode == PR_MODE_HEALTH && (p->options & PR_O_HTTP_CHK))) { + /* Either we got a request from a monitoring system on an HTTP instance, + * or we're in health check mode with the 'httpchk' option enabled. In + * both cases, we return a fake "HTTP/1.0 200 OK" response and we exit. */ - task_wakeup(t, TASK_WOKEN_INIT); + struct chunk msg; + chunk_initstr(&msg, "HTTP/1.0 200 OK\r\n\r\n"); + stream_int_retnclose(&s->si[0], &msg); /* forge a 200 response */ + s->req->analysers = 0; + t->expire = s->rep->wex; + } + else if (p->mode == PR_MODE_HEALTH) { /* health check mode, no client reading */ + struct chunk msg; + chunk_initstr(&msg, "OK\n"); + stream_int_retnclose(&s->si[0], &msg); /* forge an "OK" response */ + s->req->analysers = 0; + t->expire = s->rep->wex; + } + else { + EV_FD_SET(cfd, DIR_RD); + } - l->nbconn++; /* warning! right now, it's up to the handler to decrease this */ - if (l->nbconn >= l->maxconn) { - EV_FD_CLR(l->fd, DIR_RD); - l->state = LI_FULL; - } + /* it is important not to call the wakeup function directly but to + * pass through task_wakeup(), because this one knows how to apply + * priorities to tasks. + */ + task_wakeup(t, TASK_WOKEN_INIT); - p->feconn++; /* beconn will be increased later */ - if (p->feconn > p->counters.feconn_max) - p->counters.feconn_max = p->feconn; - - if (l->counters) { - if (l->nbconn > l->counters->conn_max) - l->counters->conn_max = l->nbconn; - } - - actconn++; - - // fprintf(stderr, "accepting from %p => %d conn, %d total, task=%p\n", p, actconn, totalconn, t); - } /* end of while (p->feconn < p->maxconn) */ - return 0; + return 1; /* Error unrolling */ out_fail_rep: @@ -507,8 +423,7 @@ int event_accept(int fd) { LIST_DEL(&s->list); pool_free2(pool2_session, s); out_close: - close(cfd); - return 0; + return -1; } /* set test->i to the id of the frontend */ diff --git a/src/proto_tcp.c b/src/proto_tcp.c index 1662e9065..fc020c969 100644 --- a/src/proto_tcp.c +++ b/src/proto_tcp.c @@ -63,6 +63,7 @@ static struct protocol proto_tcpv4 = { .sock_family = AF_INET, .sock_addrlen = sizeof(struct sockaddr_in), .l3_addrlen = 32/8, + .accept = &stream_sock_accept, .read = &stream_sock_read, .write = &stream_sock_write, .bind_all = tcp_bind_listeners, @@ -81,6 +82,7 @@ static struct protocol proto_tcpv6 = { .sock_family = AF_INET6, .sock_addrlen = sizeof(struct sockaddr_in6), .l3_addrlen = 128/8, + .accept = &stream_sock_accept, .read = &stream_sock_read, .write = &stream_sock_write, .bind_all = tcp_bind_listeners, @@ -539,19 +541,17 @@ int tcp_bind_listener(struct listener *listener, char *errmsg, int errlen) listener->fd = fd; listener->state = LI_LISTEN; - /* the function for the accept() event */ - fd_insert(fd); - fdtab[fd].cb[DIR_RD].f = listener->accept; - fdtab[fd].cb[DIR_WR].f = NULL; /* never called */ - fdtab[fd].cb[DIR_RD].b = fdtab[fd].cb[DIR_WR].b = NULL; fdtab[fd].owner = listener; /* reference the listener instead of a task */ fdtab[fd].state = FD_STLISTEN; - fdtab[fd].flags = FD_FL_TCP; - if (listener->options & LI_O_NOLINGER) - fdtab[fd].flags |= FD_FL_TCP_NOLING; + fdtab[fd].flags = FD_FL_TCP | ((listener->options & LI_O_NOLINGER) ? FD_FL_TCP_NOLING : 0); + fdtab[fd].cb[DIR_RD].f = listener->proto->accept; + fdtab[fd].cb[DIR_WR].f = NULL; /* never called */ + fdtab[fd].cb[DIR_RD].b = fdtab[fd].cb[DIR_WR].b = NULL; fdinfo[fd].peeraddr = NULL; fdinfo[fd].peerlen = 0; + fd_insert(fd); + tcp_return: if (msg && errlen) strlcpy2(errmsg, msg, errlen); diff --git a/src/proto_uxst.c b/src/proto_uxst.c index 177c5a37d..98c955552 100644 --- a/src/proto_uxst.c +++ b/src/proto_uxst.c @@ -1,7 +1,7 @@ /* * UNIX SOCK_STREAM protocol layer (uxst) * - * Copyright 2000-2009 Willy Tarreau + * Copyright 2000-2010 Willy Tarreau * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -29,26 +29,17 @@ #include #include #include -#include #include #include -#include #include #include #include -#include -#include -#include -#include #include #include #include #include -#include -#include -#include #include #include @@ -68,6 +59,7 @@ static struct protocol proto_unix = { .sock_family = AF_UNIX, .sock_addrlen = sizeof(struct sockaddr_un), .l3_addrlen = sizeof(((struct sockaddr_un*)0)->sun_path),/* path len */ + .accept = &stream_sock_accept, .read = &stream_sock_read, .write = &stream_sock_write, .bind_all = uxst_bind_listeners, @@ -262,7 +254,7 @@ static int uxst_bind_listener(struct listener *listener) /* the function for the accept() event */ fd_insert(fd); - fdtab[fd].cb[DIR_RD].f = listener->accept; + fdtab[fd].cb[DIR_RD].f = listener->proto->accept; fdtab[fd].cb[DIR_WR].f = NULL; /* never called */ fdtab[fd].cb[DIR_RD].b = fdtab[fd].cb[DIR_WR].b = NULL; fdtab[fd].owner = listener; /* reference the listener instead of a task */ @@ -347,229 +339,6 @@ static int uxst_unbind_listeners(struct protocol *proto) * 4) high-level functions ********************************/ - -/* - * This function is called on a read event from a listen socket, corresponding - * to an accept. It tries to accept as many connections as possible. - * It returns 0. Since we use UNIX sockets on the local system for monitoring - * purposes and other related things, we do not need to output as many messages - * as with TCP which can fall under attack. - */ -int uxst_event_accept(int fd) { - struct listener *l = fdtab[fd].owner; - struct session *s; - struct task *t; - int cfd; - int max_accept; - - if (global.nbproc > 1) - max_accept = 8; /* let other processes catch some connections too */ - else - max_accept = -1; - - while (max_accept--) { - struct sockaddr_storage addr; - socklen_t laddr = sizeof(addr); - - if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) == -1) { - switch (errno) { - case EAGAIN: - case EINTR: - case ECONNABORTED: - return 0; /* nothing more to accept */ - case ENFILE: - /* Process reached system FD limit. Check system tunables. */ - return 0; - case EMFILE: - /* Process reached process FD limit. Check 'ulimit-n'. */ - return 0; - case ENOBUFS: - case ENOMEM: - /* Process reached system memory limit. Check system tunables. */ - return 0; - default: - return 0; - } - } - - if (l->nbconn >= l->maxconn || actconn >= global.maxconn) { - /* too many connections, we shoot this one and return. - * FIXME: it would be better to simply switch the listener's - * state to LI_FULL and disable the FD. We could re-enable - * it upon fd_delete(), but this requires all protocols to - * be switched. - */ - goto out_close; - } - - if ((s = pool_alloc2(pool2_session)) == NULL) { - Alert("out of memory in uxst_event_accept().\n"); - goto out_close; - } - - LIST_ADDQ(&sessions, &s->list); - LIST_INIT(&s->back_refs); - - s->flags = 0; - s->term_trace = 0; - - if ((t = task_new()) == NULL) { - Alert("out of memory in uxst_event_accept().\n"); - goto out_free_session; - } - - s->cli_addr = addr; - - /* FIXME: should be checked earlier */ - if (cfd >= global.maxsock) { - Alert("accept(): not enough free sockets. Raise -n argument. Giving up.\n"); - goto out_free_task; - } - - if (fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) { - Alert("accept(): cannot set the socket in non blocking mode. Giving up.\n"); - goto out_free_task; - } - - t->process = l->handler; - t->context = s; - t->nice = l->nice; - - s->task = t; - s->listener = l; - s->fe = s->be = l->private; - - s->req = s->rep = NULL; /* will be allocated later */ - - s->si[0].state = s->si[0].prev_state = SI_ST_EST; - s->si[0].err_type = SI_ET_NONE; - s->si[0].err_loc = NULL; - s->si[0].owner = t; - s->si[0].update = stream_sock_data_finish; - s->si[0].shutr = stream_sock_shutr; - s->si[0].shutw = stream_sock_shutw; - s->si[0].chk_rcv = stream_sock_chk_rcv; - s->si[0].chk_snd = stream_sock_chk_snd; - s->si[0].connect = NULL; - s->si[0].iohandler = NULL; - s->si[0].fd = cfd; - s->si[0].flags = SI_FL_NONE; - if (s->fe->options2 & PR_O2_INDEPSTR) - s->si[0].flags |= SI_FL_INDEP_STR; - s->si[0].exp = TICK_ETERNITY; - - s->si[1].state = s->si[1].prev_state = SI_ST_INI; - s->si[1].err_type = SI_ET_NONE; - s->si[1].err_loc = NULL; - s->si[1].owner = t; - s->si[1].exp = TICK_ETERNITY; - s->si[1].fd = -1; /* just to help with debugging */ - s->si[1].flags = SI_FL_NONE; - if (s->be->options2 & PR_O2_INDEPSTR) - s->si[1].flags |= SI_FL_INDEP_STR; - - stream_int_register_handler(&s->si[1], stats_io_handler); - s->si[1].private = s; - s->si[1].st1 = 0; - s->si[1].st0 = STAT_CLI_INIT; - - s->srv = s->prev_srv = s->srv_conn = NULL; - s->pend_pos = NULL; - - s->store_count = 0; - - memset(&s->logs, 0, sizeof(s->logs)); - memset(&s->txn, 0, sizeof(s->txn)); - - s->logs.accept_date = date; /* user-visible date for logging */ - s->logs.tv_accept = now; /* corrected date for internal use */ - - s->data_state = DATA_ST_INIT; - s->data_source = DATA_SRC_NONE; - s->uniq_id = totalconn; - totalconn++; - - if ((s->req = pool_alloc2(pool2_buffer)) == NULL) - goto out_free_task; - - s->req->size = global.tune.bufsize; - buffer_init(s->req); - s->req->prod = &s->si[0]; - s->req->cons = &s->si[1]; - s->si[0].ib = s->si[1].ob = s->req; - s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */ - s->req->flags |= BF_READ_DONTWAIT; /* we plan to read small requests */ - - s->req->analysers = l->analysers; - - s->req->wto = TICK_ETERNITY; - s->req->cto = TICK_ETERNITY; - s->req->rto = TICK_ETERNITY; - - if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) - goto out_free_req; - - s->rep->size = global.tune.bufsize; - buffer_init(s->rep); - - s->rep->prod = &s->si[1]; - s->rep->cons = &s->si[0]; - s->si[0].ob = s->si[1].ib = s->rep; - s->rep->analysers = 0; - - s->rep->rto = TICK_ETERNITY; - s->rep->cto = TICK_ETERNITY; - s->rep->wto = TICK_ETERNITY; - - s->req->rex = TICK_ETERNITY; - s->req->wex = TICK_ETERNITY; - s->req->analyse_exp = TICK_ETERNITY; - s->rep->rex = TICK_ETERNITY; - s->rep->wex = TICK_ETERNITY; - s->rep->analyse_exp = TICK_ETERNITY; - - t->expire = TICK_ETERNITY; - - if (l->timeout) { - s->req->rto = *l->timeout; - s->rep->wto = *l->timeout; - } - - fd_insert(cfd); - fdtab[cfd].owner = &s->si[0]; - fdtab[cfd].state = FD_STREADY; - fdtab[cfd].cb[DIR_RD].f = l->proto->read; - fdtab[cfd].cb[DIR_RD].b = s->req; - fdtab[cfd].cb[DIR_WR].f = l->proto->write; - fdtab[cfd].cb[DIR_WR].b = s->rep; - fdinfo[cfd].peeraddr = (struct sockaddr *)&s->cli_addr; - fdinfo[cfd].peerlen = sizeof(s->cli_addr); - - EV_FD_SET(cfd, DIR_RD); - - task_wakeup(t, TASK_WOKEN_INIT); - - l->nbconn++; /* warning! right now, it's up to the handler to decrease this */ - if (l->nbconn >= l->maxconn) { - EV_FD_CLR(l->fd, DIR_RD); - l->state = LI_FULL; - } - actconn++; - } - return 0; - - out_free_req: - pool_free2(pool2_buffer, s->req); - out_free_task: - task_free(t); - out_free_session: - LIST_DEL(&s->list); - pool_free2(pool2_session, s); - out_close: - close(cfd); - return 0; -} - __attribute__((constructor)) static void __uxst_protocol_init(void) { diff --git a/src/stream_sock.c b/src/stream_sock.c index d58876277..171ed294c 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -31,6 +31,8 @@ #include #include +#include +#include #include #include #include @@ -1118,6 +1120,107 @@ void stream_sock_chk_snd(struct stream_interface *si) } } +/* This function is called on a read event from a listening socket, corresponding + * to an accept. It tries to accept as many connections as possible, and for each + * calls the listener's accept handler (generally the frontend's accept handler). + */ +int stream_sock_accept(int fd) +{ + struct listener *l = fdtab[fd].owner; + struct proxy *p = l->frontend; + int max_accept = global.tune.maxaccept; + int cfd; + int ret; + + if (unlikely(l->nbconn >= l->maxconn)) { + EV_FD_CLR(l->fd, DIR_RD); + l->state = LI_FULL; + return 0; + } + + if (p && p->fe_sps_lim) { + int max = freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0); + if (max_accept > max) + max_accept = max; + } + + while ((!p || p->feconn < p->maxconn) && actconn < global.maxconn && max_accept--) { + struct sockaddr_storage addr; + socklen_t laddr = sizeof(addr); + + cfd = accept(fd, (struct sockaddr *)&addr, &laddr); + if (unlikely(cfd == -1)) { + switch (errno) { + case EAGAIN: + case EINTR: + case ECONNABORTED: + return 0; /* nothing more to accept */ + case ENFILE: + send_log(p, LOG_EMERG, + "Proxy %s reached system FD limit at %d. Please check system tunables.\n", + p->id, maxfd); + return 0; + case EMFILE: + send_log(p, LOG_EMERG, + "Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n", + p->id, maxfd); + return 0; + case ENOBUFS: + case ENOMEM: + send_log(p, LOG_EMERG, + "Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n", + p->id, maxfd); + return 0; + default: + return 0; + } + } + + if (unlikely(cfd >= global.maxsock)) { + Alert("accept(): not enough free sockets. Raise -n argument. Giving up.\n"); + goto out_close; + } + + ret = l->accept(l, cfd, &addr); + if (unlikely(ret < 0)) { + /* critical error encountered, generally a resource shortage */ + EV_FD_CLR(fd, DIR_RD); + p->state = PR_STIDLE; + goto out_close; + } + else if (unlikely(ret == 0)) { + /* ignore this connection */ + close(cfd); + continue; + } + + actconn++; + totalconn++; + l->nbconn++; /* warning! right now, it's up to the handler to decrease this */ + if (l->nbconn >= l->maxconn) { + EV_FD_CLR(l->fd, DIR_RD); + l->state = LI_FULL; + } + + if (p) { + p->feconn++; /* beconn will be increased later */ + if (p->feconn > p->counters.feconn_max) + p->counters.feconn_max = p->feconn; + } + + if (l->counters) { + if (l->nbconn > l->counters->conn_max) + l->counters->conn_max = l->nbconn; + } + } /* end of while (p->feconn < p->maxconn) */ + return 0; + + /* Error unrolling */ + out_close: + close(cfd); + return 0; +} + /* * Local variables: