diff --git a/include/types/spoe.h b/include/types/spoe.h index 659dd273d..32d231146 100644 --- a/include/types/spoe.h +++ b/include/types/spoe.h @@ -260,16 +260,18 @@ struct spoe_agent { /* running info */ struct { unsigned int frame_size; /* current maximum frame size, only used to encode messages */ +#if defined(DEBUG_SPOE) || defined(DEBUG_FULL) unsigned int applets_act; /* # of applets alive at a time */ unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */ - +#endif unsigned int processing; struct freq_ctr processing_per_sec; struct freq_ctr conn_per_sec; /* connections per second */ struct freq_ctr err_per_sec; /* connetion errors per second */ - struct list applets; /* List of available SPOE applets */ + struct eb_root idle_applets; /* idle SPOE applets available to process data */ + struct list applets; /* all SPOE applets for this agent */ struct list sending_queue; /* Queue of streams waiting to send data */ struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */ __decl_hathreads(HA_SPINLOCK_T lock); @@ -336,6 +338,7 @@ struct spoe_appctx { struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */ struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */ struct list list; /* next spoe appctx for the same agent */ + struct eb32_node node; /* node used for applets tree */ unsigned int cur_fpa; struct { diff --git a/src/flt_spoe.c b/src/flt_spoe.c index ae34c3b90..5848fdc7b 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -46,8 +46,10 @@ #if defined(DEBUG_SPOE) || defined(DEBUG_FULL) #define SPOE_PRINTF(x...) fprintf(x) +#define SPOE_DEBUG_STMT(statement) statement #else #define SPOE_PRINTF(x...) +#define SPOE_DEBUG_STMT(statement) #endif /* Reserved 4 bytes to the frame size. So a frame and its size can be written @@ -1212,16 +1214,20 @@ spoe_release_appctx(struct appctx *appctx) __FUNCTION__, appctx); /* Remove applet from the list of running applets */ - agent->rt[tid].applets_act--; + SPOE_DEBUG_STMT(agent->rt[tid].applets_act--); + HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); if (!LIST_ISEMPTY(&spoe_appctx->list)) { LIST_DEL(&spoe_appctx->list); LIST_INIT(&spoe_appctx->list); } + HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); /* Shutdown the server connection, if needed */ if (appctx->st0 != SPOE_APPCTX_ST_END) { - if (appctx->st0 == SPOE_APPCTX_ST_IDLE) - agent->rt[tid].applets_idle--; + if (appctx->st0 == SPOE_APPCTX_ST_IDLE) { + eb32_delete(&spoe_appctx->node); + SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--); + } appctx->st0 = SPOE_APPCTX_ST_END; if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE) @@ -1398,12 +1404,10 @@ spoe_handle_connecting_appctx(struct appctx *appctx) default: /* HELLO handshake is finished, set the idle timeout and * add the applet in the list of running applets. */ - agent->rt[tid].applets_idle++; + SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++); appctx->st0 = SPOE_APPCTX_ST_IDLE; - HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); - LIST_DEL(&SPOE_APPCTX(appctx)->list); - LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list); - HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); + SPOE_APPCTX(appctx)->node.key = 0; + eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node); /* Update runtinme agent info */ HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size); @@ -1474,6 +1478,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip) ctx->state = SPOE_CTX_ST_ERROR; ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100); task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + *skip = 1; break; case 1: /* retry */ @@ -1491,15 +1496,14 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip) if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) || (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN)) goto no_frag_frame_sent; - else { - *skip = 1; + else goto frag_frame_sent; - } } goto end; frag_frame_sent: appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY; + *skip = 1; SPOE_APPCTX(appctx)->frag_ctx.ctx = ctx; SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id; SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id; @@ -1518,6 +1522,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip) } else { appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK; + *skip = 1; LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list); } SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL; @@ -1552,6 +1557,7 @@ spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip) if (ret > 1) { if (*frame == SPOE_FRM_T_AGENT_DISCON) { appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + ret = -1; goto end; } trash.len = ret + 4; @@ -1616,13 +1622,12 @@ spoe_handle_processing_appctx(struct appctx *appctx) goto next; } - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - process: fpa=%u/%u - appctx-state=%s - flags=0x%08x\n", + " - process: fpa=%u/%u - appctx-state=%s - weight=%u - flags=0x%08x\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx, SPOE_APPCTX(appctx)->cur_fpa, agent->max_fpa, spoe_appctx_state_str[appctx->st0], - SPOE_APPCTX(appctx)->flags); + SPOE_APPCTX(appctx)->node.key, SPOE_APPCTX(appctx)->flags); if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) skip_sending = 1; @@ -1655,6 +1660,8 @@ spoe_handle_processing_appctx(struct appctx *appctx) goto next; case 0: /* ignore */ + if (SPOE_APPCTX(appctx)->node.key) + SPOE_APPCTX(appctx)->node.key--; active_s++; break; @@ -1662,23 +1669,22 @@ spoe_handle_processing_appctx(struct appctx *appctx) break; default: + if (SPOE_APPCTX(appctx)->node.key) + SPOE_APPCTX(appctx)->node.key--; active_s++; break; } } if (active_s || active_r) { - HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); - LIST_DEL(&SPOE_APPCTX(appctx)->list); - LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list); - HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); - update_freq_ctr(&agent->rt[tid].processing_per_sec, active_s); SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle); } + if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) { + SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++); appctx->st0 = SPOE_APPCTX_ST_IDLE; - agent->rt[tid].applets_idle++; + eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node); } return 1; @@ -1847,7 +1853,8 @@ spoe_handle_appctx(struct appctx *appctx) appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; goto switchstate; } - agent->rt[tid].applets_idle--; + SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--); + eb32_delete(&SPOE_APPCTX(appctx)->node); appctx->st0 = SPOE_APPCTX_ST_PROCESSING; /* fall through */ @@ -1955,7 +1962,7 @@ spoe_create_appctx(struct spoe_config *conf) HA_SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock); LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list); HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock); - conf->agent->rt[tid].applets_act++; + SPOE_DEBUG_STMT(conf->agent->rt[tid].applets_act++); task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT); task_wakeup(strm->task, TASK_WOKEN_INIT); @@ -1983,7 +1990,7 @@ spoe_queue_context(struct spoe_context *ctx) struct spoe_appctx *spoe_appctx; /* Check if we need to create a new SPOE applet or not. */ - if (agent->rt[tid].applets_idle && + if (!eb_is_empty(&agent->rt[tid].idle_applets) && agent->rt[tid].processing < read_freq_ctr(&agent->rt[tid].processing_per_sec)) goto end; @@ -2048,17 +2055,17 @@ spoe_queue_context(struct spoe_context *ctx) ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle, agent->rt[tid].processing); - /* Finally try to wakeup the first IDLE applet found and move it at the - * end of the list. */ - list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) { - appctx = spoe_appctx->owner; - if (appctx->st0 == SPOE_APPCTX_ST_IDLE) { - spoe_wakeup_appctx(appctx); - HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); - LIST_DEL(&spoe_appctx->list); - LIST_ADDQ(&agent->rt[tid].applets, &spoe_appctx->list); - HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock); - break; + /* Finally try to wakeup an IDLE applet. */ + if (!eb_is_empty(&agent->rt[tid].idle_applets)) { + struct eb32_node *node; + + node = eb32_first(&agent->rt[tid].idle_applets); + spoe_appctx = eb32_entry(node, struct spoe_appctx, node); + if (node && spoe_appctx) { + eb32_delete(&spoe_appctx->node); + spoe_appctx->node.key++; + eb32_insert(&agent->rt[tid].idle_applets, &spoe_appctx->node); + spoe_wakeup_appctx(spoe_appctx->owner); } } return 1; @@ -3169,7 +3176,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) curagent->cps_max = 0; curagent->eps_max = 0; curagent->max_frame_size = MAX_FRAME_SIZE; - curagent->max_fpa = 100; + curagent->max_fpa = 20; for (i = 0; i < SPOE_EV_EVENTS; ++i) LIST_INIT(&curagent->events[i]); @@ -3183,8 +3190,8 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) } for (i = 0; i < global.nbthread; ++i) { curagent->rt[i].frame_size = curagent->max_frame_size; - curagent->rt[i].applets_act = 0; - curagent->rt[i].applets_idle = 0; + SPOE_DEBUG_STMT(curagent->rt[i].applets_act = 0); + SPOE_DEBUG_STMT(curagent->rt[i].applets_idle = 0); curagent->rt[i].processing = 0; LIST_INIT(&curagent->rt[i].applets); LIST_INIT(&curagent->rt[i].sending_queue);