MEDIUM: lua-thread: Add the lua-load-per-thread directive

The goal is to allow execution of one main lua state per thread.

This patch contains the main job. The lua init is done using these
steps:
 - "lua-load-per-thread" loads the lua code in the first thread
 - it creates the structs
 - it stores loaded files
 - the 1st step load is completed (execution of hlua_post_init)
   and now, we known the number of threads
 - we initilize lua states for all remaining threads
 - for each one, we load the lua file
 - for each one, we execute post-init

Once all is loaded, we control consistency of functions references.
The rules are:
 - a function reference cannot be in the shared lua state and in
   a per-thread lua state at the same time.
 - if a function reference is declared in a per-thread lua state, it
   must be declared in all per-thread lua states
This commit is contained in:
Thierry Fournier 2020-11-29 00:37:41 +01:00 committed by Willy Tarreau
parent c749259dff
commit 59f11be436
2 changed files with 252 additions and 14 deletions

View File

@ -837,6 +837,7 @@ The following keywords are supported in the "global" section :
- log-tag
- log-send-hostname
- lua-load
- lua-load-per-thread
- lua-prepend-path
- mworker-max-reloads
- nbproc
@ -1363,9 +1364,31 @@ log-tag <string>
running on the same host. See also the per-proxy "log-tag" directive.
lua-load <file>
This global directive loads and executes a Lua file. This directive can be
This global directive loads and executes a Lua file in the shared context
that is visible to all threads. Any variable set in such a context is visible
from any thread. This is the easiest and recommended way to load Lua programs
but it will not scale well if a lot of Lua calls are performed, as only one
thread may be running on the global state at a time. A program loaded this
way will always see 0 in the "core.thread" variable. This directive can be
used multiple times.
lua-load-per-thread <file>
This global directive loads and executes a Lua file into each started thread.
Any global variable has a thread-local visibility so that each thread could
see a different value. As such it is strongly recommended not to use global
variables in programs loaded this way. An independent copy is loaded and
initialized for each thread, everything is done sequentially and in the
thread's numeric order from 1 to nbthread. If some operations need to be
performed only once, the program should check the "core.thread" variable to
figure what thread is being initialized. Programs loaded this way will run
concurrently on all threads and will be highly scalable. This is the
recommended way to load simple functions that register sample-fetches,
converters, actions or services once it is certain the program doesn't depend
on global variables. For the sake of simplicity, the directive is available
even if only one thread is used and even if threads are disabled (in which
case it will be equivalent to lua-load). This directive can be used multiple
times.
lua-prepend-path <string> [<type>]
Prepends the given string followed by a semicolon to Lua's package.<type>
variable.

View File

