diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 9973123f5..853452f53 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1309,7 +1309,7 @@ spoe_release_appctx(struct appctx *appctx) /* Destroy the task attached to this applet */ task_destroy(spoe_appctx->task); - /* Notify all waiting streams */ + /* Report an error to all streams in the appctx waiting queue */ list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) { LIST_DELETE(&ctx->list); LIST_INIT(&ctx->list); @@ -1321,8 +1321,8 @@ spoe_release_appctx(struct appctx *appctx) task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); } - /* If the applet was processing a fragmented frame, notify the - * corresponding stream. */ + /* If the applet was processing a fragmented frame, report an error to + * the corresponding stream. */ if (spoe_appctx->frag_ctx.ctx) { ctx = spoe_appctx->frag_ctx.ctx; ctx->spoe_appctx = NULL; @@ -1331,7 +1331,11 @@ spoe_release_appctx(struct appctx *appctx) task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); } - if (!LIST_ISEMPTY(&agent->rt[tid].waiting_queue)) { + if (!LIST_ISEMPTY(&agent->rt[tid].applets)) { + /* If there are still some running applets, remove reference on + * the current one from streams in the async waiting queue. In + * async mode, the ACK may be received from another appctx. + */ list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) { if (ctx->spoe_appctx == spoe_appctx) ctx->spoe_appctx = NULL; @@ -1339,16 +1343,25 @@ spoe_release_appctx(struct appctx *appctx) goto end; } else { - /* It is the last running applet and the sending and waiting - * queues are not empty. Try to start a new one if HAproxy is - * not stopping. + /* It is the last running applet and the sending and async + * waiting queues are not empty. So try to start a new applet if + * HAproxy is not stopping. On success, we remove reference on + * the current appctx from streams in the async waiting queue. + * In async mode, the ACK may be received from another appctx. */ if (!stopping && (!LIST_ISEMPTY(&agent->rt[tid].sending_queue) || !LIST_ISEMPTY(&agent->rt[tid].waiting_queue)) && - spoe_create_appctx(agent->spoe_conf)) + spoe_create_appctx(agent->spoe_conf)) { + list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) { + if (ctx->spoe_appctx == spoe_appctx) + ctx->spoe_appctx = NULL; + } goto end; + } - /* otherwise, notify all waiting streams */ + /* Otherwise, report an error to all streams in the sending and + * async waiting queues. + */ list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) { LIST_DELETE(&ctx->list); LIST_INIT(&ctx->list);