diff --git a/doc/SPOE.txt b/doc/SPOE.txt index fa0a53367..f9aac7108 100644 --- a/doc/SPOE.txt +++ b/doc/SPOE.txt @@ -158,7 +158,7 @@ spoe-agent following keywords are supported : - messages - option var-prefix - - timeout hello|idle|ack + - timeout hello|idle|ack|processing - use-backend @@ -200,7 +200,8 @@ option var-prefix timeout ack Set the maximum time to wait for an agent to receive the acknowledgement to a - NOTIFY frame. + NOTIFY frame. It is applied on the stream that handle the connection with the + agent. Arguments : is the timeout value specified in milliseconds by default, but @@ -210,6 +211,7 @@ timeout ack timeout hello Set the maximum time to wait for an agent to receive the AGENT-HELLO frame. + It is applied on the stream that handle the connection with the agent. Arguments : is the timeout value specified in milliseconds by default, but @@ -221,7 +223,21 @@ timeout hello timeout idle - Set the maximum time to wait for an agent to close an idle connection. + Set the maximum time to wait for an agent to close an idle connection. It is + applied on the stream that handle the connection with the agent. + + Arguments : + is the timeout value specified in milliseconds by default, but + can be in any other unit if the number is suffixed by the unit, + as explained at the top of this document. + + +timeout processing + Set the maximum time to wait for a stream to process an event, i.e to acquire + a stream to talk with an agent, to encode all messages, to send the NOTIFY + frame, to receive the corrsponding acknowledgement and to process all + actions. It is applied on the stream that handle the client and the server + sessions. Arguments : is the timeout value specified in milliseconds by default, but diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 12e589ebc..b3f2ef3a3 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -190,9 +190,10 @@ struct spoe_agent { char *name; /* Backend name used during conf parsing */ } b; struct { - unsigned int hello; /* Max time to receive AGENT-HELLO frame */ - unsigned int idle; /* Max Idle timeout */ - unsigned int ack; /* Max time to acknowledge a NOTIFY frame */ + unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */ + unsigned int idle; /* Max Idle timeout (in SPOE applet) */ + unsigned int ack; /* Max time to acknowledge a NOTIFY frame (in SPOE applet)*/ + unsigned int processing; /* Max time to process an event (in the main stream) */ } timeout; char *var_pfx; /* Prefix used for vars set by the agent */ @@ -232,7 +233,7 @@ struct spoe_context { unsigned int stream_id; /* stream_id and frame_id are used */ unsigned int frame_id; /* to map NOTIFY and ACK frames */ - + unsigned int process_exp; /* expiration date to process an event */ }; /* Set if the handle on SIGUSR1 is registered */ @@ -1636,6 +1637,9 @@ offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx) { struct spoe_context *ctx; + if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING) + return; + if (LIST_ISEMPTY(&agent->applet_wq)) LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list); else { @@ -1787,6 +1791,9 @@ release_spoe_appctx(struct spoe_context *ctx) /* Reset the flag to allow next processing */ ctx->flags &= ~SPOE_CTX_FL_PROCESS; + /* Reset processing timer */ + ctx->process_exp = TICK_ETERNITY; + /* Release the buffer if needed */ if (ctx->buffer != &buf_empty) { b_free(&ctx->buffer); @@ -2084,13 +2091,14 @@ static int process_spoe_event(struct stream *s, struct spoe_context *ctx, enum spoe_event ev) { - int dir, ret = 1; + struct spoe_config *conf = FLT_CONF(ctx->filter); + struct spoe_agent *agent = conf->agent; + int dir, ret = 1; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - ctx-state=%s - event=%s\n", (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, - __FUNCTION__, s, spoe_ctx_state_str[ctx->state], + agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state], spoe_event_str[ev]); dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES); @@ -2101,7 +2109,24 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, if (ctx->state == SPOE_CTX_ST_ERROR) goto error; + if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - failed to process event '%s': timeout\n", + (int)now.tv_sec, (int)now.tv_usec, + agent->id, __FUNCTION__, s, spoe_event_str[ev]); + send_log(ctx->strm->be, LOG_WARNING, + "failed to process event '%s': timeout.\n", + spoe_event_str[ev]); + goto error; + } + if (ctx->state == SPOE_CTX_ST_READY) { + if (!tick_isset(ctx->process_exp)) { + ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing); + s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire), + ctx->process_exp); + } + ret = acquire_spoe_appctx(ctx, dir); if (ret <= 0) { if (!ret) @@ -2182,8 +2207,9 @@ create_spoe_context(struct filter *filter) LIST_INIT(&ctx->buffer_wait); LIST_INIT(&ctx->applet_wait); - ctx->stream_id = 0; - ctx->frame_id = 1; + ctx->stream_id = 0; + ctx->frame_id = 1; + ctx->process_exp = TICK_ETERNITY; return ctx; } @@ -2399,6 +2425,19 @@ spoe_stop(struct stream *s, struct filter *filter) } } + +/* + * Called when the stream is woken up because of expired timer. + */ +static void +spoe_check_timeouts(struct stream *s, struct filter *filter) +{ + struct spoe_context *ctx = filter->ctx; + + if (tick_is_expired(ctx->process_exp, now_ms)) + s->task->state |= TASK_WOKEN_MSG; +} + /* Called when we are ready to filter data on a channel */ static int spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn) @@ -2528,8 +2567,9 @@ struct flt_ops spoe_ops = { .check = spoe_check, /* Handle start/stop of SPOE */ - .attach = spoe_start, - .detach = spoe_stop, + .attach = spoe_start, + .detach = spoe_stop, + .check_timeouts = spoe_check_timeouts, /* Handle channels activity */ .channel_start_analyze = spoe_start_analyze, @@ -2589,6 +2629,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) curagent->timeout.hello = TICK_ETERNITY; curagent->timeout.ack = TICK_ETERNITY; curagent->timeout.idle = TICK_ETERNITY; + curagent->timeout.processing = TICK_ETERNITY; curagent->var_pfx = NULL; curagent->new_applets = 0; @@ -2654,8 +2695,10 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) tv = &curagent->timeout.idle; else if (!strcmp(args[1], "ack")) tv = &curagent->timeout.ack; + else if (!strcmp(args[1], "processing")) + tv = &curagent->timeout.processing; else { - Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' and 'ack' (got %s).\n", + Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle', 'ack' or 'processing' (got %s).\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto out; @@ -2956,13 +2999,17 @@ parse_spoe_flt(char **args, int *cur_arg, struct proxy *px, curagent->id, curagent->conf.file, curagent->conf.line); goto error; } - if (curagent->timeout.hello == TICK_ETERNITY || - curagent->timeout.idle == TICK_ETERNITY || - curagent->timeout.ack == TICK_ETERNITY) { + if (curagent->timeout.hello == TICK_ETERNITY || + curagent->timeout.idle == TICK_ETERNITY || + curagent->timeout.ack == TICK_ETERNITY || + curagent->timeout.processing == TICK_ETERNITY) { + if (curagent->timeout.ack == TICK_ETERNITY) + curagent->timeout.ack = curagent->timeout.idle; + Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n" " | While not properly invalid, you will certainly encounter various problems\n" " | with such a configuration. To fix this, please ensure that all following\n" - " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack'.\n", + " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack', 'processing'.\n", px->id, curagent->id, curagent->conf.file, curagent->conf.line); } if (curagent->var_pfx == NULL) {