diff --git a/include/proto/task.h b/include/proto/task.h index 59ac38258..c8004143b 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -89,11 +89,14 @@ extern unsigned int tasks_run_queue_cur; extern unsigned int nb_tasks_cur; extern unsigned int niced_tasks; /* number of niced tasks in the run queue */ extern struct pool_head *pool_head_task; +extern struct pool_head *pool_head_tasklet; extern struct pool_head *pool_head_notification; extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */ extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */ extern struct eb_root rqueue; /* tree constituting the run queue */ extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */ +extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */ +extern int task_list_size[MAX_THREADS]; /* Number of task sin the task_list */ __decl_hathreads(extern HA_SPINLOCK_T rq_lock); /* spin lock related to run queue */ __decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait queue */ @@ -101,7 +104,10 @@ __decl_hathreads(extern HA_SPINLOCK_T wq_lock); /* spin lock related to wait qu /* return 0 if task is in run queue, otherwise non-zero */ static inline int task_in_rq(struct task *t) { - return t->rq.node.leaf_p != NULL; + /* Check if leaf_p is NULL, in case he's not in the runqueue, and if + * it's not 0x1, which would mean it's in the tasklet list. + */ + return t->rq.node.leaf_p != NULL && t->rq.node.leaf_p != (void *)0x1; } /* return 0 if task is in wait queue, otherwise non-zero */ @@ -122,7 +128,7 @@ static inline void task_wakeup(struct task *t, unsigned int f) #ifdef USE_THREAD struct eb_root *root; - if (t->thread_mask == tid_bit && global.nbthread > 1) + if (t->thread_mask == tid_bit || global.nbthread == 1) root = &rqueue_local[tid]; else root = &rqueue; @@ -172,7 +178,6 @@ static inline struct task *task_unlink_wq(struct task *t) static inline struct task *__task_unlink_rq(struct task *t) { eb32sc_delete(&t->rq); - HA_ATOMIC_SUB(&tasks_run_queue, 1); if (likely(t->nice)) HA_ATOMIC_SUB(&niced_tasks, 1); return t; @@ -195,6 +200,41 @@ static inline struct task *task_unlink_rq(struct task *t) return t; } +static inline void tasklet_wakeup(struct tasklet *tl) +{ + LIST_ADDQ(&task_list[tid], &tl->list); + task_list_size[tid]++; + HA_ATOMIC_ADD(&tasks_run_queue, 1); + +} + +static inline void task_insert_into_tasklet_list(struct task *t) +{ + struct tasklet *tl; + void *expected = NULL; + + /* Protect ourself against anybody trying to insert the task into + * another runqueue. We set leaf_p to 0x1 to indicate that the node is + * not in a tree but that it's in the tasklet list. See task_in_rq(). + */ + if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, 0x1))) + return; + task_list_size[tid]++; + tl = (struct tasklet *)t; + LIST_ADDQ(&task_list[tid], &tl->list); +} + +static inline void task_remove_from_task_list(struct task *t) +{ + LIST_DEL(&((struct tasklet *)t)->list); + task_list_size[tid]--; + HA_ATOMIC_SUB(&tasks_run_queue, 1); + if (!TASK_IS_TASKLET(t)) { + t->rq.node.leaf_p = NULL; // was 0x1 + __ha_barrier_store(); + } +} + /* * Unlinks the task and adjusts run queue stats. * A pointer to the task itself is returned. @@ -223,6 +263,24 @@ static inline struct task *task_init(struct task *t, unsigned long thread_mask) return t; } +static inline void tasklet_init(struct tasklet *t) +{ + t->nice = -32768; + t->calls = 0; + t->state = 0; + t->list.p = t->list.n = NULL; +} + +static inline struct tasklet *tasklet_new(void) +{ + struct tasklet *t = pool_alloc(pool_head_tasklet); + + if (t) { + tasklet_init(t); + } + return t; +} + /* * Allocate and initialise a new task. The new task is returned, or NULL in * case of lack of memory. The task count is incremented. Tasks should only @@ -262,6 +320,13 @@ static inline void task_free(struct task *t) } +static inline void tasklet_free(struct tasklet *tl) +{ + pool_free(pool_head_tasklet, tl); + if (unlikely(stopping)) + pool_flush(pool_head_tasklet); +} + /* Place into the wait queue, where it may already be. If the expiration * timer is infinite, do nothing and rely on wake_expired_task to clean up. */ diff --git a/include/types/task.h b/include/types/task.h index cff235155..be7b6f3ab 100644 --- a/include/types/task.h +++ b/include/types/task.h @@ -60,20 +60,35 @@ struct notification { __decl_hathreads(HA_SPINLOCK_T lock); }; +/* This part is common between struct task and struct tasklet so that tasks + * can be used as-is as tasklets. + */ +#define TASK_COMMON \ + struct { \ + unsigned short state; /* task state : bitfield of TASK_ */ \ + short nice; /* task prio from -1024 to +1024, or -32768 for tasklets */ \ + unsigned int calls; /* number of times process was called */ \ + struct task *(*process)(struct task *t, void *ctx, unsigned short state); /* the function which processes the task */ \ + void *context; /* the task's context */ \ + } + /* The base for all tasks */ struct task { + TASK_COMMON; /* must be at the beginning! */ struct eb32sc_node rq; /* ebtree node used to hold the task in the run queue */ - unsigned short state; /* task state : bit field of TASK_* */ - unsigned short pending_state; /* pending states for running talk */ - short nice; /* the task's current nice value from -1024 to +1024 */ - unsigned int calls; /* number of times ->process() was called */ - struct task * (*process)(struct task *t, void *ctx, unsigned short state); /* the function which processes the task */ - void *context; /* the task's context */ struct eb32_node wq; /* ebtree node used to hold the task in the wait queue */ int expire; /* next expiration date for this task, in ticks */ unsigned long thread_mask; /* mask of thread IDs authorized to process the task */ }; +/* lightweight tasks, without priority, mainly used for I/Os */ +struct tasklet { + TASK_COMMON; /* must be at the beginning! */ + struct list list; +}; + +#define TASK_IS_TASKLET(t) ((t)->nice == -32768) + /* * The task callback (->process) is responsible for updating ->expire. It must * return a pointer to the task itself, except if the task has been deleted, in diff --git a/src/task.c b/src/task.c index 876b837e8..3032010ba 100644 --- a/src/task.c +++ b/src/task.c @@ -25,6 +25,7 @@ #include struct pool_head *pool_head_task; +struct pool_head *pool_head_tasklet; /* This is the memory pool containing all the signal structs. These * struct are used to store each requiered signal between two tasks. @@ -41,6 +42,9 @@ unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */ THREAD_LOCAL struct task *curr_task = NULL; /* task currently running or NULL */ THREAD_LOCAL struct eb32sc_node *rq_next = NULL; /* Next task to be potentially run */ +struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */ +int task_list_size[MAX_THREADS]; /* Number of tasks in the task_list */ + __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */ __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */ @@ -240,20 +244,32 @@ void process_runnable_tasks() tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */ nb_tasks_cur = nb_tasks; max_processed = 200; - if (unlikely(global.nbthread <= 1)) { - /* when no lock is needed, this loop is much faster */ + + if (likely(global.nbthread > 1)) { + HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); if (!(active_tasks_mask & tid_bit)) { + HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); activity[tid].empty_rq++; return; } - active_tasks_mask &= ~tid_bit; - rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); - while (1) { - if (!rq_next) { - /* we might have reached the end of the tree, typically because - * is in the first half and we're first scanning - * the last half. Let's loop back to the beginning of the tree now. + average = tasks_run_queue / global.nbthread; + + /* Get some elements from the global run queue and put it in the + * local run queue. To try to keep a bit of fairness, just get as + * much elements from the global list as to have a bigger local queue + * than the average. + */ + while (rqueue_size[tid] <= average) { + + /* we have to restart looking up after every batch */ + rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); + if (unlikely(!rq_next)) { + /* either we just started or we reached the end + * of the tree, typically because + * is in the first half and we're first scanning + * the last half. Let's loop back to the beginning + * of the tree now. */ rq_next = eb32sc_first(&rqueue, tid_bit); if (!rq_next) @@ -266,90 +282,22 @@ void process_runnable_tasks() /* detach the task from the queue */ __task_unlink_rq(t); - t->state |= TASK_RUNNING; - - t->calls++; - curr_task = t; - /* This is an optimisation to help the processor's branch - * predictor take this most common call. - */ - if (likely(t->process == process_stream)) - t = process_stream(t, t->context, t->state); - else { - if (t->process != NULL) - t = t->process(t, t->context, t->state); - else { - __task_free(t); - t = NULL; - } - } - curr_task = NULL; - - if (likely(t != NULL)) { - t->state &= ~TASK_RUNNING; - /* If there is a pending state - * we have to wake up the task - * immediatly, else we defer - * it into wait queue - */ - if (t->state) - __task_wakeup(t, &rqueue); - else - task_queue(t); - } - - max_processed--; - if (max_processed <= 0) { - active_tasks_mask |= tid_bit; - activity[tid].long_rq++; - break; - } + __task_wakeup(t, &rqueue_local[tid]); } - return; - } - HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock); - if (!(active_tasks_mask & tid_bit)) { HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); - activity[tid].empty_rq++; - return; - } - - average = tasks_run_queue / global.nbthread; - - /* Get some elements from the global run queue and put it in the - * local run queue. To try to keep a bit of fairness, just get as - * much elements from the global list as to have a bigger local queue - * than the average. - */ - while (rqueue_size[tid] <= average) { - - /* we have to restart looking up after every batch */ - rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit); - if (unlikely(!rq_next)) { - /* either we just started or we reached the end - * of the tree, typically because - * is in the first half and we're first scanning - * the last half. Let's loop back to the beginning - * of the tree now. - */ - rq_next = eb32sc_first(&rqueue, tid_bit); - if (!rq_next) - break; + } else { + if (!(active_tasks_mask & tid_bit)) { + activity[tid].empty_rq++; + return; } - - t = eb32sc_entry(rq_next, struct task, rq); - rq_next = eb32sc_next(rq_next, tid_bit); - - /* detach the task from the queue */ - __task_unlink_rq(t); - __task_wakeup(t, &rqueue_local[tid]); } - - HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock); active_tasks_mask &= ~tid_bit; - while (1) { - unsigned short state; + /* Get some tasks from the run queue, make sure we don't + * get too much in the task list, but put a bit more than + * the max that will be run, to give a bit more fairness + */ + while (max_processed + 20 > task_list_size[tid]) { /* Note: this loop is one of the fastest code path in * the whole program. It should not be re-arranged * without a good reason. @@ -370,18 +318,42 @@ void process_runnable_tasks() } t = eb32sc_entry(rq_next, struct task, rq); rq_next = eb32sc_next(rq_next, tid_bit); + /* Make sure nobody re-adds the task in the runqueue */ + HA_ATOMIC_OR(&t->state, TASK_RUNNING); - state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING); /* detach the task from the queue */ __task_unlink_rq(t); - t->calls++; - max_processed--; + /* And add it to the local task list */ + task_insert_into_tasklet_list(t); + } + while (max_processed > 0 && !LIST_ISEMPTY(&task_list[tid])) { + struct task *t; + unsigned short state; + void *ctx; + struct task *(*process)(struct task *t, void *ctx, unsigned short state); + + t = (struct task *)LIST_ELEM(task_list[tid].n, struct tasklet *, list); + state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING); + __ha_barrier_store(); + task_remove_from_task_list(t); + + ctx = t->context; + process = t->process; rqueue_size[tid]--; - curr_task = t; - if (likely(t->process == process_stream)) - t = process_stream(t, t->context, state); - else - t = t->process(t, t->context, state); + t->calls++; + curr_task = (struct task *)t; + if (TASK_IS_TASKLET(t)) + t = NULL; + if (likely(process == process_stream)) + t = process_stream(t, ctx, state); + else { + if (t->process != NULL) + t = process(t, ctx, state); + else { + __task_free(t); + t = NULL; + } + } curr_task = NULL; /* If there is a pending state we have to wake up the task * immediatly, else we defer it into wait queue @@ -412,11 +384,17 @@ int init_task() memset(&rqueue, 0, sizeof(rqueue)); HA_SPIN_INIT(&wq_lock); HA_SPIN_INIT(&rq_lock); - for (i = 0; i < MAX_THREADS; i++) + for (i = 0; i < MAX_THREADS; i++) { memset(&rqueue_local[i], 0, sizeof(rqueue_local[i])); + LIST_INIT(&task_list[i]); + task_list_size[i] = 0; + } pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED); if (!pool_head_task) return 0; + pool_head_tasklet = create_pool("tasklet", sizeof(struct tasklet), MEM_F_SHARED); + if (!pool_head_tasklet) + return 0; pool_head_notification = create_pool("notification", sizeof(struct notification), MEM_F_SHARED); if (!pool_head_notification) return 0;