mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2026-05-04 12:41:00 +02:00
MEDIUM: tasks: implement 3 different tasklet classes with their own queues
We used to mix high latency tasks and low latency tasklets in the same list, and to even refill bulk tasklets there, causing some unfairness in certain situations (e.g. poll-less transfers between many connections saturating the machine with similarly-sized in and out network interfaces). This patch changes the mechanism to split the load into 3 lists depending on the task/tasklet's desired classes : - URGENT: this is mainly for tasklets used as deferred callbacks - NORMAL: this is for regular tasks - BULK: this is for bulk tasks/tasklets Arbitrary ratios of max_processed are picked from each of these lists in turn, with the ability to complete in one list from what was not picked in the previous one. After some quick tests, the following setup gave apparently good results both for raw TCP with splicing and for H2-to-H1 request rate: - 0 to 75% for urgent - 12 to 50% for normal - 12 to what remains for bulk Bulk is not used yet.
This commit is contained in:
parent
4ffa0b526a
commit
a62917b890
@ -245,7 +245,7 @@ static inline void tasklet_wakeup(struct tasklet *tl)
|
||||
if (likely(tl->tid < 0)) {
|
||||
/* this tasklet runs on the caller thread */
|
||||
if (LIST_ISEMPTY(&tl->list)) {
|
||||
LIST_ADDQ(&task_per_thread[tid].task_list, &tl->list);
|
||||
LIST_ADDQ(&task_per_thread[tid].tasklets[TL_URGENT], &tl->list);
|
||||
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
||||
}
|
||||
} else {
|
||||
@ -264,10 +264,10 @@ static inline void tasklet_wakeup(struct tasklet *tl)
|
||||
/* Insert a tasklet into the tasklet list. If used with a plain task instead,
|
||||
* the caller must update the task_list_size.
|
||||
*/
|
||||
static inline void tasklet_insert_into_tasklet_list(struct tasklet *tl)
|
||||
static inline void tasklet_insert_into_tasklet_list(struct list *list, struct tasklet *tl)
|
||||
{
|
||||
_HA_ATOMIC_ADD(&tasks_run_queue, 1);
|
||||
LIST_ADDQ(&sched->task_list, &tl->list);
|
||||
LIST_ADDQ(list, &tl->list);
|
||||
}
|
||||
|
||||
/* Remove the tasklet from the tasklet list. The tasklet MUST already be there.
|
||||
@ -581,7 +581,10 @@ static inline int thread_has_tasks(void)
|
||||
{
|
||||
return (!!(global_tasks_mask & tid_bit) |
|
||||
(sched->rqueue_size > 0) |
|
||||
!LIST_ISEMPTY(&sched->task_list) | !MT_LIST_ISEMPTY(&sched->shared_tasklet_list));
|
||||
!LIST_ISEMPTY(&sched->tasklets[TL_URGENT]) |
|
||||
!LIST_ISEMPTY(&sched->tasklets[TL_NORMAL]) |
|
||||
!LIST_ISEMPTY(&sched->tasklets[TL_BULK]) |
|
||||
!MT_LIST_ISEMPTY(&sched->shared_tasklet_list));
|
||||
}
|
||||
|
||||
/* adds list item <item> to work list <work> and wake up the associated task */
|
||||
|
||||
@ -50,6 +50,13 @@
|
||||
TASK_WOKEN_IO|TASK_WOKEN_SIGNAL|TASK_WOKEN_MSG| \
|
||||
TASK_WOKEN_RES)
|
||||
|
||||
enum {
|
||||
TL_URGENT = 0, /* urgent tasklets (I/O callbacks) */
|
||||
TL_NORMAL = 1, /* normal tasks */
|
||||
TL_BULK = 2, /* bulk task/tasklets, streaming I/Os */
|
||||
TL_CLASSES /* must be last */
|
||||
};
|
||||
|
||||
struct notification {
|
||||
struct list purge_me; /* Part of the list of signals to be purged in the
|
||||
case of the LUA execution stack crash. */
|
||||
@ -63,9 +70,9 @@ struct notification {
|
||||
struct task_per_thread {
|
||||
struct eb_root timers; /* tree constituting the per-thread wait queue */
|
||||
struct eb_root rqueue; /* tree constituting the per-thread run queue */
|
||||
struct list task_list; /* List of tasks to be run, mixing tasks and tasklets */
|
||||
struct mt_list shared_tasklet_list; /* Tasklet to be run, woken up by other threads */
|
||||
int task_list_size; /* Number of tasks in the task_list */
|
||||
struct list tasklets[TL_CLASSES]; /* tasklets (and/or tasks) to run, by class */
|
||||
int task_list_size; /* Number of tasks among the tasklets */
|
||||
int rqueue_size; /* Number of elements in the per-thread run queue */
|
||||
struct task *current; /* current task (not tasklet) */
|
||||
__attribute__((aligned(64))) char end[0];
|
||||
|
||||
@ -63,8 +63,10 @@ void ha_thread_dump(struct buffer *buf, int thr, int calling_tid)
|
||||
!!(global_tasks_mask & thr_bit),
|
||||
!eb_is_empty(&task_per_thread[thr].timers),
|
||||
!eb_is_empty(&task_per_thread[thr].rqueue),
|
||||
!(LIST_ISEMPTY(&task_per_thread[thr].task_list) |
|
||||
MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
|
||||
!(LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_URGENT]) &&
|
||||
LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_NORMAL]) &&
|
||||
LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_BULK]) &&
|
||||
MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
|
||||
task_per_thread[thr].task_list_size,
|
||||
task_per_thread[thr].rqueue_size,
|
||||
stuck,
|
||||
|
||||
27
src/task.c
27
src/task.c
@ -427,7 +427,7 @@ void process_runnable_tasks()
|
||||
*/
|
||||
tmp_list = MT_LIST_BEHEAD(&sched->shared_tasklet_list);
|
||||
if (tmp_list)
|
||||
LIST_SPLICE_END_DETACHED(&sched->task_list, (struct list *)tmp_list);
|
||||
LIST_SPLICE_END_DETACHED(&sched->tasklets[TL_URGENT], (struct list *)tmp_list);
|
||||
|
||||
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
|
||||
nb_tasks_cur = nb_tasks;
|
||||
@ -436,9 +436,15 @@ void process_runnable_tasks()
|
||||
if (likely(niced_tasks))
|
||||
max_processed = (max_processed + 3) / 4;
|
||||
|
||||
/* run up to 3*max_processed/4 urgent tasklets */
|
||||
done = run_tasks_from_list(&tt->tasklets[TL_URGENT], 3*(max_processed + 1) / 4);
|
||||
max_processed -= done;
|
||||
|
||||
/* pick up to (max_processed-done+1)/2 regular tasks from prio-ordered run queues */
|
||||
|
||||
/* Note: the grq lock is always held when grq is not null */
|
||||
|
||||
while (tt->task_list_size < max_processed) {
|
||||
while (tt->task_list_size < (max_processed + 1) / 2) {
|
||||
if ((global_tasks_mask & tid_bit) && !grq) {
|
||||
#ifdef USE_THREAD
|
||||
HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
@ -489,7 +495,7 @@ void process_runnable_tasks()
|
||||
/* Make sure the entry doesn't appear to be in a list */
|
||||
LIST_INIT(&((struct tasklet *)t)->list);
|
||||
/* And add it to the local task list */
|
||||
tasklet_insert_into_tasklet_list((struct tasklet *)t);
|
||||
tasklet_insert_into_tasklet_list(&tt->tasklets[TL_NORMAL], (struct tasklet *)t);
|
||||
tt->task_list_size++;
|
||||
activity[tid].tasksw++;
|
||||
}
|
||||
@ -500,10 +506,17 @@ void process_runnable_tasks()
|
||||
grq = NULL;
|
||||
}
|
||||
|
||||
done = run_tasks_from_list(&tt->task_list, max_processed);
|
||||
/* run between max_processed/8 and max_processed/2 regular tasks */
|
||||
done = run_tasks_from_list(&tt->tasklets[TL_NORMAL], (max_processed + 1) / 2);
|
||||
max_processed -= done;
|
||||
|
||||
if (!LIST_ISEMPTY(&tt->task_list))
|
||||
/* run between max_processed/8 and max_processed bulk tasklets */
|
||||
done = run_tasks_from_list(&tt->tasklets[TL_BULK], max_processed);
|
||||
max_processed -= done;
|
||||
|
||||
if (!LIST_ISEMPTY(&sched->tasklets[TL_URGENT]) |
|
||||
!LIST_ISEMPTY(&sched->tasklets[TL_NORMAL]) |
|
||||
!LIST_ISEMPTY(&sched->tasklets[TL_BULK]))
|
||||
activity[tid].long_rq++;
|
||||
}
|
||||
|
||||
@ -607,7 +620,9 @@ static void init_task()
|
||||
#endif
|
||||
memset(&task_per_thread, 0, sizeof(task_per_thread));
|
||||
for (i = 0; i < MAX_THREADS; i++) {
|
||||
LIST_INIT(&task_per_thread[i].task_list);
|
||||
LIST_INIT(&task_per_thread[i].tasklets[TL_URGENT]);
|
||||
LIST_INIT(&task_per_thread[i].tasklets[TL_NORMAL]);
|
||||
LIST_INIT(&task_per_thread[i].tasklets[TL_BULK]);
|
||||
MT_LIST_INIT(&task_per_thread[i].shared_tasklet_list);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user