diff --git a/include/haproxy/task-t.h b/include/haproxy/task-t.h index 34d9a4565..c46cdcc65 100644 --- a/include/haproxy/task-t.h +++ b/include/haproxy/task-t.h @@ -40,6 +40,7 @@ #define TASK_SELF_WAKING 0x0010 /* task/tasklet found waking itself */ #define TASK_KILLED 0x0020 /* task/tasklet killed, may now be freed */ #define TASK_IN_LIST 0x0040 /* tasklet is in a tasklet list */ +#define TASK_HEAVY 0x0080 /* this task/tasklet is extremely heavy */ #define TASK_WOKEN_INIT 0x0100 /* woken up for initialisation purposes */ #define TASK_WOKEN_TIMER 0x0200 /* woken up because of expired timer */ diff --git a/src/task.c b/src/task.c index a9a6cc1a1..1cb8352f9 100644 --- a/src/task.c +++ b/src/task.c @@ -114,7 +114,7 @@ void __tasklet_wakeup_on(struct tasklet *tl, int thr) { if (likely(thr < 0)) { /* this tasklet runs on the caller thread */ - if (tl->state & TASK_SELF_WAKING) { + if (tl->state & (TASK_SELF_WAKING|TASK_HEAVY)) { LIST_ADDQ(&sched->tasklets[TL_BULK], &tl->list); sched->tl_class_mask |= 1 << TL_BULK; } @@ -437,6 +437,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) unsigned int done = 0; unsigned int queue; unsigned short state; + char heavy_calls = 0; void *ctx; for (queue = 0; queue < TL_CLASSES;) { @@ -483,7 +484,20 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) budgets[queue]--; t = (struct task *)LIST_ELEM(tl_queues[queue].n, struct tasklet *, list); - state = t->state & (TASK_SHARED_WQ|TASK_SELF_WAKING|TASK_KILLED); + state = t->state & (TASK_SHARED_WQ|TASK_SELF_WAKING|TASK_HEAVY|TASK_KILLED); + + if (state & TASK_HEAVY) { + /* This is a heavy task. We'll call no more than one + * per function call. If we called one already, we'll + * return and announce the max possible weight so that + * the caller doesn't come back too soon. + */ + if (heavy_calls) { + done = INT_MAX; // 11ms instead of 3 without this + break; // too many heavy tasks processed already + } + heavy_calls = 1; + } ti->flags &= ~TI_FL_STUCK; // this thread is still running activity[tid].ctxsw++;