mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-09-21 22:01:31 +02:00
MAJOR: threads/applet: Handle multithreading for applets
A global lock has been added to protect accesses to the list of active applets. A process mask has also been added on each applet. Like for FDs and tasks, it is used to know which threads are allowed to process an applet. Because applets are, most of time, linked to a session, it should be sticky on the same thread. But in all cases, it is the responsibility of the applet handler to lock what have to be protected in the applet context.
This commit is contained in:
parent
272e252e61
commit
1138fd0c57
@ -154,6 +154,7 @@ enum lock_label {
|
||||
SIGNALS_LOCK,
|
||||
STK_TABLE_LOCK,
|
||||
STK_SESS_LOCK,
|
||||
APPLETS_LOCK,
|
||||
LOCK_LABELS
|
||||
};
|
||||
struct lock_stat {
|
||||
@ -239,7 +240,8 @@ static inline void show_lock_stats()
|
||||
const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
|
||||
"TASK_RQ", "TASK_WQ", "POOL",
|
||||
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
|
||||
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS" };
|
||||
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
|
||||
"APPLETS" };
|
||||
int lbl;
|
||||
|
||||
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
|
||||
|
@ -31,7 +31,9 @@
|
||||
|
||||
extern unsigned int nb_applets;
|
||||
extern unsigned int applets_active_queue;
|
||||
|
||||
#ifdef USE_THREAD
|
||||
extern HA_SPINLOCK_T applet_active_lock;
|
||||
#endif
|
||||
extern struct list applet_active_queue;
|
||||
|
||||
void applet_run_active();
|
||||
@ -44,10 +46,11 @@ static int inline appctx_res_wakeup(struct appctx *appctx);
|
||||
* minimum acceptable initialization for an appctx. This means only the
|
||||
* 3 integer states st0, st1, st2 are zeroed.
|
||||
*/
|
||||
static inline void appctx_init(struct appctx *appctx)
|
||||
static inline void appctx_init(struct appctx *appctx, unsigned long thread_mask)
|
||||
{
|
||||
appctx->st0 = appctx->st1 = appctx->st2 = 0;
|
||||
appctx->io_release = NULL;
|
||||
appctx->process_mask = thread_mask;
|
||||
appctx->state = APPLET_SLEEPING;
|
||||
}
|
||||
|
||||
@ -56,7 +59,7 @@ static inline void appctx_init(struct appctx *appctx)
|
||||
* pool_free2(connection) or appctx_free(), since it's allocated from the
|
||||
* connection pool. <applet> is assigned as the applet, but it can be NULL.
|
||||
*/
|
||||
static inline struct appctx *appctx_new(struct applet *applet)
|
||||
static inline struct appctx *appctx_new(struct applet *applet, unsigned long thread_mask)
|
||||
{
|
||||
struct appctx *appctx;
|
||||
|
||||
@ -64,12 +67,12 @@ static inline struct appctx *appctx_new(struct applet *applet)
|
||||
if (likely(appctx != NULL)) {
|
||||
appctx->obj_type = OBJ_TYPE_APPCTX;
|
||||
appctx->applet = applet;
|
||||
appctx_init(appctx);
|
||||
appctx_init(appctx, thread_mask);
|
||||
LIST_INIT(&appctx->runq);
|
||||
LIST_INIT(&appctx->buffer_wait.list);
|
||||
appctx->buffer_wait.target = appctx;
|
||||
appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
|
||||
nb_applets++;
|
||||
HA_ATOMIC_ADD(&nb_applets, 1);
|
||||
}
|
||||
return appctx;
|
||||
}
|
||||
@ -83,20 +86,25 @@ static inline void __appctx_free(struct appctx *appctx)
|
||||
LIST_DEL(&appctx->runq);
|
||||
applets_active_queue--;
|
||||
}
|
||||
|
||||
if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
|
||||
LIST_DEL(&appctx->buffer_wait.list);
|
||||
LIST_INIT(&appctx->buffer_wait.list);
|
||||
}
|
||||
|
||||
pool_free2(pool2_connection, appctx);
|
||||
nb_applets--;
|
||||
HA_ATOMIC_SUB(&nb_applets, 1);
|
||||
}
|
||||
static inline void appctx_free(struct appctx *appctx)
|
||||
{
|
||||
SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
if (appctx->state & APPLET_RUNNING) {
|
||||
appctx->state |= APPLET_WANT_DIE;
|
||||
SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
return;
|
||||
}
|
||||
__appctx_free(appctx);
|
||||
SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
}
|
||||
|
||||
/* wakes up an applet when conditions have changed */
|
||||
@ -110,11 +118,14 @@ static inline void __appctx_wakeup(struct appctx *appctx)
|
||||
|
||||
static inline void appctx_wakeup(struct appctx *appctx)
|
||||
{
|
||||
SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
if (appctx->state & APPLET_RUNNING) {
|
||||
appctx->state |= APPLET_WOKEN_UP;
|
||||
SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
return;
|
||||
}
|
||||
__appctx_wakeup(appctx);
|
||||
SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
}
|
||||
|
||||
/* Callback used to wake up an applet when a buffer is available. The applet
|
||||
@ -124,18 +135,23 @@ static inline void appctx_wakeup(struct appctx *appctx)
|
||||
* requested */
|
||||
static inline int appctx_res_wakeup(struct appctx *appctx)
|
||||
{
|
||||
SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
if (appctx->state & APPLET_RUNNING) {
|
||||
if (appctx->state & APPLET_WOKEN_UP) {
|
||||
SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
return 0;
|
||||
}
|
||||
appctx->state |= APPLET_WOKEN_UP;
|
||||
SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!LIST_ISEMPTY(&appctx->runq)) {
|
||||
SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
return 0;
|
||||
}
|
||||
__appctx_wakeup(appctx);
|
||||
SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -304,7 +304,7 @@ static inline struct appctx *si_alloc_appctx(struct stream_interface *si, struct
|
||||
struct appctx *appctx;
|
||||
|
||||
si_release_endpoint(si);
|
||||
appctx = appctx_new(applet);
|
||||
appctx = appctx_new(applet, tid_bit);
|
||||
if (appctx)
|
||||
si_attach_appctx(si, appctx);
|
||||
|
||||
|
26
src/applet.c
26
src/applet.c
@ -23,6 +23,10 @@
|
||||
unsigned int nb_applets = 0;
|
||||
unsigned int applets_active_queue = 0;
|
||||
|
||||
#ifdef USE_THREAD
|
||||
HA_SPINLOCK_T applet_active_lock; /* spin lock related to applet active queue */
|
||||
#endif
|
||||
|
||||
struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue);
|
||||
|
||||
void applet_run_active()
|
||||
@ -34,16 +38,22 @@ void applet_run_active()
|
||||
if (!applets_active_queue)
|
||||
return;
|
||||
|
||||
SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
|
||||
curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq);
|
||||
while (&curr->runq != &applet_active_queue) {
|
||||
next = LIST_NEXT(&curr->runq, typeof(next), runq);
|
||||
LIST_DEL(&curr->runq);
|
||||
curr->state = APPLET_RUNNING;
|
||||
LIST_ADDQ(&applet_cur_queue, &curr->runq);
|
||||
applets_active_queue--;
|
||||
if (curr->process_mask & (1UL << tid)) {
|
||||
LIST_DEL(&curr->runq);
|
||||
curr->state = APPLET_RUNNING;
|
||||
LIST_ADDQ(&applet_cur_queue, &curr->runq);
|
||||
applets_active_queue--;
|
||||
}
|
||||
curr = next;
|
||||
}
|
||||
|
||||
SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
|
||||
/* The list is only scanned from the head. This guarantees that if any
|
||||
* applet removes another one, there is no side effect while walking
|
||||
* through the list.
|
||||
@ -74,6 +84,7 @@ void applet_run_active()
|
||||
/* curr was left in the list, move it back to the active list */
|
||||
LIST_DEL(&curr->runq);
|
||||
LIST_INIT(&curr->runq);
|
||||
SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
if (curr->state & APPLET_WANT_DIE) {
|
||||
curr->state = APPLET_SLEEPING;
|
||||
__appctx_free(curr);
|
||||
@ -87,6 +98,13 @@ void applet_run_active()
|
||||
curr->state = APPLET_SLEEPING;
|
||||
}
|
||||
}
|
||||
SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
__attribute__((constructor))
|
||||
static void __applet_init(void)
|
||||
{
|
||||
SPIN_INIT(&applet_active_lock);
|
||||
}
|
||||
|
@ -1930,7 +1930,7 @@ spoe_create_appctx(struct spoe_config *conf)
|
||||
struct session *sess;
|
||||
struct stream *strm;
|
||||
|
||||
if ((appctx = appctx_new(&spoe_applet)) == NULL)
|
||||
if ((appctx = appctx_new(&spoe_applet, tid_bit)) == NULL)
|
||||
goto out_error;
|
||||
|
||||
appctx->ctx.spoe.ptr = pool_alloc_dirty(pool2_spoe_appctx);
|
||||
|
@ -2376,7 +2376,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
|
||||
lua_setmetatable(L, -2);
|
||||
|
||||
/* Create the applet context */
|
||||
appctx = appctx_new(&update_applet);
|
||||
appctx = appctx_new(&update_applet, MAX_THREADS_MASK);
|
||||
if (!appctx) {
|
||||
hlua_pusherror(L, "socket: out of memory");
|
||||
goto out_fail_conf;
|
||||
|
@ -1839,7 +1839,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
||||
peer->statuscode = PEER_SESS_SC_CONNECTCODE;
|
||||
s = NULL;
|
||||
|
||||
appctx = appctx_new(&peer_applet);
|
||||
appctx = appctx_new(&peer_applet, tid_bit);
|
||||
if (!appctx)
|
||||
goto out_close;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user