diff --git a/include/haproxy/listener-t.h b/include/haproxy/listener-t.h index c29328c8c..89bf7bc7c 100644 --- a/include/haproxy/listener-t.h +++ b/include/haproxy/listener-t.h @@ -43,6 +43,7 @@ struct protocol; struct xprt_ops; struct proxy; struct fe_counters; +struct connection; /* listener state */ enum li_state { @@ -197,7 +198,7 @@ struct listener { int maxconn; /* maximum connections allowed on this listener */ unsigned int backlog; /* if set, listen backlog */ int maxaccept; /* if set, max number of connections accepted at once (-1 when disabled) */ - int (*accept)(struct listener *l, int fd, struct sockaddr_storage *addr); /* upper layer's accept() */ + int (*accept)(struct connection *conn); /* upper layer's accept() */ enum obj_type *default_target; /* default target to use for accepted sessions or NULL */ /* cache line boundary */ struct mt_list wait_queue; /* link element to make the listener wait for something (LI_LIMITED) */ @@ -254,29 +255,14 @@ struct bind_kw_list { struct bind_kw kw[VAR_ARRAY]; }; -/* This is used to create the accept queue, optimized to be 64 bytes long. */ -struct accept_queue_entry { - struct listener *listener; // 8 bytes - int fd __attribute__((aligned(8))); // 4 bytes - int addr_len; // 4 bytes - - union { - sa_family_t family; // 2 bytes - struct sockaddr_in in; // 16 bytes - struct sockaddr_in6 in6; // 28 bytes - } addr; // this is normally 28 bytes - /* 20-bytes hole here */ - char pad0[0] __attribute((aligned(64))); -}; - /* The per-thread accept queue ring, must be a power of two minus 1 */ -#define ACCEPT_QUEUE_SIZE ((1<<8) - 1) +#define ACCEPT_QUEUE_SIZE ((1<<10) - 1) struct accept_queue_ring { unsigned int head; unsigned int tail; struct tasklet *tasklet; /* tasklet of the thread owning this ring */ - struct accept_queue_entry entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64))); + struct connection *entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64))); }; diff --git a/include/haproxy/session.h b/include/haproxy/session.h index 59945b2ba..6a24d8a59 100644 --- a/include/haproxy/session.h +++ b/include/haproxy/session.h @@ -35,7 +35,7 @@ extern struct pool_head *pool_head_sess_srv_list; struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type *origin); void session_free(struct session *sess); -int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr); +int session_accept_fd(struct connection *cli_conn); int conn_complete_session(struct connection *conn); /* Remove the refcount from the session to the tracked counters, and clear the diff --git a/src/listener.c b/src/listener.c index 5a375db68..b7dd9348c 100644 --- a/src/listener.c +++ b/src/listener.c @@ -55,22 +55,18 @@ static struct task *manage_global_listener_queue(struct task *t, void *context, struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64))) = { }; /* dequeue and process a pending connection from the local accept queue (single - * consumer). Returns the accepted fd or -1 if none was found. The listener is - * placed into *li. The address is copied into *addr for no more than *addr_len - * bytes, and the address length is returned into *addr_len. + * consumer). Returns the accepted connection or NULL if none was found. */ -int accept_queue_pop_sc(struct accept_queue_ring *ring, struct listener **li, void *addr, int *addr_len) +struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring) { - struct accept_queue_entry *e; unsigned int pos, next; - struct listener *ptr; - int len; - int fd; + struct connection *ptr; + struct connection **e; pos = ring->head; if (pos == ring->tail) - return -1; + return NULL; next = pos + 1; if (next >= ACCEPT_QUEUE_SIZE) @@ -80,42 +76,28 @@ int accept_queue_pop_sc(struct accept_queue_ring *ring, struct listener **li, vo /* wait for the producer to update the listener's pointer */ while (1) { - ptr = e->listener; + ptr = *e; __ha_barrier_load(); if (ptr) break; pl_cpu_relax(); } - fd = e->fd; - len = e->addr_len; - if (len > *addr_len) - len = *addr_len; - - if (likely(len > 0)) - memcpy(addr, &e->addr, len); - /* release the entry */ - e->listener = NULL; + *e = NULL; __ha_barrier_store(); ring->head = next; - - *addr_len = len; - *li = ptr; - - return fd; + return ptr; } -/* tries to push a new accepted connection into ring for listener - *
  • , from address whose length is . Returns non-zero if it - * succeeds, or zero if the ring is full. Supports multiple producers. +/* tries to push a new accepted connection into ring . Returns + * non-zero if it succeeds, or zero if the ring is full. Supports multiple + * producers. */ -int accept_queue_push_mp(struct accept_queue_ring *ring, int fd, - struct listener *li, const void *addr, int addr_len) +int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn) { - struct accept_queue_entry *e; unsigned int pos, next; pos = ring->tail; @@ -127,22 +109,8 @@ int accept_queue_push_mp(struct accept_queue_ring *ring, int fd, return 0; // ring full } while (unlikely(!_HA_ATOMIC_CAS(&ring->tail, &pos, next))); - - e = &ring->entry[pos]; - - if (addr_len > sizeof(e->addr)) - addr_len = sizeof(e->addr); - - if (addr_len) - memcpy(&e->addr, addr, addr_len); - - e->addr_len = addr_len; - e->fd = fd; - + ring->entry[pos] = conn; __ha_barrier_store(); - /* now commit the change */ - - e->listener = li; return 1; } @@ -150,25 +118,23 @@ int accept_queue_push_mp(struct accept_queue_ring *ring, int fd, static struct task *accept_queue_process(struct task *t, void *context, unsigned short state) { struct accept_queue_ring *ring = context; + struct connection *conn; struct listener *li; - struct sockaddr_storage addr; unsigned int max_accept; - int addr_len; int ret; - int fd; /* if global.tune.maxaccept is -1, then max_accept is UINT_MAX. It * is not really illimited, but it is probably enough. */ max_accept = global.tune.maxaccept ? global.tune.maxaccept : 64; for (; max_accept; max_accept--) { - addr_len = sizeof(addr); - fd = accept_queue_pop_sc(ring, &li, &addr, &addr_len); - if (fd < 0) + conn = accept_queue_pop_sc(ring); + if (!conn) break; + li = __objt_listener(conn->target); _HA_ATOMIC_ADD(&li->thr_conn[tid], 1); - ret = li->accept(li, fd, &addr); + ret = li->accept(conn); if (ret <= 0) { /* connection was terminated by the application */ continue; @@ -722,6 +688,7 @@ int listener_backlog(const struct listener *l) void listener_accept(int fd) { struct listener *l = fdtab[fd].owner; + struct connection *cli_conn; struct proxy *p; unsigned int max_accept; int next_conn = 0; @@ -928,6 +895,25 @@ void listener_accept(int fd) if (unlikely(master == 1)) fcntl(cfd, F_SETFD, FD_CLOEXEC); + /* we'll have to at least allocate a connection, assign the listener + * to conn->target, set the source address, and set the fd. + */ + cli_conn = conn_new(&l->obj_type); + if (cli_conn) { + cli_conn->handle.fd = cfd; + cli_conn->flags |= CO_FL_ADDR_FROM_SET; + if (!sockaddr_alloc(&cli_conn->src, &addr, laddr)) { + conn_free(cli_conn); + cli_conn = NULL; + } + } + + if (!cli_conn) { + /* no more memory, give up! */ + close(cfd); + continue; + } + /* The connection was accepted, it must be counted as such */ if (l->counters) HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn); @@ -948,6 +934,7 @@ void listener_accept(int fd) send_log(p, LOG_EMERG, "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n", p->id); + conn_free(cli_conn); close(cfd); expire = tick_add(now_ms, 1000); /* try again in 1 second */ goto limit_global; @@ -962,6 +949,7 @@ void listener_accept(int fd) next_feconn = 0; next_actconn = 0; + #if defined(USE_THREAD) mask = thread_mask(l->rx.settings->bind_thread) & all_threads_mask; if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ) && !stopping) { @@ -1064,7 +1052,7 @@ void listener_accept(int fd) * when processing this loop. */ ring = &accept_queue_rings[t]; - if (accept_queue_push_mp(ring, cfd, l, &addr, laddr)) { + if (accept_queue_push_mp(ring, cli_conn)) { _HA_ATOMIC_ADD(&activity[t].accq_pushed, 1); tasklet_wakeup(ring->tasklet); continue; @@ -1077,7 +1065,7 @@ void listener_accept(int fd) #endif // USE_THREAD _HA_ATOMIC_ADD(&l->thr_conn[tid], 1); - ret = l->accept(l, cfd, &addr); + ret = l->accept(cli_conn); if (unlikely(ret <= 0)) { /* The connection was closed by stream_accept(). Either * we just have to ignore it (ret == 0) or it's a critical diff --git a/src/session.c b/src/session.c index f6e4fe438..2673482ff 100644 --- a/src/session.c +++ b/src/session.c @@ -129,29 +129,22 @@ static void session_count_new(struct session *sess) /* This function is called from the protocol layer accept() in order to * instantiate a new session on behalf of a given listener and frontend. It * returns a positive value upon success, 0 if the connection can be ignored, - * or a negative value upon critical failure. The accepted file descriptor is + * or a negative value upon critical failure. The accepted connection is * closed if we return <= 0. If no handshake is needed, it immediately tries - * to instantiate a new stream. The created connection's owner points to the - * new session until the upper layers are created. + * to instantiate a new stream. The connection must already have been filled + * with the incoming connection handle (a fd), a target (the listener) and a + * source address. */ -int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr) +int session_accept_fd(struct connection *cli_conn) { - struct connection *cli_conn; + struct listener *l = __objt_listener(cli_conn->target); struct proxy *p = l->bind_conf->frontend; + int cfd = cli_conn->handle.fd; struct session *sess; int ret; - ret = -1; /* assume unrecoverable error by default */ - if (unlikely((cli_conn = conn_new(&l->obj_type)) == NULL)) - goto out_close; - - if (!sockaddr_alloc(&cli_conn->src, addr, sizeof(*addr))) - goto out_free_conn; - - cli_conn->handle.fd = cfd; - cli_conn->flags |= CO_FL_ADDR_FROM_SET; cli_conn->proxy_netns = l->rx.settings->netns; conn_prepare(cli_conn, l->rx.proto, l->bind_conf->xprt); @@ -282,7 +275,6 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr conn_stop_tracking(cli_conn); conn_xprt_close(cli_conn); conn_free(cli_conn); - out_close: listener_release(l); if (ret < 0 && l->bind_conf->xprt == xprt_get(XPRT_RAW) && p->mode == PR_MODE_HTTP && l->bind_conf->mux_proto == NULL) {