MEDIUM: spoe: Use an ebtree to manage idle applets

Instead of using a list of applets with idle ones in front, we now use an
ebtree. Aapplets in the tree are idle by definition. And the key is the applet's
weight. When a new frame is queued, the first idle applet (with the lowest
weight) is woken up and its weight is increased by one. And when an applet sends
a frame to a SPOA, its weight is decremented by one.

This is empirical, but it should avoid to overuse a very few number of applets
and increase the balancing between idle applets.
This commit is contained in:
Christopher Faulet 2018-01-24 16:37:57 +01:00 committed by Willy Tarreau
parent 8f82b203d5
commit b077cdc012
2 changed files with 49 additions and 39 deletions

View File

@ -260,16 +260,18 @@ struct spoe_agent {
/* running info */ /* running info */
struct { struct {
unsigned int frame_size; /* current maximum frame size, only used to encode messages */ unsigned int frame_size; /* current maximum frame size, only used to encode messages */
#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
unsigned int applets_act; /* # of applets alive at a time */ unsigned int applets_act; /* # of applets alive at a time */
unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */ unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */
#endif
unsigned int processing; unsigned int processing;
struct freq_ctr processing_per_sec; struct freq_ctr processing_per_sec;
struct freq_ctr conn_per_sec; /* connections per second */ struct freq_ctr conn_per_sec; /* connections per second */
struct freq_ctr err_per_sec; /* connetion errors per second */ struct freq_ctr err_per_sec; /* connetion errors per second */
struct list applets; /* List of available SPOE applets */ struct eb_root idle_applets; /* idle SPOE applets available to process data */
struct list applets; /* all SPOE applets for this agent */
struct list sending_queue; /* Queue of streams waiting to send data */ struct list sending_queue; /* Queue of streams waiting to send data */
struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */ struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */
__decl_hathreads(HA_SPINLOCK_T lock); __decl_hathreads(HA_SPINLOCK_T lock);
@ -336,6 +338,7 @@ struct spoe_appctx {
struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */ struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */ struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */
struct list list; /* next spoe appctx for the same agent */ struct list list; /* next spoe appctx for the same agent */
struct eb32_node node; /* node used for applets tree */
unsigned int cur_fpa; unsigned int cur_fpa;
struct { struct {

View File

@ -46,8 +46,10 @@
#if defined(DEBUG_SPOE) || defined(DEBUG_FULL) #if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
#define SPOE_PRINTF(x...) fprintf(x) #define SPOE_PRINTF(x...) fprintf(x)
#define SPOE_DEBUG_STMT(statement) statement
#else #else
#define SPOE_PRINTF(x...) #define SPOE_PRINTF(x...)
#define SPOE_DEBUG_STMT(statement)
#endif #endif
/* Reserved 4 bytes to the frame size. So a frame and its size can be written /* Reserved 4 bytes to the frame size. So a frame and its size can be written
@ -1212,16 +1214,20 @@ spoe_release_appctx(struct appctx *appctx)
__FUNCTION__, appctx); __FUNCTION__, appctx);
/* Remove applet from the list of running applets */ /* Remove applet from the list of running applets */
agent->rt[tid].applets_act--; SPOE_DEBUG_STMT(agent->rt[tid].applets_act--);
HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
if (!LIST_ISEMPTY(&spoe_appctx->list)) { if (!LIST_ISEMPTY(&spoe_appctx->list)) {
LIST_DEL(&spoe_appctx->list); LIST_DEL(&spoe_appctx->list);
LIST_INIT(&spoe_appctx->list); LIST_INIT(&spoe_appctx->list);
} }
HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
/* Shutdown the server connection, if needed */ /* Shutdown the server connection, if needed */
if (appctx->st0 != SPOE_APPCTX_ST_END) { if (appctx->st0 != SPOE_APPCTX_ST_END) {
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
agent->rt[tid].applets_idle--; eb32_delete(&spoe_appctx->node);
SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
}
appctx->st0 = SPOE_APPCTX_ST_END; appctx->st0 = SPOE_APPCTX_ST_END;
if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE) if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
@ -1398,12 +1404,10 @@ spoe_handle_connecting_appctx(struct appctx *appctx)
default: default:
/* HELLO handshake is finished, set the idle timeout and /* HELLO handshake is finished, set the idle timeout and
* add the applet in the list of running applets. */ * add the applet in the list of running applets. */
agent->rt[tid].applets_idle++; SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
appctx->st0 = SPOE_APPCTX_ST_IDLE; appctx->st0 = SPOE_APPCTX_ST_IDLE;
HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); SPOE_APPCTX(appctx)->node.key = 0;
LIST_DEL(&SPOE_APPCTX(appctx)->list); eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
/* Update runtinme agent info */ /* Update runtinme agent info */
HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size); HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size);
@ -1474,6 +1478,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
ctx->state = SPOE_CTX_ST_ERROR; ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100); ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
*skip = 1;
break; break;
case 1: /* retry */ case 1: /* retry */
@ -1491,15 +1496,14 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) || if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
(ctx->frag_ctx.flags & SPOE_FRM_FL_FIN)) (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN))
goto no_frag_frame_sent; goto no_frag_frame_sent;
else { else
*skip = 1;
goto frag_frame_sent; goto frag_frame_sent;
} }
}
goto end; goto end;
frag_frame_sent: frag_frame_sent:
appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY; appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY;
*skip = 1;
SPOE_APPCTX(appctx)->frag_ctx.ctx = ctx; SPOE_APPCTX(appctx)->frag_ctx.ctx = ctx;
SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id; SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id;
SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id; SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id;
@ -1518,6 +1522,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
} }
else { else {
appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK; appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
*skip = 1;
LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
} }
SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL; SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
@ -1552,6 +1557,7 @@ spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
if (ret > 1) { if (ret > 1) {
if (*frame == SPOE_FRM_T_AGENT_DISCON) { if (*frame == SPOE_FRM_T_AGENT_DISCON) {
appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
ret = -1;
goto end; goto end;
} }
trash.len = ret + 4; trash.len = ret + 4;
@ -1616,13 +1622,12 @@ spoe_handle_processing_appctx(struct appctx *appctx)
goto next; goto next;
} }
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
" - process: fpa=%u/%u - appctx-state=%s - flags=0x%08x\n", " - process: fpa=%u/%u - appctx-state=%s - weight=%u - flags=0x%08x\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id, (int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, appctx, SPOE_APPCTX(appctx)->cur_fpa, __FUNCTION__, appctx, SPOE_APPCTX(appctx)->cur_fpa,
agent->max_fpa, spoe_appctx_state_str[appctx->st0], agent->max_fpa, spoe_appctx_state_str[appctx->st0],
SPOE_APPCTX(appctx)->flags); SPOE_APPCTX(appctx)->node.key, SPOE_APPCTX(appctx)->flags);
if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
skip_sending = 1; skip_sending = 1;
@ -1655,6 +1660,8 @@ spoe_handle_processing_appctx(struct appctx *appctx)
goto next; goto next;
case 0: /* ignore */ case 0: /* ignore */
if (SPOE_APPCTX(appctx)->node.key)
SPOE_APPCTX(appctx)->node.key--;
active_s++; active_s++;
break; break;
@ -1662,23 +1669,22 @@ spoe_handle_processing_appctx(struct appctx *appctx)
break; break;
default: default:
if (SPOE_APPCTX(appctx)->node.key)
SPOE_APPCTX(appctx)->node.key--;
active_s++; active_s++;
break; break;
} }
} }
if (active_s || active_r) { if (active_s || active_r) {
HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
LIST_DEL(&SPOE_APPCTX(appctx)->list);
LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
update_freq_ctr(&agent->rt[tid].processing_per_sec, active_s); update_freq_ctr(&agent->rt[tid].processing_per_sec, active_s);
SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle); SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
} }
if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) { if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
appctx->st0 = SPOE_APPCTX_ST_IDLE; appctx->st0 = SPOE_APPCTX_ST_IDLE;
agent->rt[tid].applets_idle++; eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
} }
return 1; return 1;
@ -1847,7 +1853,8 @@ spoe_handle_appctx(struct appctx *appctx)
appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
goto switchstate; goto switchstate;
} }
agent->rt[tid].applets_idle--; SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
eb32_delete(&SPOE_APPCTX(appctx)->node);
appctx->st0 = SPOE_APPCTX_ST_PROCESSING; appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
/* fall through */ /* fall through */
@ -1955,7 +1962,7 @@ spoe_create_appctx(struct spoe_config *conf)
HA_SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock); HA_SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list); LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock); HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
conf->agent->rt[tid].applets_act++; SPOE_DEBUG_STMT(conf->agent->rt[tid].applets_act++);
task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT); task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
task_wakeup(strm->task, TASK_WOKEN_INIT); task_wakeup(strm->task, TASK_WOKEN_INIT);
@ -1983,7 +1990,7 @@ spoe_queue_context(struct spoe_context *ctx)
struct spoe_appctx *spoe_appctx; struct spoe_appctx *spoe_appctx;
/* Check if we need to create a new SPOE applet or not. */ /* Check if we need to create a new SPOE applet or not. */
if (agent->rt[tid].applets_idle && if (!eb_is_empty(&agent->rt[tid].idle_applets) &&
agent->rt[tid].processing < read_freq_ctr(&agent->rt[tid].processing_per_sec)) agent->rt[tid].processing < read_freq_ctr(&agent->rt[tid].processing_per_sec))
goto end; goto end;
@ -2048,17 +2055,17 @@ spoe_queue_context(struct spoe_context *ctx)
ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle, ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
agent->rt[tid].processing); agent->rt[tid].processing);
/* Finally try to wakeup the first IDLE applet found and move it at the /* Finally try to wakeup an IDLE applet. */
* end of the list. */ if (!eb_is_empty(&agent->rt[tid].idle_applets)) {
list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) { struct eb32_node *node;
appctx = spoe_appctx->owner;
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) { node = eb32_first(&agent->rt[tid].idle_applets);
spoe_wakeup_appctx(appctx); spoe_appctx = eb32_entry(node, struct spoe_appctx, node);
HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); if (node && spoe_appctx) {
LIST_DEL(&spoe_appctx->list); eb32_delete(&spoe_appctx->node);
LIST_ADDQ(&agent->rt[tid].applets, &spoe_appctx->list); spoe_appctx->node.key++;
HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); eb32_insert(&agent->rt[tid].idle_applets, &spoe_appctx->node);
break; spoe_wakeup_appctx(spoe_appctx->owner);
} }
} }
return 1; return 1;
@ -3169,7 +3176,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
curagent->cps_max = 0; curagent->cps_max = 0;
curagent->eps_max = 0; curagent->eps_max = 0;
curagent->max_frame_size = MAX_FRAME_SIZE; curagent->max_frame_size = MAX_FRAME_SIZE;
curagent->max_fpa = 100; curagent->max_fpa = 20;
for (i = 0; i < SPOE_EV_EVENTS; ++i) for (i = 0; i < SPOE_EV_EVENTS; ++i)
LIST_INIT(&curagent->events[i]); LIST_INIT(&curagent->events[i]);
@ -3183,8 +3190,8 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
} }
for (i = 0; i < global.nbthread; ++i) { for (i = 0; i < global.nbthread; ++i) {
curagent->rt[i].frame_size = curagent->max_frame_size; curagent->rt[i].frame_size = curagent->max_frame_size;
curagent->rt[i].applets_act = 0; SPOE_DEBUG_STMT(curagent->rt[i].applets_act = 0);
curagent->rt[i].applets_idle = 0; SPOE_DEBUG_STMT(curagent->rt[i].applets_idle = 0);
curagent->rt[i].processing = 0; curagent->rt[i].processing = 0;
LIST_INIT(&curagent->rt[i].applets); LIST_INIT(&curagent->rt[i].applets);
LIST_INIT(&curagent->rt[i].sending_queue); LIST_INIT(&curagent->rt[i].sending_queue);