diff --git a/src/listener.c b/src/listener.c index 8215cec63..f97e2d7d1 100644 --- a/src/listener.c +++ b/src/listener.c @@ -1176,67 +1176,167 @@ void listener_accept(struct listener *l) #if defined(USE_THREAD) + if (!(global.tune.options & GTUNE_LISTENER_MQ_ANY) || stopping) + goto local_accept; + + /* we want to perform thread rebalancing if the listener is + * bound to more than one thread or if it's part of a shard + * with more than one listener. + */ mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled); - if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ_ANY) && !stopping) { + if (l->rx.shard_info || atleast2(mask)) { struct accept_queue_ring *ring; - unsigned int t, t0, t1, t2; - int base = tg->base; + struct listener *new_li; + uint n0, n1, n2, r1, r2, t, t1, t2; + const struct tgroup_info *g1, *g2; + ulong m1, m2; /* The principle is that we have two running indexes, * each visiting in turn all threads bound to this - * listener. The connection will be assigned to the one - * with the least connections, and the other one will - * be updated. This provides a good fairness on short - * connections (round robin) and on long ones (conn - * count), without ever missing any idle thread. + * listener's shard. The connection will be assigned to + * the one with the least connections, and the other + * one will be updated. This provides a good fairness + * on short connections (round robin) and on long ones + * (conn count), without ever missing any idle thread. + * Each thread number is encoded as a combination of + * times the receiver number and its local thread + * number from 0 to MAX_THREADS_PER_GROUP - 1. The two + * indexes are stored as 16 bit numbers in the thr_idx + * variable. + * + * In the loop below we have this for each index: + * - n is the thread index + * - r is the receiver number + * - g is the receiver's thread group + * - t is the thread number in this receiver + * - m is the receiver's thread mask shifted by the thread number */ /* keep a copy for the final update. thr_idx is composite - * and made of (t2<<16) + t1. + * and made of (n2<<16) + n1. */ - t0 = l->thr_idx; - do { - unsigned long m1, m2; + n0 = l->thr_idx; + while (1) { int q1, q2; - t2 = t1 = t0; - t2 >>= 16; - t1 &= 0xFFFF; + new_li = NULL; + + n2 = n1 = n0; + n2 >>= 16; + n1 &= 0xFFFF; /* t1 walks low to high bits ; * t2 walks high to low. */ - m1 = mask >> t1; - m2 = mask & (t2 ? nbits(t2 + 1) : ~0UL); - if (unlikely(!(m1 & 1))) { - m1 &= ~1UL; - if (!m1) { - m1 = mask; - t1 = 0; + /* calculate r1/g1/t1 first */ + r1 = n1 / MAX_THREADS_PER_GROUP; + t1 = n1 % MAX_THREADS_PER_GROUP; + while (1) { + if (l->rx.shard_info) { + /* multiple listeners, take the group into account */ + if (r1 >= l->rx.shard_info->nbgroups) + r1 = 0; + + g1 = &ha_tgroup_info[l->rx.shard_info->members[r1]->bind_tgroup - 1]; + m1 = l->rx.shard_info->members[r1]->bind_thread; + } else { + /* single listener */ + r1 = 0; + g1 = tg; + m1 = l->rx.bind_thread; } - t1 += my_ffsl(m1) - 1; + m1 &= _HA_ATOMIC_LOAD(&g1->threads_enabled); + m1 >>= t1; + + /* find first existing thread */ + if (unlikely(!(m1 & 1))) { + m1 &= ~1UL; + if (!m1) { + /* no more threads here, switch to + * first thread of next group. + */ + t1 = 0; + if (l->rx.shard_info) + r1++; + /* loop again */ + continue; + } + t1 += my_ffsl(m1) - 1; + } + /* done: r1 and t1 are OK */ + break; } + /* now r2/g2/t2 */ + r2 = n2 / MAX_THREADS_PER_GROUP; + t2 = n2 % MAX_THREADS_PER_GROUP; + /* if running in round-robin mode ("fair"), we don't need * to go further. */ if ((global.tune.options & GTUNE_LISTENER_MQ_ANY) == GTUNE_LISTENER_MQ_FAIR) { - t = t1; + t = g1->base + t1; + if (l->rx.shard_info && t != tid) + new_li = l->rx.shard_info->members[r1]->owner; goto updt_t1; } - if (unlikely(!(m2 & (1UL << t2)) || t1 == t2)) { - /* highest bit not set */ - if (!m2) - m2 = mask; + while (1) { + if (l->rx.shard_info) { + /* multiple listeners, take the group into account */ + if (r2 >= l->rx.shard_info->nbgroups) + r2 = l->rx.shard_info->nbgroups - 1; - t2 = my_flsl(m2) - 1; + g2 = &ha_tgroup_info[l->rx.shard_info->members[r2]->bind_tgroup - 1]; + m2 = l->rx.shard_info->members[r2]->bind_thread; + } else { + /* single listener */ + r2 = 0; + g2 = tg; + m2 = l->rx.bind_thread; + } + m2 &= _HA_ATOMIC_LOAD(&g2->threads_enabled); + m2 &= nbits(t2 + 1); + + /* find previous existing thread */ + if (unlikely(!(m2 & (1UL << t2)) || (g1 == g2 && t1 == t2))) { + /* highest bit not set or colliding threads, let's check + * if we still have other threads available after this + * one. + */ + m2 &= ~(1UL << t2); + if (!m2) { + /* no more threads here, switch to + * last thread of previous group. + */ + t2 = MAX_THREADS_PER_GROUP - 1; + if (l->rx.shard_info) + r2--; + /* loop again */ + continue; + } + t2 = my_flsl(m2) - 1; + } + /* done: r2 and t2 are OK */ + break; } - /* now we have two distinct thread IDs belonging to the mask */ - q1 = accept_queue_ring_len(&accept_queue_rings[base + t1]); - q2 = accept_queue_ring_len(&accept_queue_rings[base + t2]); + /* here we have (r1,g1,t1) that designate the first receiver, its + * thread group and local thread, and (r2,g2,t2) that designate + * the second receiver, its thread group and local thread. + */ + q1 = accept_queue_ring_len(&accept_queue_rings[g1->base + t1]); + q2 = accept_queue_ring_len(&accept_queue_rings[g2->base + t2]); + + /* add to this the currently active connections */ + if (l->rx.shard_info) { + q1 += _HA_ATOMIC_LOAD(&((struct listener *)l->rx.shard_info->members[r1]->owner)->thr_conn[t1]); + q2 += _HA_ATOMIC_LOAD(&((struct listener *)l->rx.shard_info->members[r2]->owner)->thr_conn[t2]); + } else { + q1 += _HA_ATOMIC_LOAD(&l->thr_conn[t1]); + q2 += _HA_ATOMIC_LOAD(&l->thr_conn[t2]); + } /* we have 3 possibilities now : * q1 < q2 : t1 is less loaded than t2, so we pick it @@ -1250,33 +1350,64 @@ void listener_accept(struct listener *l) * than t2. */ - q1 += l->thr_conn[t1]; - q2 += l->thr_conn[t2]; - if (q1 - q2 < 0) { - t = t1; - t2 = t2 ? t2 - 1 : LONGBITS - 1; + t = g1->base + t1; + + if (l->rx.shard_info) + new_li = l->rx.shard_info->members[r1]->owner; + + t2--; + if (t2 >= MAX_THREADS_PER_GROUP) { + if (l->rx.shard_info) + r2--; + t2 = MAX_THREADS_PER_GROUP - 1; + } } else if (q1 - q2 > 0) { - t = t2; - t1++; - if (t1 >= LONGBITS) - t1 = 0; + t = g2->base + t2; + + if (l->rx.shard_info) + new_li = l->rx.shard_info->members[r2]->owner; + goto updt_t1; } else { - t = t1; + t = g1->base + t1; + + if (l->rx.shard_info) + new_li = l->rx.shard_info->members[r1]->owner; updt_t1: t1++; - if (t1 >= LONGBITS) + if (t1 >= MAX_THREADS_PER_GROUP) { + if (l->rx.shard_info) + r1++; t1 = 0; + } } + /* the target thread number is in now */ + /* new value for thr_idx */ - t1 += (t2 << 16); - } while (unlikely(!_HA_ATOMIC_CAS(&l->thr_idx, &t0, t1))); + n1 = ((r1 & 63) * MAX_THREADS_PER_GROUP) + t1; + n2 = ((r2 & 63) * MAX_THREADS_PER_GROUP) + t2; + n1 += (n2 << 16); + + /* try to update the index */ + if (likely(_HA_ATOMIC_CAS(&l->thr_idx, &n0, n1))) + break; + } /* end of main while() loop */ + + /* we may need to update the listener in the connection + * if we switched to another group. + */ + if (new_li) + cli_conn->target = &new_li->obj_type; + + /* here we have the target thread number in and we hold a + * reservation in the target ring. + */ if (l->rx.proto && l->rx.proto->set_affinity) { - if (l->rx.proto->set_affinity(cli_conn, base + t)) { + if (l->rx.proto->set_affinity(cli_conn, t)) { /* Failed migration, stay on the same thread. */ goto local_accept; } @@ -1288,20 +1419,22 @@ void listener_accept(struct listener *l) * performing model, likely due to better cache locality * when processing this loop. */ - ring = &accept_queue_rings[base + t]; + ring = &accept_queue_rings[t]; if (accept_queue_push_mp(ring, cli_conn)) { - _HA_ATOMIC_INC(&activity[base + t].accq_pushed); + _HA_ATOMIC_INC(&activity[t].accq_pushed); tasklet_wakeup(ring->tasklet); continue; } /* If the ring is full we do a synchronous accept on * the local thread here. */ - _HA_ATOMIC_INC(&activity[base + t].accq_full); + _HA_ATOMIC_INC(&activity[t].accq_full); } #endif // USE_THREAD local_accept: + /* restore the connection's listener in case we failed to migrate above */ + cli_conn->target = &l->obj_type; _HA_ATOMIC_INC(&l->thr_conn[ti->ltid]); ret = l->bind_conf->accept(cli_conn); if (unlikely(ret <= 0)) {