MEDIUM: thread/spoe: Make the SPOE thread-safe

Because there is not migration mechanism yet, all runtime information about an
SPOE agent are thread-local and async exchanges with agents are disabled when we
have serveral threads. Howerver, pipelining is still available. So for now, the
thread part of the SPOE is pretty simple.
This commit is contained in:
Christopher Faulet 2017-09-25 14:48:02 +02:00 committed by Willy Tarreau
parent 738a6d76f6
commit 24289f2e07
3 changed files with 109 additions and 68 deletions

View File

@ -167,6 +167,7 @@ enum lock_label {
COMP_POOL_LOCK, COMP_POOL_LOCK,
LUA_LOCK, LUA_LOCK,
NOTIF_LOCK, NOTIF_LOCK,
SPOE_APPLET_LOCK,
LOCK_LABELS LOCK_LABELS
}; };
struct lock_stat { struct lock_stat {
@ -255,7 +256,7 @@ static inline void show_lock_stats()
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS", "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
"APPLETS", "PEER", "BUF_WQ", "STREAMS", "SSL", "SSL_GEN_CERTS", "APPLETS", "PEER", "BUF_WQ", "STREAMS", "SSL", "SSL_GEN_CERTS",
"PATREF", "PATEXP", "PATLRU", "VARS", "COMP_POOL", "LUA", "PATREF", "PATEXP", "PATLRU", "VARS", "COMP_POOL", "LUA",
"NOTIF" }; "NOTIF", "SPOE_APPLET" };
int lbl; int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) { for (lbl = 0; lbl < LOCK_LABELS; lbl++) {

View File

@ -24,6 +24,7 @@
#include <common/buffer.h> #include <common/buffer.h>
#include <common/mini-clist.h> #include <common/mini-clist.h>
#include <common/hathreads.h>
#include <types/filters.h> #include <types/filters.h>
#include <types/freq_ctr.h> #include <types/freq_ctr.h>
@ -251,6 +252,7 @@ struct spoe_agent {
struct list messages; /* list of all messages attached to this SPOE agent */ struct list messages; /* list of all messages attached to this SPOE agent */
/* running info */ /* running info */
struct {
unsigned int frame_size; /* current maximum frame size, only used to encode messages */ unsigned int frame_size; /* current maximum frame size, only used to encode messages */
unsigned int applets_act; /* # of applets alive at a time */ unsigned int applets_act; /* # of applets alive at a time */
unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */ unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */
@ -263,6 +265,11 @@ struct spoe_agent {
struct list sending_queue; /* Queue of streams waiting to send data */ struct list sending_queue; /* Queue of streams waiting to send data */
struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */ struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */
#ifdef USE_THREAD
HA_SPINLOCK_T lock;
#endif
} *rt;
}; };
/* SPOE filter configuration */ /* SPOE filter configuration */

View File

@ -153,6 +153,7 @@ spoe_release_agent(struct spoe_agent *agent)
{ {
struct spoe_message *msg, *msgback; struct spoe_message *msg, *msgback;
struct spoe_group *grp, *grpback; struct spoe_group *grp, *grpback;
int i;
if (!agent) if (!agent)
return; return;
@ -169,6 +170,9 @@ spoe_release_agent(struct spoe_agent *agent)
LIST_DEL(&grp->list); LIST_DEL(&grp->list);
spoe_release_group(grp); spoe_release_group(grp);
} }
for (i = 0; i < global.nbthread; ++i)
SPIN_DESTROY(&agent->rt[i].lock);
free(agent->rt);
free(agent); free(agent);
} }
@ -974,7 +978,7 @@ spoe_handle_agentack_frame(struct appctx *appctx, struct spoe_context **ctx,
/* Try to find the corresponding SPOE context */ /* Try to find the corresponding SPOE context */
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
list_for_each_entry((*ctx), &agent->waiting_queue, list) { list_for_each_entry((*ctx), &agent->rt[tid].waiting_queue, list) {
if ((*ctx)->stream_id == (unsigned int)stream_id && if ((*ctx)->stream_id == (unsigned int)stream_id &&
(*ctx)->frame_id == (unsigned int)frame_id) (*ctx)->frame_id == (unsigned int)frame_id)
goto found; goto found;
@ -1234,7 +1238,7 @@ spoe_release_appctx(struct appctx *appctx)
__FUNCTION__, appctx); __FUNCTION__, appctx);
/* Remove applet from the list of running applets */ /* Remove applet from the list of running applets */
agent->applets_act--; agent->rt[tid].applets_act--;
if (!LIST_ISEMPTY(&spoe_appctx->list)) { if (!LIST_ISEMPTY(&spoe_appctx->list)) {
LIST_DEL(&spoe_appctx->list); LIST_DEL(&spoe_appctx->list);
LIST_INIT(&spoe_appctx->list); LIST_INIT(&spoe_appctx->list);
@ -1243,7 +1247,7 @@ spoe_release_appctx(struct appctx *appctx)
/* Shutdown the server connection, if needed */ /* Shutdown the server connection, if needed */
if (appctx->st0 != SPOE_APPCTX_ST_END) { if (appctx->st0 != SPOE_APPCTX_ST_END) {
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
agent->applets_idle--; agent->rt[tid].applets_idle--;
appctx->st0 = SPOE_APPCTX_ST_END; appctx->st0 = SPOE_APPCTX_ST_END;
if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE) if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
@ -1284,18 +1288,18 @@ spoe_release_appctx(struct appctx *appctx)
&spoe_appctx->buffer_wait); &spoe_appctx->buffer_wait);
pool_free2(pool2_spoe_appctx, spoe_appctx); pool_free2(pool2_spoe_appctx, spoe_appctx);
if (!LIST_ISEMPTY(&agent->applets)) if (!LIST_ISEMPTY(&agent->rt[tid].applets))
goto end; goto end;
/* If this was the last running applet, notify all waiting streams */ /* If this was the last running applet, notify all waiting streams */
list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) { list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
LIST_DEL(&ctx->list); LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list); LIST_INIT(&ctx->list);
ctx->state = SPOE_CTX_ST_ERROR; ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100); ctx->status_code = (spoe_appctx->status_code + 0x100);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
} }
list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) { list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
LIST_DEL(&ctx->list); LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list); LIST_INIT(&ctx->list);
ctx->state = SPOE_CTX_ST_ERROR; ctx->state = SPOE_CTX_ST_ERROR;
@ -1305,10 +1309,9 @@ spoe_release_appctx(struct appctx *appctx)
end: end:
/* Update runtinme agent info */ /* Update runtinme agent info */
agent->frame_size = agent->max_frame_size; agent->rt[tid].frame_size = agent->max_frame_size;
list_for_each_entry(spoe_appctx, &agent->applets, list) list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list)
agent->frame_size = MIN(spoe_appctx->max_frame_size, HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, spoe_appctx->max_frame_size);
agent->frame_size);
} }
static int static int
@ -1421,14 +1424,15 @@ spoe_handle_connecting_appctx(struct appctx *appctx)
default: default:
/* HELLO handshake is finished, set the idle timeout and /* HELLO handshake is finished, set the idle timeout and
* add the applet in the list of running applets. */ * add the applet in the list of running applets. */
agent->applets_idle++; agent->rt[tid].applets_idle++;
appctx->st0 = SPOE_APPCTX_ST_IDLE; appctx->st0 = SPOE_APPCTX_ST_IDLE;
SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
LIST_DEL(&SPOE_APPCTX(appctx)->list); LIST_DEL(&SPOE_APPCTX(appctx)->list);
LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list); LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
/* Update runtinme agent info */ /* Update runtinme agent info */
agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size, HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size);
agent->frame_size);
goto next; goto next;
} }
@ -1465,13 +1469,13 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
ret = spoe_prepare_hafrag_frame(appctx, ctx, frame, ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
SPOE_APPCTX(appctx)->max_frame_size); SPOE_APPCTX(appctx)->max_frame_size);
} }
else if (LIST_ISEMPTY(&agent->sending_queue)) { else if (LIST_ISEMPTY(&agent->rt[tid].sending_queue)) {
*skip = 1; *skip = 1;
ret = 1; ret = 1;
goto end; goto end;
} }
else { else {
ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list); ctx = LIST_NEXT(&agent->rt[tid].sending_queue, typeof(ctx), list);
ret = spoe_prepare_hanotify_frame(appctx, ctx, frame, ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
SPOE_APPCTX(appctx)->max_frame_size); SPOE_APPCTX(appctx)->max_frame_size);
@ -1532,7 +1536,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
no_frag_frame_sent: no_frag_frame_sent:
if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) { if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
appctx->st0 = SPOE_APPCTX_ST_PROCESSING; appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
LIST_ADDQ(&agent->waiting_queue, &ctx->list); LIST_ADDQ(&agent->rt[tid].waiting_queue, &ctx->list);
} }
else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) { else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
appctx->st0 = SPOE_APPCTX_ST_PROCESSING; appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
@ -1660,7 +1664,7 @@ spoe_handle_processing_appctx(struct appctx *appctx)
goto next; goto next;
case 0: /* ignore */ case 0: /* ignore */
agent->sending_rate++; agent->rt[tid].sending_rate++;
fpa++; fpa++;
break; break;
@ -1668,7 +1672,7 @@ spoe_handle_processing_appctx(struct appctx *appctx)
break; break;
default: default:
agent->sending_rate++; agent->rt[tid].sending_rate++;
fpa++; fpa++;
break; break;
} }
@ -1703,11 +1707,13 @@ spoe_handle_processing_appctx(struct appctx *appctx)
stop: stop:
if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) { if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) {
appctx->st0 = SPOE_APPCTX_ST_IDLE; appctx->st0 = SPOE_APPCTX_ST_IDLE;
agent->applets_idle++; agent->rt[tid].applets_idle++;
} }
if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) { if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) {
SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
LIST_DEL(&SPOE_APPCTX(appctx)->list); LIST_DEL(&SPOE_APPCTX(appctx)->list);
LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list); LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
if (fpa) if (fpa)
SPOE_APPCTX(appctx)->task->expire = SPOE_APPCTX(appctx)->task->expire =
tick_add_ifset(now_ms, agent->timeout.idle); tick_add_ifset(now_ms, agent->timeout.idle);
@ -1868,14 +1874,14 @@ spoe_handle_appctx(struct appctx *appctx)
case SPOE_APPCTX_ST_IDLE: case SPOE_APPCTX_ST_IDLE:
if (stopping && if (stopping &&
LIST_ISEMPTY(&agent->sending_queue) && LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) { LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
SPOE_APPCTX(appctx)->task->expire = SPOE_APPCTX(appctx)->task->expire =
tick_add_ifset(now_ms, agent->timeout.idle); tick_add_ifset(now_ms, agent->timeout.idle);
appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
goto switchstate; goto switchstate;
} }
agent->applets_idle--; agent->rt[tid].applets_idle--;
appctx->st0 = SPOE_APPCTX_ST_PROCESSING; appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
/* fall through */ /* fall through */
@ -1909,6 +1915,9 @@ spoe_handle_appctx(struct appctx *appctx)
return; return;
} }
out: out:
if (stopping)
spoe_wakeup_appctx(appctx);
if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY) if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY)
task_queue(SPOE_APPCTX(appctx)->task); task_queue(SPOE_APPCTX(appctx)->task);
si_oc(si)->flags |= CF_READ_DONTWAIT; si_oc(si)->flags |= CF_READ_DONTWAIT;
@ -1940,7 +1949,7 @@ spoe_create_appctx(struct spoe_config *conf)
memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size); memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size);
appctx->st0 = SPOE_APPCTX_ST_CONNECT; appctx->st0 = SPOE_APPCTX_ST_CONNECT;
if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL) if ((SPOE_APPCTX(appctx)->task = task_new(1UL << tid)) == NULL)
goto out_free_spoe_appctx; goto out_free_spoe_appctx;
SPOE_APPCTX(appctx)->owner = appctx; SPOE_APPCTX(appctx)->owner = appctx;
@ -1976,8 +1985,10 @@ spoe_create_appctx(struct spoe_config *conf)
strm->do_log = NULL; strm->do_log = NULL;
strm->res.flags |= CF_READ_DONTWAIT; strm->res.flags |= CF_READ_DONTWAIT;
LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list); SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
conf->agent->applets_act++; LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
conf->agent->rt[tid].applets_act++;
task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT); task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
task_wakeup(strm->task, TASK_WOKEN_INIT); task_wakeup(strm->task, TASK_WOKEN_INIT);
@ -2008,9 +2019,9 @@ spoe_queue_context(struct spoe_context *ctx)
min_applets = min_applets_act(agent); min_applets = min_applets_act(agent);
/* Check if we need to create a new SPOE applet or not. */ /* Check if we need to create a new SPOE applet or not. */
if (agent->applets_act >= min_applets && if (agent->rt[tid].applets_act >= min_applets &&
agent->applets_idle && agent->rt[tid].applets_idle &&
agent->sending_rate) agent->rt[tid].sending_rate)
goto end; goto end;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
@ -2031,7 +2042,7 @@ spoe_queue_context(struct spoe_context *ctx)
/* Do not try to create a new applet if we have reached the maximum of /* Do not try to create a new applet if we have reached the maximum of
* connection per seconds */ * connection per seconds */
if (agent->cps_max > 0) { if (agent->cps_max > 0) {
if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) { if (!freq_ctr_remain(&agent->rt[tid].conn_per_sec, agent->cps_max, 0)) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - cannot create SPOE appctx: max CPS reached\n", " - cannot create SPOE appctx: max CPS reached\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id, (int)now.tv_sec, (int)now.tv_usec, agent->id,
@ -2052,41 +2063,43 @@ spoe_queue_context(struct spoe_context *ctx)
goto end; goto end;
} }
if (agent->applets_act <= min_applets) if (agent->rt[tid].applets_act <= min_applets)
SPOE_APPCTX(appctx)->flags |= SPOE_APPCTX_FL_PERSIST; SPOE_APPCTX(appctx)->flags |= SPOE_APPCTX_FL_PERSIST;
/* Increase the per-process number of cumulated connections */ /* Increase the per-process number of cumulated connections */
if (agent->cps_max > 0) if (agent->cps_max > 0)
update_freq_ctr(&agent->conn_per_sec, 1); update_freq_ctr(&agent->rt[tid].conn_per_sec, 1);
end: end:
/* The only reason to return an error is when there is no applet */ /* The only reason to return an error is when there is no applet */
if (LIST_ISEMPTY(&agent->applets)) { if (LIST_ISEMPTY(&agent->rt[tid].applets)) {
ctx->status_code = SPOE_CTX_ERR_RES; ctx->status_code = SPOE_CTX_ERR_RES;
return -1; return -1;
} }
/* Add the SPOE context in the sending queue and update all running /* Add the SPOE context in the sending queue and update all running
* info */ * info */
LIST_ADDQ(&agent->sending_queue, &ctx->list); LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
if (agent->sending_rate) if (agent->rt[tid].sending_rate)
agent->sending_rate--; agent->rt[tid].sending_rate--;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - Add stream in sending queue" " - Add stream in sending queue"
" - applets_act=%u - applets_idle=%u - sending_rate=%u\n", " - applets_act=%u - applets_idle=%u - sending_rate=%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
ctx->strm, agent->applets_act, agent->applets_idle, ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
agent->sending_rate); agent->rt[tid].sending_rate);
/* Finally try to wakeup the first IDLE applet found and move it at the /* Finally try to wakeup the first IDLE applet found and move it at the
* end of the list. */ * end of the list. */
list_for_each_entry(spoe_appctx, &agent->applets, list) { list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) {
appctx = spoe_appctx->owner; appctx = spoe_appctx->owner;
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) { if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
spoe_wakeup_appctx(appctx); spoe_wakeup_appctx(appctx);
SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
LIST_DEL(&spoe_appctx->list); LIST_DEL(&spoe_appctx->list);
LIST_ADDQ(&agent->applets, &spoe_appctx->list); LIST_ADDQ(&agent->rt[tid].applets, &spoe_appctx->list);
SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
break; break;
} }
} }
@ -2189,7 +2202,7 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
char *p, *end; char *p, *end;
p = ctx->buffer->p; p = ctx->buffer->p;
end = p + agent->frame_size - FRAME_HDR_SIZE; end = p + agent->rt[tid].frame_size - FRAME_HDR_SIZE;
if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */ if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */
/* Resume encoding of a SPOE message */ /* Resume encoding of a SPOE message */
@ -2239,7 +2252,7 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
(int)now.tv_sec, (int)now.tv_usec, (int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, agent->id, __FUNCTION__, s,
((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"), ((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"),
ctx->frag_ctx.spoe_appctx, (agent->frame_size - FRAME_HDR_SIZE), ctx->frag_ctx.spoe_appctx, (agent->rt[tid].frame_size - FRAME_HDR_SIZE),
p - ctx->buffer->p); p - ctx->buffer->p);
ctx->buffer->i = p - ctx->buffer->p; ctx->buffer->i = p - ctx->buffer->p;
@ -2263,7 +2276,7 @@ spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
(int)now.tv_sec, (int)now.tv_usec, (int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx, agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx,
ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff, ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff,
(agent->frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p); (agent->rt[tid].frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p);
ctx->buffer->i = p - ctx->buffer->p; ctx->buffer->i = p - ctx->buffer->p;
ctx->flags |= SPOE_CTX_FL_FRAGMENTED; ctx->flags |= SPOE_CTX_FL_FRAGMENTED;
@ -2504,7 +2517,7 @@ spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
struct spoe_context *ctx, int dir) struct spoe_context *ctx, int dir)
{ {
if (agent->eps_max > 0) if (agent->eps_max > 0)
update_freq_ctr(&agent->err_per_sec, 1); update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
if (agent->var_on_error) { if (agent->var_on_error) {
struct sample smp; struct sample smp;
@ -2557,7 +2570,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx,
if (ctx->state == SPOE_CTX_ST_READY) { if (ctx->state == SPOE_CTX_ST_READY) {
if (agent->eps_max > 0) { if (agent->eps_max > 0) {
if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) { if (!freq_ctr_remain(&agent->rt[tid].err_per_sec, agent->eps_max, 0)) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - skip processing of messages: max EPS reached\n", " - skip processing of messages: max EPS reached\n",
(int)now.tv_sec, (int)now.tv_usec, (int)now.tv_sec, (int)now.tv_usec,
@ -2791,6 +2804,7 @@ spoe_sig_stop(struct sig_handler *sh)
struct spoe_config *conf; struct spoe_config *conf;
struct spoe_agent *agent; struct spoe_agent *agent;
struct spoe_appctx *spoe_appctx; struct spoe_appctx *spoe_appctx;
int i;
if (fconf->id != spoe_filter_id) if (fconf->id != spoe_filter_id)
continue; continue;
@ -2798,8 +2812,11 @@ spoe_sig_stop(struct sig_handler *sh)
conf = fconf->conf; conf = fconf->conf;
agent = conf->agent; agent = conf->agent;
list_for_each_entry(spoe_appctx, &agent->applets, list) { for (i = 0; i < global.nbthread; ++i) {
SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
list_for_each_entry(spoe_appctx, &agent->rt[i].applets, list)
spoe_wakeup_appctx(spoe_appctx->owner); spoe_wakeup_appctx(spoe_appctx->owner);
SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
} }
} }
p = p->next; p = p->next;
@ -3177,7 +3194,9 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
curagent->engine_id = NULL; curagent->engine_id = NULL;
curagent->var_pfx = NULL; curagent->var_pfx = NULL;
curagent->var_on_error = NULL; curagent->var_on_error = NULL;
curagent->flags = (SPOE_FL_PIPELINING | SPOE_FL_ASYNC | SPOE_FL_SND_FRAGMENTATION); curagent->flags = (SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION);
if (global.nbthread == 1)
curagent->flags |= SPOE_FL_ASYNC;
curagent->cps_max = 0; curagent->cps_max = 0;
curagent->eps_max = 0; curagent->eps_max = 0;
curagent->max_frame_size = MAX_FRAME_SIZE; curagent->max_frame_size = MAX_FRAME_SIZE;
@ -3189,14 +3208,21 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
LIST_INIT(&curagent->groups); LIST_INIT(&curagent->groups);
LIST_INIT(&curagent->messages); LIST_INIT(&curagent->messages);
curagent->frame_size = curagent->max_frame_size; if ((curagent->rt = calloc(global.nbthread, sizeof(*curagent->rt))) == NULL) {
curagent->applets_act = 0; Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
curagent->applets_idle = 0; err_code |= ERR_ALERT | ERR_ABORT;
curagent->sending_rate = 0; goto out;
}
LIST_INIT(&curagent->applets); for (i = 0; i < global.nbthread; ++i) {
LIST_INIT(&curagent->sending_queue); curagent->rt[i].frame_size = curagent->max_frame_size;
LIST_INIT(&curagent->waiting_queue); curagent->rt[i].applets_act = 0;
curagent->rt[i].applets_idle = 0;
curagent->rt[i].sending_rate = 0;
LIST_INIT(&curagent->rt[i].applets);
LIST_INIT(&curagent->rt[i].sending_queue);
LIST_INIT(&curagent->rt[i].waiting_queue);
SPIN_INIT(&curagent->rt[i].lock);
}
} }
else if (!strcmp(args[0], "use-backend")) { else if (!strcmp(args[0], "use-backend")) {
if (!*args[1]) { if (!*args[1]) {
@ -3320,8 +3346,15 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
goto out; goto out;
if (kwm == 1) if (kwm == 1)
curagent->flags &= ~SPOE_FL_ASYNC; curagent->flags &= ~SPOE_FL_ASYNC;
else else {
if (global.nbthread == 1)
curagent->flags |= SPOE_FL_ASYNC; curagent->flags |= SPOE_FL_ASYNC;
else {
Warning("parsing [%s:%d] Async option is not supported with threads.\n",
file, linenum);
err_code |= ERR_WARN;
}
}
goto out; goto out;
} }
else if (!strcmp(args[1], "send-frag-payload")) { else if (!strcmp(args[1], "send-frag-payload")) {