diff --git a/include/common/hathreads.h b/include/common/hathreads.h index e997ea3a9..8b32cf607 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -154,6 +154,7 @@ enum lock_label { SIGNALS_LOCK, STK_TABLE_LOCK, STK_SESS_LOCK, + APPLETS_LOCK, LOCK_LABELS }; struct lock_stat { @@ -239,7 +240,8 @@ static inline void show_lock_stats() const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL", "TASK_RQ", "TASK_WQ", "POOL", "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER", - "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS" }; + "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS", + "APPLETS" }; int lbl; for (lbl = 0; lbl < LOCK_LABELS; lbl++) { diff --git a/include/proto/applet.h b/include/proto/applet.h index 3cf8578c1..d9f0ce2df 100644 --- a/include/proto/applet.h +++ b/include/proto/applet.h @@ -31,7 +31,9 @@ extern unsigned int nb_applets; extern unsigned int applets_active_queue; - +#ifdef USE_THREAD +extern HA_SPINLOCK_T applet_active_lock; +#endif extern struct list applet_active_queue; void applet_run_active(); @@ -44,10 +46,11 @@ static int inline appctx_res_wakeup(struct appctx *appctx); * minimum acceptable initialization for an appctx. This means only the * 3 integer states st0, st1, st2 are zeroed. */ -static inline void appctx_init(struct appctx *appctx) +static inline void appctx_init(struct appctx *appctx, unsigned long thread_mask) { appctx->st0 = appctx->st1 = appctx->st2 = 0; appctx->io_release = NULL; + appctx->process_mask = thread_mask; appctx->state = APPLET_SLEEPING; } @@ -56,7 +59,7 @@ static inline void appctx_init(struct appctx *appctx) * pool_free2(connection) or appctx_free(), since it's allocated from the * connection pool. is assigned as the applet, but it can be NULL. */ -static inline struct appctx *appctx_new(struct applet *applet) +static inline struct appctx *appctx_new(struct applet *applet, unsigned long thread_mask) { struct appctx *appctx; @@ -64,12 +67,12 @@ static inline struct appctx *appctx_new(struct applet *applet) if (likely(appctx != NULL)) { appctx->obj_type = OBJ_TYPE_APPCTX; appctx->applet = applet; - appctx_init(appctx); + appctx_init(appctx, thread_mask); LIST_INIT(&appctx->runq); LIST_INIT(&appctx->buffer_wait.list); appctx->buffer_wait.target = appctx; appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup; - nb_applets++; + HA_ATOMIC_ADD(&nb_applets, 1); } return appctx; } @@ -83,20 +86,25 @@ static inline void __appctx_free(struct appctx *appctx) LIST_DEL(&appctx->runq); applets_active_queue--; } + if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) { LIST_DEL(&appctx->buffer_wait.list); LIST_INIT(&appctx->buffer_wait.list); } + pool_free2(pool2_connection, appctx); - nb_applets--; + HA_ATOMIC_SUB(&nb_applets, 1); } static inline void appctx_free(struct appctx *appctx) { + SPIN_LOCK(APPLETS_LOCK, &applet_active_lock); if (appctx->state & APPLET_RUNNING) { appctx->state |= APPLET_WANT_DIE; + SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); return; } __appctx_free(appctx); + SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); } /* wakes up an applet when conditions have changed */ @@ -110,11 +118,14 @@ static inline void __appctx_wakeup(struct appctx *appctx) static inline void appctx_wakeup(struct appctx *appctx) { + SPIN_LOCK(APPLETS_LOCK, &applet_active_lock); if (appctx->state & APPLET_RUNNING) { appctx->state |= APPLET_WOKEN_UP; + SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); return; } __appctx_wakeup(appctx); + SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); } /* Callback used to wake up an applet when a buffer is available. The applet @@ -124,18 +135,23 @@ static inline void appctx_wakeup(struct appctx *appctx) * requested */ static inline int appctx_res_wakeup(struct appctx *appctx) { + SPIN_LOCK(APPLETS_LOCK, &applet_active_lock); if (appctx->state & APPLET_RUNNING) { if (appctx->state & APPLET_WOKEN_UP) { + SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); return 0; } appctx->state |= APPLET_WOKEN_UP; + SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); return 1; } if (!LIST_ISEMPTY(&appctx->runq)) { + SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); return 0; } __appctx_wakeup(appctx); + SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); return 1; } diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 70a323425..c6578ef07 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -304,7 +304,7 @@ static inline struct appctx *si_alloc_appctx(struct stream_interface *si, struct struct appctx *appctx; si_release_endpoint(si); - appctx = appctx_new(applet); + appctx = appctx_new(applet, tid_bit); if (appctx) si_attach_appctx(si, appctx); diff --git a/src/applet.c b/src/applet.c index 4e70d8c02..b0783e6a6 100644 --- a/src/applet.c +++ b/src/applet.c @@ -23,6 +23,10 @@ unsigned int nb_applets = 0; unsigned int applets_active_queue = 0; +#ifdef USE_THREAD +HA_SPINLOCK_T applet_active_lock; /* spin lock related to applet active queue */ +#endif + struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue); void applet_run_active() @@ -34,16 +38,22 @@ void applet_run_active() if (!applets_active_queue) return; + SPIN_LOCK(APPLETS_LOCK, &applet_active_lock); + curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq); while (&curr->runq != &applet_active_queue) { next = LIST_NEXT(&curr->runq, typeof(next), runq); - LIST_DEL(&curr->runq); - curr->state = APPLET_RUNNING; - LIST_ADDQ(&applet_cur_queue, &curr->runq); - applets_active_queue--; + if (curr->process_mask & (1UL << tid)) { + LIST_DEL(&curr->runq); + curr->state = APPLET_RUNNING; + LIST_ADDQ(&applet_cur_queue, &curr->runq); + applets_active_queue--; + } curr = next; } + SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); + /* The list is only scanned from the head. This guarantees that if any * applet removes another one, there is no side effect while walking * through the list. @@ -74,6 +84,7 @@ void applet_run_active() /* curr was left in the list, move it back to the active list */ LIST_DEL(&curr->runq); LIST_INIT(&curr->runq); + SPIN_LOCK(APPLETS_LOCK, &applet_active_lock); if (curr->state & APPLET_WANT_DIE) { curr->state = APPLET_SLEEPING; __appctx_free(curr); @@ -87,6 +98,13 @@ void applet_run_active() curr->state = APPLET_SLEEPING; } } + SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock); } } } + +__attribute__((constructor)) +static void __applet_init(void) +{ + SPIN_INIT(&applet_active_lock); +} diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 9543f8fe2..aa3f37a10 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1930,7 +1930,7 @@ spoe_create_appctx(struct spoe_config *conf) struct session *sess; struct stream *strm; - if ((appctx = appctx_new(&spoe_applet)) == NULL) + if ((appctx = appctx_new(&spoe_applet, tid_bit)) == NULL) goto out_error; appctx->ctx.spoe.ptr = pool_alloc_dirty(pool2_spoe_appctx); diff --git a/src/hlua.c b/src/hlua.c index 36b1b3ff4..d5d07de34 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -2376,7 +2376,7 @@ __LJMP static int hlua_socket_new(lua_State *L) lua_setmetatable(L, -2); /* Create the applet context */ - appctx = appctx_new(&update_applet); + appctx = appctx_new(&update_applet, MAX_THREADS_MASK); if (!appctx) { hlua_pusherror(L, "socket: out of memory"); goto out_fail_conf; diff --git a/src/peers.c b/src/peers.c index 25f1ba32b..ef332eba2 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1839,7 +1839,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer peer->statuscode = PEER_SESS_SC_CONNECTCODE; s = NULL; - appctx = appctx_new(&peer_applet); + appctx = appctx_new(&peer_applet, tid_bit); if (!appctx) goto out_close;