MEDIUM: task: make notification_* API thread safe by default

Some notification_* functions were not thread safe by default as they
assumed only one producer would emit events for registered tasks.

While this suited well with the Lua sockets use-case, this proved to
be a limitation with some other event sources (ie: lua Queue class)

instead of having to deal with both the non thread safe and thread
safe variants (_mt suffix), which is error prone, let's make the
entire API thread safe regarding the event list.

Pruning functions still require that only one thread executes them,
with Lua this is always the case because there is one cleanup list
per context.
This commit is contained in:
Aurelien DARRAGON 2025-04-01 16:07:39 +02:00
parent 976890edda
commit 11d4d0957e
4 changed files with 15 additions and 45 deletions

View File

@ -107,13 +107,7 @@ enum {
struct notification {
struct list purge_me; /* Part of the list of signals to be purged in the
case of the LUA execution stack crash. */
union {
struct list wake_me; /* Part of list of signals to be targeted if an
event occurs. */
struct mt_list wake_me_mt; /* thread safe signal list */
} wake;
# define wake_me wake.wake_me
# define wake_me_mt wake.wake_me_mt
struct mt_list wake_me; /* thread safe signal list */
struct task *task; /* The task to be wake if an event occurs. */
__decl_thread(HA_SPINLOCK_T lock);
};

View File

@ -793,22 +793,12 @@ static inline struct notification *_notification_new(struct list *purge, struct
* events like TCP I/O or sleep functions. This function allocate
* memory for the signal.
*/
static inline struct notification *notification_new(struct list *purge, struct list *event, struct task *wakeup)
static inline struct notification *notification_new(struct list *purge, struct mt_list *event, struct task *wakeup)
{
struct notification *com = _notification_new(purge, wakeup);
if (!com)
return NULL;
LIST_APPEND(event, &com->wake_me);
return com;
}
/* thread safe variant */
static inline struct notification *notification_new_mt(struct list *purge, struct mt_list *event, struct task *wakeup)
{
struct notification *com = _notification_new(purge, wakeup);
if (!com)
return NULL;
MT_LIST_APPEND(event, &com->wake_me_mt);
MT_LIST_APPEND(event, &com->wake_me);
return com;
}
@ -872,29 +862,15 @@ static inline void _notification_wake(struct notification *com)
}
/* This function sends signals. It wakes all the tasks attached
* to a list head, and remove the signal, and free the used
* memory. The wake list is not locked because it is owned by
* only one process. before browsing this list, the caller must
* ensure to be the only one browser.
* memory.
*/
static inline void notification_wake(struct list *wake)
{
struct notification *com, *back;
/* Wake task and delete all pending communication signals. */
list_for_each_entry_safe(com, back, wake, wake_me) {
LIST_DELETE(&com->wake_me);
_notification_wake(com);
}
}
/* thread safe variant */
static inline void notification_wake_mt(struct mt_list *wake)
static inline void notification_wake(struct mt_list *wake)
{
struct notification *com;
struct mt_list back;
/* Wake task and delete all pending communication signals. */
MT_LIST_FOR_EACH_ENTRY_UNLOCKED(com, wake, wake_me_mt, back) {
MT_LIST_FOR_EACH_ENTRY_UNLOCKED(com, wake, wake_me, back) {
_notification_wake(com);
com = NULL;
}
@ -902,9 +878,9 @@ static inline void notification_wake_mt(struct mt_list *wake)
/* This function returns true is some notification are pending
*/
static inline int notification_registered(struct list *wake)
static inline int notification_registered(struct mt_list *wake)
{
return !LIST_ISEMPTY(wake);
return !MT_LIST_ISEMPTY(wake);
}
#endif /* _HAPROXY_TASK_H */

View File

@ -419,8 +419,8 @@ struct hlua_flt_ctx {
struct hlua_csk_ctx {
int connected;
struct xref xref; /* cross reference with the Lua object owner. */
struct list wake_on_read;
struct list wake_on_write;
struct mt_list wake_on_read;
struct mt_list wake_on_write;
struct appctx *appctx;
struct server *srv;
int timeout;
@ -3771,8 +3771,8 @@ __LJMP static int hlua_socket_new(lua_State *L)
ctx->srv = NULL;
ctx->timeout = 0;
ctx->appctx = appctx;
LIST_INIT(&ctx->wake_on_write);
LIST_INIT(&ctx->wake_on_read);
MT_LIST_INIT(&ctx->wake_on_write);
MT_LIST_INIT(&ctx->wake_on_read);
hlua->gc_count++;

View File

@ -557,7 +557,7 @@ static int hlua_queue_alarm(lua_State *L)
return 0; /* not reached */
}
if (!notification_new_mt(&hlua->com, &queue->wait_tasks, hlua->task))
if (!notification_new(&hlua->com, &queue->wait_tasks, hlua->task))
luaL_error(L, "out of memory");
return 0;
@ -595,7 +595,7 @@ static int hlua_queue_push(lua_State *L)
MT_LIST_APPEND(&queue->list, &item->list);
/* notify tasks waiting on queue:pop_wait() (if any) */
notification_wake_mt(&queue->wait_tasks);
notification_wake(&queue->wait_tasks);
lua_pushboolean(L, 1);
return 1;
@ -657,7 +657,7 @@ static int _hlua_queue_pop_wait(lua_State *L, int status, lua_KContext ctx)
struct hlua *hlua;
hlua = hlua_gethlua(L);
if (!notification_new_mt(&hlua->com, &queue->wait_tasks, hlua->task)) {
if (!notification_new(&hlua->com, &queue->wait_tasks, hlua->task)) {
lua_pushnil(L);
return 1; /* memory error, return nil */
}