mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-06 15:17:01 +02:00
MEDIUM: threads/stick-tables: handle multithreads on stick tables
The stick table API was slightly reworked: A global spin lock on stick table was added to perform lookup and insert in a thread safe way. The handling of refcount on entries is now handled directly by stick tables functions under protection of this lock and was removed from the code of callers. The "stktable_store" function is no more externalized and users should now use "stktable_set_entry" in any case of insertion. This last one performs a lookup followed by a store if not found. So the code using "stktable_store" was re-worked. Lookup, and set_entry functions automatically increase the refcount of the returned/stored entry. The function "sticktable_touch" was renamed "sticktable_touch_local" and is now able to decrease the refcount if last arg is set to true. It is allowing to release the entry without taking the lock twice. A new function "sticktable_touch_remote" is now used to insert entries coming from remote peers at the right place in the update tree. The code of peer update was re-worked to use this new function. This function is also able to decrease the refcount if wanted. The function "stksess_kill" also handle a parameter to decrease the refcount on the entry. A read/write lock is added on each entry to protect the data content updates of the entry.
This commit is contained in:
parent
5b51755aef
commit
819fc6f563
@ -152,6 +152,8 @@ enum lock_label {
|
|||||||
UPDATED_SERVERS_LOCK,
|
UPDATED_SERVERS_LOCK,
|
||||||
LBPRM_LOCK,
|
LBPRM_LOCK,
|
||||||
SIGNALS_LOCK,
|
SIGNALS_LOCK,
|
||||||
|
STK_TABLE_LOCK,
|
||||||
|
STK_SESS_LOCK,
|
||||||
LOCK_LABELS
|
LOCK_LABELS
|
||||||
};
|
};
|
||||||
struct lock_stat {
|
struct lock_stat {
|
||||||
@ -237,7 +239,7 @@ static inline void show_lock_stats()
|
|||||||
const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
|
const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
|
||||||
"TASK_RQ", "TASK_WQ", "POOL",
|
"TASK_RQ", "TASK_WQ", "POOL",
|
||||||
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
|
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
|
||||||
"UPDATED_SERVERS", "LBPRM", "SIGNALS" };
|
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS" };
|
||||||
int lbl;
|
int lbl;
|
||||||
|
|
||||||
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
|
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
|
||||||
|
@ -46,19 +46,26 @@ static inline void session_store_counters(struct session *sess)
|
|||||||
{
|
{
|
||||||
void *ptr;
|
void *ptr;
|
||||||
int i;
|
int i;
|
||||||
|
struct stksess *ts;
|
||||||
|
|
||||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||||
struct stkctr *stkctr = &sess->stkctr[i];
|
struct stkctr *stkctr = &sess->stkctr[i];
|
||||||
|
|
||||||
if (!stkctr_entry(stkctr))
|
ts = stkctr_entry(stkctr);
|
||||||
|
if (!ts)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_CONN_CUR);
|
ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_CONN_CUR);
|
||||||
if (ptr)
|
if (ptr) {
|
||||||
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
stktable_data_cast(ptr, conn_cur)--;
|
stktable_data_cast(ptr, conn_cur)--;
|
||||||
stkctr_entry(stkctr)->ref_cnt--;
|
|
||||||
stksess_kill_if_expired(stkctr->table, stkctr_entry(stkctr));
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
}
|
||||||
|
|
||||||
stkctr_set_entry(stkctr, NULL);
|
stkctr_set_entry(stkctr, NULL);
|
||||||
|
stksess_kill_if_expired(stkctr->table, ts, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,17 +34,15 @@
|
|||||||
struct stksess *stksess_new(struct stktable *t, struct stktable_key *key);
|
struct stksess *stksess_new(struct stktable *t, struct stktable_key *key);
|
||||||
void stksess_setkey(struct stktable *t, struct stksess *ts, struct stktable_key *key);
|
void stksess_setkey(struct stktable *t, struct stksess *ts, struct stktable_key *key);
|
||||||
void stksess_free(struct stktable *t, struct stksess *ts);
|
void stksess_free(struct stktable *t, struct stksess *ts);
|
||||||
void stksess_kill(struct stktable *t, struct stksess *ts);
|
int stksess_kill(struct stktable *t, struct stksess *ts, int decrefcount);
|
||||||
|
|
||||||
int stktable_init(struct stktable *t);
|
int stktable_init(struct stktable *t);
|
||||||
int stktable_parse_type(char **args, int *idx, unsigned long *type, size_t *key_size);
|
int stktable_parse_type(char **args, int *idx, unsigned long *type, size_t *key_size);
|
||||||
struct stksess *stktable_get_entry(struct stktable *table, struct stktable_key *key);
|
struct stksess *stktable_get_entry(struct stktable *table, struct stktable_key *key);
|
||||||
struct stksess *stktable_store(struct stktable *t, struct stksess *ts, int local);
|
struct stksess *stktable_set_entry(struct stktable *table, struct stksess *nts);
|
||||||
struct stksess *stktable_store_with_exp(struct stktable *t, struct stksess *ts,
|
void stktable_touch_with_exp(struct stktable *t, struct stksess *ts, int decrefcount, int expire);
|
||||||
int local, int expire);
|
void stktable_touch_remote(struct stktable *t, struct stksess *ts, int decrefcnt);
|
||||||
struct stksess *stktable_touch_with_exp(struct stktable *t, struct stksess *ts,
|
void stktable_touch_local(struct stktable *t, struct stksess *ts, int decrefccount);
|
||||||
int local, int expire);
|
|
||||||
struct stksess *stktable_touch(struct stktable *t, struct stksess *ts, int local);
|
|
||||||
struct stksess *stktable_lookup(struct stktable *t, struct stksess *ts);
|
struct stksess *stktable_lookup(struct stktable *t, struct stksess *ts);
|
||||||
struct stksess *stktable_lookup_key(struct stktable *t, struct stktable_key *key);
|
struct stksess *stktable_lookup_key(struct stktable *t, struct stktable_key *key);
|
||||||
struct stksess *stktable_update_key(struct stktable *table, struct stktable_key *key);
|
struct stksess *stktable_update_key(struct stktable *table, struct stktable_key *key);
|
||||||
@ -52,12 +50,13 @@ struct stktable_key *smp_to_stkey(struct sample *smp, struct stktable *t);
|
|||||||
struct stktable_key *stktable_fetch_key(struct stktable *t, struct proxy *px, struct session *sess,
|
struct stktable_key *stktable_fetch_key(struct stktable *t, struct proxy *px, struct session *sess,
|
||||||
struct stream *strm, unsigned int opt,
|
struct stream *strm, unsigned int opt,
|
||||||
struct sample_expr *expr, struct sample *smp);
|
struct sample_expr *expr, struct sample *smp);
|
||||||
struct stkctr *smp_fetch_sc_stkctr(struct session *sess, struct stream *strm, const struct arg *args, const char *kw);
|
struct stkctr *smp_fetch_sc_stkctr(struct session *sess, struct stream *strm, const struct arg *args, const char *kw, struct stkctr *stkctr);
|
||||||
struct stkctr *smp_create_src_stkctr(struct session *sess, struct stream *strm, const struct arg *args, const char *kw);
|
struct stkctr *smp_create_src_stkctr(struct session *sess, struct stream *strm, const struct arg *args, const char *kw, struct stkctr *stkctr);
|
||||||
int stktable_compatible_sample(struct sample_expr *expr, unsigned long table_type);
|
int stktable_compatible_sample(struct sample_expr *expr, unsigned long table_type);
|
||||||
int stktable_register_data_store(int idx, const char *name, int std_type, int arg_type);
|
int stktable_register_data_store(int idx, const char *name, int std_type, int arg_type);
|
||||||
int stktable_get_data_type(char *name);
|
int stktable_get_data_type(char *name);
|
||||||
int stktable_trash_oldest(struct stktable *t, int to_batch);
|
int stktable_trash_oldest(struct stktable *t, int to_batch);
|
||||||
|
int __stksess_kill(struct stktable *t, struct stksess *ts);
|
||||||
|
|
||||||
/* return allocation size for standard data type <type> */
|
/* return allocation size for standard data type <type> */
|
||||||
static inline int stktable_type_size(int type)
|
static inline int stktable_type_size(int type)
|
||||||
@ -132,10 +131,29 @@ static inline void *stktable_data_ptr(struct stktable *t, struct stksess *ts, in
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* kill an entry if it's expired and its ref_cnt is zero */
|
/* kill an entry if it's expired and its ref_cnt is zero */
|
||||||
static inline void stksess_kill_if_expired(struct stktable *t, struct stksess *ts)
|
static inline int __stksess_kill_if_expired(struct stktable *t, struct stksess *ts)
|
||||||
{
|
{
|
||||||
if (t->expire != TICK_ETERNITY && tick_is_expired(ts->expire, now_ms))
|
if (t->expire != TICK_ETERNITY && tick_is_expired(ts->expire, now_ms))
|
||||||
stksess_kill(t, ts);
|
return __stksess_kill(t, ts);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int stksess_kill_if_expired(struct stktable *t, struct stksess *ts, int decrefcnt)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
SPIN_LOCK(STK_TABLE_LOCK, &t->lock);
|
||||||
|
|
||||||
|
if (decrefcnt)
|
||||||
|
ts->ref_cnt--;
|
||||||
|
|
||||||
|
if (t->expire != TICK_ETERNITY && tick_is_expired(ts->expire, now_ms))
|
||||||
|
ret = __stksess_kill_if_expired(t, ts);
|
||||||
|
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &t->lock);
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* sets the stick counter's entry pointer */
|
/* sets the stick counter's entry pointer */
|
||||||
|
@ -90,20 +90,26 @@ static inline void stream_store_counters(struct stream *s)
|
|||||||
{
|
{
|
||||||
void *ptr;
|
void *ptr;
|
||||||
int i;
|
int i;
|
||||||
|
struct stksess *ts;
|
||||||
|
|
||||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||||
if (!stkctr_entry(&s->stkctr[i]))
|
ts = stkctr_entry(&s->stkctr[i]);
|
||||||
|
if (!ts)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (stkctr_entry(&s->sess->stkctr[i]))
|
if (stkctr_entry(&s->sess->stkctr[i]))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
ptr = stktable_data_ptr(s->stkctr[i].table, stkctr_entry(&s->stkctr[i]), STKTABLE_DT_CONN_CUR);
|
ptr = stktable_data_ptr(s->stkctr[i].table, ts, STKTABLE_DT_CONN_CUR);
|
||||||
if (ptr)
|
if (ptr) {
|
||||||
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
stktable_data_cast(ptr, conn_cur)--;
|
stktable_data_cast(ptr, conn_cur)--;
|
||||||
stkctr_entry(&s->stkctr[i])->ref_cnt--;
|
|
||||||
stksess_kill_if_expired(s->stkctr[i].table, stkctr_entry(&s->stkctr[i]));
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
}
|
||||||
stkctr_set_entry(&s->stkctr[i], NULL);
|
stkctr_set_entry(&s->stkctr[i], NULL);
|
||||||
|
stksess_kill_if_expired(s->stkctr[i].table, ts, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,11 +120,13 @@ static inline void stream_store_counters(struct stream *s)
|
|||||||
*/
|
*/
|
||||||
static inline void stream_stop_content_counters(struct stream *s)
|
static inline void stream_stop_content_counters(struct stream *s)
|
||||||
{
|
{
|
||||||
|
struct stksess *ts;
|
||||||
void *ptr;
|
void *ptr;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||||
if (!stkctr_entry(&s->stkctr[i]))
|
ts = stkctr_entry(&s->stkctr[i]);
|
||||||
|
if (!ts)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (stkctr_entry(&s->sess->stkctr[i]))
|
if (stkctr_entry(&s->sess->stkctr[i]))
|
||||||
@ -127,12 +135,16 @@ static inline void stream_stop_content_counters(struct stream *s)
|
|||||||
if (!(stkctr_flags(&s->stkctr[i]) & STKCTR_TRACK_CONTENT))
|
if (!(stkctr_flags(&s->stkctr[i]) & STKCTR_TRACK_CONTENT))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
ptr = stktable_data_ptr(s->stkctr[i].table, stkctr_entry(&s->stkctr[i]), STKTABLE_DT_CONN_CUR);
|
ptr = stktable_data_ptr(s->stkctr[i].table, ts, STKTABLE_DT_CONN_CUR);
|
||||||
if (ptr)
|
if (ptr) {
|
||||||
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
stktable_data_cast(ptr, conn_cur)--;
|
stktable_data_cast(ptr, conn_cur)--;
|
||||||
stkctr_entry(&s->stkctr[i])->ref_cnt--;
|
|
||||||
stksess_kill_if_expired(s->stkctr[i].table, stkctr_entry(&s->stkctr[i]));
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
}
|
||||||
stkctr_set_entry(&s->stkctr[i], NULL);
|
stkctr_set_entry(&s->stkctr[i], NULL);
|
||||||
|
stksess_kill_if_expired(s->stkctr[i].table, ts, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -144,6 +156,8 @@ static inline void stream_start_counters(struct stktable *t, struct stksess *ts)
|
|||||||
{
|
{
|
||||||
void *ptr;
|
void *ptr;
|
||||||
|
|
||||||
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
ptr = stktable_data_ptr(t, ts, STKTABLE_DT_CONN_CUR);
|
ptr = stktable_data_ptr(t, ts, STKTABLE_DT_CONN_CUR);
|
||||||
if (ptr)
|
if (ptr)
|
||||||
stktable_data_cast(ptr, conn_cur)++;
|
stktable_data_cast(ptr, conn_cur)++;
|
||||||
@ -158,6 +172,8 @@ static inline void stream_start_counters(struct stktable *t, struct stksess *ts)
|
|||||||
t->data_arg[STKTABLE_DT_CONN_RATE].u, 1);
|
t->data_arg[STKTABLE_DT_CONN_RATE].u, 1);
|
||||||
if (tick_isset(t->expire))
|
if (tick_isset(t->expire))
|
||||||
ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
|
ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
|
||||||
|
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Enable tracking of stream counters as <stkctr> on stksess <ts>. The caller is
|
/* Enable tracking of stream counters as <stkctr> on stksess <ts>. The caller is
|
||||||
@ -166,10 +182,10 @@ static inline void stream_start_counters(struct stktable *t, struct stksess *ts)
|
|||||||
*/
|
*/
|
||||||
static inline void stream_track_stkctr(struct stkctr *ctr, struct stktable *t, struct stksess *ts)
|
static inline void stream_track_stkctr(struct stkctr *ctr, struct stktable *t, struct stksess *ts)
|
||||||
{
|
{
|
||||||
|
/* Why this test ???? */
|
||||||
if (stkctr_entry(ctr))
|
if (stkctr_entry(ctr))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
ts->ref_cnt++;
|
|
||||||
ctr->table = t;
|
ctr->table = t;
|
||||||
stkctr_set_entry(ctr, ts);
|
stkctr_set_entry(ctr, ts);
|
||||||
stream_start_counters(t, ts);
|
stream_start_counters(t, ts);
|
||||||
@ -178,26 +194,33 @@ static inline void stream_track_stkctr(struct stkctr *ctr, struct stktable *t, s
|
|||||||
/* Increase the number of cumulated HTTP requests in the tracked counters */
|
/* Increase the number of cumulated HTTP requests in the tracked counters */
|
||||||
static void inline stream_inc_http_req_ctr(struct stream *s)
|
static void inline stream_inc_http_req_ctr(struct stream *s)
|
||||||
{
|
{
|
||||||
|
struct stksess *ts;
|
||||||
void *ptr;
|
void *ptr;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||||
struct stkctr *stkctr = &s->stkctr[i];
|
struct stkctr *stkctr = &s->stkctr[i];
|
||||||
|
|
||||||
if (!stkctr_entry(stkctr)) {
|
ts = stkctr_entry(stkctr);
|
||||||
|
if (!ts) {
|
||||||
stkctr = &s->sess->stkctr[i];
|
stkctr = &s->sess->stkctr[i];
|
||||||
if (!stkctr_entry(stkctr))
|
ts = stkctr_entry(stkctr);
|
||||||
|
if (!ts)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_REQ_CNT);
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
|
ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_REQ_CNT);
|
||||||
if (ptr)
|
if (ptr)
|
||||||
stktable_data_cast(ptr, http_req_cnt)++;
|
stktable_data_cast(ptr, http_req_cnt)++;
|
||||||
|
|
||||||
ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_REQ_RATE);
|
ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_REQ_RATE);
|
||||||
if (ptr)
|
if (ptr)
|
||||||
update_freq_ctr_period(&stktable_data_cast(ptr, http_req_rate),
|
update_freq_ctr_period(&stktable_data_cast(ptr, http_req_rate),
|
||||||
stkctr->table->data_arg[STKTABLE_DT_HTTP_REQ_RATE].u, 1);
|
stkctr->table->data_arg[STKTABLE_DT_HTTP_REQ_RATE].u, 1);
|
||||||
|
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,26 +229,32 @@ static void inline stream_inc_http_req_ctr(struct stream *s)
|
|||||||
*/
|
*/
|
||||||
static void inline stream_inc_be_http_req_ctr(struct stream *s)
|
static void inline stream_inc_be_http_req_ctr(struct stream *s)
|
||||||
{
|
{
|
||||||
|
struct stksess *ts;
|
||||||
void *ptr;
|
void *ptr;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||||
struct stkctr *stkctr = &s->stkctr[i];
|
struct stkctr *stkctr = &s->stkctr[i];
|
||||||
|
|
||||||
if (!stkctr_entry(stkctr))
|
ts = stkctr_entry(stkctr);
|
||||||
|
if (!ts)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (!(stkctr_flags(&s->stkctr[i]) & STKCTR_TRACK_BACKEND))
|
if (!(stkctr_flags(&s->stkctr[i]) & STKCTR_TRACK_BACKEND))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_REQ_CNT);
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
|
ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_REQ_CNT);
|
||||||
if (ptr)
|
if (ptr)
|
||||||
stktable_data_cast(ptr, http_req_cnt)++;
|
stktable_data_cast(ptr, http_req_cnt)++;
|
||||||
|
|
||||||
ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_REQ_RATE);
|
ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_REQ_RATE);
|
||||||
if (ptr)
|
if (ptr)
|
||||||
update_freq_ctr_period(&stktable_data_cast(ptr, http_req_rate),
|
update_freq_ctr_period(&stktable_data_cast(ptr, http_req_rate),
|
||||||
stkctr->table->data_arg[STKTABLE_DT_HTTP_REQ_RATE].u, 1);
|
stkctr->table->data_arg[STKTABLE_DT_HTTP_REQ_RATE].u, 1);
|
||||||
|
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -237,26 +266,33 @@ static void inline stream_inc_be_http_req_ctr(struct stream *s)
|
|||||||
*/
|
*/
|
||||||
static void inline stream_inc_http_err_ctr(struct stream *s)
|
static void inline stream_inc_http_err_ctr(struct stream *s)
|
||||||
{
|
{
|
||||||
|
struct stksess *ts;
|
||||||
void *ptr;
|
void *ptr;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||||
struct stkctr *stkctr = &s->stkctr[i];
|
struct stkctr *stkctr = &s->stkctr[i];
|
||||||
|
|
||||||
if (!stkctr_entry(stkctr)) {
|
ts = stkctr_entry(stkctr);
|
||||||
|
if (!ts) {
|
||||||
stkctr = &s->sess->stkctr[i];
|
stkctr = &s->sess->stkctr[i];
|
||||||
if (!stkctr_entry(stkctr))
|
ts = stkctr_entry(stkctr);
|
||||||
|
if (!ts)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_ERR_CNT);
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
|
ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_ERR_CNT);
|
||||||
if (ptr)
|
if (ptr)
|
||||||
stktable_data_cast(ptr, http_err_cnt)++;
|
stktable_data_cast(ptr, http_err_cnt)++;
|
||||||
|
|
||||||
ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_ERR_RATE);
|
ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_ERR_RATE);
|
||||||
if (ptr)
|
if (ptr)
|
||||||
update_freq_ctr_period(&stktable_data_cast(ptr, http_err_rate),
|
update_freq_ctr_period(&stktable_data_cast(ptr, http_err_rate),
|
||||||
stkctr->table->data_arg[STKTABLE_DT_HTTP_ERR_RATE].u, 1);
|
stkctr->table->data_arg[STKTABLE_DT_HTTP_ERR_RATE].u, 1);
|
||||||
|
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,6 +129,9 @@ extern struct stktable_type stktable_types[];
|
|||||||
struct stksess {
|
struct stksess {
|
||||||
unsigned int expire; /* session expiration date */
|
unsigned int expire; /* session expiration date */
|
||||||
unsigned int ref_cnt; /* reference count, can only purge when zero */
|
unsigned int ref_cnt; /* reference count, can only purge when zero */
|
||||||
|
#ifdef USE_THREAD
|
||||||
|
HA_RWLOCK_T lock; /* lock related to the table entry */
|
||||||
|
#endif
|
||||||
struct eb32_node exp; /* ebtree node used to hold the session in expiration tree */
|
struct eb32_node exp; /* ebtree node used to hold the session in expiration tree */
|
||||||
struct eb32_node upd; /* ebtree node used to hold the update sequence tree */
|
struct eb32_node upd; /* ebtree node used to hold the update sequence tree */
|
||||||
struct ebmb_node key; /* ebtree node used to hold the session in table */
|
struct ebmb_node key; /* ebtree node used to hold the session in table */
|
||||||
@ -143,6 +146,9 @@ struct stktable {
|
|||||||
struct eb_root exps; /* head of sticky session expiration tree */
|
struct eb_root exps; /* head of sticky session expiration tree */
|
||||||
struct eb_root updates; /* head of sticky updates sequence tree */
|
struct eb_root updates; /* head of sticky updates sequence tree */
|
||||||
struct pool_head *pool; /* pool used to allocate sticky sessions */
|
struct pool_head *pool; /* pool used to allocate sticky sessions */
|
||||||
|
#ifdef USE_THREAD
|
||||||
|
HA_SPINLOCK_T lock; /* spin lock related to the table */
|
||||||
|
#endif
|
||||||
struct task *exp_task; /* expiration task */
|
struct task *exp_task; /* expiration task */
|
||||||
struct task *sync_task; /* sync task */
|
struct task *sync_task; /* sync task */
|
||||||
unsigned int update;
|
unsigned int update;
|
||||||
|
121
src/peers.c
121
src/peers.c
@ -270,7 +270,7 @@ static inline void peer_set_update_msg_type(char *msg_type, int use_identifier,
|
|||||||
* If function returns 0, the caller should consider we were unable to encode this message (TODO:
|
* If function returns 0, the caller should consider we were unable to encode this message (TODO:
|
||||||
* check size)
|
* check size)
|
||||||
*/
|
*/
|
||||||
static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, char *msg, size_t size, int use_identifier, int use_timed)
|
static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, unsigned int updateid, char *msg, size_t size, int use_identifier, int use_timed)
|
||||||
{
|
{
|
||||||
uint32_t netinteger;
|
uint32_t netinteger;
|
||||||
unsigned short datalen;
|
unsigned short datalen;
|
||||||
@ -283,13 +283,13 @@ static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, c
|
|||||||
/* construct message */
|
/* construct message */
|
||||||
|
|
||||||
/* check if we need to send the update identifer */
|
/* check if we need to send the update identifer */
|
||||||
if (!st->last_pushed || ts->upd.key < st->last_pushed || ((ts->upd.key - st->last_pushed) != 1)) {
|
if (!st->last_pushed || updateid < st->last_pushed || ((updateid - st->last_pushed) != 1)) {
|
||||||
use_identifier = 1;
|
use_identifier = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* encode update identifier if needed */
|
/* encode update identifier if needed */
|
||||||
if (use_identifier) {
|
if (use_identifier) {
|
||||||
netinteger = htonl(ts->upd.key);
|
netinteger = htonl(updateid);
|
||||||
memcpy(cursor, &netinteger, sizeof(netinteger));
|
memcpy(cursor, &netinteger, sizeof(netinteger));
|
||||||
cursor += sizeof(netinteger);
|
cursor += sizeof(netinteger);
|
||||||
}
|
}
|
||||||
@ -318,6 +318,7 @@ static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, c
|
|||||||
cursor += st->table->key_size;
|
cursor += st->table->key_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RWLOCK_RDLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
/* encode values */
|
/* encode values */
|
||||||
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
|
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
|
||||||
|
|
||||||
@ -357,6 +358,7 @@ static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, c
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
RWLOCK_RDUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
/* Compute datalen */
|
/* Compute datalen */
|
||||||
datalen = (cursor - datamsg);
|
datalen = (cursor - datamsg);
|
||||||
@ -1152,7 +1154,9 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
newts = stksess_new(st->table, NULL);
|
newts = stksess_new(st->table, NULL);
|
||||||
if (!newts)
|
if (!newts)
|
||||||
goto ignore_msg;
|
goto ignore_msg;
|
||||||
|
/* Force expiratiion to remote date
|
||||||
|
in case of first insert */
|
||||||
|
newts->expire = tick_add(now_ms, expire);
|
||||||
if (st->table->type == SMP_T_STR) {
|
if (st->table->type == SMP_T_STR) {
|
||||||
unsigned int to_read, to_store;
|
unsigned int to_read, to_store;
|
||||||
|
|
||||||
@ -1201,27 +1205,13 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* lookup for existing entry */
|
/* lookup for existing entry */
|
||||||
ts = stktable_lookup(st->table, newts);
|
ts = stktable_set_entry(st->table, newts);
|
||||||
if (ts) {
|
if (ts != newts) {
|
||||||
/* the entry already exist, we can free ours */
|
|
||||||
stktable_touch_with_exp(st->table, ts, 0, tick_add(now_ms, expire));
|
|
||||||
stksess_free(st->table, newts);
|
stksess_free(st->table, newts);
|
||||||
newts = NULL;
|
newts = NULL;
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
struct eb32_node *eb;
|
|
||||||
|
|
||||||
/* create new entry */
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
ts = stktable_store_with_exp(st->table, newts, 0, tick_add(now_ms, expire));
|
|
||||||
newts = NULL; /* don't reuse it */
|
|
||||||
|
|
||||||
ts->upd.key= (++st->table->update)+(2147483648U);
|
|
||||||
eb = eb32_insert(&st->table->updates, &ts->upd);
|
|
||||||
if (eb != &ts->upd) {
|
|
||||||
eb32_delete(eb);
|
|
||||||
eb32_insert(&st->table->updates, &ts->upd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
|
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
|
||||||
|
|
||||||
@ -1233,6 +1223,8 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
data = intdecode(&msg_cur, msg_end);
|
data = intdecode(&msg_cur, msg_end);
|
||||||
if (!msg_cur) {
|
if (!msg_cur) {
|
||||||
/* malformed message */
|
/* malformed message */
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
stktable_touch_remote(st->table, ts, 1);
|
||||||
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
@ -1248,6 +1240,8 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
data = intdecode(&msg_cur, msg_end);
|
data = intdecode(&msg_cur, msg_end);
|
||||||
if (!msg_cur) {
|
if (!msg_cur) {
|
||||||
/* malformed message */
|
/* malformed message */
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
stktable_touch_remote(st->table, ts, 1);
|
||||||
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
@ -1263,6 +1257,8 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
data = intdecode(&msg_cur, msg_end);
|
data = intdecode(&msg_cur, msg_end);
|
||||||
if (!msg_cur) {
|
if (!msg_cur) {
|
||||||
/* malformed message */
|
/* malformed message */
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
stktable_touch_remote(st->table, ts, 1);
|
||||||
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
@ -1278,18 +1274,24 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
data.curr_tick = tick_add(now_ms, -intdecode(&msg_cur, msg_end));
|
data.curr_tick = tick_add(now_ms, -intdecode(&msg_cur, msg_end));
|
||||||
if (!msg_cur) {
|
if (!msg_cur) {
|
||||||
/* malformed message */
|
/* malformed message */
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
stktable_touch_remote(st->table, ts, 1);
|
||||||
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
data.curr_ctr = intdecode(&msg_cur, msg_end);
|
data.curr_ctr = intdecode(&msg_cur, msg_end);
|
||||||
if (!msg_cur) {
|
if (!msg_cur) {
|
||||||
/* malformed message */
|
/* malformed message */
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
stktable_touch_remote(st->table, ts, 1);
|
||||||
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
data.prev_ctr = intdecode(&msg_cur, msg_end);
|
data.prev_ctr = intdecode(&msg_cur, msg_end);
|
||||||
if (!msg_cur) {
|
if (!msg_cur) {
|
||||||
/* malformed message */
|
/* malformed message */
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
stktable_touch_remote(st->table, ts, 1);
|
||||||
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
@ -1302,6 +1304,10 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
stktable_touch_remote(st->table, ts, 1);
|
||||||
|
|
||||||
}
|
}
|
||||||
else if (msg_head[1] == PEER_MSG_STKT_ACK) {
|
else if (msg_head[1] == PEER_MSG_STKT_ACK) {
|
||||||
/* ack message */
|
/* ack message */
|
||||||
@ -1441,12 +1447,14 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
|
|
||||||
/* We force new pushed to 1 to force identifier in update message */
|
/* We force new pushed to 1 to force identifier in update message */
|
||||||
new_pushed = 1;
|
new_pushed = 1;
|
||||||
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
while (1) {
|
while (1) {
|
||||||
uint32_t msglen;
|
uint32_t msglen;
|
||||||
struct stksess *ts;
|
struct stksess *ts;
|
||||||
|
unsigned updateid;
|
||||||
|
|
||||||
/* push local updates */
|
/* push local updates */
|
||||||
|
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
||||||
if (!eb) {
|
if (!eb) {
|
||||||
eb = eb32_first(&st->table->updates);
|
eb = eb32_first(&st->table->updates);
|
||||||
if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) {
|
if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) {
|
||||||
@ -1461,9 +1469,16 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
ts = eb32_entry(eb, struct stksess, upd);
|
ts = eb32_entry(eb, struct stksess, upd);
|
||||||
msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, 0);
|
updateid = ts->upd.key;
|
||||||
|
ts->ref_cnt++;
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
|
||||||
|
msglen = peer_prepare_updatemsg(ts, st, updateid, trash.str, trash.size, new_pushed, 0);
|
||||||
if (!msglen) {
|
if (!msglen) {
|
||||||
/* internal error: message does not fit in trash */
|
/* internal error: message does not fit in trash */
|
||||||
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
ts->ref_cnt--;
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
appctx->st0 = PEER_SESS_ST_END;
|
appctx->st0 = PEER_SESS_ST_END;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
@ -1472,20 +1487,25 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
repl = ci_putblk(si_ic(si), trash.str, msglen);
|
repl = ci_putblk(si_ic(si), trash.str, msglen);
|
||||||
if (repl <= 0) {
|
if (repl <= 0) {
|
||||||
/* no more write possible */
|
/* no more write possible */
|
||||||
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
ts->ref_cnt--;
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
if (repl == -1) {
|
if (repl == -1) {
|
||||||
goto full;
|
goto full;
|
||||||
}
|
}
|
||||||
appctx->st0 = PEER_SESS_ST_END;
|
appctx->st0 = PEER_SESS_ST_END;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
st->last_pushed = ts->upd.key;
|
|
||||||
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
ts->ref_cnt--;
|
||||||
|
st->last_pushed = updateid;
|
||||||
if ((int)(st->last_pushed - st->table->commitupdate) > 0)
|
if ((int)(st->last_pushed - st->table->commitupdate) > 0)
|
||||||
st->table->commitupdate = st->last_pushed;
|
st->table->commitupdate = st->last_pushed;
|
||||||
/* identifier may not needed in next update message */
|
/* identifier may not needed in next update message */
|
||||||
new_pushed = 0;
|
new_pushed = 0;
|
||||||
|
|
||||||
eb = eb32_next(eb);
|
|
||||||
}
|
}
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@ -1518,13 +1538,15 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
|
|
||||||
/* We force new pushed to 1 to force identifier in update message */
|
/* We force new pushed to 1 to force identifier in update message */
|
||||||
new_pushed = 1;
|
new_pushed = 1;
|
||||||
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
while (1) {
|
while (1) {
|
||||||
uint32_t msglen;
|
uint32_t msglen;
|
||||||
struct stksess *ts;
|
struct stksess *ts;
|
||||||
int use_timed;
|
int use_timed;
|
||||||
|
unsigned updateid;
|
||||||
|
|
||||||
/* push local updates */
|
/* push local updates */
|
||||||
|
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
||||||
if (!eb) {
|
if (!eb) {
|
||||||
st->flags |= SHTABLE_F_TEACH_STAGE1;
|
st->flags |= SHTABLE_F_TEACH_STAGE1;
|
||||||
eb = eb32_first(&st->table->updates);
|
eb = eb32_first(&st->table->updates);
|
||||||
@ -1534,10 +1556,17 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
ts = eb32_entry(eb, struct stksess, upd);
|
ts = eb32_entry(eb, struct stksess, upd);
|
||||||
|
updateid = ts->upd.key;
|
||||||
|
ts->ref_cnt++;
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
|
||||||
use_timed = !(curpeer->flags & PEER_F_DWNGRD);
|
use_timed = !(curpeer->flags & PEER_F_DWNGRD);
|
||||||
msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, use_timed);
|
msglen = peer_prepare_updatemsg(ts, st, updateid, trash.str, trash.size, new_pushed, use_timed);
|
||||||
if (!msglen) {
|
if (!msglen) {
|
||||||
/* internal error: message does not fit in trash */
|
/* internal error: message does not fit in trash */
|
||||||
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
ts->ref_cnt--;
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
appctx->st0 = PEER_SESS_ST_END;
|
appctx->st0 = PEER_SESS_ST_END;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
@ -1546,18 +1575,22 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
repl = ci_putblk(si_ic(si), trash.str, msglen);
|
repl = ci_putblk(si_ic(si), trash.str, msglen);
|
||||||
if (repl <= 0) {
|
if (repl <= 0) {
|
||||||
/* no more write possible */
|
/* no more write possible */
|
||||||
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
ts->ref_cnt--;
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
if (repl == -1) {
|
if (repl == -1) {
|
||||||
goto full;
|
goto full;
|
||||||
}
|
}
|
||||||
appctx->st0 = PEER_SESS_ST_END;
|
appctx->st0 = PEER_SESS_ST_END;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
st->last_pushed = ts->upd.key;
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
ts->ref_cnt--;
|
||||||
|
st->last_pushed = updateid;
|
||||||
/* identifier may not needed in next update message */
|
/* identifier may not needed in next update message */
|
||||||
new_pushed = 0;
|
new_pushed = 0;
|
||||||
|
|
||||||
eb = eb32_next(eb);
|
|
||||||
}
|
}
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
|
if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
|
||||||
@ -1589,11 +1622,15 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
|
|
||||||
/* We force new pushed to 1 to force identifier in update message */
|
/* We force new pushed to 1 to force identifier in update message */
|
||||||
new_pushed = 1;
|
new_pushed = 1;
|
||||||
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
while (1) {
|
while (1) {
|
||||||
uint32_t msglen;
|
uint32_t msglen;
|
||||||
struct stksess *ts;
|
struct stksess *ts;
|
||||||
int use_timed;
|
int use_timed;
|
||||||
|
unsigned updateid;
|
||||||
|
|
||||||
|
/* push local updates */
|
||||||
|
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
||||||
|
|
||||||
/* push local updates */
|
/* push local updates */
|
||||||
if (!eb || eb->key > st->teaching_origin) {
|
if (!eb || eb->key > st->teaching_origin) {
|
||||||
@ -1602,10 +1639,17 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
ts = eb32_entry(eb, struct stksess, upd);
|
ts = eb32_entry(eb, struct stksess, upd);
|
||||||
|
updateid = ts->upd.key;
|
||||||
|
ts->ref_cnt++;
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
|
||||||
use_timed = !(curpeer->flags & PEER_F_DWNGRD);
|
use_timed = !(curpeer->flags & PEER_F_DWNGRD);
|
||||||
msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, use_timed);
|
msglen = peer_prepare_updatemsg(ts, st, updateid, trash.str, trash.size, new_pushed, use_timed);
|
||||||
if (!msglen) {
|
if (!msglen) {
|
||||||
/* internal error: message does not fit in trash */
|
/* internal error: message does not fit in trash */
|
||||||
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
ts->ref_cnt--;
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
appctx->st0 = PEER_SESS_ST_END;
|
appctx->st0 = PEER_SESS_ST_END;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
@ -1614,18 +1658,23 @@ static void peer_io_handler(struct appctx *appctx)
|
|||||||
repl = ci_putblk(si_ic(si), trash.str, msglen);
|
repl = ci_putblk(si_ic(si), trash.str, msglen);
|
||||||
if (repl <= 0) {
|
if (repl <= 0) {
|
||||||
/* no more write possible */
|
/* no more write possible */
|
||||||
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
ts->ref_cnt--;
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
if (repl == -1) {
|
if (repl == -1) {
|
||||||
goto full;
|
goto full;
|
||||||
}
|
}
|
||||||
appctx->st0 = PEER_SESS_ST_END;
|
appctx->st0 = PEER_SESS_ST_END;
|
||||||
goto switchstate;
|
goto switchstate;
|
||||||
}
|
}
|
||||||
st->last_pushed = ts->upd.key;
|
|
||||||
|
SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
|
ts->ref_cnt--;
|
||||||
|
st->last_pushed = updateid;
|
||||||
/* identifier may not needed in next update message */
|
/* identifier may not needed in next update message */
|
||||||
new_pushed = 0;
|
new_pushed = 0;
|
||||||
|
|
||||||
eb = eb32_next(eb);
|
|
||||||
}
|
}
|
||||||
|
SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2721,7 +2721,7 @@ http_req_get_intercept_rule(struct proxy *px, struct list *rules, struct stream
|
|||||||
struct stktable *t;
|
struct stktable *t;
|
||||||
struct stksess *ts;
|
struct stksess *ts;
|
||||||
struct stktable_key *key;
|
struct stktable_key *key;
|
||||||
void *ptr;
|
void *ptr1, *ptr2;
|
||||||
|
|
||||||
t = rule->arg.trk_ctr.table.t;
|
t = rule->arg.trk_ctr.table.t;
|
||||||
key = stktable_fetch_key(t, s->be, sess, s, SMP_OPT_DIR_REQ | SMP_OPT_FINAL, rule->arg.trk_ctr.expr, NULL);
|
key = stktable_fetch_key(t, s->be, sess, s, SMP_OPT_DIR_REQ | SMP_OPT_FINAL, rule->arg.trk_ctr.expr, NULL);
|
||||||
@ -2730,14 +2730,20 @@ http_req_get_intercept_rule(struct proxy *px, struct list *rules, struct stream
|
|||||||
stream_track_stkctr(&s->stkctr[trk_idx(rule->action)], t, ts);
|
stream_track_stkctr(&s->stkctr[trk_idx(rule->action)], t, ts);
|
||||||
|
|
||||||
/* let's count a new HTTP request as it's the first time we do it */
|
/* let's count a new HTTP request as it's the first time we do it */
|
||||||
ptr = stktable_data_ptr(t, ts, STKTABLE_DT_HTTP_REQ_CNT);
|
ptr1 = stktable_data_ptr(t, ts, STKTABLE_DT_HTTP_REQ_CNT);
|
||||||
if (ptr)
|
ptr2 = stktable_data_ptr(t, ts, STKTABLE_DT_HTTP_REQ_RATE);
|
||||||
stktable_data_cast(ptr, http_req_cnt)++;
|
if (ptr1 || ptr2) {
|
||||||
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
ptr = stktable_data_ptr(t, ts, STKTABLE_DT_HTTP_REQ_RATE);
|
if (ptr1)
|
||||||
if (ptr)
|
stktable_data_cast(ptr1, http_req_cnt)++;
|
||||||
update_freq_ctr_period(&stktable_data_cast(ptr, http_req_rate),
|
|
||||||
t->data_arg[STKTABLE_DT_HTTP_REQ_RATE].u, 1);
|
if (ptr2)
|
||||||
|
update_freq_ctr_period(&stktable_data_cast(ptr2, http_req_rate),
|
||||||
|
t->data_arg[STKTABLE_DT_HTTP_REQ_RATE].u, 1);
|
||||||
|
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
}
|
||||||
|
|
||||||
stkctr_set_flags(&s->stkctr[trk_idx(rule->action)], STKCTR_TRACK_CONTENT);
|
stkctr_set_flags(&s->stkctr[trk_idx(rule->action)], STKCTR_TRACK_CONTENT);
|
||||||
if (sess->fe != s->be)
|
if (sess->fe != s->be)
|
||||||
@ -3002,6 +3008,8 @@ http_res_get_intercept_rule(struct proxy *px, struct list *rules, struct stream
|
|||||||
if (key && (ts = stktable_get_entry(t, key))) {
|
if (key && (ts = stktable_get_entry(t, key))) {
|
||||||
stream_track_stkctr(&s->stkctr[trk_idx(rule->action)], t, ts);
|
stream_track_stkctr(&s->stkctr[trk_idx(rule->action)], t, ts);
|
||||||
|
|
||||||
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
/* let's count a new HTTP request as it's the first time we do it */
|
/* let's count a new HTTP request as it's the first time we do it */
|
||||||
ptr = stktable_data_ptr(t, ts, STKTABLE_DT_HTTP_REQ_CNT);
|
ptr = stktable_data_ptr(t, ts, STKTABLE_DT_HTTP_REQ_CNT);
|
||||||
if (ptr)
|
if (ptr)
|
||||||
@ -3012,10 +3020,6 @@ http_res_get_intercept_rule(struct proxy *px, struct list *rules, struct stream
|
|||||||
update_freq_ctr_period(&stktable_data_cast(ptr, http_req_rate),
|
update_freq_ctr_period(&stktable_data_cast(ptr, http_req_rate),
|
||||||
t->data_arg[STKTABLE_DT_HTTP_REQ_RATE].u, 1);
|
t->data_arg[STKTABLE_DT_HTTP_REQ_RATE].u, 1);
|
||||||
|
|
||||||
stkctr_set_flags(&s->stkctr[trk_idx(rule->action)], STKCTR_TRACK_CONTENT);
|
|
||||||
if (sess->fe != s->be)
|
|
||||||
stkctr_set_flags(&s->stkctr[trk_idx(rule->action)], STKCTR_TRACK_BACKEND);
|
|
||||||
|
|
||||||
/* When the client triggers a 4xx from the server, it's most often due
|
/* When the client triggers a 4xx from the server, it's most often due
|
||||||
* to a missing object or permission. These events should be tracked
|
* to a missing object or permission. These events should be tracked
|
||||||
* because if they happen often, it may indicate a brute force or a
|
* because if they happen often, it may indicate a brute force or a
|
||||||
@ -3033,6 +3037,13 @@ http_res_get_intercept_rule(struct proxy *px, struct list *rules, struct stream
|
|||||||
update_freq_ctr_period(&stktable_data_cast(ptr, http_err_rate),
|
update_freq_ctr_period(&stktable_data_cast(ptr, http_err_rate),
|
||||||
t->data_arg[STKTABLE_DT_HTTP_ERR_RATE].u, 1);
|
t->data_arg[STKTABLE_DT_HTTP_ERR_RATE].u, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
|
stkctr_set_flags(&s->stkctr[trk_idx(rule->action)], STKCTR_TRACK_CONTENT);
|
||||||
|
if (sess->fe != s->be)
|
||||||
|
stkctr_set_flags(&s->stkctr[trk_idx(rule->action)], STKCTR_TRACK_BACKEND);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
File diff suppressed because it is too large
Load Diff
45
src/stream.c
45
src/stream.c
@ -465,6 +465,7 @@ void stream_process_counters(struct stream *s)
|
|||||||
struct session *sess = s->sess;
|
struct session *sess = s->sess;
|
||||||
unsigned long long bytes;
|
unsigned long long bytes;
|
||||||
void *ptr1,*ptr2;
|
void *ptr1,*ptr2;
|
||||||
|
struct stksess *ts;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
bytes = s->req.total - s->logs.bytes_in;
|
bytes = s->req.total - s->logs.bytes_in;
|
||||||
@ -482,24 +483,28 @@ void stream_process_counters(struct stream *s)
|
|||||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||||
struct stkctr *stkctr = &s->stkctr[i];
|
struct stkctr *stkctr = &s->stkctr[i];
|
||||||
|
|
||||||
if (!stkctr_entry(stkctr)) {
|
ts = stkctr_entry(stkctr);
|
||||||
|
if (!ts) {
|
||||||
stkctr = &sess->stkctr[i];
|
stkctr = &sess->stkctr[i];
|
||||||
if (!stkctr_entry(stkctr))
|
ts = stkctr_entry(stkctr);
|
||||||
|
if (!ts)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ptr1 = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_BYTES_IN_CNT);
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
ptr1 = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_BYTES_IN_CNT);
|
||||||
if (ptr1)
|
if (ptr1)
|
||||||
stktable_data_cast(ptr1, bytes_in_cnt) += bytes;
|
stktable_data_cast(ptr1, bytes_in_cnt) += bytes;
|
||||||
|
|
||||||
ptr2 = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_BYTES_IN_RATE);
|
ptr2 = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_BYTES_IN_RATE);
|
||||||
if (ptr2)
|
if (ptr2)
|
||||||
update_freq_ctr_period(&stktable_data_cast(ptr2, bytes_in_rate),
|
update_freq_ctr_period(&stktable_data_cast(ptr2, bytes_in_rate),
|
||||||
stkctr->table->data_arg[STKTABLE_DT_BYTES_IN_RATE].u, bytes);
|
stkctr->table->data_arg[STKTABLE_DT_BYTES_IN_RATE].u, bytes);
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
/* If data was modified, we need to touch to re-schedule sync */
|
/* If data was modified, we need to touch to re-schedule sync */
|
||||||
if (ptr1 || ptr2)
|
if (ptr1 || ptr2)
|
||||||
stktable_touch(stkctr->table, stkctr_entry(stkctr), 1);
|
stktable_touch_local(stkctr->table, ts, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -518,24 +523,28 @@ void stream_process_counters(struct stream *s)
|
|||||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||||
struct stkctr *stkctr = &s->stkctr[i];
|
struct stkctr *stkctr = &s->stkctr[i];
|
||||||
|
|
||||||
if (!stkctr_entry(stkctr)) {
|
ts = stkctr_entry(stkctr);
|
||||||
|
if (!ts) {
|
||||||
stkctr = &sess->stkctr[i];
|
stkctr = &sess->stkctr[i];
|
||||||
if (!stkctr_entry(stkctr))
|
ts = stkctr_entry(stkctr);
|
||||||
|
if (!ts)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ptr1 = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_BYTES_OUT_CNT);
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
ptr1 = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_BYTES_OUT_CNT);
|
||||||
if (ptr1)
|
if (ptr1)
|
||||||
stktable_data_cast(ptr1, bytes_out_cnt) += bytes;
|
stktable_data_cast(ptr1, bytes_out_cnt) += bytes;
|
||||||
|
|
||||||
ptr2 = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_BYTES_OUT_RATE);
|
ptr2 = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_BYTES_OUT_RATE);
|
||||||
if (ptr2)
|
if (ptr2)
|
||||||
update_freq_ctr_period(&stktable_data_cast(ptr2, bytes_out_rate),
|
update_freq_ctr_period(&stktable_data_cast(ptr2, bytes_out_rate),
|
||||||
stkctr->table->data_arg[STKTABLE_DT_BYTES_OUT_RATE].u, bytes);
|
stkctr->table->data_arg[STKTABLE_DT_BYTES_OUT_RATE].u, bytes);
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
|
||||||
/* If data was modified, we need to touch to re-schedule sync */
|
/* If data was modified, we need to touch to re-schedule sync */
|
||||||
if (ptr1 || ptr2)
|
if (ptr1 || ptr2)
|
||||||
stktable_touch(stkctr->table, stkctr_entry(stkctr), 1);
|
stktable_touch_local(stkctr->table, stkctr_entry(stkctr), 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1383,8 +1392,10 @@ static int process_sticking_rules(struct stream *s, struct channel *req, int an_
|
|||||||
void *ptr;
|
void *ptr;
|
||||||
|
|
||||||
/* srv found in table */
|
/* srv found in table */
|
||||||
|
RWLOCK_RDLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
ptr = stktable_data_ptr(rule->table.t, ts, STKTABLE_DT_SERVER_ID);
|
ptr = stktable_data_ptr(rule->table.t, ts, STKTABLE_DT_SERVER_ID);
|
||||||
node = eb32_lookup(&px->conf.used_server_id, stktable_data_cast(ptr, server_id));
|
node = eb32_lookup(&px->conf.used_server_id, stktable_data_cast(ptr, server_id));
|
||||||
|
RWLOCK_RDUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
if (node) {
|
if (node) {
|
||||||
struct server *srv;
|
struct server *srv;
|
||||||
|
|
||||||
@ -1397,7 +1408,7 @@ static int process_sticking_rules(struct stream *s, struct channel *req, int an_
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stktable_touch(rule->table.t, ts, 1);
|
stktable_touch_local(rule->table.t, ts, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (rule->flags & STK_IS_STORE) {
|
if (rule->flags & STK_IS_STORE) {
|
||||||
@ -1501,18 +1512,18 @@ static int process_store_rules(struct stream *s, struct channel *rep, int an_bit
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ts = stktable_lookup(s->store[i].table, s->store[i].ts);
|
ts = stktable_set_entry(s->store[i].table, s->store[i].ts);
|
||||||
if (ts) {
|
if (ts != s->store[i].ts) {
|
||||||
/* the entry already existed, we can free ours */
|
/* the entry already existed, we can free ours */
|
||||||
stktable_touch(s->store[i].table, ts, 1);
|
|
||||||
stksess_free(s->store[i].table, s->store[i].ts);
|
stksess_free(s->store[i].table, s->store[i].ts);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
ts = stktable_store(s->store[i].table, s->store[i].ts, 1);
|
|
||||||
|
|
||||||
s->store[i].ts = NULL;
|
s->store[i].ts = NULL;
|
||||||
|
|
||||||
|
RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
ptr = stktable_data_ptr(s->store[i].table, ts, STKTABLE_DT_SERVER_ID);
|
ptr = stktable_data_ptr(s->store[i].table, ts, STKTABLE_DT_SERVER_ID);
|
||||||
stktable_data_cast(ptr, server_id) = objt_server(s->target)->puid;
|
stktable_data_cast(ptr, server_id) = objt_server(s->target)->puid;
|
||||||
|
RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
|
||||||
|
stktable_touch_local(s->store[i].table, ts, 1);
|
||||||
}
|
}
|
||||||
s->store_count = 0; /* everything is stored */
|
s->store_count = 0; /* everything is stored */
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user