diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h index ebd26f0c6..9b4f5744b 100644 --- a/include/haproxy/applet.h +++ b/include/haproxy/applet.h @@ -38,6 +38,7 @@ extern unsigned int nb_applets; extern struct pool_head *pool_head_appctx; struct task *task_run_applet(struct task *t, void *context, unsigned int state); +struct task *task_process_applet(struct task *t, void *context, unsigned int state); int appctx_buf_available(void *arg); void *applet_reserve_svcctx(struct appctx *appctx, size_t size); void applet_reset_svcctx(struct appctx *appctx); diff --git a/src/applet.c b/src/applet.c index 89bea0238..81bb99964 100644 --- a/src/applet.c +++ b/src/applet.c @@ -238,7 +238,10 @@ struct appctx *appctx_new_on(struct applet *applet, struct sedesc *sedesc, int t } appctx->sedesc = sedesc; - appctx->t->process = task_run_applet; + if (applet->rcv_buf != NULL && applet->snd_buf != NULL) + appctx->t->process = task_process_applet; + else + appctx->t->process = task_run_applet; appctx->t->context = appctx; appctx->inbuf = BUF_NULL; @@ -663,3 +666,72 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) TRACE_LEAVE(APPLET_EV_PROCESS, app); return t; } + + +/* Default applet handler based on IN/OUT buffers. It is a true task here, no a tasklet */ +struct task *task_process_applet(struct task *t, void *context, unsigned int state) +{ + struct appctx *app = context; + struct stconn *sc; + unsigned int rate; + + TRACE_ENTER(APPLET_EV_PROCESS, app); + + if (app->state & APPLET_WANT_DIE) { + TRACE_DEVEL("APPCTX want die, release it", APPLET_EV_FREE, app); + __appctx_free(app); + return NULL; + } + + if (se_fl_test(app->sedesc, SE_FL_ORPHAN)) { + /* Finalize init of orphan appctx. .init callback function must + * be defined and it must finalize appctx startup. + */ + BUG_ON(!app->applet->init); + + if (appctx_init(app) == -1) { + TRACE_DEVEL("APPCTX init failed", APPLET_EV_FREE|APPLET_EV_ERR, app); + appctx_free_on_early_error(app); + return NULL; + } + BUG_ON(!app->sess || !appctx_sc(app) || !appctx_strm(app)); + TRACE_DEVEL("APPCTX initialized", APPLET_EV_PROCESS, app); + } + + sc = appctx_sc(app); + + sc_applet_sync_send(sc); + + /* We always pretend the applet can't get and doesn't want to + * put, it's up to it to change this if needed. This ensures + * that one applet which ignores any event will not spin. + */ + applet_need_more_data(app); + applet_have_no_more_data(app); + + app->applet->fct(app); + + TRACE_POINT(APPLET_EV_PROCESS, app); + + sc_applet_sync_recv(sc); + + /* TODO: May be move in appctx_rcv_buf or sc_applet_process ? */ + if (sc_waiting_room(sc) && (sc->flags & SC_FL_ABRT_DONE)) { + sc_ep_set(sc, SE_FL_EOS|SE_FL_ERROR); + } + + /* measure the call rate and check for anomalies when too high */ + if (((b_size(sc_ib(sc)) && sc->flags & SC_FL_NEED_BUFF) || // asks for a buffer which is present + (b_size(sc_ib(sc)) && !b_data(sc_ib(sc)) && sc->flags & SC_FL_NEED_ROOM) || // asks for room in an empty buffer + (b_data(sc_ob(sc)) && sc_is_send_allowed(sc)) || // asks for data already present + (!b_data(sc_ib(sc)) && b_data(sc_ob(sc)) && // didn't return anything ... + (!(sc_oc(sc)->flags & CF_WRITE_EVENT) && (sc->flags & SC_FL_SHUT_WANTED))))) { // ... and left data pending after a shut + rate = update_freq_ctr(&app->call_rate, 1); + if (rate >= 100000 && app->call_rate.prev_ctr) // looped like this more than 100k times over last second + stream_dump_and_crash(&app->obj_type, read_freq_ctr(&app->call_rate)); + } + + sc->app_ops->wake(sc); + TRACE_LEAVE(APPLET_EV_PROCESS, app); + return t; +}