MEDIUM: threads/listeners: Make listeners thread-safe

First, we use atomic operations to update jobs/totalconn/actconn variables,
listener's nbconn variable and listener's counters. Then we add a lock on
listeners to protect access to their information. And finally, listener queues
(global and per proxy) are also protected by a lock. Here, because access to
these queues are unusal, we use the same lock for all queues instead of a global
one for the global queue and a lock per proxy for others.
This commit is contained in:
Christopher Faulet 2017-05-30 15:36:50 +02:00 committed by Willy Tarreau
parent b79a94c9f3
commit 8d8aa0d681
12 changed files with 165 additions and 87 deletions

View File

@ -651,6 +651,7 @@ static inline void b_reset(struct buffer *buf)
buf->o = 0; buf->o = 0;
buf->i = 0; buf->i = 0;
buf->p = buf->data; buf->p = buf->data;
} }
/* Allocates a buffer and replaces *buf with this buffer. If no memory is /* Allocates a buffer and replaces *buf with this buffer. If no memory is

View File

@ -145,6 +145,8 @@ enum lock_label {
TASK_RQ_LOCK, TASK_RQ_LOCK,
TASK_WQ_LOCK, TASK_WQ_LOCK,
POOL_LOCK, POOL_LOCK,
LISTENER_LOCK,
LISTENER_QUEUE_LOCK,
SIGNALS_LOCK, SIGNALS_LOCK,
LOCK_LABELS LOCK_LABELS
}; };
@ -230,7 +232,7 @@ static inline void show_lock_stats()
{ {
const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL", const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
"TASK_RQ", "TASK_WQ", "POOL", "TASK_RQ", "TASK_WQ", "POOL",
"SIGNALS" }; "LISTENER", "LISTENER_QUEUE", "SIGNALS" };
int lbl; int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) { for (lbl = 0; lbl < LOCK_LABELS; lbl++) {

View File

@ -112,7 +112,7 @@ static void inline proxy_inc_fe_conn_ctr(struct listener *l, struct proxy *fe)
{ {
fe->fe_counters.cum_conn++; fe->fe_counters.cum_conn++;
if (l->counters) if (l->counters)
l->counters->cum_conn++; HA_ATOMIC_ADD(&l->counters->cum_conn, 1);
update_freq_ctr(&fe->fe_conn_per_sec, 1); update_freq_ctr(&fe->fe_conn_per_sec, 1);
if (fe->fe_conn_per_sec.curr_ctr > fe->fe_counters.cps_max) if (fe->fe_conn_per_sec.curr_ctr > fe->fe_counters.cps_max)
@ -124,7 +124,7 @@ static void inline proxy_inc_fe_sess_ctr(struct listener *l, struct proxy *fe)
{ {
fe->fe_counters.cum_sess++; fe->fe_counters.cum_sess++;
if (l->counters) if (l->counters)
l->counters->cum_sess++; HA_ATOMIC_ADD(&l->counters->cum_sess, 1);
update_freq_ctr(&fe->fe_sess_per_sec, 1); update_freq_ctr(&fe->fe_sess_per_sec, 1);
if (fe->fe_sess_per_sec.curr_ctr > fe->fe_counters.sps_max) if (fe->fe_sess_per_sec.curr_ctr > fe->fe_counters.sps_max)
fe->fe_counters.sps_max = fe->fe_sess_per_sec.curr_ctr; fe->fe_counters.sps_max = fe->fe_sess_per_sec.curr_ctr;

View File

@ -32,6 +32,8 @@
#include <common/config.h> #include <common/config.h>
#include <common/mini-clist.h> #include <common/mini-clist.h>
#include <common/hathreads.h>
#include <types/obj_type.h> #include <types/obj_type.h>
#include <eb32tree.h> #include <eb32tree.h>
@ -198,6 +200,10 @@ struct listener {
int tcp_ut; /* for TCP, user timeout */ int tcp_ut; /* for TCP, user timeout */
char *interface; /* interface name or NULL */ char *interface; /* interface name or NULL */
#ifdef USE_THREAD
HA_SPINLOCK_T lock;
#endif
const struct netns_entry *netns; /* network namespace of the listener*/ const struct netns_entry *netns; /* network namespace of the listener*/
struct list by_fe; /* chaining in frontend's list of listeners */ struct list by_fe; /* chaining in frontend's list of listeners */

View File

