diff --git a/src/listener.c b/src/listener.c index 3e080b4ff..a7e72905f 100644 --- a/src/listener.c +++ b/src/listener.c @@ -597,6 +597,7 @@ void listener_accept(int fd) { struct listener *l = fdtab[fd].owner; struct proxy *p; + __decl_hathreads(unsigned long mask); int max_accept; int next_conn = 0; int next_feconn = 0; @@ -844,60 +845,116 @@ void listener_accept(int fd) next_actconn = 0; #if defined(USE_THREAD) - count = l->bind_conf->thr_count; - if (count > 1 && (global.tune.options & GTUNE_LISTENER_MQ)) { + mask = thread_mask(l->bind_conf->bind_thread); + if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ)) { struct accept_queue_ring *ring; - int t1, t2, q1, q2; + unsigned int t, t0, t1, t2; - /* pick a first thread ID using a round robin index, - * and a second thread ID using a random. The - * connection will be assigned to the one with the - * least connections. This provides fairness on short + /* The principle is that we have two running indexes, + * each visiting in turn all threads bound to this + * listener. The connection will be assigned to the one + * with the least connections, and the other one will + * be updated. This provides a good fairness on short * connections (round robin) and on long ones (conn - * count). + * count), without ever missing any idle thread. */ - t1 = l->bind_conf->thr_idx; + + /* keep a copy for the final update. thr_idx is composite + * and made of (t2<<16) + t1. + */ + t0 = l->bind_conf->thr_idx; do { - t2 = t1 + 1; - if (t2 >= count) - t2 = 0; - } while (!HA_ATOMIC_CAS(&l->bind_conf->thr_idx, &t1, t2)); + unsigned long m1, m2; + int q1, q2; - t2 = (random() >> 8) % (count - 1); // 0..thr_count-2 - t2 += t1 + 1; // necessarily different from t1 + t2 = t1 = t0; + t2 >>= 16; + t1 &= 0xFFFF; - if (t2 >= count) - t2 -= count; + /* t1 walks low to high bits ; + * t2 walks high to low. + */ + m1 = mask >> t1; + m2 = mask & (t2 ? nbits(t2 + 1) : ~0UL); - t1 = bind_map_thread_id(l->bind_conf, t1); - t2 = bind_map_thread_id(l->bind_conf, t2); + if (unlikely((signed long)m2 >= 0)) { + /* highest bit not set */ + if (!m2) + m2 = mask; - q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE; - if (q1 >= ACCEPT_QUEUE_SIZE) - q1 -= ACCEPT_QUEUE_SIZE; + t2 = my_flsl(m2) - 1; + } - q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE; - if (q2 >= ACCEPT_QUEUE_SIZE) - q2 -= ACCEPT_QUEUE_SIZE; + if (unlikely(!(m1 & 1) || t1 == t2)) { + m1 &= ~1UL; + if (!m1) { + m1 = mask; + t1 = 0; + } + t1 += my_ffsl(m1) - 1; + } - /* make t1 the lowest loaded thread */ - if (q1 >= ACCEPT_QUEUE_SIZE || l->thr_conn[t1] + q1 > l->thr_conn[t2] + q2) - t1 = t2; + /* now we have two distinct thread IDs belonging to the mask */ + q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE; + if (q1 >= ACCEPT_QUEUE_SIZE) + q1 -= ACCEPT_QUEUE_SIZE; - /* We use deferred accepts even if it's the local thread because - * tests show that it's the best performing model, likely due to - * better cache locality when processing this loop. + q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE; + if (q2 >= ACCEPT_QUEUE_SIZE) + q2 -= ACCEPT_QUEUE_SIZE; + + /* we have 3 possibilities now : + * q1 < q2 : t1 is less loaded than t2, so we pick it + * and update t2 (since t1 might still be + * lower than another thread) + * q1 > q2 : t2 is less loaded than t1, so we pick it + * and update t1 (since t2 might still be + * lower than another thread) + * q1 = q2 : both are equally loaded, thus we pick t1 + * and update t1 as it will become more loaded + * than t2. + */ + + q1 += l->thr_conn[t1]; + q2 += l->thr_conn[t2]; + + if (q1 - q2 < 0) { + t = t1; + t2 = t2 ? t2 - 1 : LONGBITS - 1; + } + else if (q1 - q2 > 0) { + t = t2; + t1++; + if (t1 >= LONGBITS) + t1 = 0; + } + else { + t = t1; + t1++; + if (t1 >= LONGBITS) + t1 = 0; + } + + /* new value for thr_idx */ + t1 += (t2 << 16); + } while (unlikely(!HA_ATOMIC_CAS(&l->bind_conf->thr_idx, &t0, t1))); + + /* We successfully selected the best thread "t" for this + * connection. We use deferred accepts even if it's the + * local thread because tests show that it's the best + * performing model, likely due to better cache locality + * when processing this loop. */ - ring = &accept_queue_rings[t1]; + ring = &accept_queue_rings[t]; if (accept_queue_push_mp(ring, cfd, l, &addr, laddr)) { - HA_ATOMIC_ADD(&activity[t1].accq_pushed, 1); + HA_ATOMIC_ADD(&activity[t].accq_pushed, 1); task_wakeup(ring->task, TASK_WOKEN_IO); continue; } /* If the ring is full we do a synchronous accept on * the local thread here. */ - HA_ATOMIC_ADD(&activity[t1].accq_full, 1); + HA_ATOMIC_ADD(&activity[t].accq_full, 1); } #endif // USE_THREAD