mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-21 05:41:26 +02:00
MINOR: spoe: Replace sending_rate by a frequency counter
sending_rate was a counter used to evaluate the SPOE capacity to process frames. Because it was not really accurrate, it has been replaced by a frequency counter representing the number of frames handled by the SPOE per second. We just check this counter is higher than the number of streams waiting for a reply. If not, a new applet is created.
This commit is contained in:
parent
fce747bbaa
commit
6f9ea4f87b
@ -262,7 +262,9 @@ struct spoe_agent {
|
|||||||
unsigned int frame_size; /* current maximum frame size, only used to encode messages */
|
unsigned int frame_size; /* current maximum frame size, only used to encode messages */
|
||||||
unsigned int applets_act; /* # of applets alive at a time */
|
unsigned int applets_act; /* # of applets alive at a time */
|
||||||
unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */
|
unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */
|
||||||
unsigned int sending_rate; /* the global sending rate */
|
|
||||||
|
unsigned int processing;
|
||||||
|
struct freq_ctr processing_per_sec;
|
||||||
|
|
||||||
struct freq_ctr conn_per_sec; /* connections per second */
|
struct freq_ctr conn_per_sec; /* connections per second */
|
||||||
struct freq_ctr err_per_sec; /* connetion errors per second */
|
struct freq_ctr err_per_sec; /* connetion errors per second */
|
||||||
|
@ -1638,7 +1638,7 @@ spoe_handle_processing_appctx(struct appctx *appctx)
|
|||||||
goto next;
|
goto next;
|
||||||
|
|
||||||
case 0: /* ignore */
|
case 0: /* ignore */
|
||||||
agent->rt[tid].sending_rate++;
|
update_freq_ctr(&agent->rt[tid].processing_per_sec, 1);
|
||||||
fpa++;
|
fpa++;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -1646,7 +1646,7 @@ spoe_handle_processing_appctx(struct appctx *appctx)
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
agent->rt[tid].sending_rate++;
|
update_freq_ctr(&agent->rt[tid].processing_per_sec, 1);
|
||||||
fpa++;
|
fpa++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1991,7 +1991,7 @@ spoe_queue_context(struct spoe_context *ctx)
|
|||||||
|
|
||||||
/* Check if we need to create a new SPOE applet or not. */
|
/* Check if we need to create a new SPOE applet or not. */
|
||||||
if (agent->rt[tid].applets_idle &&
|
if (agent->rt[tid].applets_idle &&
|
||||||
agent->rt[tid].sending_rate)
|
agent->rt[tid].processing < read_freq_ctr(&agent->rt[tid].processing_per_sec))
|
||||||
goto end;
|
goto end;
|
||||||
|
|
||||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
|
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
|
||||||
@ -2045,18 +2045,15 @@ spoe_queue_context(struct spoe_context *ctx)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add the SPOE context in the sending queue and update all running
|
/* Add the SPOE context in the sending queue */
|
||||||
* info */
|
|
||||||
LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
|
LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
|
||||||
if (agent->rt[tid].sending_rate)
|
|
||||||
agent->rt[tid].sending_rate--;
|
|
||||||
|
|
||||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
|
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
|
||||||
" - Add stream in sending queue"
|
" - Add stream in sending queue"
|
||||||
" - applets_act=%u - applets_idle=%u - sending_rate=%u\n",
|
" - applets_act=%u - applets_idle=%u - processing=%u\n",
|
||||||
(int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
|
(int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
|
||||||
ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
|
ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
|
||||||
agent->rt[tid].sending_rate);
|
agent->rt[tid].processing);
|
||||||
|
|
||||||
/* Finally try to wakeup the first IDLE applet found and move it at the
|
/* Finally try to wakeup the first IDLE applet found and move it at the
|
||||||
* end of the list. */
|
* end of the list. */
|
||||||
@ -2436,13 +2433,15 @@ spoe_process_actions(struct stream *s, struct spoe_context *ctx, int dir)
|
|||||||
* Functions that process SPOE events
|
* Functions that process SPOE events
|
||||||
**************************************************************************/
|
**************************************************************************/
|
||||||
static inline int
|
static inline int
|
||||||
spoe_start_processing(struct spoe_context *ctx, int dir)
|
spoe_start_processing(struct spoe_agent *agent, struct spoe_context *ctx, int dir)
|
||||||
{
|
{
|
||||||
/* If a process is already started for this SPOE context, retry
|
/* If a process is already started for this SPOE context, retry
|
||||||
* later. */
|
* later. */
|
||||||
if (ctx->flags & SPOE_CTX_FL_PROCESS)
|
if (ctx->flags & SPOE_CTX_FL_PROCESS)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
agent->rt[tid].processing++;
|
||||||
|
|
||||||
/* Set the right flag to prevent request and response processing
|
/* Set the right flag to prevent request and response processing
|
||||||
* in same time. */
|
* in same time. */
|
||||||
ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
|
ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
|
||||||
@ -2452,16 +2451,20 @@ spoe_start_processing(struct spoe_context *ctx, int dir)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static inline void
|
static inline void
|
||||||
spoe_stop_processing(struct spoe_context *ctx)
|
spoe_stop_processing(struct spoe_agent *agent, struct spoe_context *ctx)
|
||||||
{
|
{
|
||||||
struct spoe_appctx *sa = ctx->spoe_appctx;
|
struct spoe_appctx *sa = ctx->spoe_appctx;
|
||||||
|
|
||||||
|
if (!(ctx->flags & SPOE_CTX_FL_PROCESS))
|
||||||
|
return;
|
||||||
|
|
||||||
if (sa && sa->frag_ctx.ctx == ctx) {
|
if (sa && sa->frag_ctx.ctx == ctx) {
|
||||||
sa->frag_ctx.ctx = NULL;
|
sa->frag_ctx.ctx = NULL;
|
||||||
spoe_wakeup_appctx(sa->owner);
|
spoe_wakeup_appctx(sa->owner);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Reset the flag to allow next processing */
|
/* Reset the flag to allow next processing */
|
||||||
|
agent->rt[tid].processing--;
|
||||||
ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
|
ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
|
||||||
|
|
||||||
ctx->status_code = 0;
|
ctx->status_code = 0;
|
||||||
@ -2555,7 +2558,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx,
|
|||||||
s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
|
s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
|
||||||
ctx->process_exp);
|
ctx->process_exp);
|
||||||
}
|
}
|
||||||
ret = spoe_start_processing(ctx, dir);
|
ret = spoe_start_processing(agent, ctx, dir);
|
||||||
if (!ret)
|
if (!ret)
|
||||||
goto out;
|
goto out;
|
||||||
|
|
||||||
@ -2609,7 +2612,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx,
|
|||||||
ret = 1;
|
ret = 1;
|
||||||
|
|
||||||
end:
|
end:
|
||||||
spoe_stop_processing(ctx);
|
spoe_stop_processing(agent, ctx);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2710,7 +2713,7 @@ spoe_wakeup_context(struct spoe_context *ctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static struct spoe_context *
|
static struct spoe_context *
|
||||||
spoe_create_context(struct filter *filter)
|
spoe_create_context(struct stream *s, struct filter *filter)
|
||||||
{
|
{
|
||||||
struct spoe_config *conf = FLT_CONF(filter);
|
struct spoe_config *conf = FLT_CONF(filter);
|
||||||
struct spoe_context *ctx;
|
struct spoe_context *ctx;
|
||||||
@ -2736,17 +2739,25 @@ spoe_create_context(struct filter *filter)
|
|||||||
ctx->frame_id = 1;
|
ctx->frame_id = 1;
|
||||||
ctx->process_exp = TICK_ETERNITY;
|
ctx->process_exp = TICK_ETERNITY;
|
||||||
|
|
||||||
|
ctx->strm = s;
|
||||||
|
ctx->state = SPOE_CTX_ST_READY;
|
||||||
|
filter->ctx = ctx;
|
||||||
|
|
||||||
return ctx;
|
return ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
spoe_destroy_context(struct spoe_context *ctx)
|
spoe_destroy_context(struct filter *filter)
|
||||||
{
|
{
|
||||||
|
struct spoe_config *conf = FLT_CONF(filter);
|
||||||
|
struct spoe_context *ctx = filter->ctx;
|
||||||
|
|
||||||
if (!ctx)
|
if (!ctx)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
spoe_stop_processing(ctx);
|
spoe_stop_processing(conf->agent, ctx);
|
||||||
pool_free(pool_head_spoe_ctx, ctx);
|
pool_free(pool_head_spoe_ctx, ctx);
|
||||||
|
filter->ctx = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -2907,8 +2918,7 @@ spoe_start(struct stream *s, struct filter *filter)
|
|||||||
(int)now.tv_sec, (int)now.tv_usec, agent->id,
|
(int)now.tv_sec, (int)now.tv_usec, agent->id,
|
||||||
__FUNCTION__, s);
|
__FUNCTION__, s);
|
||||||
|
|
||||||
ctx = spoe_create_context(filter);
|
if ((ctx = spoe_create_context(s, filter)) == NULL) {
|
||||||
if (ctx == NULL) {
|
|
||||||
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
|
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
|
||||||
" - failed to create SPOE context\n",
|
" - failed to create SPOE context\n",
|
||||||
(int)now.tv_sec, (int)now.tv_usec, agent->id,
|
(int)now.tv_sec, (int)now.tv_usec, agent->id,
|
||||||
@ -2919,10 +2929,6 @@ spoe_start(struct stream *s, struct filter *filter)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx->strm = s;
|
|
||||||
ctx->state = SPOE_CTX_ST_READY;
|
|
||||||
filter->ctx = ctx;
|
|
||||||
|
|
||||||
if (!LIST_ISEMPTY(&ctx->events[SPOE_EV_ON_TCP_REQ_FE]))
|
if (!LIST_ISEMPTY(&ctx->events[SPOE_EV_ON_TCP_REQ_FE]))
|
||||||
filter->pre_analyzers |= AN_REQ_INSPECT_FE;
|
filter->pre_analyzers |= AN_REQ_INSPECT_FE;
|
||||||
|
|
||||||
@ -2953,7 +2959,7 @@ spoe_stop(struct stream *s, struct filter *filter)
|
|||||||
(int)now.tv_sec, (int)now.tv_usec,
|
(int)now.tv_sec, (int)now.tv_usec,
|
||||||
((struct spoe_config *)FLT_CONF(filter))->agent->id,
|
((struct spoe_config *)FLT_CONF(filter))->agent->id,
|
||||||
__FUNCTION__, s);
|
__FUNCTION__, s);
|
||||||
spoe_destroy_context(filter->ctx);
|
spoe_destroy_context(filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -3186,7 +3192,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
|
|||||||
curagent->rt[i].frame_size = curagent->max_frame_size;
|
curagent->rt[i].frame_size = curagent->max_frame_size;
|
||||||
curagent->rt[i].applets_act = 0;
|
curagent->rt[i].applets_act = 0;
|
||||||
curagent->rt[i].applets_idle = 0;
|
curagent->rt[i].applets_idle = 0;
|
||||||
curagent->rt[i].sending_rate = 0;
|
curagent->rt[i].processing = 0;
|
||||||
LIST_INIT(&curagent->rt[i].applets);
|
LIST_INIT(&curagent->rt[i].applets);
|
||||||
LIST_INIT(&curagent->rt[i].sending_queue);
|
LIST_INIT(&curagent->rt[i].sending_queue);
|
||||||
LIST_INIT(&curagent->rt[i].waiting_queue);
|
LIST_INIT(&curagent->rt[i].waiting_queue);
|
||||||
@ -4168,7 +4174,7 @@ spoe_send_group(struct act_rule *rule, struct proxy *px,
|
|||||||
agent->id, __FUNCTION__, s, group->id);
|
agent->id, __FUNCTION__, s, group->id);
|
||||||
ctx->status_code = SPOE_CTX_ERR_INTERRUPT;
|
ctx->status_code = SPOE_CTX_ERR_INTERRUPT;
|
||||||
spoe_handle_processing_error(s, agent, ctx, dir);
|
spoe_handle_processing_error(s, agent, ctx, dir);
|
||||||
spoe_stop_processing(ctx);
|
spoe_stop_processing(agent, ctx);
|
||||||
return ACT_RET_CONT;
|
return ACT_RET_CONT;
|
||||||
}
|
}
|
||||||
return ACT_RET_YIELD;
|
return ACT_RET_YIELD;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user