From f7a30925126f100dd10714cd3dd0213ee4695e14 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Thu, 10 Nov 2016 15:04:51 +0100 Subject: [PATCH] MINOR: spoe: Add 'timeout processing' option to limit time to process an event It is a way to set the maximum time to wait for a stream to process an event, i.e to acquire a stream to talk with an agent, to encode all messages, to send the NOTIFY frame, to receive the corrsponding acknowledgement and to process all actions. It is applied on the stream that handle the client and the server sessions. --- doc/SPOE.txt | 22 ++++++++++++-- src/flt_spoe.c | 79 ++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 82 insertions(+), 19 deletions(-) diff --git a/doc/SPOE.txt b/doc/SPOE.txt index fa0a53367..f9aac7108 100644 --- a/doc/SPOE.txt +++ b/doc/SPOE.txt @@ -158,7 +158,7 @@ spoe-agent following keywords are supported : - messages - option var-prefix - - timeout hello|idle|ack + - timeout hello|idle|ack|processing - use-backend @@ -200,7 +200,8 @@ option var-prefix timeout ack Set the maximum time to wait for an agent to receive the acknowledgement to a - NOTIFY frame. + NOTIFY frame. It is applied on the stream that handle the connection with the + agent. Arguments : is the timeout value specified in milliseconds by default, but @@ -210,6 +211,7 @@ timeout ack timeout hello Set the maximum time to wait for an agent to receive the AGENT-HELLO frame. + It is applied on the stream that handle the connection with the agent. Arguments : is the timeout value specified in milliseconds by default, but @@ -221,7 +223,21 @@ timeout hello timeout idle - Set the maximum time to wait for an agent to close an idle connection. + Set the maximum time to wait for an agent to close an idle connection. It is + applied on the stream that handle the connection with the agent. + + Arguments : + is the timeout value specified in milliseconds by default, but + can be in any other unit if the number is suffixed by the unit, + as explained at the top of this document. + + +timeout processing + Set the maximum time to wait for a stream to process an event, i.e to acquire + a stream to talk with an agent, to encode all messages, to send the NOTIFY + frame, to receive the corrsponding acknowledgement and to process all + actions. It is applied on the stream that handle the client and the server + sessions. Arguments : is the timeout value specified in milliseconds by default, but diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 12e589ebc..b3f2ef3a3 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -190,9 +190,10 @@ struct spoe_agent { char *name; /* Backend name used during conf parsing */ } b; struct { - unsigned int hello; /* Max time to receive AGENT-HELLO frame */ - unsigned int idle; /* Max Idle timeout */ - unsigned int ack; /* Max time to acknowledge a NOTIFY frame */ + unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */ + unsigned int idle; /* Max Idle timeout (in SPOE applet) */ + unsigned int ack; /* Max time to acknowledge a NOTIFY frame (in SPOE applet)*/ + unsigned int processing; /* Max time to process an event (in the main stream) */ } timeout; char *var_pfx; /* Prefix used for vars set by the agent */ @@ -232,7 +233,7 @@ struct spoe_context { unsigned int stream_id; /* stream_id and frame_id are used */ unsigned int frame_id; /* to map NOTIFY and ACK frames */ - + unsigned int process_exp; /* expiration date to process an event */ }; /* Set if the handle on SIGUSR1 is registered */ @@ -1636,6 +1637,9 @@ offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx) { struct spoe_context *ctx; + if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING) + return; + if (LIST_ISEMPTY(&agent->applet_wq)) LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list); else { @@ -1787,6 +1791,9 @@ release_spoe_appctx(struct spoe_context *ctx) /* Reset the flag to allow next processing */ ctx->flags &= ~SPOE_CTX_FL_PROCESS; + /* Reset processing timer */ + ctx->process_exp = TICK_ETERNITY; + /* Release the buffer if needed */ if (ctx->buffer != &buf_empty) { b_free(&ctx->buffer); @@ -2084,13 +2091,14 @@ static int process_spoe_event(struct stream *s, struct spoe_context *ctx, enum spoe_event ev) { - int dir, ret = 1; + struct spoe_config *conf = FLT_CONF(ctx->filter); + struct spoe_agent *agent = conf->agent; + int dir, ret = 1; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - ctx-state=%s - event=%s\n", (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, - __FUNCTION__, s, spoe_ctx_state_str[ctx->state], + agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state], spoe_event_str[ev]); dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES); @@ -2101,7 +2109,24 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, if (ctx->state == SPOE_CTX_ST_ERROR) goto error; + if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - failed to process event '%s': timeout\n", + (int)now.tv_sec, (int)now.tv_usec, + agent->id, __FUNCTION__, s, spoe_event_str[ev]); + send_log(ctx->strm->be, LOG_WARNING, + "failed to process event '%s': timeout.\n", + spoe_event_str[ev]); + goto error; + } + if (ctx->state == SPOE_CTX_ST_READY) { + if (!tick_isset(ctx->process_exp)) { + ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing); + s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire), + ctx->process_exp); + } + ret = acquire_spoe_appctx(ctx, dir); if (ret <= 0) { if (!ret) @@ -2182,8 +2207,9 @@ create_spoe_context(struct filter *filter) LIST_INIT(&ctx->buffer_wait); LIST_INIT(&ctx->applet_wait); - ctx->stream_id = 0; - ctx->frame_id = 1; + ctx->stream_id = 0; + ctx->frame_id = 1; + ctx->process_exp = TICK_ETERNITY; return ctx; } @@ -2399,6 +2425,19 @@ spoe_stop(struct stream *s, struct filter *filter) } } + +/* + * Called when the stream is woken up because of expired timer. + */ +static void +spoe_check_timeouts(struct stream *s, struct filter *filter) +{ + struct spoe_context *ctx = filter->ctx; + + if (tick_is_expired(ctx->process_exp, now_ms)) + s->task->state |= TASK_WOKEN_MSG; +} + /* Called when we are ready to filter data on a channel */ static int spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn) @@ -2528,8 +2567,9 @@ struct flt_ops spoe_ops = { .check = spoe_check, /* Handle start/stop of SPOE */ - .attach = spoe_start, - .detach = spoe_stop, + .attach = spoe_start, + .detach = spoe_stop, + .check_timeouts = spoe_check_timeouts, /* Handle channels activity */ .channel_start_analyze = spoe_start_analyze, @@ -2589,6 +2629,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) curagent->timeout.hello = TICK_ETERNITY; curagent->timeout.ack = TICK_ETERNITY; curagent->timeout.idle = TICK_ETERNITY; + curagent->timeout.processing = TICK_ETERNITY; curagent->var_pfx = NULL; curagent->new_applets = 0; @@ -2654,8 +2695,10 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) tv = &curagent->timeout.idle; else if (!strcmp(args[1], "ack")) tv = &curagent->timeout.ack; + else if (!strcmp(args[1], "processing")) + tv = &curagent->timeout.processing; else { - Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' and 'ack' (got %s).\n", + Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle', 'ack' or 'processing' (got %s).\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto out; @@ -2956,13 +2999,17 @@ parse_spoe_flt(char **args, int *cur_arg, struct proxy *px, curagent->id, curagent->conf.file, curagent->conf.line); goto error; } - if (curagent->timeout.hello == TICK_ETERNITY || - curagent->timeout.idle == TICK_ETERNITY || - curagent->timeout.ack == TICK_ETERNITY) { + if (curagent->timeout.hello == TICK_ETERNITY || + curagent->timeout.idle == TICK_ETERNITY || + curagent->timeout.ack == TICK_ETERNITY || + curagent->timeout.processing == TICK_ETERNITY) { + if (curagent->timeout.ack == TICK_ETERNITY) + curagent->timeout.ack = curagent->timeout.idle; + Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n" " | While not properly invalid, you will certainly encounter various problems\n" " | with such a configuration. To fix this, please ensure that all following\n" - " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack'.\n", + " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack', 'processing'.\n", px->id, curagent->id, curagent->conf.file, curagent->conf.line); } if (curagent->var_pfx == NULL) {