mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-23 14:51:27 +02:00
MEDIUM: threads: Adds a set of functions to handle sync-point
A sync-point is a protected area where you have the warranty that no concurrency access is possible. It is implementated as a thread barrier to enter in the sync-point and another one to exit from it. Inside the sync-point, all threads that must do some syncrhonous processing will be called one after the other while all other threads will wait. All threads will then exit from the sync-point at the same time. A sync-point will be evaluated only when necessary because it is a costly operation. To limit the waiting time of each threads, we must have a mechanism to wakeup all threads. This is done with a pipe shared by all threads. By writting in this pipe, we will interrupt all threads blocked on a poller. The pipe is then flushed before exiting from the sync-point.
This commit is contained in:
parent
be0faa2e47
commit
339fff8a18
@ -60,6 +60,15 @@ extern THREAD_LOCAL unsigned int tid_bit; /* The bit corresponding to the thread
|
||||
*(val); \
|
||||
})
|
||||
|
||||
|
||||
#define THREAD_SYNC_INIT(m) do { /* do nothing */ } while(0)
|
||||
#define THREAD_SYNC_ENABLE() do { /* do nothing */ } while(0)
|
||||
#define THREAD_WANT_SYNC() do { /* do nothing */ } while(0)
|
||||
#define THREAD_ENTER_SYNC() do { /* do nothing */ } while(0)
|
||||
#define THREAD_EXIT_SYNC() do { /* do nothing */ } while(0)
|
||||
#define THREAD_NO_SYNC() ({ 0; })
|
||||
#define THREAD_NEED_SYNC() ({ 1; })
|
||||
|
||||
#define SPIN_INIT(l) do { /* do nothing */ } while(0)
|
||||
#define SPIN_DESTROY(l) do { /* do nothing */ } while(0)
|
||||
#define SPIN_LOCK(lbl, l) do { /* do nothing */ } while(0)
|
||||
@ -109,10 +118,27 @@ extern THREAD_LOCAL unsigned int tid_bit; /* The bit corresponding to the thread
|
||||
(*val); \
|
||||
})
|
||||
|
||||
#define THREAD_SYNC_INIT(m) thread_sync_init(m)
|
||||
#define THREAD_SYNC_ENABLE() thread_sync_enable()
|
||||
#define THREAD_WANT_SYNC() thread_want_sync()
|
||||
#define THREAD_ENTER_SYNC() thread_enter_sync()
|
||||
#define THREAD_EXIT_SYNC() thread_exit_sync()
|
||||
#define THREAD_NO_SYNC() thread_no_sync()
|
||||
#define THREAD_NEED_SYNC() thread_need_sync()
|
||||
|
||||
int thread_sync_init(unsigned long mask);
|
||||
void thread_sync_enable(void);
|
||||
void thread_want_sync(void);
|
||||
void thread_enter_sync(void);
|
||||
void thread_exit_sync(void);
|
||||
int thread_no_sync(void);
|
||||
int thread_need_sync(void);
|
||||
|
||||
#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
|
||||
|
||||
enum lock_label {
|
||||
LOCK_LABELS = 0
|
||||
THREAD_SYNC_LOCK = 0,
|
||||
LOCK_LABELS
|
||||
};
|
||||
struct lock_stat {
|
||||
uint64_t nsec_wait_for_write;
|
||||
@ -194,7 +220,7 @@ struct ha_rwlock {
|
||||
|
||||
static inline void show_lock_stats()
|
||||
{
|
||||
const char *labels[LOCK_LABELS] = {};
|
||||
const char *labels[LOCK_LABELS] = {"THREAD_SYNC" };
|
||||
int lbl;
|
||||
|
||||
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
|
||||
|
129
src/hathreads.c
129
src/hathreads.c
@ -10,25 +10,150 @@
|
||||
*
|
||||
*/
|
||||
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include <common/hathreads.h>
|
||||
#include <common/standard.h>
|
||||
#include <proto/fd.h>
|
||||
|
||||
THREAD_LOCAL unsigned int tid = 0;
|
||||
THREAD_LOCAL unsigned int tid_bit = (1UL << 0);
|
||||
|
||||
#ifdef USE_THREAD
|
||||
|
||||
static HA_SPINLOCK_T sync_lock;
|
||||
static int threads_sync_pipe[2];
|
||||
static unsigned long threads_want_sync = 0;
|
||||
static unsigned long all_threads_mask = 0;
|
||||
|
||||
#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
|
||||
struct lock_stat lock_stats[LOCK_LABELS];
|
||||
#endif
|
||||
|
||||
/* Dummy I/O handler used by the sync pipe.*/
|
||||
static void thread_sync_io_handler(int fd) { }
|
||||
|
||||
/* Initializes the sync point. It creates a pipe used by threads to wakup all
|
||||
* others when a sync is requested. It also initialize the mask of all create
|
||||
* threads. It returns 0 on success and -1 if an error occurred.
|
||||
*/
|
||||
int thread_sync_init(unsigned long mask)
|
||||
{
|
||||
int rfd;
|
||||
|
||||
if (pipe(threads_sync_pipe) < 0)
|
||||
return -1;
|
||||
|
||||
rfd = threads_sync_pipe[0];
|
||||
fcntl(rfd, F_SETFL, O_NONBLOCK);
|
||||
|
||||
fdtab[rfd].owner = NULL;
|
||||
fdtab[rfd].iocb = thread_sync_io_handler;
|
||||
fd_insert(rfd);
|
||||
|
||||
all_threads_mask = mask;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Enables the sync point. */
|
||||
void thread_sync_enable(void)
|
||||
{
|
||||
fd_want_recv(threads_sync_pipe[0]);
|
||||
}
|
||||
|
||||
/* Called when a thread want to pass into the sync point. It subscribes the
|
||||
* current thread in threads waiting for sync by update a bit-field. It this is
|
||||
* the first one, it wakeup all other threads by writing on the sync pipe.
|
||||
*/
|
||||
void thread_want_sync()
|
||||
{
|
||||
if (all_threads_mask) {
|
||||
if (HA_ATOMIC_OR(&threads_want_sync, tid_bit) == tid_bit)
|
||||
shut_your_big_mouth_gcc(write(threads_sync_pipe[1], "S", 1));
|
||||
}
|
||||
else {
|
||||
threads_want_sync = 1;
|
||||
}
|
||||
}
|
||||
|
||||
/* Returns 1 if no thread has requested a sync. Otherwise, it returns 0. */
|
||||
int thread_no_sync()
|
||||
{
|
||||
return (threads_want_sync == 0);
|
||||
}
|
||||
|
||||
/* Returns 1 if the current thread has requested a sync. Otherwise, it returns
|
||||
* 0.
|
||||
*/
|
||||
int thread_need_sync()
|
||||
{
|
||||
return (threads_want_sync & tid_bit);
|
||||
}
|
||||
|
||||
/* Thread barrier. Synchronizes all threads at the barrier referenced by
|
||||
* <barrier>. The calling thread shall block until all other threads have called
|
||||
* thread_sync_barrier specifying the same barrier.
|
||||
*
|
||||
* If you need to use several barriers at differnt points, you need to use a
|
||||
* different <barrier> for each point.
|
||||
*/
|
||||
static inline void thread_sync_barrier(volatile unsigned long *barrier)
|
||||
{
|
||||
unsigned long old = all_threads_mask;
|
||||
|
||||
HA_ATOMIC_CAS(barrier, &old, 0);
|
||||
HA_ATOMIC_OR(barrier, tid_bit;
|
||||
while (*barrier != all_threads_mask)
|
||||
pl_cpu_relax();
|
||||
}
|
||||
|
||||
/* Enter into the sync point and lock it if the current thread has requested a
|
||||
* sync. */
|
||||
void thread_enter_sync()
|
||||
{
|
||||
static volatile unsigned long barrier = 0;
|
||||
|
||||
if (!all_threads_mask)
|
||||
return;
|
||||
|
||||
thread_sync_barrier(&barrier);
|
||||
if (threads_want_sync & tid_bit)
|
||||
SPIN_LOCK(THREAD_SYNC_LOCK, &sync_lock);
|
||||
}
|
||||
|
||||
/* Exit from the sync point and unlock it if it was previously locked. If the
|
||||
* current thread is the last one to have requested a sync, the sync pipe is
|
||||
* flushed.
|
||||
*/
|
||||
void thread_exit_sync()
|
||||
{
|
||||
static volatile unsigned long barrier = 0;
|
||||
|
||||
if (!all_threads_mask)
|
||||
return;
|
||||
|
||||
if (threads_want_sync & tid_bit)
|
||||
SPIN_UNLOCK(THREAD_SYNC_LOCK, &sync_lock);
|
||||
|
||||
if (HA_ATOMIC_AND(&threads_want_sync, ~tid_bit) == 0) {
|
||||
char c;
|
||||
|
||||
shut_your_big_mouth_gcc(read(threads_sync_pipe[0], &c, 1));
|
||||
fd_done_recv(threads_sync_pipe[0]);
|
||||
}
|
||||
|
||||
thread_sync_barrier(&barrier);
|
||||
}
|
||||
|
||||
|
||||
__attribute__((constructor))
|
||||
static void __hathreads_init(void)
|
||||
{
|
||||
|
||||
SPIN_INIT(&sync_lock);
|
||||
#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
|
||||
memset(lock_stats, 0, sizeof(lock_stats));
|
||||
#endif
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
Loading…
x
Reference in New Issue
Block a user