mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-09 00:27:08 +02:00
MEDIUM: buffer: remove the buffer_wq lock
This lock was only needed to protect the buffer_wq list, but now we have the mt_list for this. This patch simply turns the buffer_wq list to an mt_list and gets rid of the lock. It's worth noting that the whole buffer_wait thing still looks totally wrong especially in a threaded context: the wakeup_cb() callback is called synchronously from any thread and may end up calling some connection code that was not expected to run on a given thread. The whole thing should probably be reworked to use tasklets instead and be a bit more centralized.
This commit is contained in:
parent
32bf97fb60
commit
2104659cd5
@ -40,11 +40,11 @@
|
||||
struct buffer_wait {
|
||||
void *target; /* The waiting object that should be woken up */
|
||||
int (*wakeup_cb)(void *); /* The function used to wake up the <target>, passed as argument */
|
||||
struct list list; /* Next element in the <buffer_wq> list */
|
||||
struct mt_list list; /* Next element in the <buffer_wq> list */
|
||||
};
|
||||
|
||||
extern struct pool_head *pool_head_buffer;
|
||||
extern struct list buffer_wq;
|
||||
extern struct mt_list buffer_wq;
|
||||
__decl_hathreads(extern HA_SPINLOCK_T buffer_wq_lock);
|
||||
|
||||
int init_buffer();
|
||||
@ -203,13 +203,8 @@ void __offer_buffer(void *from, unsigned int threshold);
|
||||
|
||||
static inline void offer_buffers(void *from, unsigned int threshold)
|
||||
{
|
||||
if (LIST_ISEMPTY(&buffer_wq))
|
||||
return;
|
||||
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
if (!LIST_ISEMPTY(&buffer_wq))
|
||||
if (!MT_LIST_ISEMPTY(&buffer_wq))
|
||||
__offer_buffer(from, threshold);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
}
|
||||
|
||||
|
||||
|
@ -542,7 +542,6 @@ enum lock_label {
|
||||
STK_SESS_LOCK,
|
||||
APPLETS_LOCK,
|
||||
PEER_LOCK,
|
||||
BUF_WQ_LOCK,
|
||||
STRMS_LOCK,
|
||||
SSL_LOCK,
|
||||
SSL_GEN_CERTS_LOCK,
|
||||
@ -661,7 +660,6 @@ static inline const char *lock_label(enum lock_label label)
|
||||
case STK_SESS_LOCK: return "STK_SESS";
|
||||
case APPLETS_LOCK: return "APPLETS";
|
||||
case PEER_LOCK: return "PEER";
|
||||
case BUF_WQ_LOCK: return "BUF_WQ";
|
||||
case STRMS_LOCK: return "STRMS";
|
||||
case SSL_LOCK: return "SSL";
|
||||
case SSL_GEN_CERTS_LOCK: return "SSL_GEN_CERTS";
|
||||
|
@ -75,7 +75,7 @@ static inline struct appctx *appctx_new(struct applet *applet, unsigned long thr
|
||||
}
|
||||
appctx->t->process = task_run_applet;
|
||||
appctx->t->context = appctx;
|
||||
LIST_INIT(&appctx->buffer_wait.list);
|
||||
MT_LIST_INIT(&appctx->buffer_wait.list);
|
||||
appctx->buffer_wait.target = appctx;
|
||||
appctx->buffer_wait.wakeup_cb = appctx_buf_available;
|
||||
_HA_ATOMIC_ADD(&nb_applets, 1);
|
||||
@ -87,12 +87,8 @@ static inline struct appctx *appctx_new(struct applet *applet, unsigned long thr
|
||||
static inline void __appctx_free(struct appctx *appctx)
|
||||
{
|
||||
task_destroy(appctx->t);
|
||||
if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_DEL(&appctx->buffer_wait.list);
|
||||
LIST_INIT(&appctx->buffer_wait.list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
}
|
||||
if (MT_LIST_ADDED(&appctx->buffer_wait.list))
|
||||
MT_LIST_DEL(&appctx->buffer_wait.list);
|
||||
|
||||
pool_free(pool_head_appctx, appctx);
|
||||
_HA_ATOMIC_SUB(&nb_applets, 1);
|
||||
|
@ -855,11 +855,8 @@ static inline int channel_alloc_buffer(struct channel *chn, struct buffer_wait *
|
||||
if (b_alloc_margin(&chn->buf, margin) != NULL)
|
||||
return 1;
|
||||
|
||||
if (LIST_ISEMPTY(&wait->list)) {
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_ADDQ(&buffer_wq, &wait->list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
}
|
||||
if (!MT_LIST_ADDED(&wait->list))
|
||||
MT_LIST_ADDQ(&buffer_wq, &wait->list);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
11
src/buffer.c
11
src/buffer.c
@ -23,7 +23,7 @@
|
||||
struct pool_head *pool_head_buffer;
|
||||
|
||||
/* list of objects waiting for at least one buffer */
|
||||
struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
|
||||
struct mt_list buffer_wq = LIST_HEAD_INIT(buffer_wq);
|
||||
__decl_aligned_spinlock(buffer_wq_lock);
|
||||
|
||||
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
|
||||
@ -98,7 +98,8 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to)
|
||||
/* see offer_buffer() for details */
|
||||
void __offer_buffer(void *from, unsigned int threshold)
|
||||
{
|
||||
struct buffer_wait *wait, *bak;
|
||||
struct buffer_wait *wait;
|
||||
struct mt_list *elt1, elt2;
|
||||
int avail;
|
||||
|
||||
/* For now, we consider that all objects need 1 buffer, so we can stop
|
||||
@ -112,16 +113,14 @@ void __offer_buffer(void *from, unsigned int threshold)
|
||||
*/
|
||||
avail = pool_head_buffer->allocated - pool_head_buffer->used - global.tune.reserved_bufs / 2;
|
||||
|
||||
list_for_each_entry_safe(wait, bak, &buffer_wq, list) {
|
||||
mt_list_for_each_entry_safe(wait, &buffer_wq, list, elt1, elt2) {
|
||||
if (avail <= threshold)
|
||||
break;
|
||||
|
||||
if (wait->target == from || !wait->wakeup_cb(wait->target))
|
||||
continue;
|
||||
|
||||
LIST_DEL(&wait->list);
|
||||
LIST_INIT(&wait->list);
|
||||
|
||||
MT_LIST_DEL_SAFE(&wait->list);
|
||||
avail--;
|
||||
}
|
||||
}
|
||||
|
@ -1988,7 +1988,7 @@ spoe_create_appctx(struct spoe_config *conf)
|
||||
SPOE_APPCTX(appctx)->buffer = BUF_NULL;
|
||||
SPOE_APPCTX(appctx)->cur_fpa = 0;
|
||||
|
||||
LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
|
||||
MT_LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
|
||||
SPOE_APPCTX(appctx)->buffer_wait.target = appctx;
|
||||
SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_appctx;
|
||||
|
||||
@ -2834,31 +2834,21 @@ spoe_acquire_buffer(struct buffer *buf, struct buffer_wait *buffer_wait)
|
||||
if (buf->size)
|
||||
return 1;
|
||||
|
||||
if (!LIST_ISEMPTY(&buffer_wait->list)) {
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_DEL(&buffer_wait->list);
|
||||
LIST_INIT(&buffer_wait->list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
}
|
||||
if (MT_LIST_ADDED(&buffer_wait->list))
|
||||
MT_LIST_DEL(&buffer_wait->list);
|
||||
|
||||
if (b_alloc_margin(buf, global.tune.reserved_bufs))
|
||||
return 1;
|
||||
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_ADDQ(&buffer_wq, &buffer_wait->list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
MT_LIST_ADDQ(&buffer_wq, &buffer_wait->list);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
spoe_release_buffer(struct buffer *buf, struct buffer_wait *buffer_wait)
|
||||
{
|
||||
if (!LIST_ISEMPTY(&buffer_wait->list)) {
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_DEL(&buffer_wait->list);
|
||||
LIST_INIT(&buffer_wait->list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
}
|
||||
if (MT_LIST_ADDED(&buffer_wait->list))
|
||||
MT_LIST_DEL(&buffer_wait->list);
|
||||
|
||||
/* Release the buffer if needed */
|
||||
if (buf->size) {
|
||||
@ -2892,7 +2882,7 @@ spoe_create_context(struct stream *s, struct filter *filter)
|
||||
ctx->events = conf->agent->events;
|
||||
ctx->groups = &conf->agent->groups;
|
||||
ctx->buffer = BUF_NULL;
|
||||
LIST_INIT(&ctx->buffer_wait.list);
|
||||
MT_LIST_INIT(&ctx->buffer_wait.list);
|
||||
ctx->buffer_wait.target = ctx;
|
||||
ctx->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_context;
|
||||
LIST_INIT(&ctx->list);
|
||||
|
@ -601,13 +601,11 @@ static inline struct buffer *fcgi_get_buf(struct fcgi_conn *fconn, struct buffer
|
||||
{
|
||||
struct buffer *buf = NULL;
|
||||
|
||||
if (likely(!LIST_ADDED(&fconn->buf_wait.list)) &&
|
||||
if (likely(!MT_LIST_ADDED(&fconn->buf_wait.list)) &&
|
||||
unlikely((buf = b_alloc_margin(bptr, 0)) == NULL)) {
|
||||
fconn->buf_wait.target = fconn;
|
||||
fconn->buf_wait.wakeup_cb = fcgi_buf_available;
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_ADDQ(&buffer_wq, &fconn->buf_wait.list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
MT_LIST_ADDQ(&buffer_wq, &fconn->buf_wait.list);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
@ -759,7 +757,7 @@ static int fcgi_init(struct connection *conn, struct proxy *px, struct session *
|
||||
br_init(fconn->mbuf, sizeof(fconn->mbuf) / sizeof(fconn->mbuf[0]));
|
||||
fconn->streams_by_id = EB_ROOT;
|
||||
LIST_INIT(&fconn->send_list);
|
||||
LIST_INIT(&fconn->buf_wait.list);
|
||||
MT_LIST_INIT(&fconn->buf_wait.list);
|
||||
|
||||
conn->ctx = fconn;
|
||||
|
||||
@ -838,11 +836,8 @@ static void fcgi_release(struct fcgi_conn *fconn)
|
||||
|
||||
TRACE_DEVEL("freeing fconn", FCGI_EV_FCONN_END, conn);
|
||||
|
||||
if (LIST_ADDED(&fconn->buf_wait.list)) {
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_DEL(&fconn->buf_wait.list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
}
|
||||
if (MT_LIST_ADDED(&fconn->buf_wait.list))
|
||||
MT_LIST_DEL(&fconn->buf_wait.list);
|
||||
|
||||
fcgi_release_buf(fconn, &fconn->dbuf);
|
||||
fcgi_release_mbuf(fconn);
|
||||
|
16
src/mux_h1.c
16
src/mux_h1.c
@ -415,13 +415,11 @@ static inline struct buffer *h1_get_buf(struct h1c *h1c, struct buffer *bptr)
|
||||
{
|
||||
struct buffer *buf = NULL;
|
||||
|
||||
if (likely(LIST_ISEMPTY(&h1c->buf_wait.list)) &&
|
||||
if (likely(!MT_LIST_ADDED(&h1c->buf_wait.list)) &&
|
||||
unlikely((buf = b_alloc_margin(bptr, 0)) == NULL)) {
|
||||
h1c->buf_wait.target = h1c;
|
||||
h1c->buf_wait.wakeup_cb = h1_buf_available;
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_ADDQ(&buffer_wq, &h1c->buf_wait.list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
MT_LIST_ADDQ(&buffer_wq, &h1c->buf_wait.list);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
@ -659,7 +657,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
|
||||
h1c->h1s = NULL;
|
||||
h1c->task = NULL;
|
||||
|
||||
LIST_INIT(&h1c->buf_wait.list);
|
||||
MT_LIST_INIT(&h1c->buf_wait.list);
|
||||
h1c->wait_event.tasklet = tasklet_new();
|
||||
if (!h1c->wait_event.tasklet)
|
||||
goto fail;
|
||||
@ -747,12 +745,8 @@ static void h1_release(struct h1c *h1c)
|
||||
}
|
||||
|
||||
|
||||
if (!LIST_ISEMPTY(&h1c->buf_wait.list)) {
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_DEL(&h1c->buf_wait.list);
|
||||
LIST_INIT(&h1c->buf_wait.list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
}
|
||||
if (MT_LIST_ADDED(&h1c->buf_wait.list))
|
||||
MT_LIST_DEL(&h1c->buf_wait.list);
|
||||
|
||||
h1_release_buf(h1c, &h1c->ibuf);
|
||||
h1_release_buf(h1c, &h1c->obuf);
|
||||
|
15
src/mux_h2.c
15
src/mux_h2.c
@ -679,13 +679,11 @@ static inline struct buffer *h2_get_buf(struct h2c *h2c, struct buffer *bptr)
|
||||
{
|
||||
struct buffer *buf = NULL;
|
||||
|
||||
if (likely(!LIST_ADDED(&h2c->buf_wait.list)) &&
|
||||
if (likely(!MT_LIST_ADDED(&h2c->buf_wait.list)) &&
|
||||
unlikely((buf = b_alloc_margin(bptr, 0)) == NULL)) {
|
||||
h2c->buf_wait.target = h2c;
|
||||
h2c->buf_wait.wakeup_cb = h2_buf_available;
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_ADDQ(&buffer_wq, &h2c->buf_wait.list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
MT_LIST_ADDQ(&buffer_wq, &h2c->buf_wait.list);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
@ -856,7 +854,7 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s
|
||||
LIST_INIT(&h2c->send_list);
|
||||
LIST_INIT(&h2c->fctl_list);
|
||||
LIST_INIT(&h2c->blocked_list);
|
||||
LIST_INIT(&h2c->buf_wait.list);
|
||||
MT_LIST_INIT(&h2c->buf_wait.list);
|
||||
|
||||
conn->ctx = h2c;
|
||||
|
||||
@ -940,11 +938,8 @@ static void h2_release(struct h2c *h2c)
|
||||
TRACE_DEVEL("freeing h2c", H2_EV_H2C_END, conn);
|
||||
hpack_dht_free(h2c->ddht);
|
||||
|
||||
if (LIST_ADDED(&h2c->buf_wait.list)) {
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_DEL(&h2c->buf_wait.list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
}
|
||||
if (MT_LIST_ADDED(&h2c->buf_wait.list))
|
||||
MT_LIST_DEL(&h2c->buf_wait.list);
|
||||
|
||||
h2_release_buf(h2c, &h2c->dbuf);
|
||||
h2_release_mbuf(h2c);
|
||||
|
25
src/stream.c
25
src/stream.c
@ -391,7 +391,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
||||
/* OK, we're keeping the stream, so let's properly initialize the stream */
|
||||
LIST_INIT(&s->back_refs);
|
||||
|
||||
LIST_INIT(&s->buffer_wait.list);
|
||||
MT_LIST_INIT(&s->buffer_wait.list);
|
||||
s->buffer_wait.target = s;
|
||||
s->buffer_wait.wakeup_cb = stream_buf_available;
|
||||
|
||||
@ -595,12 +595,9 @@ static void stream_free(struct stream *s)
|
||||
put_pipe(s->res.pipe);
|
||||
|
||||
/* We may still be present in the buffer wait queue */
|
||||
if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_DEL(&s->buffer_wait.list);
|
||||
LIST_INIT(&s->buffer_wait.list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
}
|
||||
if (MT_LIST_ADDED(&s->buffer_wait.list))
|
||||
MT_LIST_DEL(&s->buffer_wait.list);
|
||||
|
||||
if (s->req.buf.size || s->res.buf.size) {
|
||||
b_free(&s->req.buf);
|
||||
b_free(&s->res.buf);
|
||||
@ -727,19 +724,13 @@ static void stream_free(struct stream *s)
|
||||
*/
|
||||
static int stream_alloc_work_buffer(struct stream *s)
|
||||
{
|
||||
if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_DEL(&s->buffer_wait.list);
|
||||
LIST_INIT(&s->buffer_wait.list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
}
|
||||
if (MT_LIST_ADDED(&s->buffer_wait.list))
|
||||
MT_LIST_DEL(&s->buffer_wait.list);
|
||||
|
||||
if (b_alloc_margin(&s->res.buf, 0))
|
||||
return 1;
|
||||
|
||||
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
|
||||
HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
|
||||
MT_LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -2788,7 +2779,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
|
||||
chunk_appendf(&trash,
|
||||
" flags=0x%x, conn_retries=%d, srv_conn=%p, pend_pos=%p waiting=%d\n",
|
||||
strm->flags, strm->si[1].conn_retries, strm->srv_conn, strm->pend_pos,
|
||||
!LIST_ISEMPTY(&strm->buffer_wait.list));
|
||||
MT_LIST_ADDED(&strm->buffer_wait.list));
|
||||
|
||||
chunk_appendf(&trash,
|
||||
" frontend=%s (id=%u mode=%s), listener=%s (id=%u)",
|
||||
|
Loading…
Reference in New Issue
Block a user