@ -38,6 +38,11 @@
#include <proto/stream.h> #include <proto/stream.h>
#include <proto/task.h> #include <proto/task.h>
#ifdef USE_THREAD
/* listner_queue lock (same for global and per proxy queues) */
static HA_SPINLOCK_T lq_lock;
#endif
/* List head of all known bind keywords */ /* List head of all known bind keywords */
static struct bind_kw_list bind_keywords = { static struct bind_kw_list bind_keywords = {
.list = LIST_HEAD_INIT(bind_keywords.list) .list = LIST_HEAD_INIT(bind_keywords.list)
@ -53,6 +58,7 @@ struct xfer_sock_list *xfer_sock_list = NULL;
*/ */
static void enable_listener(struct listener *listener) static void enable_listener(struct listener *listener)
{ {
SPIN_LOCK(LISTENER_LOCK, &listener->lock);
if (listener->state == LI_LISTEN) { if (listener->state == LI_LISTEN) {
if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) && if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) &&
listener->bind_conf->bind_proc && listener->bind_conf->bind_proc &&
@ -75,6 +81,7 @@ static void enable_listener(struct listener *listener)
listener->state = LI_FULL; listener->state = LI_FULL;
} }
} }
SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
} }
/* This function removes the specified listener's file descriptor from the /* This function removes the specified listener's file descriptor from the
@ -83,13 +90,19 @@ static void enable_listener(struct listener *listener)
*/ */
static void disable_listener(struct listener *listener) static void disable_listener(struct listener *listener)
{ {
SPIN_LOCK(LISTENER_LOCK, &listener->lock);
if (listener->state < LI_READY) if (listener->state < LI_READY)
return; goto end;
if (listener->state == LI_READY) if (listener->state == LI_READY)
fd_stop_recv(listener->fd); fd_stop_recv(listener->fd);
if (listener->state == LI_LIMITED) if (listener->state == LI_LIMITED) {
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
LIST_DEL(&listener->wait_queue); LIST_DEL(&listener->wait_queue);
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
}
listener->state = LI_LISTEN; listener->state = LI_LISTEN;
end:
SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
} }
/* This function tries to temporarily disable a listener, depending on the OS /* This function tries to temporarily disable a listener, depending on the OS
@ -101,8 +114,12 @@ static void disable_listener(struct listener *listener)
*/ */
int pause_listener(struct listener *l) int pause_listener(struct listener *l)
{ {
int ret = 1;
SPIN_LOCK(LISTENER_LOCK, &l->lock);
if (l->state <= LI_ZOMBIE) if (l->state <= LI_ZOMBIE)
return 1; goto end;
if (l->proto->pause) { if (l->proto->pause) {
/* Returns < 0 in case of failure, 0 if the listener /* Returns < 0 in case of failure, 0 if the listener
@ -110,18 +127,25 @@ int pause_listener(struct listener *l)
*/ */
int ret = l->proto->pause(l); int ret = l->proto->pause(l);
if (ret < 0) if (ret < 0) {
return 0; ret = 0;
goto end;
}
else if (ret == 0) else if (ret == 0)
return 1; goto end;
} }
if (l->state == LI_LIMITED) if (l->state == LI_LIMITED) {
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
LIST_DEL(&l->wait_queue); LIST_DEL(&l->wait_queue);
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
}
fd_stop_recv(l->fd); fd_stop_recv(l->fd);
l->state = LI_PAUSED; l->state = LI_PAUSED;
return 1; end:
SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
return ret;
} }
/* This function tries to resume a temporarily disabled listener. Paused, full, /* This function tries to resume a temporarily disabled listener. Paused, full,
@ -134,12 +158,16 @@ int pause_listener(struct listener *l)
* stopped it. If the resume fails, 0 is returned and an error might be * stopped it. If the resume fails, 0 is returned and an error might be
* displayed. * displayed.
*/ */
int resume_listener(struct listener *l) static int __resume_listener(struct listener *l)
{ {
int ret = 1;
SPIN_LOCK(LISTENER_LOCK, &l->lock);
if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) && if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) &&
l->bind_conf->bind_proc && l->bind_conf->bind_proc &&
!(l->bind_conf->bind_proc & (1UL << (relative_pid - 1)))) !(l->bind_conf->bind_proc & (1UL << (relative_pid - 1))))
return 1; goto end;
if (l->state == LI_ASSIGNED) { if (l->state == LI_ASSIGNED) {
char msg[100]; char msg[100];
@ -151,42 +179,66 @@ int resume_listener(struct listener *l)
else if (err & ERR_WARN) else if (err & ERR_WARN)
Warning("Resuming listener: %s\n", msg); Warning("Resuming listener: %s\n", msg);
if (err & (ERR_FATAL | ERR_ABORT)) if (err & (ERR_FATAL | ERR_ABORT)) {
return 0; ret = 0;
goto end;
}
} }
if (l->state < LI_PAUSED || l->state == LI_ZOMBIE) if (l->state < LI_PAUSED || l->state == LI_ZOMBIE) {
return 0; ret = 0;
goto end;
}
if (l->proto->sock_prot == IPPROTO_TCP && if (l->proto->sock_prot == IPPROTO_TCP &&
l->state == LI_PAUSED && l->state == LI_PAUSED &&
listen(l->fd, l->backlog ? l->backlog : l->maxconn) != 0) listen(l->fd, l->backlog ? l->backlog : l->maxconn) != 0) {
return 0; ret = 0;
goto end;
}
if (l->state == LI_READY) if (l->state == LI_READY)
return 1; goto end;
if (l->state == LI_LIMITED) if (l->state == LI_LIMITED)
LIST_DEL(&l->wait_queue); LIST_DEL(&l->wait_queue);
if (l->nbconn >= l->maxconn) { if (l->nbconn >= l->maxconn) {
l->state = LI_FULL; l->state = LI_FULL;
return 1; goto end;
} }
fd_want_recv(l->fd); fd_want_recv(l->fd);
l->state = LI_READY; l->state = LI_READY;
return 1; end:
SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
return ret;
}
int resume_listener(struct listener *l)
{
int ret;
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
ret = __resume_listener(l);
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
return ret;
} }
/* Marks a ready listener as full so that the stream code tries to re-enable /* Marks a ready listener as full so that the stream code tries to re-enable
* it upon next close() using resume_listener(). * it upon next close() using resume_listener().
*
* Note: this function is only called from listener_accept so <l> is already
* locked.
*/ */
static void listener_full(struct listener *l) static void listener_full(struct listener *l)
{ {
if (l->state >= LI_READY) { if (l->state >= LI_READY) {
if (l->state == LI_LIMITED) if (l->state == LI_LIMITED) {
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
LIST_DEL(&l->wait_queue); LIST_DEL(&l->wait_queue);
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
}
fd_stop_recv(l->fd); fd_stop_recv(l->fd);
l->state = LI_FULL; l->state = LI_FULL;
@ -195,11 +247,16 @@ static void listener_full(struct listener *l)
/* Marks a ready listener as limited so that we only try to re-enable it when /* Marks a ready listener as limited so that we only try to re-enable it when
* resources are free again. It will be queued into the specified queue. * resources are free again. It will be queued into the specified queue.
*
* Note: this function is only called from listener_accept so <l> is already
* locked.
*/ */
static void limit_listener(struct listener *l, struct list *list) static void limit_listener(struct listener *l, struct list *list)
{ {
if (l->state == LI_READY) { if (l->state == LI_READY) {
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
LIST_ADDQ(list, &l->wait_queue); LIST_ADDQ(list, &l->wait_queue);
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
fd_stop_recv(l->fd); fd_stop_recv(l->fd);
l->state = LI_LIMITED; l->state = LI_LIMITED;
} }
@ -239,22 +296,28 @@ void dequeue_all_listeners(struct list *list)
{ {
struct listener *listener, *l_back; struct listener *listener, *l_back;
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
list_for_each_entry_safe(listener, l_back, list, wait_queue) { list_for_each_entry_safe(listener, l_back, list, wait_queue) {
/* This cannot fail because the listeners are by definition in /* This cannot fail because the listeners are by definition in
* the LI_LIMITED state. The function also removes the entry * the LI_LIMITED state. The function also removes the entry
* from the queue. * from the queue.
*/ */
resume_listener(listener); __resume_listener(listener);
} }
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
} }
static int do_unbind_listener(struct listener *listener, int do_close) static int do_unbind_listener(struct listener *listener, int do_close)
{ {
SPIN_LOCK(LISTENER_LOCK, &listener->lock);
if (listener->state == LI_READY) if (listener->state == LI_READY)
fd_stop_recv(listener->fd); fd_stop_recv(listener->fd);
if (listener->state == LI_LIMITED) if (listener->state == LI_LIMITED) {
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
LIST_DEL(&listener->wait_queue); LIST_DEL(&listener->wait_queue);
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
}
if (listener->state >= LI_PAUSED) { if (listener->state >= LI_PAUSED) {
if (do_close) { if (do_close) {
@ -265,6 +328,7 @@ static int do_unbind_listener(struct listener *listener, int do_close)
fd_remove(listener->fd); fd_remove(listener->fd);
listener->state = LI_ASSIGNED; listener->state = LI_ASSIGNED;
} }
SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
return ERR_NONE; return ERR_NONE;
} }
@ -335,8 +399,9 @@ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss,
proto->add(l, port); proto->add(l, port);
jobs++; SPIN_INIT(&l->lock);
listeners++; HA_ATOMIC_ADD(&jobs, 1);
HA_ATOMIC_ADD(&listeners, 1);
} }
return 1; return 1;
} }
@ -351,11 +416,14 @@ void delete_listener(struct listener *listener)
{ {
if (listener->state != LI_ASSIGNED) if (listener->state != LI_ASSIGNED)
return; return;
SPIN_LOCK(LISTENER_LOCK, &listener->lock);
listener->state = LI_INIT; listener->state = LI_INIT;
LIST_DEL(&listener->proto_list); LIST_DEL(&listener->proto_list);
listener->proto->nb_listeners--; listener->proto->nb_listeners--;
listeners--; HA_ATOMIC_SUB(&jobs, 1);
jobs--; HA_ATOMIC_SUB(&listeners, 1);
SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
} }
/* This function is called on a read event from a listening socket, corresponding /* This function is called on a read event from a listening socket, corresponding
@ -374,9 +442,12 @@ void listener_accept(int fd)
static int accept4_broken; static int accept4_broken;
#endif #endif
if (SPIN_TRYLOCK(LISTENER_LOCK, &l->lock))
return;
if (unlikely(l->nbconn >= l->maxconn)) { if (unlikely(l->nbconn >= l->maxconn)) {
listener_full(l); listener_full(l);
return; goto end;
} }
if (!(l->options & LI_O_UNLIMITED) && global.sps_lim) { if (!(l->options & LI_O_UNLIMITED) && global.sps_lim) {
@ -425,7 +496,7 @@ void listener_accept(int fd)
/* frontend accept rate limit was reached */ /* frontend accept rate limit was reached */
limit_listener(l, &p->listener_queue); limit_listener(l, &p->listener_queue);
task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0))); task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0)));
return; goto end;
} }
if (max_accept > max) if (max_accept > max)
@ -440,16 +511,17 @@ void listener_accept(int fd)
while (max_accept--) { while (max_accept--) {
struct sockaddr_storage addr; struct sockaddr_storage addr;
socklen_t laddr = sizeof(addr); socklen_t laddr = sizeof(addr);
unsigned int count;
if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) { if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
limit_listener(l, &global_listener_queue); limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
return; goto end;
} }
if (unlikely(p && p->feconn >= p->maxconn)) { if (unlikely(p && p->feconn >= p->maxconn)) {
limit_listener(l, &p->listener_queue); limit_listener(l, &p->listener_queue);
return; goto end;
} }
#ifdef USE_ACCEPT4 #ifdef USE_ACCEPT4
@ -477,7 +549,7 @@ void listener_accept(int fd)
goto transient_error; goto transient_error;
} }
fd_cant_recv(fd); fd_cant_recv(fd);
return; /* nothing more to accept */ goto end; /* nothing more to accept */
case EINVAL: case EINVAL:
/* might be trying to accept on a shut fd (eg: soft stop) */ /* might be trying to accept on a shut fd (eg: soft stop) */
goto transient_error; goto transient_error;
@ -516,23 +588,19 @@ void listener_accept(int fd)
close(cfd); close(cfd);
limit_listener(l, &global_listener_queue); limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
return; goto end;
} }
/* increase the per-process number of cumulated connections */ /* increase the per-process number of cumulated connections */
if (!(l->options & LI_O_UNLIMITED)) { if (!(l->options & LI_O_UNLIMITED)) {
update_freq_ctr(&global.conn_per_sec, 1); count = update_freq_ctr(&global.conn_per_sec, 1);
if (global.conn_per_sec.curr_ctr > global.cps_max) HA_ATOMIC_UPDATE_MAX(&global.cps_max, count);
global.cps_max = global.conn_per_sec.curr_ctr; HA_ATOMIC_ADD(&actconn, 1);
actconn++;
} }
l->nbconn++; count = HA_ATOMIC_ADD(&l->nbconn, 1);
if (l->counters)
if (l->counters) { HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, count);
if (l->nbconn > l->counters->conn_max)
l->counters->conn_max = l->nbconn;
}
ret = l->accept(l, cfd, &addr); ret = l->accept(l, cfd, &addr);
if (unlikely(ret <= 0)) { if (unlikely(ret <= 0)) {
@ -542,8 +610,8 @@ void listener_accept(int fd)
* listener (ret < 0). * listener (ret < 0).
*/ */
if (!(l->options & LI_O_UNLIMITED)) if (!(l->options & LI_O_UNLIMITED))
actconn--; HA_ATOMIC_SUB(&actconn, 1);
l->nbconn--; HA_ATOMIC_SUB(&l->nbconn, 1);
if (ret == 0) /* successful termination */ if (ret == 0) /* successful termination */
continue; continue;
@ -552,21 +620,18 @@ void listener_accept(int fd)
if (l->nbconn >= l->maxconn) { if (l->nbconn >= l->maxconn) {
listener_full(l); listener_full(l);
return; goto end;
} }
/* increase the per-process number of cumulated connections */ /* increase the per-process number of cumulated connections */
if (!(l->options & LI_O_UNLIMITED)) { if (!(l->options & LI_O_UNLIMITED)) {
update_freq_ctr(&global.sess_per_sec, 1); count = update_freq_ctr(&global.sess_per_sec, 1);
if (global.sess_per_sec.curr_ctr > global.sps_max) HA_ATOMIC_UPDATE_MAX(&global.sps_max, count);
global.sps_max = global.sess_per_sec.curr_ctr;
} }
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
if (!(l->options & LI_O_UNLIMITED) && l->bind_conf && l->bind_conf->is_ssl) { if (!(l->options & LI_O_UNLIMITED) && l->bind_conf && l->bind_conf->is_ssl) {
count = update_freq_ctr(&global.ssl_per_sec, 1);
update_freq_ctr(&global.ssl_per_sec, 1); HA_ATOMIC_UPDATE_MAX(&global.ssl_max, count);
if (global.ssl_per_sec.curr_ctr > global.ssl_max)
global.ssl_max = global.ssl_per_sec.curr_ctr;
} }
#endif #endif
@ -575,7 +640,7 @@ void listener_accept(int fd)
/* we've exhausted max_accept, so there is no need to poll again */ /* we've exhausted max_accept, so there is no need to poll again */
stop: stop:
fd_done_recv(fd); fd_done_recv(fd);
return; goto end;
transient_error: transient_error:
/* pause the listener and try again in 100 ms */ /* pause the listener and try again in 100 ms */
@ -584,7 +649,8 @@ void listener_accept(int fd)
wait_expire: wait_expire:
limit_listener(l, &global_listener_queue); limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_first(expire, global_listener_queue_task->expire)); task_schedule(global_listener_queue_task, tick_first(expire, global_listener_queue_task->expire));
return; end:
SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
} }
/* Notify the listener that a connection initiated from it was released. This /* Notify the listener that a connection initiated from it was released. This
@ -596,8 +662,8 @@ void listener_release(struct listener *l)
struct proxy *fe = l->bind_conf->frontend; struct proxy *fe = l->bind_conf->frontend;
if (!(l->options & LI_O_UNLIMITED)) if (!(l->options & LI_O_UNLIMITED))
actconn--; HA_ATOMIC_SUB(&actconn, 1);
l->nbconn--; HA_ATOMIC_SUB(&l->nbconn, 1);
if (l->state == LI_FULL) if (l->state == LI_FULL)
resume_listener(l); resume_listener(l);
@ -946,6 +1012,7 @@ static void __listener_init(void)
sample_register_fetches(&smp_kws); sample_register_fetches(&smp_kws);
acl_register_keywords(&acl_kws); acl_register_keywords(&acl_kws);
bind_register_keywords(&bind_kws); bind_register_keywords(&bind_kws);
SPIN_INIT(&lq_lock);
} }
/* /*

View File

@ -24,6 +24,7 @@
#include <common/config.h> #include <common/config.h>
#include <common/time.h> #include <common/time.h>
#include <common/standard.h> #include <common/standard.h>
#include <common/hathreads.h>
#include <types/global.h> #include <types/global.h>
#include <types/listener.h> #include <types/listener.h>
@ -1974,7 +1975,7 @@ static struct task *process_peer_sync(struct task * task)
/* We've just recieved the signal */ /* We've just recieved the signal */
if (!(peers->flags & PEERS_F_DONOTSTOP)) { if (!(peers->flags & PEERS_F_DONOTSTOP)) {
/* add DO NOT STOP flag if not present */ /* add DO NOT STOP flag if not present */
jobs++; HA_ATOMIC_ADD(&jobs, 1);
peers->flags |= PEERS_F_DONOTSTOP; peers->flags |= PEERS_F_DONOTSTOP;
ps = peers->local; ps = peers->local;
for (st = ps->tables; st ; st = st->next) for (st = ps->tables; st ; st = st->next)
@ -1994,7 +1995,7 @@ static struct task *process_peer_sync(struct task * task)
if (ps->flags & PEER_F_TEACH_COMPLETE) { if (ps->flags & PEER_F_TEACH_COMPLETE) {
if (peers->flags & PEERS_F_DONOTSTOP) { if (peers->flags & PEERS_F_DONOTSTOP) {
/* resync of new process was complete, current process can die now */ /* resync of new process was complete, current process can die now */
jobs--; HA_ATOMIC_ADD(&jobs, 1);
peers->flags &= ~PEERS_F_DONOTSTOP; peers->flags &= ~PEERS_F_DONOTSTOP;
for (st = ps->tables; st ; st = st->next) for (st = ps->tables; st ; st = st->next)
st->table->syncing--; st->table->syncing--;
@ -2018,7 +2019,7 @@ static struct task *process_peer_sync(struct task * task)
/* Other error cases */ /* Other error cases */
if (peers->flags & PEERS_F_DONOTSTOP) { if (peers->flags & PEERS_F_DONOTSTOP) {
/* unable to resync new process, current process can die now */ /* unable to resync new process, current process can die now */
jobs--; HA_ATOMIC_SUB(&jobs, 1);
peers->flags &= ~PEERS_F_DONOTSTOP; peers->flags &= ~PEERS_F_DONOTSTOP;
for (st = ps->tables; st ; st = st->next) for (st = ps->tables; st ; st = st->next)
st->table->syncing--; st->table->syncing--;

View File

@ -1744,7 +1744,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
proxy_inc_fe_req_ctr(sess->fe); proxy_inc_fe_req_ctr(sess->fe);
sess->fe->fe_counters.failed_req++; sess->fe->fe_counters.failed_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->failed_req++; HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
if (!(s->flags & SF_FINST_MASK)) if (!(s->flags & SF_FINST_MASK))
s->flags |= SF_FINST_R; s->flags |= SF_FINST_R;
@ -1777,7 +1777,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
proxy_inc_fe_req_ctr(sess->fe); proxy_inc_fe_req_ctr(sess->fe);
sess->fe->fe_counters.failed_req++; sess->fe->fe_counters.failed_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->failed_req++; HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
if (!(s->flags & SF_FINST_MASK)) if (!(s->flags & SF_FINST_MASK))
s->flags |= SF_FINST_R; s->flags |= SF_FINST_R;
@ -1807,7 +1807,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
proxy_inc_fe_req_ctr(sess->fe); proxy_inc_fe_req_ctr(sess->fe);
sess->fe->fe_counters.failed_req++; sess->fe->fe_counters.failed_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->failed_req++; HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
if (!(s->flags & SF_FINST_MASK)) if (!(s->flags & SF_FINST_MASK))
s->flags |= SF_FINST_R; s->flags |= SF_FINST_R;
@ -2177,7 +2177,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
sess->fe->fe_counters.failed_req++; sess->fe->fe_counters.failed_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->failed_req++; HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
return_prx_cond: return_prx_cond:
if (!(s->flags & SF_ERR_MASK)) if (!(s->flags & SF_ERR_MASK))
@ -3545,7 +3545,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
if (sess->fe != s->be) if (sess->fe != s->be)
s->be->be_counters.denied_req++; s->be->be_counters.denied_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->denied_req++; HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
goto done_without_exp; goto done_without_exp;
deny: /* this request was blocked (denied) */ deny: /* this request was blocked (denied) */
@ -3564,7 +3564,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
if (sess->fe != s->be) if (sess->fe != s->be)
s->be->be_counters.denied_req++; s->be->be_counters.denied_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->denied_req++; HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
goto return_prx_cond; goto return_prx_cond;
return_bad_req: return_bad_req:
@ -3583,7 +3583,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
sess->fe->fe_counters.failed_req++; sess->fe->fe_counters.failed_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->failed_req++; HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
return_prx_cond: return_prx_cond:
if (!(s->flags & SF_ERR_MASK)) if (!(s->flags & SF_ERR_MASK))
@ -3921,7 +3921,7 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit)
sess->fe->fe_counters.failed_req++; sess->fe->fe_counters.failed_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->failed_req++; HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
if (!(s->flags & SF_ERR_MASK)) if (!(s->flags & SF_ERR_MASK))
s->flags |= SF_ERR_PRXCOND; s->flags |= SF_ERR_PRXCOND;
@ -4127,7 +4127,7 @@ int http_wait_for_request_body(struct stream *s, struct channel *req, int an_bit
req->analysers &= AN_REQ_FLT_END; req->analysers &= AN_REQ_FLT_END;
sess->fe->fe_counters.failed_req++; sess->fe->fe_counters.failed_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->failed_req++; HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
return 0; return 0;
} }
@ -4912,7 +4912,7 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit)
return_bad_req: /* let's centralize all bad requests */ return_bad_req: /* let's centralize all bad requests */
sess->fe->fe_counters.failed_req++; sess->fe->fe_counters.failed_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->failed_req++; HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
return_bad_req_stats_ok: return_bad_req_stats_ok:
txn->req.err_state = txn->req.msg_state; txn->req.err_state = txn->req.msg_state;
@ -5700,7 +5700,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
s->be->be_counters.denied_resp++; s->be->be_counters.denied_resp++;
sess->fe->fe_counters.denied_resp++; sess->fe->fe_counters.denied_resp++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->denied_resp++; HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
goto return_srv_prx_502; goto return_srv_prx_502;
} }
@ -5850,7 +5850,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
s->be->be_counters.denied_resp++; s->be->be_counters.denied_resp++;
sess->fe->fe_counters.denied_resp++; sess->fe->fe_counters.denied_resp++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->denied_resp++; HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
Alert("Blocking cacheable cookie in response from instance %s, server %s.\n", Alert("Blocking cacheable cookie in response from instance %s, server %s.\n",
s->be->id, objt_server(s->target) ? objt_server(s->target)->id : "<dispatch>"); s->be->id, objt_server(s->target) ? objt_server(s->target)->id : "<dispatch>");

View File

@ -1377,7 +1377,7 @@ static enum act_return tcp_exec_action_silent_drop(struct act_rule *rule, struct
sess->fe->fe_counters.denied_req++; sess->fe->fe_counters.denied_req++;
if (sess->listener->counters) if (sess->listener->counters)
sess->listener->counters->denied_req++; HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
return ACT_RET_STOP; return ACT_RET_STOP;
} }

View File

@ -57,8 +57,8 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type
fe->fe_counters.conn_max = fe->feconn; fe->fe_counters.conn_max = fe->feconn;
if (li) if (li)
proxy_inc_fe_conn_ctr(li, fe); proxy_inc_fe_conn_ctr(li, fe);
totalconn++; HA_ATOMIC_ADD(&totalconn, 1);
jobs++; HA_ATOMIC_ADD(&jobs, 1);
} }
return sess; return sess;
} }
@ -69,7 +69,7 @@ void session_free(struct session *sess)
session_store_counters(sess); session_store_counters(sess);
vars_prune_per_sess(&sess->vars); vars_prune_per_sess(&sess->vars);
pool_free2(pool2_session, sess); pool_free2(pool2_session, sess);
jobs--; HA_ATOMIC_SUB(&jobs, 1);
} }
/* perform minimal intializations, report 0 in case of error, 1 if OK. */ /* perform minimal intializations, report 0 in case of error, 1 if OK. */

