diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 774fe7b67..39d82208e 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -167,6 +167,7 @@ enum lock_label { COMP_POOL_LOCK, LUA_LOCK, NOTIF_LOCK, + SPOE_APPLET_LOCK, LOCK_LABELS }; struct lock_stat { @@ -255,7 +256,7 @@ static inline void show_lock_stats() "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS", "APPLETS", "PEER", "BUF_WQ", "STREAMS", "SSL", "SSL_GEN_CERTS", "PATREF", "PATEXP", "PATLRU", "VARS", "COMP_POOL", "LUA", - "NOTIF" }; + "NOTIF", "SPOE_APPLET" }; int lbl; for (lbl = 0; lbl < LOCK_LABELS; lbl++) { diff --git a/include/types/spoe.h b/include/types/spoe.h index 108bc980a..aead2ba98 100644 --- a/include/types/spoe.h +++ b/include/types/spoe.h @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -251,17 +252,23 @@ struct spoe_agent { struct list messages; /* list of all messages attached to this SPOE agent */ /* running info */ - unsigned int frame_size; /* current maximum frame size, only used to encode messages */ - unsigned int applets_act; /* # of applets alive at a time */ - unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */ - unsigned int sending_rate; /* the global sending rate */ + struct { + unsigned int frame_size; /* current maximum frame size, only used to encode messages */ + unsigned int applets_act; /* # of applets alive at a time */ + unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */ + unsigned int sending_rate; /* the global sending rate */ - struct freq_ctr conn_per_sec; /* connections per second */ - struct freq_ctr err_per_sec; /* connetion errors per second */ + struct freq_ctr conn_per_sec; /* connections per second */ + struct freq_ctr err_per_sec; /* connetion errors per second */ - struct list applets; /* List of available SPOE applets */ - struct list sending_queue; /* Queue of streams waiting to send data */ - struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */ + struct list applets; /* List of available SPOE applets */ + struct list sending_queue; /* Queue of streams waiting to send data */ + struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */ + +#ifdef USE_THREAD + HA_SPINLOCK_T lock; +#endif + } *rt; }; diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 7fc4ed87f..938faabd2 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -153,6 +153,7 @@ spoe_release_agent(struct spoe_agent *agent) { struct spoe_message *msg, *msgback; struct spoe_group *grp, *grpback; + int i; if (!agent) return; @@ -169,6 +170,9 @@ spoe_release_agent(struct spoe_agent *agent) LIST_DEL(&grp->list); spoe_release_group(grp); } + for (i = 0; i < global.nbthread; ++i) + SPIN_DESTROY(&agent->rt[i].lock); + free(agent->rt); free(agent); } @@ -974,7 +978,7 @@ spoe_handle_agentack_frame(struct appctx *appctx, struct spoe_context **ctx, /* Try to find the corresponding SPOE context */ if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { - list_for_each_entry((*ctx), &agent->waiting_queue, list) { + list_for_each_entry((*ctx), &agent->rt[tid].waiting_queue, list) { if ((*ctx)->stream_id == (unsigned int)stream_id && (*ctx)->frame_id == (unsigned int)frame_id) goto found; @@ -1234,7 +1238,7 @@ spoe_release_appctx(struct appctx *appctx) __FUNCTION__, appctx); /* Remove applet from the list of running applets */ - agent->applets_act--; + agent->rt[tid].applets_act--; if (!LIST_ISEMPTY(&spoe_appctx->list)) { LIST_DEL(&spoe_appctx->list); LIST_INIT(&spoe_appctx->list); @@ -1243,7 +1247,7 @@ spoe_release_appctx(struct appctx *appctx) /* Shutdown the server connection, if needed */ if (appctx->st0 != SPOE_APPCTX_ST_END) { if (appctx->st0 == SPOE_APPCTX_ST_IDLE) - agent->applets_idle--; + agent->rt[tid].applets_idle--; appctx->st0 = SPOE_APPCTX_ST_END; if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE) @@ -1284,18 +1288,18 @@ spoe_release_appctx(struct appctx *appctx) &spoe_appctx->buffer_wait); pool_free2(pool2_spoe_appctx, spoe_appctx); - if (!LIST_ISEMPTY(&agent->applets)) + if (!LIST_ISEMPTY(&agent->rt[tid].applets)) goto end; /* If this was the last running applet, notify all waiting streams */ - list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) { + list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) { LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); ctx->state = SPOE_CTX_ST_ERROR; ctx->status_code = (spoe_appctx->status_code + 0x100); task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); } - list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) { + list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) { LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); ctx->state = SPOE_CTX_ST_ERROR; @@ -1305,10 +1309,9 @@ spoe_release_appctx(struct appctx *appctx) end: /* Update runtinme agent info */ - agent->frame_size = agent->max_frame_size; - list_for_each_entry(spoe_appctx, &agent->applets, list) - agent->frame_size = MIN(spoe_appctx->max_frame_size, - agent->frame_size); + agent->rt[tid].frame_size = agent->max_frame_size; + list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) + HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, spoe_appctx->max_frame_size); } static int @@ -1421,14 +1424,15 @@ spoe_handle_connecting_appctx(struct appctx *appctx) default: /* HELLO handshake is finished, set the idle timeout and * add the applet in the list of running applets. */ - agent->applets_idle++; + agent->rt[tid].applets_idle++; appctx->st0 = SPOE_APPCTX_ST_IDLE; + SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); LIST_DEL(&SPOE_APPCTX(appctx)->list); - LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list); + LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list); + SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); /* Update runtinme agent info */ - agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size, - agent->frame_size); + HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size); goto next; } @@ -1465,13 +1469,13 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip) ret = spoe_prepare_hafrag_frame(appctx, ctx, frame, SPOE_APPCTX(appctx)->max_frame_size); } - else if (LIST_ISEMPTY(&agent->sending_queue)) { + else if (LIST_ISEMPTY(&agent->rt[tid].sending_queue)) { *skip = 1; ret = 1; goto end; } else { - ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list); + ctx = LIST_NEXT(&agent->rt[tid].sending_queue, typeof(ctx), list); ret = spoe_prepare_hanotify_frame(appctx, ctx, frame, SPOE_APPCTX(appctx)->max_frame_size); @@ -1532,7 +1536,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip) no_frag_frame_sent: if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { appctx->st0 = SPOE_APPCTX_ST_PROCESSING; - LIST_ADDQ(&agent->waiting_queue, &ctx->list); + LIST_ADDQ(&agent->rt[tid].waiting_queue, &ctx->list); } else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) { appctx->st0 = SPOE_APPCTX_ST_PROCESSING; @@ -1660,7 +1664,7 @@ spoe_handle_processing_appctx(struct appctx *appctx) goto next; case 0: /* ignore */ - agent->sending_rate++; + agent->rt[tid].sending_rate++; fpa++; break; @@ -1668,7 +1672,7 @@ spoe_handle_processing_appctx(struct appctx *appctx) break; default: - agent->sending_rate++; + agent->rt[tid].sending_rate++; fpa++; break; } @@ -1703,11 +1707,13 @@ spoe_handle_processing_appctx(struct appctx *appctx) stop: if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) { appctx->st0 = SPOE_APPCTX_ST_IDLE; - agent->applets_idle++; + agent->rt[tid].applets_idle++; } if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) { + SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); LIST_DEL(&SPOE_APPCTX(appctx)->list); - LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list); + LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list); + SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); if (fpa) SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle); @@ -1868,14 +1874,14 @@ spoe_handle_appctx(struct appctx *appctx) case SPOE_APPCTX_ST_IDLE: if (stopping && - LIST_ISEMPTY(&agent->sending_queue) && + LIST_ISEMPTY(&agent->rt[tid].sending_queue) && LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) { SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle); appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; goto switchstate; } - agent->applets_idle--; + agent->rt[tid].applets_idle--; appctx->st0 = SPOE_APPCTX_ST_PROCESSING; /* fall through */ @@ -1909,6 +1915,9 @@ spoe_handle_appctx(struct appctx *appctx) return; } out: + if (stopping) + spoe_wakeup_appctx(appctx); + if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY) task_queue(SPOE_APPCTX(appctx)->task); si_oc(si)->flags |= CF_READ_DONTWAIT; @@ -1940,7 +1949,7 @@ spoe_create_appctx(struct spoe_config *conf) memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size); appctx->st0 = SPOE_APPCTX_ST_CONNECT; - if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL) + if ((SPOE_APPCTX(appctx)->task = task_new(1UL << tid)) == NULL) goto out_free_spoe_appctx; SPOE_APPCTX(appctx)->owner = appctx; @@ -1976,8 +1985,10 @@ spoe_create_appctx(struct spoe_config *conf) strm->do_log = NULL; strm->res.flags |= CF_READ_DONTWAIT; - LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list); - conf->agent->applets_act++; + SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock); + LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list); + SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock); + conf->agent->rt[tid].applets_act++; task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT); task_wakeup(strm->task, TASK_WOKEN_INIT); @@ -2008,9 +2019,9 @@ spoe_queue_context(struct spoe_context *ctx) min_applets = min_applets_act(agent); /* Check if we need to create a new SPOE applet or not. */ - if (agent->applets_act >= min_applets && - agent->applets_idle && - agent->sending_rate) + if (agent->rt[tid].applets_act >= min_applets && + agent->rt[tid].applets_idle && + agent->rt[tid].sending_rate) goto end; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" @@ -2031,7 +2042,7 @@ spoe_queue_context(struct spoe_context *ctx) /* Do not try to create a new applet if we have reached the maximum of * connection per seconds */ if (agent->cps_max > 0) { - if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) { + if (!freq_ctr_remain(&agent->rt[tid].conn_per_sec, agent->cps_max, 0)) { SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - cannot create SPOE appctx: max CPS reached\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, @@ -2052,41 +2063,43 @@ spoe_queue_context(struct spoe_context *ctx) goto end; } - if (agent->applets_act <= min_applets) + if (agent->rt[tid].applets_act <= min_applets) SPOE_APPCTX(appctx)->flags |= SPOE_APPCTX_FL_PERSIST; /* Increase the per-process number of cumulated connections */ if (agent->cps_max > 0) - update_freq_ctr(&agent->conn_per_sec, 1); + update_freq_ctr(&agent->rt[tid].conn_per_sec, 1); end: /* The only reason to return an error is when there is no applet */ - if (LIST_ISEMPTY(&agent->applets)) { + if (LIST_ISEMPTY(&agent->rt[tid].applets)) { ctx->status_code = SPOE_CTX_ERR_RES; return -1; } /* Add the SPOE context in the sending queue and update all running * info */ - LIST_ADDQ(&agent->sending_queue, &ctx->list); - if (agent->sending_rate) - agent->sending_rate--; + LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list); + if (agent->rt[tid].sending_rate) + agent->rt[tid].sending_rate--; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - Add stream in sending queue" " - applets_act=%u - applets_idle=%u - sending_rate=%u\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, - ctx->strm, agent->applets_act, agent->applets_idle, - agent->sending_rate); + ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle, + agent->rt[tid].sending_rate); /* Finally try to wakeup the first IDLE applet found and move it at the * end of the list. */ - list_for_each_entry(spoe_appctx, &agent->applets, list) { + list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) { appctx = spoe_appctx->owner; if (appctx->st0 == SPOE_APPCTX_ST_IDLE) { spoe_wakeup_appctx(appctx); + SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); LIST_DEL(&spoe_appctx->list); - LIST_ADDQ(&agent->applets, &spoe_appctx->list); + LIST_ADDQ(&agent->rt[tid].applets, &spoe_appctx->list); + SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); break; } } @@ -2189,7 +2202,7 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx, char *p, *end; p = ctx->buffer->p; - end = p + agent->frame_size - FRAME_HDR_SIZE; + end = p + agent->rt[tid].frame_size - FRAME_HDR_SIZE; if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */ /* Resume encoding of a SPOE message */ @@ -2239,7 +2252,7 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx, (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, s, ((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"), - ctx->frag_ctx.spoe_appctx, (agent->frame_size - FRAME_HDR_SIZE), + ctx->frag_ctx.spoe_appctx, (agent->rt[tid].frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p); ctx->buffer->i = p - ctx->buffer->p; @@ -2263,7 +2276,7 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx, (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx, ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff, - (agent->frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p); + (agent->rt[tid].frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p); ctx->buffer->i = p - ctx->buffer->p; ctx->flags |= SPOE_CTX_FL_FRAGMENTED; @@ -2504,7 +2517,7 @@ spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent, struct spoe_context *ctx, int dir) { if (agent->eps_max > 0) - update_freq_ctr(&agent->err_per_sec, 1); + update_freq_ctr(&agent->rt[tid].err_per_sec, 1); if (agent->var_on_error) { struct sample smp; @@ -2557,7 +2570,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx, if (ctx->state == SPOE_CTX_ST_READY) { if (agent->eps_max > 0) { - if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) { + if (!freq_ctr_remain(&agent->rt[tid].err_per_sec, agent->eps_max, 0)) { SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - skip processing of messages: max EPS reached\n", (int)now.tv_sec, (int)now.tv_usec, @@ -2791,6 +2804,7 @@ spoe_sig_stop(struct sig_handler *sh) struct spoe_config *conf; struct spoe_agent *agent; struct spoe_appctx *spoe_appctx; + int i; if (fconf->id != spoe_filter_id) continue; @@ -2798,8 +2812,11 @@ spoe_sig_stop(struct sig_handler *sh) conf = fconf->conf; agent = conf->agent; - list_for_each_entry(spoe_appctx, &agent->applets, list) { - spoe_wakeup_appctx(spoe_appctx->owner); + for (i = 0; i < global.nbthread; ++i) { + SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock); + list_for_each_entry(spoe_appctx, &agent->rt[i].applets, list) + spoe_wakeup_appctx(spoe_appctx->owner); + SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock); } } p = p->next; @@ -3177,7 +3194,9 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) curagent->engine_id = NULL; curagent->var_pfx = NULL; curagent->var_on_error = NULL; - curagent->flags = (SPOE_FL_PIPELINING | SPOE_FL_ASYNC | SPOE_FL_SND_FRAGMENTATION); + curagent->flags = (SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION); + if (global.nbthread == 1) + curagent->flags |= SPOE_FL_ASYNC; curagent->cps_max = 0; curagent->eps_max = 0; curagent->max_frame_size = MAX_FRAME_SIZE; @@ -3189,14 +3208,21 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) LIST_INIT(&curagent->groups); LIST_INIT(&curagent->messages); - curagent->frame_size = curagent->max_frame_size; - curagent->applets_act = 0; - curagent->applets_idle = 0; - curagent->sending_rate = 0; - - LIST_INIT(&curagent->applets); - LIST_INIT(&curagent->sending_queue); - LIST_INIT(&curagent->waiting_queue); + if ((curagent->rt = calloc(global.nbthread, sizeof(*curagent->rt))) == NULL) { + Alert("parsing [%s:%d] : out of memory.\n", file, linenum); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + for (i = 0; i < global.nbthread; ++i) { + curagent->rt[i].frame_size = curagent->max_frame_size; + curagent->rt[i].applets_act = 0; + curagent->rt[i].applets_idle = 0; + curagent->rt[i].sending_rate = 0; + LIST_INIT(&curagent->rt[i].applets); + LIST_INIT(&curagent->rt[i].sending_queue); + LIST_INIT(&curagent->rt[i].waiting_queue); + SPIN_INIT(&curagent->rt[i].lock); + } } else if (!strcmp(args[0], "use-backend")) { if (!*args[1]) { @@ -3320,8 +3346,15 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) goto out; if (kwm == 1) curagent->flags &= ~SPOE_FL_ASYNC; - else - curagent->flags |= SPOE_FL_ASYNC; + else { + if (global.nbthread == 1) + curagent->flags |= SPOE_FL_ASYNC; + else { + Warning("parsing [%s:%d] Async option is not supported with threads.\n", + file, linenum); + err_code |= ERR_WARN; + } + } goto out; } else if (!strcmp(args[1], "send-frag-payload")) {