REORG: mworker: move IPC functions to mworker.c

Move the following functions to mworker.c:

void mworker_accept_wrapper(int fd);
void mworker_pipe_register();
This commit is contained in:
William Lallemand 2019-04-01 11:29:55 +02:00 committed by Willy Tarreau
parent 3cd95d2f1b
commit 3fa724db87
3 changed files with 63 additions and 52 deletions

View File

@ -20,4 +20,7 @@ void mworker_env_to_proc_list();
void mworker_block_signals(); void mworker_block_signals();
void mworker_unblock_signals(); void mworker_unblock_signals();
void mworker_accept_wrapper(int fd);
void mworker_pipe_register();
#endif /* PROTO_MWORKER_H_ */ #endif /* PROTO_MWORKER_H_ */

View File

@ -2598,58 +2598,6 @@ void deinit(void)
} /* end deinit() */ } /* end deinit() */
/* This is a wrapper for the sockpair FD, It tests if the socket received an
* EOF, if not, it calls listener_accept */
void mworker_accept_wrapper(int fd)
{
char c;
int ret;
while (1) {
ret = recv(fd, &c, 1, MSG_PEEK);
if (ret == -1) {
if (errno == EINTR)
continue;
if (errno == EAGAIN) {
fd_cant_recv(fd);
return;
}
break;
} else if (ret > 0) {
listener_accept(fd);
return;
} else if (ret == 0) {
/* At this step the master is down before
* this worker perform a 'normal' exit.
* So we want to exit with an error but
* other threads could currently process
* some stuff so we can't perform a clean
* deinit().
*/
exit(EXIT_FAILURE);
}
}
return;
}
/*
* This function register the accept wrapper for the sockpair of the master worker
*/
void mworker_pipe_register()
{
/* The iocb should be already initialized with listener_accept */
if (fdtab[proc_self->ipc_fd[1]].iocb == mworker_accept_wrapper)
return;
fcntl(proc_self->ipc_fd[1], F_SETFL, O_NONBLOCK);
/* In multi-tread, we need only one thread to process
* events on the pipe with master
*/
fd_insert(proc_self->ipc_fd[1], fdtab[proc_self->ipc_fd[1]].owner, mworker_accept_wrapper, 1);
fd_want_recv(proc_self->ipc_fd[1]);
}
/* Runs the polling loop */ /* Runs the polling loop */
static void run_poll_loop() static void run_poll_loop()
{ {

View File

@ -10,11 +10,16 @@
* *
*/ */
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <common/mini-clist.h> #include <common/mini-clist.h>
#include <proto/fd.h>
#include <proto/listener.h>
#include <proto/mworker.h> #include <proto/mworker.h>
#include <proto/signal.h> #include <proto/signal.h>
@ -105,3 +110,58 @@ void mworker_unblock_signals()
{ {
haproxy_unblock_signals(); haproxy_unblock_signals();
} }
/* ----- IPC FD (sockpair) related ----- */
/* This wrapper is called from the workers. It is registered instead of the
* normal listener_accept() so the worker can exit() when it detects that the
* master closed the IPC FD. If it's not a close, we just call the regular
* listener_accept() function */
void mworker_accept_wrapper(int fd)
{
char c;
int ret;
while (1) {
ret = recv(fd, &c, 1, MSG_PEEK);
if (ret == -1) {
if (errno == EINTR)
continue;
if (errno == EAGAIN) {
fd_cant_recv(fd);
return;
}
break;
} else if (ret > 0) {
listener_accept(fd);
return;
} else if (ret == 0) {
/* At this step the master is down before
* this worker perform a 'normal' exit.
* So we want to exit with an error but
* other threads could currently process
* some stuff so we can't perform a clean
* deinit().
*/
exit(EXIT_FAILURE);
}
}
return;
}
/*
* This function register the accept wrapper for the sockpair of the master worker
*/
void mworker_pipe_register()
{
/* The iocb should be already initialized with listener_accept */
if (fdtab[proc_self->ipc_fd[1]].iocb == mworker_accept_wrapper)
return;
fcntl(proc_self->ipc_fd[1], F_SETFL, O_NONBLOCK);
/* In multi-tread, we need only one thread to process
* events on the pipe with master
*/
fd_insert(proc_self->ipc_fd[1], fdtab[proc_self->ipc_fd[1]].owner, mworker_accept_wrapper, 1);
fd_want_recv(proc_self->ipc_fd[1]);
}