mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-22 14:21:25 +02:00
MINOR: tasks: move the list walking code to its own function
New function run_tasks_from_list() will run over a tasklet list and will run all the tasks and tasklets it finds there within a limit of <max> that is passed in arggument. This is a preliminary work for scheduler QoS improvements.
This commit is contained in:
parent
876b411f2b
commit
4ffa0b526a
153
src/task.c
153
src/task.c
@ -315,6 +315,85 @@ int next_timer_expiry()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Walks over tasklet list <list> and run at most <max> of them. Returns
|
||||||
|
* the number of entries effectively processed (tasks and tasklets merged).
|
||||||
|
* The count of tasks in the list for the current thread is adjusted.
|
||||||
|
*/
|
||||||
|
static int run_tasks_from_list(struct list *list, int max)
|
||||||
|
{
|
||||||
|
struct task *(*process)(struct task *t, void *ctx, unsigned short state);
|
||||||
|
struct task *t;
|
||||||
|
unsigned short state;
|
||||||
|
void *ctx;
|
||||||
|
int done = 0;
|
||||||
|
|
||||||
|
while (done < max && !LIST_ISEMPTY(list)) {
|
||||||
|
t = (struct task *)LIST_ELEM(list->n, struct tasklet *, list);
|
||||||
|
state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING;
|
||||||
|
state = _HA_ATOMIC_XCHG(&t->state, state);
|
||||||
|
__ha_barrier_atomic_store();
|
||||||
|
__tasklet_remove_from_tasklet_list((struct tasklet *)t);
|
||||||
|
|
||||||
|
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
||||||
|
activity[tid].ctxsw++;
|
||||||
|
ctx = t->context;
|
||||||
|
process = t->process;
|
||||||
|
t->calls++;
|
||||||
|
|
||||||
|
if (TASK_IS_TASKLET(t)) {
|
||||||
|
process(NULL, ctx, state);
|
||||||
|
done++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* OK then this is a regular task */
|
||||||
|
|
||||||
|
task_per_thread[tid].task_list_size--;
|
||||||
|
if (unlikely(t->call_date)) {
|
||||||
|
uint64_t now_ns = now_mono_time();
|
||||||
|
|
||||||
|
t->lat_time += now_ns - t->call_date;
|
||||||
|
t->call_date = now_ns;
|
||||||
|
}
|
||||||
|
|
||||||
|
sched->current = t;
|
||||||
|
__ha_barrier_store();
|
||||||
|
if (likely(process == process_stream))
|
||||||
|
t = process_stream(t, ctx, state);
|
||||||
|
else if (process != NULL)
|
||||||
|
t = process(t, ctx, state);
|
||||||
|
else {
|
||||||
|
__task_free(t);
|
||||||
|
sched->current = NULL;
|
||||||
|
__ha_barrier_store();
|
||||||
|
/* We don't want max_processed to be decremented if
|
||||||
|
* we're just freeing a destroyed task, we should only
|
||||||
|
* do so if we really ran a task.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
sched->current = NULL;
|
||||||
|
__ha_barrier_store();
|
||||||
|
/* If there is a pending state we have to wake up the task
|
||||||
|
* immediately, else we defer it into wait queue
|
||||||
|
*/
|
||||||
|
if (t != NULL) {
|
||||||
|
if (unlikely(t->call_date)) {
|
||||||
|
t->cpu_time += now_mono_time() - t->call_date;
|
||||||
|
t->call_date = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
state = _HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
|
||||||
|
if (state & TASK_WOKEN_ANY)
|
||||||
|
task_wakeup(t, 0);
|
||||||
|
else
|
||||||
|
task_queue(t);
|
||||||
|
}
|
||||||
|
done++;
|
||||||
|
}
|
||||||
|
return done;
|
||||||
|
}
|
||||||
|
|
||||||
/* The run queue is chronologically sorted in a tree. An insertion counter is
|
/* The run queue is chronologically sorted in a tree. An insertion counter is
|
||||||
* used to assign a position to each task. This counter may be combined with
|
* used to assign a position to each task. This counter may be combined with
|
||||||
* other variables (eg: nice value) to set the final position in the tree. The
|
* other variables (eg: nice value) to set the final position in the tree. The
|
||||||
@ -334,7 +413,7 @@ void process_runnable_tasks()
|
|||||||
struct eb32sc_node *lrq = NULL; // next local run queue entry
|
struct eb32sc_node *lrq = NULL; // next local run queue entry
|
||||||
struct eb32sc_node *grq = NULL; // next global run queue entry
|
struct eb32sc_node *grq = NULL; // next global run queue entry
|
||||||
struct task *t;
|
struct task *t;
|
||||||
int max_processed;
|
int max_processed, done;
|
||||||
struct mt_list *tmp_list;
|
struct mt_list *tmp_list;
|
||||||
|
|
||||||
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
||||||
@ -421,76 +500,8 @@ void process_runnable_tasks()
|
|||||||
grq = NULL;
|
grq = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (max_processed > 0 && !LIST_ISEMPTY(&tt->task_list)) {
|
done = run_tasks_from_list(&tt->task_list, max_processed);
|
||||||
struct task *t;
|
max_processed -= done;
|
||||||
unsigned short state;
|
|
||||||
void *ctx;
|
|
||||||
struct task *(*process)(struct task *t, void *ctx, unsigned short state);
|
|
||||||
|
|
||||||
t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list);
|
|
||||||
state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING;
|
|
||||||
state = _HA_ATOMIC_XCHG(&t->state, state);
|
|
||||||
__ha_barrier_atomic_store();
|
|
||||||
__tasklet_remove_from_tasklet_list((struct tasklet *)t);
|
|
||||||
|
|
||||||
ti->flags &= ~TI_FL_STUCK; // this thread is still running
|
|
||||||
activity[tid].ctxsw++;
|
|
||||||
ctx = t->context;
|
|
||||||
process = t->process;
|
|
||||||
t->calls++;
|
|
||||||
|
|
||||||
if (TASK_IS_TASKLET(t)) {
|
|
||||||
process(NULL, ctx, state);
|
|
||||||
max_processed--;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* OK then this is a regular task */
|
|
||||||
|
|
||||||
tt->task_list_size--;
|
|
||||||
if (unlikely(t->call_date)) {
|
|
||||||
uint64_t now_ns = now_mono_time();
|
|
||||||
|
|
||||||
t->lat_time += now_ns - t->call_date;
|
|
||||||
t->call_date = now_ns;
|
|
||||||
}
|
|
||||||
|
|
||||||
sched->current = t;
|
|
||||||
__ha_barrier_store();
|
|
||||||
if (likely(process == process_stream))
|
|
||||||
t = process_stream(t, ctx, state);
|
|
||||||
else if (process != NULL)
|
|
||||||
t = process(t, ctx, state);
|
|
||||||
else {
|
|
||||||
__task_free(t);
|
|
||||||
sched->current = NULL;
|
|
||||||
__ha_barrier_store();
|
|
||||||
/* We don't want max_processed to be decremented if
|
|
||||||
* we're just freeing a destroyed task, we should only
|
|
||||||
* do so if we really ran a task.
|
|
||||||
*/
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
sched->current = NULL;
|
|
||||||
__ha_barrier_store();
|
|
||||||
/* If there is a pending state we have to wake up the task
|
|
||||||
* immediately, else we defer it into wait queue
|
|
||||||
*/
|
|
||||||
if (t != NULL) {
|
|
||||||
if (unlikely(t->call_date)) {
|
|
||||||
t->cpu_time += now_mono_time() - t->call_date;
|
|
||||||
t->call_date = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
state = _HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
|
|
||||||
if (state & TASK_WOKEN_ANY)
|
|
||||||
task_wakeup(t, 0);
|
|
||||||
else
|
|
||||||
task_queue(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
max_processed--;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!LIST_ISEMPTY(&tt->task_list))
|
if (!LIST_ISEMPTY(&tt->task_list))
|
||||||
activity[tid].long_rq++;
|
activity[tid].long_rq++;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user