diff --git a/include/haproxy/applet-t.h b/include/haproxy/applet-t.h index f43f121c2..49c8ab499 100644 --- a/include/haproxy/applet-t.h +++ b/include/haproxy/applet-t.h @@ -134,6 +134,7 @@ struct appctx { struct { struct bref bref; /* back-reference from the session being dumped */ void *target; /* session we want to dump, or NULL for all */ + unsigned int thr; /* the thread number being explored (0..MAX_THREADS-1) */ unsigned int uid; /* if non-null, the uniq_id of the session being dumped */ int section; /* section of the session being dumped */ int pos; /* last position of the current session's buffer */ diff --git a/include/haproxy/stream-t.h b/include/haproxy/stream-t.h index 429d99244..447dbc713 100644 --- a/include/haproxy/stream-t.h +++ b/include/haproxy/stream-t.h @@ -139,7 +139,7 @@ struct stream { int16_t priority_class; /* priority class of the stream for the pending queue */ int32_t priority_offset; /* priority offset of the stream for the pending queue */ - struct list list; /* position in global streams list */ + struct list list; /* position in the thread's streams list */ struct mt_list by_srv; /* position in server stream list */ struct list back_refs; /* list of users tracking this stream */ struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */ diff --git a/include/haproxy/stream.h b/include/haproxy/stream.h index f35ee3dba..d4d202b2f 100644 --- a/include/haproxy/stream.h +++ b/include/haproxy/stream.h @@ -54,7 +54,6 @@ extern struct trace_source trace_strm; extern struct pool_head *pool_head_stream; extern struct pool_head *pool_head_uniqueid; -extern struct list streams; extern struct data_cb sess_conn_cb; diff --git a/include/haproxy/tinfo-t.h b/include/haproxy/tinfo-t.h index 4242a3f7b..89fde4f17 100644 --- a/include/haproxy/tinfo-t.h +++ b/include/haproxy/tinfo-t.h @@ -47,6 +47,9 @@ struct thread_info { #endif struct list buffer_wq; /* buffer waiters */ + struct list streams; /* list of streams attached to this thread */ + __decl_thread(HA_SPINLOCK_T streams_lock); /* shared with "show sess" */ + /* pad to cache line (64B) */ char __pad[0]; /* unused except to check remaining room */ char __end[0] __attribute__((aligned(64))); diff --git a/src/proxy.c b/src/proxy.c index ea7fc7f6f..5edbbfe23 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -1639,9 +1639,13 @@ struct task *hard_stop(struct task *t, void *context, unsigned short state) } thread_isolate(); - list_for_each_entry(s, &streams, list) { - stream_shutdown(s, SF_ERR_KILLED); + + for (thr = 0; thr < global.nbthread; thr++) { + list_for_each_entry(s, &ha_thread_info[thr].streams, list) { + stream_shutdown(s, SF_ERR_KILLED); + } } + thread_release(); killed = 1; diff --git a/src/stream.c b/src/stream.c index d04581a33..83c934575 100644 --- a/src/stream.c +++ b/src/stream.c @@ -66,8 +66,6 @@ DECLARE_POOL(pool_head_uniqueid, "uniqueid", UNIQUEID_LEN); /* incremented by each "show sess" to fix a delimiter between streams */ unsigned stream_epoch = 0; -struct list streams = LIST_HEAD_INIT(streams); -__decl_spinlock(streams_lock); /* List of all use-service keywords. */ static struct list service_keywords = LIST_HEAD_INIT(service_keywords); @@ -542,9 +540,9 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu s->tunnel_timeout = TICK_ETERNITY; - HA_SPIN_LOCK(STRMS_LOCK, &streams_lock); - LIST_ADDQ(&streams, &s->list); - HA_SPIN_UNLOCK(STRMS_LOCK, &streams_lock); + HA_SPIN_LOCK(STRMS_LOCK, &ti->streams_lock); + LIST_ADDQ(&ti->streams, &s->list); + HA_SPIN_UNLOCK(STRMS_LOCK, &ti->streams_lock); if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0) goto out_fail_accept; @@ -713,19 +711,19 @@ static void stream_free(struct stream *s) stream_store_counters(s); - HA_SPIN_LOCK(STRMS_LOCK, &streams_lock); + HA_SPIN_LOCK(STRMS_LOCK, &ti->streams_lock); list_for_each_entry_safe(bref, back, &s->back_refs, users) { /* we have to unlink all watchers. We must not relink them if * this stream was the last one in the list. */ LIST_DEL(&bref->users); LIST_INIT(&bref->users); - if (s->list.n != &streams) + if (s->list.n != &ti->streams) LIST_ADDQ(&LIST_ELEM(s->list.n, struct stream *, list)->back_refs, &bref->users); bref->ref = s->list.n; } LIST_DEL(&s->list); - HA_SPIN_UNLOCK(STRMS_LOCK, &streams_lock); + HA_SPIN_UNLOCK(STRMS_LOCK, &ti->streams_lock); /* applets do not release session yet */ must_free_sess = objt_appctx(sess->origin) && sess->origin == s->si[0].end; @@ -2739,6 +2737,16 @@ void stream_dump_and_crash(enum obj_type *obj, int rate) abort(); } +/* initialize the require structures */ +static void init_stream() +{ + int thr; + + for (thr = 0; thr < MAX_THREADS; thr++) + LIST_INIT(&ha_thread_info[thr].streams); +} +INITCALL0(STG_INIT, init_stream); + /* Generates a unique ID based on the given , stores it in the given and * returns the unique ID. @@ -3187,6 +3195,7 @@ static int cli_parse_show_sess(char **args, char *payload, struct appctx *appctx appctx->ctx.sess.target = NULL; appctx->ctx.sess.section = 0; /* start with stream status */ appctx->ctx.sess.pos = 0; + appctx->ctx.sess.thr = 0; /* let's set our own stream's epoch to the current one and increment * it so that we know which streams were already there before us. @@ -3232,7 +3241,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) * pointer points back to the head of the streams list. */ LIST_INIT(&appctx->ctx.sess.bref.users); - appctx->ctx.sess.bref.ref = streams.n; + appctx->ctx.sess.bref.ref = ha_thread_info[appctx->ctx.sess.thr].streams.n; appctx->st2 = STAT_ST_LIST; /* fall through */ @@ -3244,15 +3253,27 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) } /* and start from where we stopped */ - while (appctx->ctx.sess.bref.ref != &streams) { + while (1) { char pn[INET6_ADDRSTRLEN]; struct stream *curr_strm; + int done= 0; - curr_strm = LIST_ELEM(appctx->ctx.sess.bref.ref, struct stream *, list); + if (appctx->ctx.sess.bref.ref == &ha_thread_info[appctx->ctx.sess.thr].streams) + done = 1; + else { + /* check if we've found a stream created after issuing the "show sess" */ + curr_strm = LIST_ELEM(appctx->ctx.sess.bref.ref, struct stream *, list); + if ((int)(curr_strm->stream_epoch - si_strm(appctx->owner)->stream_epoch) > 0) + done = 1; + } - /* check if we've found a stream created after issuing the "show sess" */ - if ((int)(curr_strm->stream_epoch - si_strm(appctx->owner)->stream_epoch) > 0) - break; + if (done) { + appctx->ctx.sess.thr++; + if (appctx->ctx.sess.thr >= global.nbthread) + break; + appctx->ctx.sess.bref.ref = ha_thread_info[appctx->ctx.sess.thr].streams.n; + continue; + } if (appctx->ctx.sess.target) { if (appctx->ctx.sess.target != (void *)-1 && appctx->ctx.sess.target != curr_strm) @@ -3425,11 +3446,11 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) static void cli_release_show_sess(struct appctx *appctx) { - if (appctx->st2 == STAT_ST_LIST) { - HA_SPIN_LOCK(STRMS_LOCK, &streams_lock); + if (appctx->st2 == STAT_ST_LIST && appctx->ctx.sess.thr < global.nbthread) { + HA_SPIN_LOCK(STRMS_LOCK, &ha_thread_info[appctx->ctx.sess.thr].streams_lock); if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users)) LIST_DEL(&appctx->ctx.sess.bref.users); - HA_SPIN_UNLOCK(STRMS_LOCK, &streams_lock); + HA_SPIN_UNLOCK(STRMS_LOCK, &ha_thread_info[appctx->ctx.sess.thr].streams_lock); } } @@ -3437,6 +3458,7 @@ static void cli_release_show_sess(struct appctx *appctx) static int cli_parse_shutdown_session(char **args, char *payload, struct appctx *appctx, void *private) { struct stream *strm, *ptr; + int thr; if (!cli_has_level(appctx, ACCESS_LVL_ADMIN)) return 1; @@ -3445,21 +3467,24 @@ static int cli_parse_shutdown_session(char **args, char *payload, struct appctx return cli_err(appctx, "Session pointer expected (use 'show sess').\n"); ptr = (void *)strtoul(args[2], NULL, 0); + strm = NULL; thread_isolate(); /* first, look for the requested stream in the stream table */ - list_for_each_entry(strm, &streams, list) { - if (strm == ptr) { - stream_shutdown(strm, SF_ERR_KILLED); - break; + for (thr = 0; !strm && thr < global.nbthread; thr++) { + list_for_each_entry(strm, &ha_thread_info[thr].streams, list) { + if (strm == ptr) { + stream_shutdown(strm, SF_ERR_KILLED); + break; + } } } thread_release(); /* do we have the stream ? */ - if (strm != ptr) + if (!strm) return cli_err(appctx, "No such session (use 'show sess').\n"); return 1;