MEDIUM: stick-tables: Use a per-shard expiration task

Instead of having per-table expiration tasks, just use one per shard.
The task will now go through all the tables to expire entries. When a
table gets an expiration earlier than the one previously known, it will
be put in a mt-list, and the task will be responsible to put it into an
eb32, ordered based on the next expiration.
Each per-shard task will run on a different thread, so it should lead to
a better load distribution than the per-table tasks.
This commit is contained in:
Olivier Houchard 2025-09-29 15:37:11 +02:00 committed by Olivier Houchard
parent 945aa0ea82
commit 8bc8a21b25
2 changed files with 148 additions and 61 deletions

View File

@ -175,7 +175,6 @@ struct stktable {
*/
struct ceb_node id_node; /* Stick-table are lookup by name here, indexes <id> above. */
struct pool_head *pool; /* pool used to allocate sticky sessions */
struct task *exp_task; /* expiration task */
struct task *sync_task; /* sync task */
uint64_t hash_seed; /* hash seed used by shards */
@ -212,7 +211,11 @@ struct stktable {
struct {
struct eb_root keys; /* head of sticky session tree */
struct eb_root exps; /* head of sticky session expiration tree */
struct eb32_node in_bucket; /* Each bucket maintains a tree, ordered by expiration date, this does not require sh_lock as only one task will ever modify it */
struct mt_list in_bucket_toadd; /* To add to the bucket tree */
__decl_thread(HA_RWLOCK_T sh_lock); /* for the trees above */
int next_exp; /* Next expiration for this table */
} shards[CONFIG_HAP_TBL_BUCKETS];
unsigned int refcnt; /* number of local peer over all peers sections
@ -241,6 +244,13 @@ struct stktable {
} conf;
};
struct stk_per_bucket {
struct eb_root tables;
struct mt_list toadd_tables;
__decl_thread(HA_SPINLOCK_T lock); /* Should not have any contention, only there in case a table gets destroyed, which should happen very rarely */
struct task *exp_task; /* Expiration task */
};
extern struct stktable_data_type stktable_data_types[STKTABLE_DATA_TYPES];
/* stick table key */

View File

@ -66,6 +66,8 @@ struct pool_head *pool_head_stk_ctr __read_mostly = NULL;
struct stktable *stktables_list;
struct ceb_root *stktable_by_name = NULL;
static struct stk_per_bucket per_bucket[CONFIG_HAP_TBL_BUCKETS];
#define round_ptr_size(i) (((i) + (sizeof(void *) - 1)) &~ (sizeof(void *) - 1))
/* This function inserts stktable <t> into the tree of known stick-table.
@ -706,24 +708,33 @@ void stktable_requeue_exp(struct stktable *t, const struct stksess *ts)
{
int old_exp, new_exp;
int expire = ts->expire;
int bucket;
int len;
if (!t->expire)
return;
if (t->type == SMP_T_STR)
len = strlen((const char *)ts->key.key);
else
len = t->key_size;
bucket = stktable_calc_shard_num(t, ts->key.key, len);
/* set the task's expire to the newest expiration date. */
old_exp = HA_ATOMIC_LOAD(&t->exp_task->expire);
old_exp = HA_ATOMIC_LOAD(&t->shards[bucket].next_exp);
new_exp = tick_first(expire, old_exp);
/* let's not go further if we're already up to date. We have
* to make sure the compared date doesn't change under us.
*/
if (new_exp == old_exp &&
HA_ATOMIC_CAS(&t->exp_task->expire, &old_exp, new_exp))
HA_ATOMIC_CAS(&t->shards[bucket].next_exp, &old_exp, new_exp)) {
return;
}
HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->lock);
while (!HA_ATOMIC_CAS(&t->exp_task->expire, &old_exp, new_exp)) {
while (!HA_ATOMIC_CAS(&t->shards[bucket].next_exp, &old_exp, new_exp)) {
if (new_exp == old_exp)
break;
__ha_cpu_relax();
@ -732,9 +743,20 @@ void stktable_requeue_exp(struct stktable *t, const struct stksess *ts)
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->lock);
if (t->type == SMP_T_STR)
len = strlen((const char *)ts->key.key);
else
len = t->key_size;
/* the timer was advanced, only the task can update it */
if (!tick_isset(old_exp) || tick_is_lt(new_exp, old_exp))
task_wakeup(t->exp_task, TASK_WOKEN_OTHER);
if (!tick_isset(old_exp) || tick_is_lt(new_exp, old_exp)) {
int ret;
ret = MT_LIST_TRY_APPEND(&per_bucket[bucket].toadd_tables, &t->shards[bucket].in_bucket_toadd);
if (ret)
task_wakeup(per_bucket[bucket].exp_task, TASK_WOKEN_OTHER);
}
}
/* Returns a valid or initialized stksess for the specified stktable_key in the
@ -922,38 +944,57 @@ struct stksess *stktable_set_entry(struct stktable *table, struct stksess *nts)
* Task processing function to trash expired sticky sessions. A pointer to the
* task itself is returned since it never dies.
*/
struct task *process_table_expire(struct task *task, void *context, unsigned int state)
struct task *process_tables_expire(struct task *task, void *context, unsigned int state)
{
struct stktable *t = context;
struct stk_per_bucket *ps = context;
struct stktable *t;
struct stksess *ts;
struct eb32_node *eb;
struct eb32_node *table_eb, *eb;
int updt_locked;
int to_visit = STKTABLE_MAX_UPDATES_AT_ONCE;
int looped;
int exp_next;
int to_visit;
int task_exp;
int shard, init_shard;
int failed_once = 0;
int purged = 0;
int shard;
task_exp = TICK_ETERNITY;
/* start from a random shard number to avoid starvation in the last ones */
shard = init_shard = statistical_prng_range(CONFIG_HAP_TBL_BUCKETS - 1);
do {
updt_locked = 0;
looped = 0;
shard = (ps - &per_bucket[0]);
if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock) != 0) {
if (purged || failed_once) {
/* already purged or second failed lock, yield and come back later */
to_visit = 0;
to_visit = STKTABLE_MAX_UPDATES_AT_ONCE;
/*
* First put all the tables to be added from the list to the tree
*/
while ((t = MT_LIST_POP(&ps->toadd_tables, struct stktable *, shards[shard].in_bucket_toadd)) != NULL) {
int next_exp = HA_ATOMIC_LOAD(&t->shards[shard].next_exp);
/*
* We're already in the tree
*/
if (tick_isset(t->shards[shard].in_bucket.key) &&
tick_is_lt(t->shards[shard].in_bucket.key, next_exp))
continue;
eb32_delete(&t->shards[shard].in_bucket);
t->shards[shard].in_bucket.key = next_exp;
eb32_insert(&ps->tables, &t->shards[shard].in_bucket);
}
table_eb = eb32_first(&ps->tables);
while (table_eb) {
struct eb32_node *tmpnode;
unsigned int next_exp_table = TICK_ETERNITY;
t = eb32_entry(table_eb, struct stktable, shards[shard].in_bucket);
updt_locked = 0;
if (tick_is_lt(now_ms, table_eb->key)) {
/*
* Next expiration in the future, we can give up
*/
if (!tick_isset(task_exp) || tick_is_lt(table_eb->key, task_exp))
task_exp = table_eb->key;
break;
}
/* make sure we succeed at least once */
failed_once = 1;
HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
}
eb = eb32_lookup_ge(&t->shards[shard].exps, now_ms - TIMER_LOOK_BACK);
@ -964,9 +1005,6 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
* half. Let's loop back to the beginning of the tree now if we
* have not yet visited it.
*/
if (looped)
break;
looped = 1;
eb = eb32_first(&t->shards[shard].exps);
if (likely(!eb))
break;
@ -974,8 +1012,9 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
if (likely(tick_is_lt(now_ms, eb->key))) {
/* timer not expired yet, revisit it later */
exp_next = eb->key;
goto out_unlock;
if (!tick_isset(task_exp) || tick_is_lt(eb->key, task_exp))
task_exp = eb->key;
break;
}
/* Let's quit earlier if we currently hold the update lock */
@ -1045,34 +1084,60 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
MT_LIST_DELETE(&ts->pend_updts);
eb32_delete(&ts->upd);
__stksess_free(t, ts);
purged++;
}
/* We have found no task to expire in any tree */
exp_next = TICK_ETERNITY;
out_unlock:
if (updt_locked)
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
task_exp = tick_first(task_exp, exp_next);
/*
* Now find the first element, so that we can reposition
* the table in the shard tree.
*/
eb = eb32_lookup_ge(&t->shards[shard].exps, now_ms - TIMER_LOOK_BACK);
if (!eb)
eb = eb32_first(&t->shards[shard].exps);
if (eb)
next_exp_table = eb->key;
else
next_exp_table = TICK_ETERNITY;
if (!tick_isset(task_exp) || (tick_isset(next_exp_table) && tick_is_lt(next_exp_table, task_exp)))
task_exp = next_exp_table;
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
tmpnode = eb32_next(table_eb);
shard++;
if (shard >= CONFIG_HAP_TBL_BUCKETS)
shard = 0;
} while (to_visit > 0 && shard != init_shard);
if (table_eb->key != next_exp_table) {
int old_exp;
/*
* We have to move the entry in the tree
*/
old_exp = HA_ATOMIC_LOAD(&t->shards[shard].next_exp);
if (old_exp >= table_eb->key) {
HA_ATOMIC_CAS(&t->shards[shard].next_exp, &old_exp, next_exp_table);
}
if (to_visit <= 0) {
eb32_delete(table_eb);
table_eb->key = TICK_ETERNITY;
/*
* If there's more entry, just put it back into the list,
* it'll go back into the tree the next time the task runs.
*/
if (next_exp_table != TICK_ETERNITY)
MT_LIST_TRY_APPEND(&per_bucket[shard].toadd_tables, &t->shards[shard].in_bucket_toadd);
}
table_eb = tmpnode;
}
if (tick_is_le(task_exp, now_ms)) {
/*
* More to do, we should wake up immediately.
*/
task_wakeup(task, TASK_WOKEN_OTHER);
} else {
/* Reset the task's expiration. We do this under the lock so as not
* to ruin a call to task_queue() in stktable_requeue_exp() if we
* were to update with TICK_ETERNITY.
/* Reset the task's expiration.
*/
HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->lock);
task->expire = task_exp;
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->lock);
}
return task;
@ -1086,7 +1151,6 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
*/
int stktable_init(struct stktable *t, char **err_msg)
{
static int operating_thread = 0;
int peers_retval = 0;
int shard;
int i;
@ -1098,6 +1162,7 @@ int stktable_init(struct stktable *t, char **err_msg)
t->shards[shard].keys = EB_ROOT_UNIQUE;
memset(&t->shards[shard].exps, 0, sizeof(t->shards[shard].exps));
HA_RWLOCK_INIT(&t->shards[shard].sh_lock);
MT_LIST_INIT(&t->shards[shard].in_bucket_toadd);
}
t->updates = EB_ROOT_UNIQUE;
@ -1105,15 +1170,6 @@ int stktable_init(struct stktable *t, char **err_msg)
t->pool = create_pool("sticktables", sizeof(struct stksess) + round_ptr_size(t->data_size) + t->key_size, MEM_F_SHARED);
if ( t->expire ) {
t->exp_task = task_new_on(operating_thread);
if (!t->exp_task)
goto mem_error;
operating_thread = (operating_thread + 1) % global.nbthread;
t->exp_task->process = process_table_expire;
t->exp_task->context = (void *)t;
}
if (t->peers.p && t->peers.p->peers_fe && !(t->peers.p->peers_fe->flags & (PR_FL_DISABLED|PR_FL_STOPPED))) {
peers_retval = peers_register_table(t->peers.p, t);
}
@ -1176,9 +1232,16 @@ int stktable_init(struct stktable *t, char **err_msg)
*/
void stktable_deinit(struct stktable *t)
{
int i;
if (!t)
return;
task_destroy(t->exp_task);
for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++) {
HA_SPIN_LOCK(OTHER_LOCK, &per_bucket[i].lock);
eb32_delete(&t->shards[i].in_bucket);
MT_LIST_DELETE(&t->shards[i].in_bucket_toadd);
HA_SPIN_UNLOCK(OTHER_LOCK, &per_bucket[i].lock);
}
tasklet_free(t->updt_task);
ha_free(&t->pend_updts);
pool_destroy(t->pool);
@ -5897,11 +5960,25 @@ static int stkt_create_stk_ctr_pool(void)
static void stkt_late_init(void)
{
struct sample_fetch *f;
int i;
f = find_sample_fetch("src", strlen("src"));
if (f)
smp_fetch_src = f->process;
stkt_create_stk_ctr_pool();
for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++) {
MT_LIST_INIT(&per_bucket[i].toadd_tables);
per_bucket[i].tables = EB_ROOT;
per_bucket[i].exp_task = task_new_on(i % global.nbthread);
if (per_bucket[i].exp_task == NULL) {
ha_alert("Failed to allocate per-shard task!\n");
exit(1);
}
per_bucket[i].exp_task->process = process_tables_expire;
per_bucket[i].exp_task->context = &per_bucket[i];
HA_SPIN_INIT(&per_bucket[i].lock);
}
}
INITCALL0(STG_INIT_2, stkt_late_init);