diff --git a/include/haproxy/dynbuf-t.h b/include/haproxy/dynbuf-t.h index b64cda144..b93c6e0a4 100644 --- a/include/haproxy/dynbuf-t.h +++ b/include/haproxy/dynbuf-t.h @@ -48,6 +48,9 @@ * snd_buf() handler to encode the outgoing channel's data. * - buffer permanently allocated at boot (e.g. temporary compression * buffers). If these fail, we can't boot. + * + * Please DO NOT CHANGE THESE LEVELS without first getting a full understanding + * of how all this works and touching the DB_CRIT_TO_QUEUE() macro below! */ enum dynbuf_crit { DB_GROW_RING = 0, // used to grow an existing buffer ring @@ -61,6 +64,29 @@ enum dynbuf_crit { DB_PERMANENT, // buffers permanently allocated. }; +/* We'll deal with 4 queues, with indexes numbered from 0 to 3 based on the + * criticality of the allocation. All criticality levels are mapped to a 2-bit + * queue index. While some levels never use the queue (the first two), some of + * the others will share a same queue, and all levels will define a ratio of + * allocated emergency buffers below which we refrain from trying to allocate. + * In practice, for now the thresholds will just be the queue number times 33% + * so that queue 0 is allowed to deplete emergency buffers and queue 3 not at + * all. This gives us: queue idx=3 for DB_MUX_RX and below, 2 for DB_SE_RX, + * 1 for DB_CHANNEL, 0 for DB_MUX_TX and above. This must match the DYNBUF_NBQ + * in tinfo-t.h. + */ + +#define DB_CRIT_TO_QUEUE(crit) ((0x000001BF >> ((crit) * 2)) & 3) + +#define DB_GROW_RING_Q DB_CRIT_TO_QUEUE(DB_GROW_RING) +#define DB_UNLIKELY_Q DB_CRIT_TO_QUEUE(DB_UNLIKELY) +#define DB_MUX_RX_Q DB_CRIT_TO_QUEUE(DB_MUX_RX) +#define DB_SE_RX_Q DB_CRIT_TO_QUEUE(DB_SE_RX) +#define DB_CHANNEL_Q DB_CRIT_TO_QUEUE(DB_CHANNEL) +#define DB_MUX_TX_Q DB_CRIT_TO_QUEUE(DB_MUX_TX) +#define DB_PERMANENT_Q DB_CRIT_TO_QUEUE(DB_PERMANENT) + + /* an element of the list. It represents an object that need to * acquire a buffer to continue its process. */ struct buffer_wait { diff --git a/include/haproxy/dynbuf.h b/include/haproxy/dynbuf.h index 4541281ee..d8a7552fc 100644 --- a/include/haproxy/dynbuf.h +++ b/include/haproxy/dynbuf.h @@ -117,8 +117,19 @@ void __offer_buffers(void *from, unsigned int count); static inline void offer_buffers(void *from, unsigned int count) { - if (!LIST_ISEMPTY(&th_ctx->buffer_wq)) + int q; + + if (likely(!th_ctx->bufq_map)) + return; + + for (q = 0; q < DYNBUF_NBQ; q++) { + if (!(th_ctx->bufq_map & (1 << q))) + continue; + + BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q])); __offer_buffers(from, count); + break; + } } /* Queues a buffer request for the current thread via , and returns @@ -130,6 +141,8 @@ static inline void offer_buffers(void *from, unsigned int count) */ static inline int b_requeue(enum dynbuf_crit crit, struct buffer_wait *bw) { + int q = DB_CRIT_TO_QUEUE(crit); + if (LIST_INLIST(&bw->list)) return 1; @@ -137,7 +150,8 @@ static inline int b_requeue(enum dynbuf_crit crit, struct buffer_wait *bw) if (crit < DB_MUX_RX) return 0; - LIST_APPEND(&th_ctx->buffer_wq, &bw->list); + th_ctx->bufq_map |= 1 << q; + LIST_APPEND(&th_ctx->buffer_wq[q], &bw->list); return 1; } diff --git a/include/haproxy/tinfo-t.h b/include/haproxy/tinfo-t.h index 8e7638e2b..4ed78756a 100644 --- a/include/haproxy/tinfo-t.h +++ b/include/haproxy/tinfo-t.h @@ -65,6 +65,8 @@ enum { #define TH_FL_STARTED 0x00000010 /* set once the thread starts */ #define TH_FL_IN_LOOP 0x00000020 /* set only inside the polling loop */ +/* we have 4 buffer-wait queues, in highest to lowest emergency order */ +#define DYNBUF_NBQ 4 /* Thread group information. This defines a base and a count of global thread * IDs which belong to it, and which can be looked up into thread_info/ctx. It @@ -133,14 +135,15 @@ struct thread_ctx { int current_queue; /* points to current tasklet list being run, -1 if none */ unsigned int nb_tasks; /* number of tasks allocated on this thread */ uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */ + uint8_t bufq_map; /* one bit per non-empty buffer_wq */ - // 7 bytes hole here + // 6 bytes hole here struct list pool_lru_head; /* oldest objects in thread-local pool caches */ - struct list buffer_wq; /* buffer waiters */ struct list streams; /* list of streams attached to this thread */ struct list quic_conns; /* list of active quic-conns attached to this thread */ struct list quic_conns_clo; /* list of closing quic-conns attached to this thread */ struct list queued_checks; /* checks waiting for a connection slot */ + struct list buffer_wq[DYNBUF_NBQ]; /* buffer waiters, 4 criticality-based queues */ unsigned int nb_rhttp_conns; /* count of current conns used for active reverse HTTP */ ALWAYS_ALIGN(2*sizeof(void*)); diff --git a/src/dynbuf.c b/src/dynbuf.c index 7a9deb886..a849aeeb9 100644 --- a/src/dynbuf.c +++ b/src/dynbuf.c @@ -30,13 +30,24 @@ int init_buffer() void *buffer; int thr; int done; + int i; pool_head_buffer = create_pool("buffer", global.tune.bufsize, MEM_F_SHARED|MEM_F_EXACT); if (!pool_head_buffer) return 0; - for (thr = 0; thr < MAX_THREADS; thr++) - LIST_INIT(&ha_thread_ctx[thr].buffer_wq); + /* make sure any change to the queues assignment isn't overlooked */ + BUG_ON(DB_PERMANENT - DB_UNLIKELY - 1 != DYNBUF_NBQ); + BUG_ON(DB_MUX_RX_Q < DB_SE_RX_Q || DB_MUX_RX_Q >= DYNBUF_NBQ); + BUG_ON(DB_SE_RX_Q < DB_CHANNEL_Q || DB_SE_RX_Q >= DYNBUF_NBQ); + BUG_ON(DB_CHANNEL_Q < DB_MUX_TX_Q || DB_CHANNEL_Q >= DYNBUF_NBQ); + BUG_ON(DB_MUX_TX_Q >= DYNBUF_NBQ); + + for (thr = 0; thr < MAX_THREADS; thr++) { + for (i = 0; i < DYNBUF_NBQ; i++) + LIST_INIT(&ha_thread_ctx[thr].buffer_wq[i]); + ha_thread_ctx[thr].bufq_map = 0; + } /* The reserved buffer is what we leave behind us. Thus we always need @@ -104,6 +115,7 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to) void __offer_buffers(void *from, unsigned int count) { struct buffer_wait *wait, *wait_back; + int q; /* For now, we consider that all objects need 1 buffer, so we can stop * waking up them once we have enough of them to eat all the available @@ -111,15 +123,23 @@ void __offer_buffers(void *from, unsigned int count) * other tasks, but that's a rough estimate. Similarly, for each cached * event we'll need 1 buffer. */ - list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq, list) { - if (!count) - break; - - if (wait->target == from || !wait->wakeup_cb(wait->target)) + for (q = 0; q < DYNBUF_NBQ; q++) { + if (!(th_ctx->bufq_map & (1 << q))) continue; + BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q])); - LIST_DEL_INIT(&wait->list); - count--; + list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq[q], list) { + if (!count) + break; + + if (wait->target == from || !wait->wakeup_cb(wait->target)) + continue; + + LIST_DEL_INIT(&wait->list); + count--; + } + if (LIST_ISEMPTY(&th_ctx->buffer_wq[q])) + th_ctx->bufq_map &= ~(1 << q); } }