View File

@ -427,7 +427,7 @@ static void ssl_async_fd_free(int fd)
/* Now we can safely call SSL_free, no more pending job in engines */ /* Now we can safely call SSL_free, no more pending job in engines */
SSL_free(ssl); SSL_free(ssl);
sslconns--; sslconns--;
jobs--; HA_ATOMIC_SUB(&jobs, 1);
} }
/* /*
* function used to manage a returned SSL_ERROR_WANT_ASYNC * function used to manage a returned SSL_ERROR_WANT_ASYNC
@ -5487,7 +5487,7 @@ static void ssl_sock_close(struct connection *conn) {
fd_cant_recv(afd); fd_cant_recv(afd);
} }
conn->xprt_ctx = NULL; conn->xprt_ctx = NULL;
jobs++; HA_ATOMIC_ADD(&jobs, 1);
return; return;
} }
/* Else we can remove the fds from the fdtab /* Else we can remove the fds from the fdtab

View File

@ -19,6 +19,7 @@
#include <common/buffer.h> #include <common/buffer.h>
#include <common/debug.h> #include <common/debug.h>
#include <common/memory.h> #include <common/memory.h>
#include <common/hathreads.h>
#include <types/applet.h> #include <types/applet.h>
#include <types/capture.h> #include <types/capture.h>
@ -477,7 +478,7 @@ void stream_process_counters(struct stream *s)
objt_server(s->target)->counters.bytes_in += bytes; objt_server(s->target)->counters.bytes_in += bytes;
if (sess->listener && sess->listener->counters) if (sess->listener && sess->listener->counters)
sess->listener->counters->bytes_in += bytes; HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes);
for (i = 0; i < MAX_SESS_STKCTR; i++) { for (i = 0; i < MAX_SESS_STKCTR; i++) {
struct stkctr *stkctr = &s->stkctr[i]; struct stkctr *stkctr = &s->stkctr[i];
@ -514,7 +515,7 @@ void stream_process_counters(struct stream *s)
objt_server(s->target)->counters.bytes_out += bytes; objt_server(s->target)->counters.bytes_out += bytes;
if (sess->listener && sess->listener->counters) if (sess->listener && sess->listener->counters)
sess->listener->counters->bytes_out += bytes; HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes);
for (i = 0; i < MAX_SESS_STKCTR; i++) { for (i = 0; i < MAX_SESS_STKCTR; i++) {
struct stkctr *stkctr = &s->stkctr[i]; struct stkctr *stkctr = &s->stkctr[i];
@ -986,7 +987,7 @@ static void sess_set_term_flags(struct stream *s)
strm_fe(s)->fe_counters.failed_req++; strm_fe(s)->fe_counters.failed_req++;
if (strm_li(s) && strm_li(s)->counters) if (strm_li(s) && strm_li(s)->counters)
strm_li(s)->counters->failed_req++; HA_ATOMIC_ADD(&strm_li(s)->counters->failed_req, 1);
s->flags |= SF_FINST_R; s->flags |= SF_FINST_R;
} }

View File

@ -169,7 +169,7 @@ int tcp_inspect_request(struct stream *s, struct channel *req, int an_bit)
s->be->be_counters.denied_req++; s->be->be_counters.denied_req++;
sess->fe->fe_counters.denied_req++; sess->fe->fe_counters.denied_req++;
if (sess->listener && sess->listener->counters) if (sess->listener && sess->listener->counters)
sess->listener->counters->denied_req++; HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
if (!(s->flags & SF_ERR_MASK)) if (!(s->flags & SF_ERR_MASK))
s->flags |= SF_ERR_PRXCOND; s->flags |= SF_ERR_PRXCOND;
@ -347,7 +347,7 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit)
s->be->be_counters.denied_resp++; s->be->be_counters.denied_resp++;
sess->fe->fe_counters.denied_resp++; sess->fe->fe_counters.denied_resp++;
if (sess->listener && sess->listener->counters) if (sess->listener && sess->listener->counters)
sess->listener->counters->denied_resp++; HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
if (!(s->flags & SF_ERR_MASK)) if (!(s->flags & SF_ERR_MASK))
s->flags |= SF_ERR_PRXCOND; s->flags |= SF_ERR_PRXCOND;
@ -429,7 +429,7 @@ int tcp_exec_l4_rules(struct session *sess)
else if (rule->action == ACT_ACTION_DENY) { else if (rule->action == ACT_ACTION_DENY) {
sess->fe->fe_counters.denied_conn++; sess->fe->fe_counters.denied_conn++;
if (sess->listener && sess->listener->counters) if (sess->listener && sess->listener->counters)
sess->listener->counters->denied_conn++; HA_ATOMIC_ADD(&sess->listener->counters->denied_conn, 1);
result = 0; result = 0;
break; break;
@ -516,7 +516,7 @@ int tcp_exec_l5_rules(struct session *sess)
else if (rule->action == ACT_ACTION_DENY) { else if (rule->action == ACT_ACTION_DENY) {
sess->fe->fe_counters.denied_sess++; sess->fe->fe_counters.denied_sess++;
if (sess->listener && sess->listener->counters) if (sess->listener && sess->listener->counters)
sess->listener->counters->denied_sess++; HA_ATOMIC_ADD(&sess->listener->counters->denied_sess, 1);
result = 0; result = 0;
break; break;