From 8bc8a21b25805d736c13c57b26792a8b8d8b173c Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Mon, 29 Sep 2025 15:37:11 +0200 Subject: [PATCH] MEDIUM: stick-tables: Use a per-shard expiration task Instead of having per-table expiration tasks, just use one per shard. The task will now go through all the tables to expire entries. When a table gets an expiration earlier than the one previously known, it will be put in a mt-list, and the task will be responsible to put it into an eb32, ordered based on the next expiration. Each per-shard task will run on a different thread, so it should lead to a better load distribution than the per-table tasks. --- include/haproxy/stick_table-t.h | 12 +- src/stick_table.c | 197 ++++++++++++++++++++++---------- 2 files changed, 148 insertions(+), 61 deletions(-) diff --git a/include/haproxy/stick_table-t.h b/include/haproxy/stick_table-t.h index 19dc60088..b8a9f5ab7 100644 --- a/include/haproxy/stick_table-t.h +++ b/include/haproxy/stick_table-t.h @@ -175,7 +175,6 @@ struct stktable { */ struct ceb_node id_node; /* Stick-table are lookup by name here, indexes above. */ struct pool_head *pool; /* pool used to allocate sticky sessions */ - struct task *exp_task; /* expiration task */ struct task *sync_task; /* sync task */ uint64_t hash_seed; /* hash seed used by shards */ @@ -212,7 +211,11 @@ struct stktable { struct { struct eb_root keys; /* head of sticky session tree */ struct eb_root exps; /* head of sticky session expiration tree */ + struct eb32_node in_bucket; /* Each bucket maintains a tree, ordered by expiration date, this does not require sh_lock as only one task will ever modify it */ + struct mt_list in_bucket_toadd; /* To add to the bucket tree */ + __decl_thread(HA_RWLOCK_T sh_lock); /* for the trees above */ + int next_exp; /* Next expiration for this table */ } shards[CONFIG_HAP_TBL_BUCKETS]; unsigned int refcnt; /* number of local peer over all peers sections @@ -241,6 +244,13 @@ struct stktable { } conf; }; +struct stk_per_bucket { + struct eb_root tables; + struct mt_list toadd_tables; + __decl_thread(HA_SPINLOCK_T lock); /* Should not have any contention, only there in case a table gets destroyed, which should happen very rarely */ + struct task *exp_task; /* Expiration task */ +}; + extern struct stktable_data_type stktable_data_types[STKTABLE_DATA_TYPES]; /* stick table key */ diff --git a/src/stick_table.c b/src/stick_table.c index 08e08a03c..dc788e5d3 100644 --- a/src/stick_table.c +++ b/src/stick_table.c @@ -66,6 +66,8 @@ struct pool_head *pool_head_stk_ctr __read_mostly = NULL; struct stktable *stktables_list; struct ceb_root *stktable_by_name = NULL; +static struct stk_per_bucket per_bucket[CONFIG_HAP_TBL_BUCKETS]; + #define round_ptr_size(i) (((i) + (sizeof(void *) - 1)) &~ (sizeof(void *) - 1)) /* This function inserts stktable into the tree of known stick-table. @@ -706,24 +708,33 @@ void stktable_requeue_exp(struct stktable *t, const struct stksess *ts) { int old_exp, new_exp; int expire = ts->expire; + int bucket; + int len; if (!t->expire) return; + if (t->type == SMP_T_STR) + len = strlen((const char *)ts->key.key); + else + len = t->key_size; + + bucket = stktable_calc_shard_num(t, ts->key.key, len); /* set the task's expire to the newest expiration date. */ - old_exp = HA_ATOMIC_LOAD(&t->exp_task->expire); + old_exp = HA_ATOMIC_LOAD(&t->shards[bucket].next_exp); new_exp = tick_first(expire, old_exp); /* let's not go further if we're already up to date. We have * to make sure the compared date doesn't change under us. */ if (new_exp == old_exp && - HA_ATOMIC_CAS(&t->exp_task->expire, &old_exp, new_exp)) + HA_ATOMIC_CAS(&t->shards[bucket].next_exp, &old_exp, new_exp)) { return; + } HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->lock); - while (!HA_ATOMIC_CAS(&t->exp_task->expire, &old_exp, new_exp)) { + while (!HA_ATOMIC_CAS(&t->shards[bucket].next_exp, &old_exp, new_exp)) { if (new_exp == old_exp) break; __ha_cpu_relax(); @@ -732,9 +743,20 @@ void stktable_requeue_exp(struct stktable *t, const struct stksess *ts) HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->lock); + if (t->type == SMP_T_STR) + len = strlen((const char *)ts->key.key); + else + len = t->key_size; + + /* the timer was advanced, only the task can update it */ - if (!tick_isset(old_exp) || tick_is_lt(new_exp, old_exp)) - task_wakeup(t->exp_task, TASK_WOKEN_OTHER); + if (!tick_isset(old_exp) || tick_is_lt(new_exp, old_exp)) { + int ret; + + ret = MT_LIST_TRY_APPEND(&per_bucket[bucket].toadd_tables, &t->shards[bucket].in_bucket_toadd); + if (ret) + task_wakeup(per_bucket[bucket].exp_task, TASK_WOKEN_OTHER); + } } /* Returns a valid or initialized stksess for the specified stktable_key in the @@ -922,38 +944,57 @@ struct stksess *stktable_set_entry(struct stktable *table, struct stksess *nts) * Task processing function to trash expired sticky sessions. A pointer to the * task itself is returned since it never dies. */ -struct task *process_table_expire(struct task *task, void *context, unsigned int state) +struct task *process_tables_expire(struct task *task, void *context, unsigned int state) { - struct stktable *t = context; + struct stk_per_bucket *ps = context; + struct stktable *t; struct stksess *ts; - struct eb32_node *eb; + struct eb32_node *table_eb, *eb; int updt_locked; - int to_visit = STKTABLE_MAX_UPDATES_AT_ONCE; - int looped; - int exp_next; + int to_visit; int task_exp; - int shard, init_shard; - int failed_once = 0; - int purged = 0; + int shard; task_exp = TICK_ETERNITY; - /* start from a random shard number to avoid starvation in the last ones */ - shard = init_shard = statistical_prng_range(CONFIG_HAP_TBL_BUCKETS - 1); - do { - updt_locked = 0; - looped = 0; + shard = (ps - &per_bucket[0]); - if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock) != 0) { - if (purged || failed_once) { - /* already purged or second failed lock, yield and come back later */ - to_visit = 0; - break; - } - /* make sure we succeed at least once */ - failed_once = 1; - HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock); + to_visit = STKTABLE_MAX_UPDATES_AT_ONCE; + + /* + * First put all the tables to be added from the list to the tree + */ + while ((t = MT_LIST_POP(&ps->toadd_tables, struct stktable *, shards[shard].in_bucket_toadd)) != NULL) { + int next_exp = HA_ATOMIC_LOAD(&t->shards[shard].next_exp); + /* + * We're already in the tree + */ + if (tick_isset(t->shards[shard].in_bucket.key) && + tick_is_lt(t->shards[shard].in_bucket.key, next_exp)) + continue; + + eb32_delete(&t->shards[shard].in_bucket); + t->shards[shard].in_bucket.key = next_exp; + eb32_insert(&ps->tables, &t->shards[shard].in_bucket); + } + table_eb = eb32_first(&ps->tables); + + while (table_eb) { + struct eb32_node *tmpnode; + unsigned int next_exp_table = TICK_ETERNITY; + + t = eb32_entry(table_eb, struct stktable, shards[shard].in_bucket); + updt_locked = 0; + + if (tick_is_lt(now_ms, table_eb->key)) { + /* + * Next expiration in the future, we can give up + */ + if (!tick_isset(task_exp) || tick_is_lt(table_eb->key, task_exp)) + task_exp = table_eb->key; + break; } + HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock); eb = eb32_lookup_ge(&t->shards[shard].exps, now_ms - TIMER_LOOK_BACK); @@ -964,9 +1005,6 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int * half. Let's loop back to the beginning of the tree now if we * have not yet visited it. */ - if (looped) - break; - looped = 1; eb = eb32_first(&t->shards[shard].exps); if (likely(!eb)) break; @@ -974,8 +1012,9 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int if (likely(tick_is_lt(now_ms, eb->key))) { /* timer not expired yet, revisit it later */ - exp_next = eb->key; - goto out_unlock; + if (!tick_isset(task_exp) || tick_is_lt(eb->key, task_exp)) + task_exp = eb->key; + break; } /* Let's quit earlier if we currently hold the update lock */ @@ -1045,34 +1084,60 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int MT_LIST_DELETE(&ts->pend_updts); eb32_delete(&ts->upd); __stksess_free(t, ts); - purged++; } - /* We have found no task to expire in any tree */ - exp_next = TICK_ETERNITY; - - out_unlock: if (updt_locked) HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock); - task_exp = tick_first(task_exp, exp_next); + /* + * Now find the first element, so that we can reposition + * the table in the shard tree. + */ + eb = eb32_lookup_ge(&t->shards[shard].exps, now_ms - TIMER_LOOK_BACK); + if (!eb) + eb = eb32_first(&t->shards[shard].exps); + + if (eb) + next_exp_table = eb->key; + else + next_exp_table = TICK_ETERNITY; + + if (!tick_isset(task_exp) || (tick_isset(next_exp_table) && tick_is_lt(next_exp_table, task_exp))) + task_exp = next_exp_table; HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock); + tmpnode = eb32_next(table_eb); - shard++; - if (shard >= CONFIG_HAP_TBL_BUCKETS) - shard = 0; - } while (to_visit > 0 && shard != init_shard); + if (table_eb->key != next_exp_table) { + int old_exp; + /* + * We have to move the entry in the tree + */ + old_exp = HA_ATOMIC_LOAD(&t->shards[shard].next_exp); + if (old_exp >= table_eb->key) { + HA_ATOMIC_CAS(&t->shards[shard].next_exp, &old_exp, next_exp_table); + } - if (to_visit <= 0) { + eb32_delete(table_eb); + table_eb->key = TICK_ETERNITY; + /* + * If there's more entry, just put it back into the list, + * it'll go back into the tree the next time the task runs. + */ + if (next_exp_table != TICK_ETERNITY) + MT_LIST_TRY_APPEND(&per_bucket[shard].toadd_tables, &t->shards[shard].in_bucket_toadd); + } + table_eb = tmpnode; + } + + if (tick_is_le(task_exp, now_ms)) { + /* + * More to do, we should wake up immediately. + */ task_wakeup(task, TASK_WOKEN_OTHER); } else { - /* Reset the task's expiration. We do this under the lock so as not - * to ruin a call to task_queue() in stktable_requeue_exp() if we - * were to update with TICK_ETERNITY. + /* Reset the task's expiration. */ - HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->lock); task->expire = task_exp; - HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->lock); } return task; @@ -1086,7 +1151,6 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int */ int stktable_init(struct stktable *t, char **err_msg) { - static int operating_thread = 0; int peers_retval = 0; int shard; int i; @@ -1098,6 +1162,7 @@ int stktable_init(struct stktable *t, char **err_msg) t->shards[shard].keys = EB_ROOT_UNIQUE; memset(&t->shards[shard].exps, 0, sizeof(t->shards[shard].exps)); HA_RWLOCK_INIT(&t->shards[shard].sh_lock); + MT_LIST_INIT(&t->shards[shard].in_bucket_toadd); } t->updates = EB_ROOT_UNIQUE; @@ -1105,15 +1170,6 @@ int stktable_init(struct stktable *t, char **err_msg) t->pool = create_pool("sticktables", sizeof(struct stksess) + round_ptr_size(t->data_size) + t->key_size, MEM_F_SHARED); - if ( t->expire ) { - t->exp_task = task_new_on(operating_thread); - if (!t->exp_task) - goto mem_error; - operating_thread = (operating_thread + 1) % global.nbthread; - - t->exp_task->process = process_table_expire; - t->exp_task->context = (void *)t; - } if (t->peers.p && t->peers.p->peers_fe && !(t->peers.p->peers_fe->flags & (PR_FL_DISABLED|PR_FL_STOPPED))) { peers_retval = peers_register_table(t->peers.p, t); } @@ -1176,9 +1232,16 @@ int stktable_init(struct stktable *t, char **err_msg) */ void stktable_deinit(struct stktable *t) { + int i; + if (!t) return; - task_destroy(t->exp_task); + for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++) { + HA_SPIN_LOCK(OTHER_LOCK, &per_bucket[i].lock); + eb32_delete(&t->shards[i].in_bucket); + MT_LIST_DELETE(&t->shards[i].in_bucket_toadd); + HA_SPIN_UNLOCK(OTHER_LOCK, &per_bucket[i].lock); + } tasklet_free(t->updt_task); ha_free(&t->pend_updts); pool_destroy(t->pool); @@ -5897,11 +5960,25 @@ static int stkt_create_stk_ctr_pool(void) static void stkt_late_init(void) { struct sample_fetch *f; + int i; f = find_sample_fetch("src", strlen("src")); if (f) smp_fetch_src = f->process; stkt_create_stk_ctr_pool(); + + for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++) { + MT_LIST_INIT(&per_bucket[i].toadd_tables); + per_bucket[i].tables = EB_ROOT; + per_bucket[i].exp_task = task_new_on(i % global.nbthread); + if (per_bucket[i].exp_task == NULL) { + ha_alert("Failed to allocate per-shard task!\n"); + exit(1); + } + per_bucket[i].exp_task->process = process_tables_expire; + per_bucket[i].exp_task->context = &per_bucket[i]; + HA_SPIN_INIT(&per_bucket[i].lock); + } } INITCALL0(STG_INIT_2, stkt_late_init);