From b1bb1afa4741a20e5bf954f0065ae7b747a3e219 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Tue, 17 Sep 2019 11:55:52 +0200 Subject: [PATCH] MINOR: spoe: Support the async mode with several threads A different engine-id is now generated for each thread. So, it is possible to enable the async mode with several threads. This patch may be backported to older versions. --- include/types/spoe.h | 2 +- src/flt_spoe.c | 41 ++++++++++++++++------------------------- 2 files changed, 17 insertions(+), 26 deletions(-) diff --git a/include/types/spoe.h b/include/types/spoe.h index 9bba492b5..2dbf6e565 100644 --- a/include/types/spoe.h +++ b/include/types/spoe.h @@ -244,7 +244,6 @@ struct spoe_agent { } timeout; /* Config info */ - char *engine_id; /* engine-id string */ char *var_pfx; /* Prefix used for vars set by the agent */ char *var_on_error; /* Variable to set when an error occurred, in the TXN scope */ char *var_t_process; /* Variable to set to report the processing time of the last event/group, in the TXN scope */ @@ -264,6 +263,7 @@ struct spoe_agent { /* running info */ struct { + char *engine_id; /* engine-id string */ unsigned int frame_size; /* current maximum frame size, only used to encode messages */ unsigned int processing; struct freq_ctr processing_per_sec; diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 114ecf39b..fe219ea30 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -172,7 +172,6 @@ spoe_release_agent(struct spoe_agent *agent) free(agent->id); free(agent->conf.file); free(agent->var_pfx); - free(agent->engine_id); free(agent->var_on_error); free(agent->var_t_process); free(agent->var_t_total); @@ -185,8 +184,10 @@ spoe_release_agent(struct spoe_agent *agent) spoe_release_group(grp); } if (agent->rt) { - for (i = 0; i < global.nbthread; ++i) + for (i = 0; i < global.nbthread; ++i) { + free(agent->rt[i].engine_id); HA_SPIN_DESTROY(&agent->rt[i].lock); + } } free(agent->rt); free(agent); @@ -460,14 +461,14 @@ spoe_prepare_hahello_frame(struct appctx *appctx, char *frame, size_t size) goto too_big; /* (optionnal) "engine-id" K/V item, if present */ - if (agent != NULL && agent->engine_id != NULL) { + if (agent != NULL && agent->rt[tid].engine_id != NULL) { sz = SLEN(ENGINE_ID_KEY); if (spoe_encode_buffer(ENGINE_ID_KEY, sz, &p, end) == -1) goto too_big; *p++ = SPOE_DATA_T_STR; - sz = strlen(agent->engine_id); - if (spoe_encode_buffer(agent->engine_id, sz, &p, end) == -1) + sz = strlen(agent->rt[tid].engine_id); + if (spoe_encode_buffer(agent->rt[tid].engine_id, sz, &p, end) == -1) goto too_big; } @@ -3089,16 +3090,13 @@ spoe_check(struct proxy *px, struct flt_conf *fconf) return 1; } - /* finish per-thread agent initialization */ - if (global.nbthread == 1) - conf->agent->flags |= SPOE_FL_ASYNC; - if ((conf->agent->rt = calloc(global.nbthread, sizeof(*conf->agent->rt))) == NULL) { ha_alert("Proxy %s : out of memory initializing SPOE agent '%s' declared at %s:%d.\n", px->id, conf->agent->id, conf->agent->conf.file, conf->agent->conf.line); return 1; } for (i = 0; i < global.nbthread; ++i) { + conf->agent->rt[i].engine_id = NULL; conf->agent->rt[i].frame_size = conf->agent->max_frame_size; conf->agent->rt[i].processing = 0; LIST_INIT(&conf->agent->rt[i].applets); @@ -3121,12 +3119,13 @@ spoe_init_per_thread(struct proxy *p, struct flt_conf *fconf) struct spoe_config *conf = fconf->conf; struct spoe_agent *agent = conf->agent; - if (agent->engine_id == NULL) { + /* Use a != seed per process */ + if (relative_pid > 1 && tid == 0) srandom(now_ms * pid); - agent->engine_id = generate_pseudo_uuid(); - if (agent->engine_id == NULL) - return -1; - } + + agent->rt[tid].engine_id = generate_pseudo_uuid(); + if (agent->rt[tid].engine_id == NULL) + return -1; return 0; } @@ -3394,12 +3393,11 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) curagent->timeout.idle = TICK_ETERNITY; curagent->timeout.processing = TICK_ETERNITY; - curagent->engine_id = NULL; curagent->var_pfx = NULL; curagent->var_on_error = NULL; curagent->var_t_process = NULL; curagent->var_t_total = NULL; - curagent->flags = (SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION); + curagent->flags = (SPOE_FL_ASYNC | SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION); curagent->cps_max = 0; curagent->eps_max = 0; curagent->max_frame_size = MAX_FRAME_SIZE; @@ -3544,15 +3542,8 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) goto out; if (kwm == 1) curagent->flags &= ~SPOE_FL_ASYNC; - else { - if (global.nbthread == 1) - curagent->flags |= SPOE_FL_ASYNC; - else { - ha_warning("parsing [%s:%d] Async option is not supported with threads.\n", - file, linenum); - err_code |= ERR_WARN; - } - } + else + curagent->flags |= SPOE_FL_ASYNC; goto out; } else if (!strcmp(args[1], "send-frag-payload")) {