@ -136,6 +136,11 @@ static struct list referenced_functions = LIST_HEAD_INIT(referenced_functions);
*/
static int hlua_state_id;
/* This is a NULL-terminated list of lua file which are referenced to load per thread */
static char **per_thread_load = NULL;
lua_State *hlua_init_state(int thread_id);
#define SET_SAFE_LJMP_L(__L, __HLUA) \
({ \
int ret; \
@ -282,6 +287,14 @@ static int hlua_lua2smp(lua_State *L, int ud, struct sample *smp);
__LJMP static int hlua_http_get_headers(lua_State *L, struct http_msg *msg);
struct prepend_path {
struct list l;
char *type;
char *path;
};
static struct list prepend_path_list = LIST_HEAD_INIT(prepend_path_list);
#define SEND_ERR(__be, __fmt, __args...) \
do { \
send_log(__be, LOG_ERR, __fmt, ## __args); \
@ -6400,7 +6413,13 @@ static int hlua_register_task(lua_State *L)
if (!hlua)
WILL_LJMP(luaL_error(L, "Lua out of memory error."));
task = task_new(MAX_THREADS_MASK);
/* We are in the common lua state, execute the task anywhere,
* otherwise, inherit the current thread identifier
*/
if (state_id == 0)
task = task_new(MAX_THREADS_MASK);
else
task = task_new(tid_bit);
if (!task)
WILL_LJMP(luaL_error(L, "Lua out of memory error."));
@ -6711,8 +6730,13 @@ __LJMP static int hlua_register_converters(lua_State *L)
chunk_printf(trash, "lua.%s", name);
sc = find_sample_conv(trash->area, trash->data);
if (sc != NULL) {
ha_warning("Trying to register converter 'lua.%s' more than once. "
"This will become a hard error in version 2.5.\n", name);
fcn = sc->private;
if (fcn->function_ref[hlua_state_id] != -1) {
ha_warning("Trying to register converter 'lua.%s' more than once. "
"This will become a hard error in version 2.5.\n", name);
}
fcn->function_ref[hlua_state_id] = ref;
return 0;
}
/* Allocate and fill the sample fetch keyword struct. */
@ -6779,8 +6803,13 @@ __LJMP static int hlua_register_fetches(lua_State *L)
chunk_printf(trash, "lua.%s", name);
sf = find_sample_fetch(trash->area, trash->data);
if (sf != NULL) {
ha_warning("Trying to register sample-fetch 'lua.%s' more than once. "
"This will become a hard error in version 2.5.\n", name);
fcn = sf->private;
if (fcn->function_ref[hlua_state_id] != -1) {
ha_warning("Trying to register sample-fetch 'lua.%s' more than once. "
"This will become a hard error in version 2.5.\n", name);
}
fcn->function_ref[hlua_state_id] = ref;
return 0;
}
/* Allocate and fill the sample fetch keyword struct. */
@ -7643,8 +7672,16 @@ __LJMP static int hlua_register_action(lua_State *L)
akw = NULL;
}
if (akw != NULL) {
ha_warning("Trying to register action 'lua.%s' more than once. "
"This will become a hard error in version 2.5.\n", name);
fcn = akw->private;
if (fcn->function_ref[hlua_state_id] != -1) {
ha_warning("Trying to register action 'lua.%s' more than once. "
"This will become a hard error in version 2.5.\n", name);
}
fcn->function_ref[hlua_state_id] = ref;
/* pop the environment string. */
lua_pop(L, 1);
continue;
}
/* Check required environment. Only accepted "http" or "tcp". */
@ -7765,8 +7802,13 @@ __LJMP static int hlua_register_service(lua_State *L)
chunk_printf(trash, "lua.%s", name);
akw = service_find(trash->area);
if (akw != NULL) {
ha_warning("Trying to register service 'lua.%s' more than once. "
"This will become a hard error in version 2.5.\n", name);
fcn = akw->private;
if (fcn->function_ref[hlua_state_id] != -1) {
ha_warning("Trying to register service 'lua.%s' more than once. "
"This will become a hard error in version 2.5.\n", name);
}
fcn->function_ref[hlua_state_id] = ref;
return 0;
}
/* Allocate and fill the sample fetch keyword struct. */
@ -8037,8 +8079,13 @@ __LJMP static int hlua_register_cli(lua_State *L)
}
cli_kw = cli_find_kw_exact((char **)kw);
if (cli_kw != NULL) {
ha_warning("Trying to register CLI keyword 'lua.%s' more than once. "
"This will become a hard error in version 2.5.\n", trash->area);
fcn = cli_kw->private;
if (fcn->function_ref[hlua_state_id] != -1) {
ha_warning("Trying to register CLI keyword 'lua.%s' more than once. "
"This will become a hard error in version 2.5.\n", trash->area);
}
fcn->function_ref[hlua_state_id] = ref_io;
return 0;
}
/* Allocate and fill the sample fetch keyword struct. */
@ -8243,10 +8290,56 @@ static int hlua_load(char **args, int section_type, struct proxy *curpx,
return -1;
}
/* loading for global state */
hlua_state_id = 0;
ha_set_tid(0);
return hlua_load_state(args[1], hlua_states[0], err);
}
static int hlua_load_per_thread(char **args, int section_type, struct proxy *curpx,
struct proxy *defpx, const char *file, int line,
char **err)
{
int len;
if (*(args[1]) == 0) {
memprintf(err, "'%s' expects a file as parameter.\n", args[0]);
return -1;
}
if (per_thread_load == NULL) {
/* allocate the first entry large enough to store the final NULL */
per_thread_load = calloc(1, sizeof(*per_thread_load));
if (per_thread_load == NULL) {
memprintf(err, "out of memory error");
return -1;
}
}
/* count used entries */
for (len = 0; per_thread_load[len] != NULL; len++)
;
per_thread_load = realloc(per_thread_load, (len + 2) * sizeof(*per_thread_load));
if (per_thread_load == NULL) {
memprintf(err, "out of memory error");
return -1;
}
per_thread_load[len] = strdup(args[1]);
per_thread_load[len + 1] = NULL;
if (per_thread_load[len] == NULL) {
memprintf(err, "out of memory error");
return -1;
}
/* loading for thread 1 only */
hlua_state_id = 1;
ha_set_tid(0);
return hlua_load_state(args[1], hlua_states[1], err);
}
/* Prepend the given <path> followed by a semicolon to the `package.<type>` variable
* in the given <ctx>.
*/
@ -8269,6 +8362,8 @@ static int hlua_config_prepend_path(char **args, int section_type, struct proxy
{
char *path;
char *type = "path";
struct prepend_path *p;
if (too_many_args(2, args, err, NULL)) {
return -1;
}
@ -8287,13 +8382,33 @@ static int hlua_config_prepend_path(char **args, int section_type, struct proxy
type = args[2];
}
return hlua_prepend_path(hlua_states[0], type, path);
p = calloc(1, sizeof(*p));
if (p == NULL) {
memprintf(err, "out of memory error");
return -1;
}
p->path = strdup(path);
if (p->path == NULL) {
memprintf(err, "out of memory error");
return -1;
}
p->type = strdup(type);
if (p->type == NULL) {
memprintf(err, "out of memory error");
return -1;
}
LIST_ADDQ(&prepend_path_list, &p->l);
hlua_prepend_path(hlua_states[0], type, path);
hlua_prepend_path(hlua_states[1], type, path);
return 0;
}
/* configuration keywords declaration */
static struct cfg_kw_list cfg_kws = {{ },{
{ CFG_GLOBAL, "lua-prepend-path", hlua_config_prepend_path },
{ CFG_GLOBAL, "lua-load", hlua_load },
{ CFG_GLOBAL, "lua-load-per-thread", hlua_load_per_thread },
{ CFG_GLOBAL, "tune.lua.session-timeout", hlua_session_timeout },
{ CFG_GLOBAL, "tune.lua.task-timeout", hlua_task_timeout },
{ CFG_GLOBAL, "tune.lua.service-timeout", hlua_applet_timeout },
@ -8402,6 +8517,12 @@ int hlua_post_init_state(lua_State *L)
int hlua_post_init()
{
int ret;
int i;
int errors;
char *err = NULL;
struct hlua_function *fcn;
#if USE_OPENSSL
/* Initialize SSL server. */
if (socket_ssl.xprt->prepare_srv) {
@ -8414,7 +8535,89 @@ int hlua_post_init()
/* Perform post init of common thread */
hlua_state_id = 0;
return hlua_post_init_state(hlua_states[hlua_state_id]);
ha_set_tid(0);
ret = hlua_post_init_state(hlua_states[hlua_state_id]);
if (ret == 0)
return 0;
/* init remaining lua states and load files */
for (hlua_state_id = 2; hlua_state_id < global.nbthread + 1; hlua_state_id++) {
/* set thread context */
ha_set_tid(hlua_state_id - 1);
/* Init lua state */
hlua_states[hlua_state_id] = hlua_init_state(hlua_state_id);
/* Load lua files */
for (i = 0; per_thread_load && per_thread_load[i]; i++) {
ret = hlua_load_state(per_thread_load[i], hlua_states[hlua_state_id], &err);
if (ret != 0) {
ha_alert("Lua init: %s\n", err);
return 0;
}
}
}
/* Reset thread context */
ha_set_tid(0);
/* Execute post init for all states */
for (hlua_state_id = 1; hlua_state_id < global.nbthread + 1; hlua_state_id++) {
/* set thread context */
ha_set_tid(hlua_state_id - 1);
/* run post init */
ret = hlua_post_init_state(hlua_states[hlua_state_id]);
if (ret == 0)
return 0;
}
/* Reset thread context */
ha_set_tid(0);
/* control functions registering. Each function must have:
* - only the function_ref[0] set positive and all other to -1
* - only the function_ref[0] set to -1 and all other positive
* This ensure a same reference is not used both in shared
* lua state and thread dedicated lua state. Note: is the case
* reach, the shared state is prioritary, but the bug will be
* complicated to found for the end user.
*/
errors = 0;
list_for_each_entry(fcn, &referenced_functions, l) {
ret = 0;
for (i = 1; i < global.nbthread + 1; i++) {
if (fcn->function_ref[i] == -1)
ret--;
else
ret++;
}
if (abs(ret) != global.nbthread) {
ha_alert("Lua function '%s' is not referenced in all thread. "
"Expect function in all thread or in none thread.\n", fcn->name);
errors++;
continue;
}
if ((fcn->function_ref[0] == -1) == (ret < 0)) {
ha_alert("Lua function '%s' is referenced both ins shared Lua context (throught lua-load) "
"and per-thread Lua context (throught lua-load-per-thread). these two context "
"exclusive.\n", fcn->name);
errors++;
}
}
if (errors > 0)
return 0;
/* after this point, this global wil no longer used, so set to
* -1 in order to have probably a segfault if someone use it
*/
hlua_state_id = -1;
return 1;
}
/* The memory allocator used by the Lua stack. <ud> is a pointer to the
@ -8471,6 +8674,7 @@ lua_State *hlua_init_state(int thread_num)
const char *error_msg;
void **context;
lua_State *L;
struct prepend_path *pp;
#ifdef USE_OPENSSL
struct srv_kw *kw;
int tmp_error;
@ -8521,6 +8725,10 @@ lua_State *hlua_init_state(int thread_num)
#undef HLUA_PREPEND_PATH_TOSTRING
#undef HLUA_PREPEND_PATH_TOSTRING1
/* Apply configured prepend path */
list_for_each_entry(pp, &prepend_path_list, l)
hlua_prepend_path(L, pp->type, pp->path);
/*
*
* Create "core" object.
@ -9066,12 +9274,19 @@ void hlua_init(void) {
/* Init state for common/shared lua parts */
hlua_state_id = 0;
ha_set_tid(0);
hlua_states[0] = hlua_init_state(0);
/* Init state 1 for thread 0. We have at least one thread. */
hlua_state_id = 1;
ha_set_tid(0);
hlua_states[1] = hlua_init_state(1);
}
static void hlua_deinit()
{
lua_close(hlua_states[0]);
lua_close(hlua_states[1]);
}
REGISTER_POST_DEINIT(hlua_deinit);