diff --git a/doc/SPOE.txt b/doc/SPOE.txt index 016590e3f..756988f68 100644 --- a/doc/SPOE.txt +++ b/doc/SPOE.txt @@ -1200,7 +1200,8 @@ LOG_NOTICE. Otherwise, the message is logged with the level LOG_WARNING. The messages are logged using the agent's logger, if defined, and use the following format: - SPOE: [AGENT] sid=STREAM-ID st=STATUC-CODE reqT/qT/wT/resT/pT + SPOE: [AGENT] sid=STREAM-ID st=STATUC-CODE reqT/qT/wT/resT/pT \ + / / / AGENT is the agent name TYPE is EVENT of GROUP @@ -1221,6 +1222,14 @@ following format: point of view, it is the latency added by the SPOE processing. It is more or less the sum of values above. + is the numbers of idle SPOE applets + is the numbers of SPOE applets + is the numbers of streams waiting to send data + is the numbers of streams waiting for a ack + is the numbers of processing errors + is the numbers of events/groups processed + + For all these time events, -1 means the processing was interrupted before the end. So -1 for the queue time means the request was never dequeued. For fragmented frames it is harder to know when the interruption happened. diff --git a/include/types/spoe.h b/include/types/spoe.h index b7d54f344..c01a03efb 100644 --- a/include/types/spoe.h +++ b/include/types/spoe.h @@ -282,6 +282,14 @@ struct spoe_agent { __decl_hathreads(HA_SPINLOCK_T lock); } *rt; + struct { + unsigned int applets; /* # of SPOE applets */ + unsigned int idles; /* # of idle applets */ + unsigned int nb_sending; /* # of streams waiting to send data */ + unsigned int nb_waiting; /* # of streams waiting for a ack */ + unsigned long long nb_processed; /* # of frames processed by the SPOE */ + unsigned long long nb_errors; /* # of errors during the processing */ + } counters; }; /* SPOE filter configuration */ diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 3b425cc3c..7c019c412 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1233,6 +1233,7 @@ spoe_release_appctx(struct appctx *appctx) /* Remove applet from the list of running applets */ SPOE_DEBUG_STMT(agent->rt[tid].applets_act--); + HA_ATOMIC_SUB(&agent->counters.applets, 1); HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); if (!LIST_ISEMPTY(&spoe_appctx->list)) { LIST_DEL(&spoe_appctx->list); @@ -1245,6 +1246,7 @@ spoe_release_appctx(struct appctx *appctx) if (appctx->st0 == SPOE_APPCTX_ST_IDLE) { eb32_delete(&spoe_appctx->node); SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--); + HA_ATOMIC_SUB(&agent->counters.idles, 1); } appctx->st0 = SPOE_APPCTX_ST_END; @@ -1266,6 +1268,7 @@ spoe_release_appctx(struct appctx *appctx) list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) { LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); + HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1); spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting); ctx->state = SPOE_CTX_ST_ERROR; ctx->status_code = (spoe_appctx->status_code + 0x100); @@ -1294,6 +1297,7 @@ spoe_release_appctx(struct appctx *appctx) list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) { LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); + HA_ATOMIC_SUB(&agent->counters.nb_sending, 1); spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue); ctx->state = SPOE_CTX_ST_ERROR; ctx->status_code = (spoe_appctx->status_code + 0x100); @@ -1302,6 +1306,7 @@ spoe_release_appctx(struct appctx *appctx) list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) { LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); + HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1); spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting); ctx->state = SPOE_CTX_ST_ERROR; ctx->status_code = (spoe_appctx->status_code + 0x100); @@ -1426,6 +1431,7 @@ spoe_handle_connecting_appctx(struct appctx *appctx) /* HELLO handshake is finished, set the idle timeout and * add the applet in the list of running applets. */ SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++); + HA_ATOMIC_ADD(&agent->counters.idles, 1); appctx->st0 = SPOE_APPCTX_ST_IDLE; SPOE_APPCTX(appctx)->node.key = 0; eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node); @@ -1495,6 +1501,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip) spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait); LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); + HA_ATOMIC_SUB(&agent->counters.nb_sending, 1); spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue); ctx->spoe_appctx = NULL; ctx->state = SPOE_CTX_ST_ERROR; @@ -1514,6 +1521,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip) spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait); LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); + HA_ATOMIC_SUB(&agent->counters.nb_sending, 1); spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue); ctx->spoe_appctx = SPOE_APPCTX(appctx); if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) || @@ -1548,6 +1556,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip) *skip = 1; LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); } + HA_ATOMIC_ADD(&agent->counters.nb_waiting, 1); ctx->stats.tv_wait = now; SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL; SPOE_APPCTX(appctx)->frag_ctx.cursid = 0; @@ -1571,6 +1580,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip) static int spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip) { + struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; struct spoe_context *ctx = NULL; char *frame; int ret; @@ -1602,6 +1612,7 @@ spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip) default: LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); + HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1); spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting); ctx->stats.tv_response = now; if (ctx->spoe_appctx) { @@ -1708,6 +1719,7 @@ spoe_handle_processing_appctx(struct appctx *appctx) if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) { SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++); + HA_ATOMIC_ADD(&agent->counters.idles, 1); appctx->st0 = SPOE_APPCTX_ST_IDLE; eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node); } @@ -1871,6 +1883,7 @@ spoe_handle_appctx(struct appctx *appctx) case SPOE_APPCTX_ST_IDLE: SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--); + HA_ATOMIC_SUB(&agent->counters.idles, 1); eb32_delete(&SPOE_APPCTX(appctx)->node); if (stopping && LIST_ISEMPTY(&agent->rt[tid].sending_queue) && @@ -1988,6 +2001,7 @@ spoe_create_appctx(struct spoe_config *conf) LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list); HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock); SPOE_DEBUG_STMT(conf->agent->rt[tid].applets_act++); + HA_ATOMIC_ADD(&conf->agent->counters.applets, 1); task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT); task_wakeup(strm->task, TASK_WOKEN_INIT); @@ -2072,6 +2086,7 @@ spoe_queue_context(struct spoe_context *ctx) /* Add the SPOE context in the sending queue */ LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list); + HA_ATOMIC_ADD(&agent->counters.nb_sending, 1); spoe_update_stat_time(&ctx->stats.tv_request, &ctx->stats.t_request); ctx->stats.tv_queue = now; @@ -2459,71 +2474,6 @@ spoe_process_actions(struct stream *s, struct spoe_context *ctx, int dir) /*************************************************************************** * Functions that process SPOE events **************************************************************************/ -static inline int -spoe_start_processing(struct spoe_agent *agent, struct spoe_context *ctx, int dir) -{ - /* If a process is already started for this SPOE context, retry - * later. */ - if (ctx->flags & SPOE_CTX_FL_PROCESS) - return 0; - - agent->rt[tid].processing++; - ctx->stats.tv_start = now; - ctx->stats.tv_request = now; - ctx->stats.t_request = -1; - ctx->stats.t_queue = -1; - ctx->stats.t_waiting = -1; - ctx->stats.t_response = -1; - ctx->stats.t_process = -1; - - ctx->status_code = 0; - - /* Set the right flag to prevent request and response processing - * in same time. */ - ctx->flags |= ((dir == SMP_OPT_DIR_REQ) - ? SPOE_CTX_FL_REQ_PROCESS - : SPOE_CTX_FL_RSP_PROCESS); - return 1; -} - -static inline void -spoe_stop_processing(struct spoe_agent *agent, struct spoe_context *ctx) -{ - struct spoe_appctx *sa = ctx->spoe_appctx; - - if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) - return; - - if (sa) { - if (sa->frag_ctx.ctx == ctx) { - sa->frag_ctx.ctx = NULL; - spoe_wakeup_appctx(sa->owner); - } - else - sa->cur_fpa--; - } - - /* Reset the flag to allow next processing */ - agent->rt[tid].processing--; - ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED); - - /* Reset processing timer */ - ctx->process_exp = TICK_ETERNITY; - - spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait); - - ctx->spoe_appctx = NULL; - ctx->frag_ctx.curmsg = NULL; - ctx->frag_ctx.curarg = NULL; - ctx->frag_ctx.curoff = 0; - ctx->frag_ctx.flags = 0; - - if (!LIST_ISEMPTY(&ctx->list)) { - LIST_DEL(&ctx->list); - LIST_INIT(&ctx->list); - } -} - static void spoe_update_stats(struct stream *s, struct spoe_agent *agent, struct spoe_context *ctx, int dir) @@ -2586,6 +2536,76 @@ spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent, : SPOE_CTX_ST_NONE); } +static inline int +spoe_start_processing(struct spoe_agent *agent, struct spoe_context *ctx, int dir) +{ + /* If a process is already started for this SPOE context, retry + * later. */ + if (ctx->flags & SPOE_CTX_FL_PROCESS) + return 0; + + agent->rt[tid].processing++; + ctx->stats.tv_start = now; + ctx->stats.tv_request = now; + ctx->stats.t_request = -1; + ctx->stats.t_queue = -1; + ctx->stats.t_waiting = -1; + ctx->stats.t_response = -1; + ctx->stats.t_process = -1; + + ctx->status_code = 0; + + /* Set the right flag to prevent request and response processing + * in same time. */ + ctx->flags |= ((dir == SMP_OPT_DIR_REQ) + ? SPOE_CTX_FL_REQ_PROCESS + : SPOE_CTX_FL_RSP_PROCESS); + return 1; +} + +static inline void +spoe_stop_processing(struct spoe_agent *agent, struct spoe_context *ctx) +{ + struct spoe_appctx *sa = ctx->spoe_appctx; + + if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) + return; + HA_ATOMIC_ADD(&agent->counters.nb_processed, 1); + if (sa) { + if (sa->frag_ctx.ctx == ctx) { + sa->frag_ctx.ctx = NULL; + spoe_wakeup_appctx(sa->owner); + } + else + sa->cur_fpa--; + } + + /* Reset the flag to allow next processing */ + agent->rt[tid].processing--; + ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED); + + /* Reset processing timer */ + ctx->process_exp = TICK_ETERNITY; + + spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait); + + ctx->spoe_appctx = NULL; + ctx->frag_ctx.curmsg = NULL; + ctx->frag_ctx.curarg = NULL; + ctx->frag_ctx.curoff = 0; + ctx->frag_ctx.flags = 0; + + if (!LIST_ISEMPTY(&ctx->list)) { + if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) + HA_ATOMIC_ADD(&agent->counters.nb_sending, 1); + else + HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1); + + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + } +} + /* Process a list of SPOE messages. First, this functions will process messages * and send them to an agent in a NOTIFY frame. Then, it will wait a ACK frame * to process corresponding actions. During all the processing, it returns 0 @@ -2600,7 +2620,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx, int ret = 1; if (ctx->state == SPOE_CTX_ST_ERROR) - goto error; + goto end; if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) { SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" @@ -2608,7 +2628,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx, (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, s); ctx->status_code = SPOE_CTX_ERR_TOUT; - goto error; + goto end; } if (ctx->state == SPOE_CTX_ST_READY) { @@ -2642,11 +2662,11 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx, goto out; ret = spoe_encode_messages(s, ctx, messages, dir, type); if (ret < 0) - goto error; + goto end; if (!ret) goto skip; if (spoe_queue_context(ctx) < 0) - goto error; + goto end; ctx->state = SPOE_CTX_ST_SENDING_MSGS; } @@ -2674,19 +2694,20 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx, out: return ret; - error: - spoe_handle_processing_error(s, agent, ctx, dir); - ret = 1; - goto end; - skip: tv_zero(&ctx->stats.tv_start); ctx->state = SPOE_CTX_ST_READY; - ret = 1; + spoe_stop_processing(agent, ctx); + return 1; end: spoe_update_stats(s, agent, ctx, dir); spoe_stop_processing(agent, ctx); + if (ctx->status_code) { + HA_ATOMIC_ADD(&agent->counters.nb_errors, 1); + spoe_handle_processing_error(s, agent, ctx, dir); + ret = 1; + } return ret; } @@ -2712,16 +2733,24 @@ spoe_process_group(struct stream *s, struct spoe_context *ctx, ret = spoe_process_messages(s, ctx, &group->messages, dir, SPOE_MSGS_BY_GROUP); if (ret && ctx->stats.t_process != -1) { SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n", + " - sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, s, s->uniq_id, ctx->status_code, + __FUNCTION__, s, group->id, s->uniq_id, ctx->status_code, ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting, - ctx->stats.t_response, ctx->stats.t_process); - send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING), - "SPOE: [%s] sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n", - agent->id, group->id, s->uniq_id, ctx->status_code, - ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting, - ctx->stats.t_response, ctx->stats.t_process); + ctx->stats.t_response, ctx->stats.t_process, + agent->counters.idles, agent->counters.applets, + agent->counters.nb_sending, agent->counters.nb_waiting, + agent->counters.nb_errors, agent->counters.nb_processed, + agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec)); + if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM)) + send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING), + "SPOE: [%s] sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n", + agent->id, group->id, s->uniq_id, ctx->status_code, + ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting, + ctx->stats.t_response, ctx->stats.t_process, + agent->counters.idles, agent->counters.applets, + agent->counters.nb_sending, agent->counters.nb_waiting, + agent->counters.nb_errors, agent->counters.nb_processed); } return ret; } @@ -2750,16 +2779,24 @@ spoe_process_event(struct stream *s, struct spoe_context *ctx, ret = spoe_process_messages(s, ctx, &(ctx->events[ev]), dir, SPOE_MSGS_BY_EVENT); if (ret && ctx->stats.t_process != -1) { SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n", + " - sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, s, spoe_event_str[ev], s->uniq_id, ctx->status_code, ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting, - ctx->stats.t_response, ctx->stats.t_process); - send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING), - "SPOE: [%s] sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n", - agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code, - ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting, - ctx->stats.t_response, ctx->stats.t_process); + ctx->stats.t_response, ctx->stats.t_process, + agent->counters.idles, agent->counters.applets, + agent->counters.nb_sending, agent->counters.nb_waiting, + agent->counters.nb_errors, agent->counters.nb_processed, + agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec)); + if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM)) + send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING), + "SPOE: [%s] sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n", + agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code, + ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting, + ctx->stats.t_response, ctx->stats.t_process, + agent->counters.idles, agent->counters.applets, + agent->counters.nb_sending, agent->counters.nb_waiting, + agent->counters.nb_errors, agent->counters.nb_processed); } return ret; } @@ -4451,8 +4488,8 @@ spoe_send_group(struct act_rule *rule, struct proxy *px, (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, s, group->id); ctx->status_code = SPOE_CTX_ERR_INTERRUPT; - spoe_handle_processing_error(s, agent, ctx, dir); spoe_stop_processing(agent, ctx); + spoe_handle_processing_error(s, agent, ctx, dir); return ACT_RET_CONT; } return ACT_RET_YIELD;