MINOR: spoe: Add 'timeout processing' option to limit time to process an event

It is a way to set the maximum time to wait for a stream to process an event,
i.e to acquire a stream to talk with an agent, to encode all messages, to send
the NOTIFY frame, to receive the corrsponding acknowledgement and to process all
actions. It is applied on the stream that handle the client and the server
sessions.
This commit is contained in:
Christopher Faulet 2016-11-10 15:04:51 +01:00 committed by Willy Tarreau
parent a00d817aba
commit f7a3092512
2 changed files with 82 additions and 19 deletions

View File

@ -158,7 +158,7 @@ spoe-agent <name>
following keywords are supported :
- messages
- option var-prefix
- timeout hello|idle|ack
- timeout hello|idle|ack|processing
- use-backend
@ -200,7 +200,8 @@ option var-prefix <prefix>
timeout ack <timeout>
Set the maximum time to wait for an agent to receive the acknowledgement to a
NOTIFY frame.
NOTIFY frame. It is applied on the stream that handle the connection with the
agent.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
@ -210,6 +211,7 @@ timeout ack <timeout>
timeout hello <timeout>
Set the maximum time to wait for an agent to receive the AGENT-HELLO frame.
It is applied on the stream that handle the connection with the agent.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
@ -221,7 +223,21 @@ timeout hello <timeout>
timeout idle <timeout>
Set the maximum time to wait for an agent to close an idle connection.
Set the maximum time to wait for an agent to close an idle connection. It is
applied on the stream that handle the connection with the agent.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
can be in any other unit if the number is suffixed by the unit,
as explained at the top of this document.
timeout processing <timeout>
Set the maximum time to wait for a stream to process an event, i.e to acquire
a stream to talk with an agent, to encode all messages, to send the NOTIFY
frame, to receive the corrsponding acknowledgement and to process all
actions. It is applied on the stream that handle the client and the server
sessions.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but

View File

@ -190,9 +190,10 @@ struct spoe_agent {
char *name; /* Backend name used during conf parsing */
} b;
struct {
unsigned int hello; /* Max time to receive AGENT-HELLO frame */
unsigned int idle; /* Max Idle timeout */
unsigned int ack; /* Max time to acknowledge a NOTIFY frame */
unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
unsigned int idle; /* Max Idle timeout (in SPOE applet) */
unsigned int ack; /* Max time to acknowledge a NOTIFY frame (in SPOE applet)*/
unsigned int processing; /* Max time to process an event (in the main stream) */
} timeout;
char *var_pfx; /* Prefix used for vars set by the agent */
@ -232,7 +233,7 @@ struct spoe_context {
unsigned int stream_id; /* stream_id and frame_id are used */
unsigned int frame_id; /* to map NOTIFY and ACK frames */
unsigned int process_exp; /* expiration date to process an event */
};
/* Set if the handle on SIGUSR1 is registered */
@ -1636,6 +1637,9 @@ offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
{
struct spoe_context *ctx;
if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
return;
if (LIST_ISEMPTY(&agent->applet_wq))
LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
else {
@ -1787,6 +1791,9 @@ release_spoe_appctx(struct spoe_context *ctx)
/* Reset the flag to allow next processing */
ctx->flags &= ~SPOE_CTX_FL_PROCESS;
/* Reset processing timer */
ctx->process_exp = TICK_ETERNITY;
/* Release the buffer if needed */
if (ctx->buffer != &buf_empty) {
b_free(&ctx->buffer);
@ -2084,13 +2091,14 @@ static int
process_spoe_event(struct stream *s, struct spoe_context *ctx,
enum spoe_event ev)
{
int dir, ret = 1;
struct spoe_config *conf = FLT_CONF(ctx->filter);
struct spoe_agent *agent = conf->agent;
int dir, ret = 1;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - ctx-state=%s - event=%s\n",
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
__FUNCTION__, s, spoe_ctx_state_str[ctx->state],
agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
spoe_event_str[ev]);
dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
@ -2101,7 +2109,24 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
if (ctx->state == SPOE_CTX_ST_ERROR)
goto error;
if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - failed to process event '%s': timeout\n",
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, spoe_event_str[ev]);
send_log(ctx->strm->be, LOG_WARNING,
"failed to process event '%s': timeout.\n",
spoe_event_str[ev]);
goto error;
}
if (ctx->state == SPOE_CTX_ST_READY) {
if (!tick_isset(ctx->process_exp)) {
ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
ctx->process_exp);
}
ret = acquire_spoe_appctx(ctx, dir);
if (ret <= 0) {
if (!ret)
@ -2182,8 +2207,9 @@ create_spoe_context(struct filter *filter)
LIST_INIT(&ctx->buffer_wait);
LIST_INIT(&ctx->applet_wait);
ctx->stream_id = 0;
ctx->frame_id = 1;
ctx->stream_id = 0;
ctx->frame_id = 1;
ctx->process_exp = TICK_ETERNITY;
return ctx;
}
@ -2399,6 +2425,19 @@ spoe_stop(struct stream *s, struct filter *filter)
}
}
/*
* Called when the stream is woken up because of expired timer.
*/
static void
spoe_check_timeouts(struct stream *s, struct filter *filter)
{
struct spoe_context *ctx = filter->ctx;
if (tick_is_expired(ctx->process_exp, now_ms))
s->task->state |= TASK_WOKEN_MSG;
}
/* Called when we are ready to filter data on a channel */
static int
spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
@ -2528,8 +2567,9 @@ struct flt_ops spoe_ops = {
.check = spoe_check,
/* Handle start/stop of SPOE */
.attach = spoe_start,
.detach = spoe_stop,
.attach = spoe_start,
.detach = spoe_stop,
.check_timeouts = spoe_check_timeouts,
/* Handle channels activity */
.channel_start_analyze = spoe_start_analyze,
@ -2589,6 +2629,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
curagent->timeout.hello = TICK_ETERNITY;
curagent->timeout.ack = TICK_ETERNITY;
curagent->timeout.idle = TICK_ETERNITY;
curagent->timeout.processing = TICK_ETERNITY;
curagent->var_pfx = NULL;
curagent->new_applets = 0;
@ -2654,8 +2695,10 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
tv = &curagent->timeout.idle;
else if (!strcmp(args[1], "ack"))
tv = &curagent->timeout.ack;
else if (!strcmp(args[1], "processing"))
tv = &curagent->timeout.processing;
else {
Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' and 'ack' (got %s).\n",
Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle', 'ack' or 'processing' (got %s).\n",
file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
@ -2956,13 +2999,17 @@ parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
curagent->id, curagent->conf.file, curagent->conf.line);
goto error;
}
if (curagent->timeout.hello == TICK_ETERNITY ||
curagent->timeout.idle == TICK_ETERNITY ||
curagent->timeout.ack == TICK_ETERNITY) {
if (curagent->timeout.hello == TICK_ETERNITY ||
curagent->timeout.idle == TICK_ETERNITY ||
curagent->timeout.ack == TICK_ETERNITY ||
curagent->timeout.processing == TICK_ETERNITY) {
if (curagent->timeout.ack == TICK_ETERNITY)
curagent->timeout.ack = curagent->timeout.idle;
Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
" | While not properly invalid, you will certainly encounter various problems\n"
" | with such a configuration. To fix this, please ensure that all following\n"
" | timeouts are set to a non-zero value: 'hello', 'idle', 'ack'.\n",
" | timeouts are set to a non-zero value: 'hello', 'idle', 'ack', 'processing'.\n",
px->id, curagent->id, curagent->conf.file, curagent->conf.line);
}
if (curagent->var_pfx == NULL) {