mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-06 23:27:04 +02:00
MAJOR: threads/task: handle multithread on task scheduler
2 global locks have been added to protect, respectively, the run queue and the wait queue. And a process mask has been added on each task. Like for FDs, this mask is used to know which threads are allowed to process a task. For many tasks, all threads are granted. And this must be your first intension when you create a new task, else you have a good reason to make a task sticky on some threads. This is then the responsibility to the process callback to lock what have to be locked in the task context. Nevertheless, all tasks linked to a session must be sticky on the thread creating the session. It is important that I/O handlers processing session FDs and these tasks run on the same thread to avoid conflicts.
This commit is contained in:
parent
209d02a257
commit
c60def8368
@ -142,6 +142,8 @@ enum lock_label {
|
||||
FDCACHE_LOCK,
|
||||
FD_LOCK,
|
||||
POLL_LOCK,
|
||||
TASK_RQ_LOCK,
|
||||
TASK_WQ_LOCK,
|
||||
POOL_LOCK,
|
||||
LOCK_LABELS
|
||||
};
|
||||
@ -226,7 +228,7 @@ struct ha_rwlock {
|
||||
static inline void show_lock_stats()
|
||||
{
|
||||
const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
|
||||
"POOL" };
|
||||
"TASK_RQ", "TASK_WQ", "POOL" };
|
||||
int lbl;
|
||||
|
||||
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
|
||||
|
@ -30,6 +30,8 @@
|
||||
#include <common/mini-clist.h>
|
||||
#include <common/standard.h>
|
||||
#include <common/ticks.h>
|
||||
#include <common/hathreads.h>
|
||||
|
||||
#include <eb32tree.h>
|
||||
|
||||
#include <types/global.h>
|
||||
@ -86,6 +88,10 @@ extern unsigned int nb_tasks_cur;
|
||||
extern unsigned int niced_tasks; /* number of niced tasks in the run queue */
|
||||
extern struct pool_head *pool2_task;
|
||||
extern struct pool_head *pool2_notification;
|
||||
#ifdef USE_THREAD
|
||||
extern HA_SPINLOCK_T rq_lock; /* spin lock related to run queue */
|
||||
extern HA_SPINLOCK_T wq_lock; /* spin lock related to wait queue */
|
||||
#endif
|
||||
|
||||
/* return 0 if task is in run queue, otherwise non-zero */
|
||||
static inline int task_in_rq(struct task *t)
|
||||
@ -103,19 +109,29 @@ static inline int task_in_wq(struct task *t)
|
||||
struct task *__task_wakeup(struct task *t);
|
||||
static inline struct task *task_wakeup(struct task *t, unsigned int f)
|
||||
{
|
||||
SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
|
||||
/* If task is running, we postpone the call
|
||||
* and backup the state.
|
||||
*/
|
||||
if (unlikely(t->state & TASK_RUNNING)) {
|
||||
t->pending_state |= f;
|
||||
SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
return t;
|
||||
}
|
||||
if (likely(!task_in_rq(t)))
|
||||
__task_wakeup(t);
|
||||
t->state |= f;
|
||||
SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
static inline void task_set_affinity(struct task *t, unsigned long thread_mask)
|
||||
{
|
||||
|
||||
t->process_mask = thread_mask;
|
||||
}
|
||||
/*
|
||||
* Unlink the task from the wait queue, and possibly update the last_timer
|
||||
* pointer. A pointer to the task itself is returned. The task *must* already
|
||||
@ -130,8 +146,10 @@ static inline struct task *__task_unlink_wq(struct task *t)
|
||||
|
||||
static inline struct task *task_unlink_wq(struct task *t)
|
||||
{
|
||||
SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
if (likely(task_in_wq(t)))
|
||||
__task_unlink_wq(t);
|
||||
SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
return t;
|
||||
}
|
||||
|
||||
@ -156,9 +174,10 @@ static inline struct task *__task_unlink_rq(struct task *t)
|
||||
*/
|
||||
static inline struct task *task_unlink_rq(struct task *t)
|
||||
{
|
||||
if (likely(task_in_rq(t))) {
|
||||
SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
if (likely(task_in_rq(t)))
|
||||
__task_unlink_rq(t);
|
||||
}
|
||||
SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
return t;
|
||||
}
|
||||
|
||||
@ -178,11 +197,12 @@ static inline struct task *task_delete(struct task *t)
|
||||
* state). The task is returned. This function should not be used outside of
|
||||
* task_new().
|
||||
*/
|
||||
static inline struct task *task_init(struct task *t)
|
||||
static inline struct task *task_init(struct task *t, unsigned long thread_mask)
|
||||
{
|
||||
t->wq.node.leaf_p = NULL;
|
||||
t->rq.node.leaf_p = NULL;
|
||||
t->pending_state = t->state = TASK_SLEEPING;
|
||||
t->process_mask = thread_mask;
|
||||
t->nice = 0;
|
||||
t->calls = 0;
|
||||
t->expire = TICK_ETERNITY;
|
||||
@ -194,12 +214,12 @@ static inline struct task *task_init(struct task *t)
|
||||
* case of lack of memory. The task count is incremented. Tasks should only
|
||||
* be allocated this way, and must be freed using task_free().
|
||||
*/
|
||||
static inline struct task *task_new(void)
|
||||
static inline struct task *task_new(unsigned long thread_mask)
|
||||
{
|
||||
struct task *t = pool_alloc2(pool2_task);
|
||||
if (t) {
|
||||
nb_tasks++;
|
||||
task_init(t);
|
||||
HA_ATOMIC_ADD(&nb_tasks, 1);
|
||||
task_init(t, thread_mask);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
@ -213,7 +233,7 @@ static inline void task_free(struct task *t)
|
||||
pool_free2(pool2_task, t);
|
||||
if (unlikely(stopping))
|
||||
pool_flush2(pool2_task);
|
||||
nb_tasks--;
|
||||
HA_ATOMIC_SUB(&nb_tasks, 1);
|
||||
}
|
||||
|
||||
/* Place <task> into the wait queue, where it may already be. If the expiration
|
||||
@ -234,8 +254,10 @@ static inline void task_queue(struct task *task)
|
||||
if (!tick_isset(task->expire))
|
||||
return;
|
||||
|
||||
SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
|
||||
__task_queue(task);
|
||||
SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
}
|
||||
|
||||
/* Ensure <task> will be woken up at most at <when>. If the task is already in
|
||||
@ -244,15 +266,18 @@ static inline void task_queue(struct task *task)
|
||||
*/
|
||||
static inline void task_schedule(struct task *task, int when)
|
||||
{
|
||||
/* TODO: mthread, check if there is no tisk with this test */
|
||||
if (task_in_rq(task))
|
||||
return;
|
||||
|
||||
SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
if (task_in_wq(task))
|
||||
when = tick_first(when, task->expire);
|
||||
|
||||
task->expire = when;
|
||||
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
|
||||
__task_queue(task);
|
||||
SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
}
|
||||
|
||||
/* This function register a new signal. "lua" is the current lua
|
||||
|
@ -69,6 +69,7 @@ struct task {
|
||||
void *context; /* the task's context */
|
||||
struct eb32_node wq; /* ebtree node used to hold the task in the wait queue */
|
||||
int expire; /* next expiration date for this task, in ticks */
|
||||
unsigned long process_mask; /* mask of thread IDs authorized to process the task */
|
||||
};
|
||||
|
||||
/*
|
||||
|
@ -42,6 +42,7 @@
|
||||
#include <common/time.h>
|
||||
#include <common/uri_auth.h>
|
||||
#include <common/namespace.h>
|
||||
#include <common/hathreads.h>
|
||||
|
||||
#include <types/capture.h>
|
||||
#include <types/compression.h>
|
||||
@ -8862,7 +8863,7 @@ int check_config_validity()
|
||||
}
|
||||
|
||||
/* create the task associated with the proxy */
|
||||
curproxy->task = task_new();
|
||||
curproxy->task = task_new(MAX_THREADS_MASK);
|
||||
if (curproxy->task) {
|
||||
curproxy->task->context = curproxy;
|
||||
curproxy->task->process = manage_proxy;
|
||||
|
@ -2226,7 +2226,7 @@ static int start_check_task(struct check *check, int mininter,
|
||||
{
|
||||
struct task *t;
|
||||
/* task for the check */
|
||||
if ((t = task_new()) == NULL) {
|
||||
if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
|
||||
Alert("Starting [%s:%s] check: out of memory.\n",
|
||||
check->server->proxy->id, check->server->id);
|
||||
return 0;
|
||||
@ -2272,7 +2272,7 @@ static int start_checks()
|
||||
for (px = proxy; px; px = px->next) {
|
||||
for (s = px->srv; s; s = s->next) {
|
||||
if (s->slowstart) {
|
||||
if ((t = task_new()) == NULL) {
|
||||
if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
|
||||
Alert("Starting [%s:%s] check: out of memory.\n", px->id, s->id);
|
||||
return ERR_ALERT | ERR_FATAL;
|
||||
}
|
||||
@ -3130,7 +3130,7 @@ int init_email_alert(struct mailers *mls, struct proxy *p, char **err)
|
||||
check->port = 587;
|
||||
//check->server = s;
|
||||
|
||||
if ((t = task_new()) == NULL) {
|
||||
if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
|
||||
memprintf(err, "out of memory while allocating mailer alerts task");
|
||||
goto error;
|
||||
}
|
||||
|
@ -1851,7 +1851,7 @@ static int dns_finalize_config(void)
|
||||
}
|
||||
|
||||
/* Create the task associated to the resolvers section */
|
||||
if ((t = task_new()) == NULL) {
|
||||
if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
|
||||
Alert("config : resolvers '%s' : out of memory.\n", resolvers->id);
|
||||
err_code |= (ERR_ALERT|ERR_ABORT);
|
||||
goto err;
|
||||
|
@ -1939,7 +1939,7 @@ spoe_create_appctx(struct spoe_config *conf)
|
||||
memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size);
|
||||
|
||||
appctx->st0 = SPOE_APPCTX_ST_CONNECT;
|
||||
if ((SPOE_APPCTX(appctx)->task = task_new()) == NULL)
|
||||
if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL)
|
||||
goto out_free_spoe_appctx;
|
||||
|
||||
SPOE_APPCTX(appctx)->owner = appctx;
|
||||
@ -1975,10 +1975,10 @@ spoe_create_appctx(struct spoe_config *conf)
|
||||
strm->do_log = NULL;
|
||||
strm->res.flags |= CF_READ_DONTWAIT;
|
||||
|
||||
task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
|
||||
LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
|
||||
conf->agent->applets_act++;
|
||||
|
||||
task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
|
||||
task_wakeup(strm->task, TASK_WOKEN_INIT);
|
||||
return appctx;
|
||||
|
||||
|
@ -1511,7 +1511,7 @@ static void init(int argc, char **argv)
|
||||
exit(2);
|
||||
}
|
||||
|
||||
global_listener_queue_task = task_new();
|
||||
global_listener_queue_task = task_new(MAX_THREADS_MASK);
|
||||
if (!global_listener_queue_task) {
|
||||
Alert("Out of memory when initializing global task\n");
|
||||
exit(1);
|
||||
|
@ -5450,7 +5450,7 @@ static int hlua_register_task(lua_State *L)
|
||||
if (!hlua)
|
||||
WILL_LJMP(luaL_error(L, "lua out of memory error."));
|
||||
|
||||
task = task_new();
|
||||
task = task_new(MAX_THREADS_MASK);
|
||||
task->context = hlua;
|
||||
task->process = hlua_process_task;
|
||||
|
||||
@ -6031,7 +6031,7 @@ static int hlua_applet_tcp_init(struct appctx *ctx, struct proxy *px, struct str
|
||||
ctx->ctx.hlua_apptcp.flags = 0;
|
||||
|
||||
/* Create task used by signal to wakeup applets. */
|
||||
task = task_new();
|
||||
task = task_new(MAX_THREADS_MASK);
|
||||
if (!task) {
|
||||
SEND_ERR(px, "Lua applet tcp '%s': out of memory.\n",
|
||||
ctx->rule->arg.hlua_rule->fcn.name);
|
||||
@ -6232,7 +6232,7 @@ static int hlua_applet_http_init(struct appctx *ctx, struct proxy *px, struct st
|
||||
ctx->ctx.hlua_apphttp.flags |= APPLET_HTTP11;
|
||||
|
||||
/* Create task used by signal to wakeup applets. */
|
||||
task = task_new();
|
||||
task = task_new(MAX_THREADS_MASK);
|
||||
if (!task) {
|
||||
SEND_ERR(px, "Lua applet http '%s': out of memory.\n",
|
||||
ctx->rule->arg.hlua_rule->fcn.name);
|
||||
@ -6777,7 +6777,7 @@ static int hlua_cli_parse_fct(char **args, struct appctx *appctx, void *private)
|
||||
* We use the same wakeup fonction than the Lua applet_tcp and
|
||||
* applet_http. It is absolutely compatible.
|
||||
*/
|
||||
appctx->ctx.hlua_cli.task = task_new();
|
||||
appctx->ctx.hlua_cli.task = task_new(MAX_THREADS_MASK);
|
||||
if (!appctx->ctx.hlua_cli.task) {
|
||||
SEND_ERR(NULL, "Lua cli '%s': out of memory.\n", fcn->name);
|
||||
goto error;
|
||||
|
@ -2055,7 +2055,7 @@ void peers_init_sync(struct peers *peers)
|
||||
|
||||
list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
|
||||
listener->maxconn = peers->peers_fe->maxconn;
|
||||
peers->sync_task = task_new();
|
||||
peers->sync_task = task_new(MAX_THREADS_MASK);
|
||||
peers->sync_task->process = process_peer_sync;
|
||||
peers->sync_task->context = (void *)peers;
|
||||
peers->sighandler = signal_register_task(0, peers->sync_task, 0);
|
||||
|
@ -979,7 +979,7 @@ void soft_stop(void)
|
||||
|
||||
stopping = 1;
|
||||
if (tick_isset(global.hard_stop_after)) {
|
||||
task = task_new();
|
||||
task = task_new(MAX_THREADS_MASK);
|
||||
if (task) {
|
||||
task->process = hard_stop;
|
||||
task_schedule(task, tick_add(now_ms, global.hard_stop_after));
|
||||
|
@ -241,7 +241,7 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
|
||||
* conn -- owner ---> task <-----+
|
||||
*/
|
||||
if (cli_conn->flags & (CO_FL_HANDSHAKE | CO_FL_EARLY_SSL_HS)) {
|
||||
if (unlikely((sess->task = task_new()) == NULL))
|
||||
if (unlikely((sess->task = task_new(tid_bit)) == NULL))
|
||||
goto out_free_sess;
|
||||
|
||||
conn_set_xprt_done_cb(cli_conn, conn_complete_session);
|
||||
|
@ -436,7 +436,7 @@ int stktable_init(struct stktable *t)
|
||||
|
||||
t->exp_next = TICK_ETERNITY;
|
||||
if ( t->expire ) {
|
||||
t->exp_task = task_new();
|
||||
t->exp_task = task_new(MAX_THREADS_MASK);
|
||||
t->exp_task->process = process_table_expire;
|
||||
t->exp_task->context = (void *)t;
|
||||
}
|
||||
|
@ -163,7 +163,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
||||
s->flags |= SF_INITIALIZED;
|
||||
s->unique_id = NULL;
|
||||
|
||||
if ((t = task_new()) == NULL)
|
||||
if ((t = task_new(tid_bit)) == NULL)
|
||||
goto out_fail_alloc;
|
||||
|
||||
s->task = t;
|
||||
|
42
src/task.c
42
src/task.c
@ -36,6 +36,10 @@ unsigned int tasks_run_queue_cur = 0; /* copy of the run queue size */
|
||||
unsigned int nb_tasks_cur = 0; /* copy of the tasks count */
|
||||
unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
|
||||
|
||||
#ifdef USE_THREAD
|
||||
HA_SPINLOCK_T rq_lock; /* spin lock related to run queue */
|
||||
HA_SPINLOCK_T wq_lock; /* spin lock related to wait queue */
|
||||
#endif
|
||||
|
||||
static struct eb_root timers; /* sorted timers tree */
|
||||
static struct eb_root rqueue; /* tree constituting the run queue */
|
||||
@ -113,22 +117,29 @@ int wake_expired_tasks()
|
||||
{
|
||||
struct task *task;
|
||||
struct eb32_node *eb;
|
||||
int ret;
|
||||
|
||||
while (1) {
|
||||
SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
lookup_next:
|
||||
eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
|
||||
if (unlikely(!eb)) {
|
||||
if (!eb) {
|
||||
/* we might have reached the end of the tree, typically because
|
||||
* <now_ms> is in the first half and we're first scanning the last
|
||||
* half. Let's loop back to the beginning of the tree now.
|
||||
*/
|
||||
eb = eb32_first(&timers);
|
||||
if (likely(!eb))
|
||||
if (likely(!eb)) {
|
||||
SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (likely(tick_is_lt(now_ms, eb->key))) {
|
||||
ret = eb->key;
|
||||
SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
/* timer not expired yet, revisit it later */
|
||||
return eb->key;
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* timer looks expired, detach it from the queue */
|
||||
@ -150,10 +161,11 @@ int wake_expired_tasks()
|
||||
*/
|
||||
if (!tick_is_expired(task->expire, now_ms)) {
|
||||
if (!tick_isset(task->expire))
|
||||
continue;
|
||||
goto lookup_next;
|
||||
__task_queue(task);
|
||||
continue;
|
||||
goto lookup_next;
|
||||
}
|
||||
SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
|
||||
task_wakeup(task, TASK_WOKEN_TIMER);
|
||||
}
|
||||
|
||||
@ -192,6 +204,7 @@ void process_runnable_tasks()
|
||||
if (likely(niced_tasks))
|
||||
max_processed = (max_processed + 3) / 4;
|
||||
|
||||
SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
while (max_processed > 0) {
|
||||
/* Note: this loop is one of the fastest code path in
|
||||
* the whole program. It should not be re-arranged
|
||||
@ -216,12 +229,14 @@ void process_runnable_tasks()
|
||||
while (local_tasks_count < 16) {
|
||||
t = eb32_entry(rq_next, struct task, rq);
|
||||
rq_next = eb32_next(rq_next);
|
||||
/* detach the task from the queue */
|
||||
__task_unlink_rq(t);
|
||||
t->state |= TASK_RUNNING;
|
||||
t->pending_state = 0;
|
||||
t->calls++;
|
||||
local_tasks[local_tasks_count++] = t;
|
||||
if (t->process_mask & (1UL << tid)) {
|
||||
/* detach the task from the queue */
|
||||
__task_unlink_rq(t);
|
||||
t->state |= TASK_RUNNING;
|
||||
t->pending_state = 0;
|
||||
t->calls++;
|
||||
local_tasks[local_tasks_count++] = t;
|
||||
}
|
||||
if (!rq_next) {
|
||||
if (rewind || !(rq_next = eb32_first(&rqueue))) {
|
||||
break;
|
||||
@ -233,6 +248,7 @@ void process_runnable_tasks()
|
||||
if (!local_tasks_count)
|
||||
break;
|
||||
|
||||
SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
|
||||
for (i = 0; i < local_tasks_count ; i++) {
|
||||
t = local_tasks[i];
|
||||
@ -247,6 +263,7 @@ void process_runnable_tasks()
|
||||
}
|
||||
|
||||
max_processed -= local_tasks_count;
|
||||
SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
for (i = 0; i < local_tasks_count ; i++) {
|
||||
t = local_tasks[i];
|
||||
if (likely(t != NULL)) {
|
||||
@ -263,6 +280,7 @@ void process_runnable_tasks()
|
||||
}
|
||||
}
|
||||
}
|
||||
SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
|
||||
}
|
||||
|
||||
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
|
||||
@ -270,6 +288,8 @@ int init_task()
|
||||
{
|
||||
memset(&timers, 0, sizeof(timers));
|
||||
memset(&rqueue, 0, sizeof(rqueue));
|
||||
SPIN_INIT(&wq_lock);
|
||||
SPIN_INIT(&rq_lock);
|
||||
pool2_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
|
||||
if (!pool2_task)
|
||||
return 0;
|
||||
|
Loading…
Reference in New Issue
Block a user