diff --git a/src/ev_epoll.c b/src/ev_epoll.c index f37455faf..635b8a5fe 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -29,7 +29,7 @@ /* private data */ static THREAD_LOCAL struct epoll_event *epoll_events = NULL; -static int epoll_fd; +static int epoll_fd[MAX_THREADS]; // per-thread epoll_fd /* This structure may be used for any purpose. Warning! do not use it in * recursive functions ! @@ -49,8 +49,14 @@ static THREAD_LOCAL struct epoll_event ev; */ REGPRM1 static void __fd_clo(int fd) { - if (unlikely(fdtab[fd].cloned)) - epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &ev); + if (unlikely(fdtab[fd].cloned)) { + unsigned long m = fdtab[fd].thread_mask; + int i; + + for (i = global.nbthread - 1; i >= 0; i--) + if (m & (1UL << i)) + epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, fd, &ev); + } } /* @@ -82,34 +88,36 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) fdtab[fd].state = en; HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); - if ((eo ^ en) & FD_EV_POLLED_RW) { - /* poll status changed */ - - if ((en & FD_EV_POLLED_RW) == 0) { + if (fdtab[fd].polled_mask & tid_bit) { + if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { /* fd removed from poll list */ opcode = EPOLL_CTL_DEL; - } - else if ((eo & FD_EV_POLLED_RW) == 0) { - /* new fd in the poll list */ - opcode = EPOLL_CTL_ADD; + HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); } else { /* fd status changed */ opcode = EPOLL_CTL_MOD; } - - /* construct the epoll events based on new state */ - ev.events = 0; - if (en & FD_EV_POLLED_R) - ev.events |= EPOLLIN | EPOLLRDHUP; - - if (en & FD_EV_POLLED_W) - ev.events |= EPOLLOUT; - - ev.data.fd = fd; - - epoll_ctl(epoll_fd, opcode, fd, &ev); } + else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) { + /* new fd in the poll list */ + opcode = EPOLL_CTL_ADD; + HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); + } + else { + continue; + } + + /* construct the epoll events based on new state */ + ev.events = 0; + if (en & FD_EV_POLLED_R) + ev.events |= EPOLLIN | EPOLLRDHUP; + + if (en & FD_EV_POLLED_W) + ev.events |= EPOLLOUT; + + ev.data.fd = fd; + epoll_ctl(epoll_fd[tid], opcode, fd, &ev); } fd_nbupdt = 0; @@ -129,7 +137,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) /* now let's wait for polled events */ gettimeofday(&before_poll, NULL); - status = epoll_wait(epoll_fd, epoll_events, global.tune.maxpollevents, wait_time); + status = epoll_wait(epoll_fd[tid], epoll_events, global.tune.maxpollevents, wait_time); tv_update_date(wait_time, status); measure_idle(); @@ -146,7 +154,10 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) } if (!(fdtab[fd].thread_mask & tid_bit)) { + /* FD has been migrated */ activity[tid].poll_skip++; + epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev); + HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); continue; } @@ -178,14 +189,38 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) static int init_epoll_per_thread() { + int fd; + epoll_events = calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents); if (epoll_events == NULL) - return 0; + goto fail_alloc; + + if (tid) { + epoll_fd[tid] = epoll_create(global.maxsock + 1); + if (epoll_fd[tid] < 0) + goto fail_fd; + } + + /* we may have to unregister some events initially registered on the + * original fd when it was alone, and/or to register events on the new + * fd for this thread. Let's just mark them as updated, the poller will + * do the rest. + */ + for (fd = 0; fd < maxfd; fd++) + updt_fd_polling(fd); + return 1; + fail_fd: + free(epoll_events); + fail_alloc: + return 0; } static void deinit_epoll_per_thread() { + if (tid) + close(epoll_fd[tid]); + free(epoll_events); epoll_events = NULL; } @@ -199,8 +234,8 @@ REGPRM1 static int _do_init(struct poller *p) { p->private = NULL; - epoll_fd = epoll_create(global.maxsock + 1); - if (epoll_fd < 0) + epoll_fd[tid] = epoll_create(global.maxsock + 1); + if (epoll_fd[tid] < 0) goto fail_fd; hap_register_per_thread_init(init_epoll_per_thread); @@ -219,9 +254,9 @@ REGPRM1 static int _do_init(struct poller *p) */ REGPRM1 static void _do_term(struct poller *p) { - if (epoll_fd >= 0) { - close(epoll_fd); - epoll_fd = -1; + if (epoll_fd[tid] >= 0) { + close(epoll_fd[tid]); + epoll_fd[tid] = -1; } p->private = NULL; @@ -251,10 +286,10 @@ REGPRM1 static int _do_test(struct poller *p) */ REGPRM1 static int _do_fork(struct poller *p) { - if (epoll_fd >= 0) - close(epoll_fd); - epoll_fd = epoll_create(global.maxsock + 1); - if (epoll_fd < 0) + if (epoll_fd[tid] >= 0) + close(epoll_fd[tid]); + epoll_fd[tid] = epoll_create(global.maxsock + 1); + if (epoll_fd[tid] < 0) return 0; return 1; } @@ -268,11 +303,14 @@ __attribute__((constructor)) static void _do_register(void) { struct poller *p; + int i; if (nbpollers >= MAX_POLLERS) return; - epoll_fd = -1; + for (i = 0; i < MAX_THREADS; i++) + epoll_fd[i] = -1; + p = &pollers[nbpollers++]; p->name = "epoll";