tsdb: cache collected head chunks on ChunkReader for O(1) lookup (#18302)

tsdb: cache collected head chunks on ChunkReader for O(1) lookup

The query path calls s.chunk() once per chunk meta via
ChunkOrIterableWithCopy. Each call walks the head chunks linked list
from the head to the target position. For a series with N head chunks
iterated oldest-first, total work is O(N²).

Cache the collected []*memChunk slice on headChunkReader, keyed by
series ref, head pointer, and mmapped chunks length. Collected once
per series under lock; reused on subsequent chunk lookups for the same
series. The backing array is reused across series (zero alloc after
first use).

Series with 0 or 1 head chunks skip the cache entirely to avoid
per-series overhead that dominates for typical workloads where most
series have a single head chunk.

The cache is gated behind an enableCache flag, toggled via an optional
chunkCacheToggler interface only when hints.Step > 0 (range queries).
Instant queries only need one chunk per series, so the cache overhead
is not recouped.

Also replace O(N²) linked-list traversals in appendSeriesChunks with
O(N) collectHeadChunks + slice iteration, and thread reusable
headChunksBuf through the index reader paths to avoid per-series
allocations.


---------

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
This commit is contained in:
Arve Knudsen 2026-04-17 18:34:41 +02:00 committed by GitHub
parent ff144f16fb
commit c7b2210ac3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 516 additions and 53 deletions

View File

@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"runtime"
"slices"
"strconv"
"sync"
stdatomic "sync/atomic" //nolint:depguard
@ -2638,6 +2639,22 @@ func (mc *memChunk) len() (count int) {
return count
}
func collectHeadChunks(head *memChunk, buf []*memChunk) []*memChunk {
if head == nil {
return buf
}
// Single walk: append newest-to-oldest (following prev pointers), then
// reverse to oldest-to-newest. Pointer-chasing the linked list is the
// expensive part; slices.Reverse on a contiguous array is essentially
// free by comparison.
hc := buf
for elem := head; elem != nil; elem = elem.prev {
hc = append(hc, elem)
}
slices.Reverse(hc)
return hc
}
// oldest returns the oldest element on the list.
// For single element list this will be the same memChunk oldest() was called on.
func (mc *memChunk) oldest() (elem *memChunk) {

View File

@ -29,6 +29,11 @@ import (
"github.com/prometheus/prometheus/tsdb/index"
)
// headChunksBufMaxCap is the maximum capacity for the reusable headChunksBuf
// slice. If the buffer grows beyond this, it is released to avoid holding
// oversized backing arrays across many series iterations.
const headChunksBufMaxCap = 256
func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
return h.exemplars.ExemplarQuerier(ctx)
}
@ -45,9 +50,14 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
return &headIndexReader{head: h, mint: mint, maxt: maxt}
}
// headIndexReader provides index reading for the head block.
// Not safe for concurrent use from multiple goroutines.
type headIndexReader struct {
head *Head
mint, maxt int64
// Reusable buffer for collectHeadChunks inside appendSeriesChunks,
// avoiding a per-series allocation during iteration.
headChunksBuf []*memChunk
}
func (*headIndexReader) Close() error {
@ -197,7 +207,10 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
defer s.Unlock()
*chks = (*chks)[:0]
*chks = appendSeriesChunks(s, h.mint, h.maxt, *chks)
*chks, h.headChunksBuf = appendSeriesChunks(s, h.mint, h.maxt, *chks, h.headChunksBuf)
if cap(h.headChunksBuf) > headChunksBufMaxCap {
h.headChunksBuf = nil
}
return nil
}
@ -308,7 +321,10 @@ func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.Se
return h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v))
}
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta {
// appendSeriesChunks appends chunk metadata for s to chks.
// headChunksBuf is a reusable buffer for collectHeadChunks; the (possibly grown) buffer is returned
// so callers can pass it back on the next call to avoid per-series allocations.
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta, headChunksBuf []*memChunk) ([]chunks.Meta, []*memChunk) {
for i, c := range s.mmappedChunks {
// Do not expose chunks that are outside of the specified range.
if !c.OverlapsClosedInterval(mint, maxt) {
@ -321,28 +337,39 @@ func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []ch
})
}
if s.headChunks != nil {
var maxTime int64
var i, j int
for i = s.headChunks.len() - 1; i >= 0; i-- {
chk := s.headChunks.atOffset(i)
if i == 0 {
// Set the head chunk as open (being appended to) for the first headChunk.
maxTime = math.MaxInt64
} else {
maxTime = chk.maxTime
}
if chk.OverlapsClosedInterval(mint, maxt) {
chks = append(chks, chunks.Meta{
MinTime: chk.minTime,
MaxTime: maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))),
})
}
j++
if s.headChunks == nil {
return chks, headChunksBuf
}
// Fast path: single head chunk — no allocation, no linked-list walk.
if s.headChunks.prev == nil {
if s.headChunks.OverlapsClosedInterval(mint, maxt) {
chks = append(chks, chunks.Meta{
MinTime: s.headChunks.minTime,
MaxTime: math.MaxInt64,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)))),
})
}
return chks, headChunksBuf
}
// Multiple head chunks: collect once O(N), iterate O(N).
headChunksBuf = collectHeadChunks(s.headChunks, headChunksBuf[:0])
clear(headChunksBuf[len(headChunksBuf):cap(headChunksBuf)])
for i, chk := range headChunksBuf {
maxTime := chk.maxTime
if i == len(headChunksBuf)-1 {
maxTime = math.MaxInt64 // Open (newest) chunk.
}
if chk.OverlapsClosedInterval(mint, maxt) {
chks = append(chks, chunks.Meta{
MinTime: chk.minTime,
MaxTime: maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+i))),
})
}
}
return chks
return chks, headChunksBuf
}
// headChunkID returns the HeadChunkID referred to by the given position.
@ -421,10 +448,22 @@ func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkRead
}, nil
}
// headChunkReader provides chunk reading for the head block.
// Not safe for concurrent use from multiple goroutines.
type headChunkReader struct {
head *Head
mint, maxt int64
isoState *isolationState
// When true, enables the head-chunks cache. Range queries benefit from
// caching because they look up every chunk of a series; instant queries
// only need one chunk per series, so the cache is wasted overhead.
enableCache bool
// Cache for head chunks — avoids O(n²) linked-list walks when
// iterating all chunks of a series oldest-to-newest.
cachedSeriesRef storage.SeriesRef
cachedHeadChunks []*memChunk
cachedHeadChunksHead *memChunk // Head pointer at collection time; detects head-chunk replacement.
cachedMmapLen int // len(s.mmappedChunks) at collection time; detects mmap events.
}
func (h *headChunkReader) Close() error {
@ -434,6 +473,30 @@ func (h *headChunkReader) Close() error {
return nil
}
func (h *headChunkReader) getOrCollectHeadChunks(s *memSeries) []*memChunk {
// Skip if the cache is disabled (instant queries) or there are no head chunks or there's only one.
if !h.enableCache || s.headChunks == nil || s.headChunks.prev == nil {
return nil
}
ref := storage.SeriesRef(s.ref)
if ref == h.cachedSeriesRef && s.headChunks == h.cachedHeadChunksHead && h.cachedMmapLen == len(s.mmappedChunks) {
return h.cachedHeadChunks
}
var buf []*memChunk
if h.cachedHeadChunks != nil {
buf = h.cachedHeadChunks[:0]
}
h.cachedHeadChunks = collectHeadChunks(s.headChunks, buf)
// Allow GC of *memChunk pointers left over from a previous, longer collection.
clear(h.cachedHeadChunks[len(h.cachedHeadChunks):cap(h.cachedHeadChunks)])
h.cachedSeriesRef = ref
h.cachedHeadChunksHead = s.headChunks
h.cachedMmapLen = len(s.mmappedChunks)
return h.cachedHeadChunks
}
// ChunkOrIterable returns the chunk for the reference number.
func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
chk, _, err := h.chunk(meta, false)
@ -465,7 +528,11 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
s.Lock()
defer s.Unlock()
return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk)
var headChunks []*memChunk
if !isOOO {
headChunks = h.getOrCollectHeadChunks(s)
}
return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk, headChunks)
}
// Dumb thing to defeat chunk pool.
@ -474,12 +541,12 @@ type wrapOOOHeadChunk struct {
}
// Call with s locked.
func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool, headChunks []*memChunk) (chunkenc.Chunk, int64, error) {
if isOOO {
chk, maxTime, err := s.oooChunk(cid, h.chunkDiskMapper, &h.memChunkPool)
return wrapOOOHeadChunk{chk}, maxTime, err
}
c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool)
c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool, headChunks)
if err != nil {
return nil, 0, err
}
@ -523,7 +590,7 @@ func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool,
// If headChunk is false, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
// if isOpen is true, it means that the returned *memChunk is used for appends.
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk, isOpen bool, err error) {
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool, headChunks []*memChunk) (chunk *memChunk, headChunk, isOpen bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
@ -539,7 +606,9 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
ix := int(id) - int(s.firstChunkID)
var headChunksLen int
if s.headChunks != nil {
if headChunks != nil {
headChunksLen = len(headChunks)
} else if s.headChunks != nil {
headChunksLen = s.headChunks.len()
}
@ -563,8 +632,19 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
return mc, false, false, nil
}
// Head chunk lookup.
ix -= len(s.mmappedChunks)
// Fast path: use pre-collected slice for O(1) indexed lookup.
if headChunks != nil {
if ix >= len(headChunks) {
return nil, false, false, storage.ErrNotFound
}
return headChunks[ix], true, ix == len(headChunks)-1, nil
}
// Fallback: walk the linked list.
offset := headChunksLen - ix - 1
// headChunks is a linked list where first element is the most recent one and the last one is the oldest.
// This order is reversed when compared with mmappedChunks, since mmappedChunks[0] is the oldest chunk,

View File

@ -15,6 +15,7 @@ package tsdb
import (
"context"
"strconv"
"sync"
"testing"
@ -388,7 +389,7 @@ func TestMemSeries_chunk(t *testing.T) {
tc.setup(t, series, chunkDiskMapper)
}
chk, headChunk, isOpen, err := series.chunk(tc.inputID, chunkDiskMapper, memChunkPool)
chk, headChunk, isOpen, err := series.chunk(tc.inputID, chunkDiskMapper, memChunkPool, nil)
switch tc.expected {
case outOpenHeadChunk:
require.NoError(t, err, "unexpected error")
@ -409,6 +410,71 @@ func TestMemSeries_chunk(t *testing.T) {
}
}
// TestMemSeries_chunk_FastPath verifies that the O(1) indexed lookup via a
// pre-collected headChunks slice returns identical results to the linked-list
// fallback (headChunks=nil) for every chunk in a series with mixed mmapped
// and head chunks.
func TestMemSeries_chunk_FastPath(t *testing.T) {
const chunkRange int64 = 100
const chunkStep int64 = 5
appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) {
for i := start; i < end; i += chunkStep {
ok, _ := s.append(0, i, float64(i), 0, chunkOpts{
chunkDiskMapper: cdm,
chunkRange: chunkRange,
samplesPerChunk: DefaultSamplesPerChunk,
})
require.True(t, ok, "sample append failed")
}
}
dir := t.TempDir()
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
memChunkPool := &sync.Pool{New: func() any { return &memChunk{} }}
series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false)
// Build 3 mmapped + 3 head chunks.
appendSamples(t, series, 0, chunkRange*4, chunkDiskMapper)
series.mmapChunks(chunkDiskMapper)
require.Len(t, series.mmappedChunks, 3)
require.Equal(t, 1, series.headChunks.len())
appendSamples(t, series, chunkRange*4, chunkRange*6, chunkDiskMapper)
require.Equal(t, 3, series.headChunks.len())
require.Len(t, series.mmappedChunks, 3)
hc := collectHeadChunks(series.headChunks, nil)
headChunkCount := int(series.headChunkCount.Load())
require.Equal(t, 3, headChunkCount, "expected 3 head chunks")
totalChunks := len(series.mmappedChunks) + headChunkCount
for ix := range totalChunks {
id := chunks.HeadChunkID(ix)
// Linked-list fallback.
chkLL, headChunkLL, isOpenLL, errLL := series.chunk(id, chunkDiskMapper, memChunkPool, nil)
// Fast path with pre-collected slice.
chkFP, headChunkFP, isOpenFP, errFP := series.chunk(id, chunkDiskMapper, memChunkPool, hc)
require.Equal(t, errLL, errFP, "ix=%d: error mismatch", ix)
require.Equal(t, headChunkLL, headChunkFP, "ix=%d: headChunk mismatch", ix)
require.Equal(t, isOpenLL, isOpenFP, "ix=%d: isOpen mismatch", ix)
if ix < len(series.mmappedChunks) {
// Mmapped chunks are loaded from disk into fresh memChunks, so
// pointer equality is not expected — compare the time range instead.
require.Equal(t, chkLL.minTime, chkFP.minTime, "ix=%d: minTime mismatch", ix)
require.Equal(t, chkLL.maxTime, chkFP.maxTime, "ix=%d: maxTime mismatch", ix)
} else {
// Head chunks should be pointer-identical.
require.Same(t, chkLL, chkFP, "ix=%d: head chunk pointer mismatch", ix)
}
}
}
func TestHeadIndexReader_PostingsForLabelMatching(t *testing.T) {
testPostingsForLabelMatching(t, 0, func(t *testing.T, series []labels.Labels) IndexReader {
opts := DefaultHeadOptions()
@ -430,3 +496,239 @@ func TestHeadIndexReader_PostingsForLabelMatching(t *testing.T) {
return ir
})
}
func TestHeadChunkReaderCache(t *testing.T) {
t.Run("cache_hit", func(t *testing.T) {
// Verify that a second chunk lookup for the same (unchanged) series
// returns the cached head-chunks slice without re-collecting.
opts := DefaultHeadOptions()
opts.ChunkRange = 100
opts.ChunkDirRoot = t.TempDir()
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, h.Close()) })
app := h.Appender(t.Context())
lbls := labels.FromStrings("__name__", "test")
for i := int64(0); i < 500; i += 5 {
_, err := app.Append(0, lbls, i, float64(i))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
s := h.series.getByID(1)
require.NotNil(t, s)
s.Lock()
require.Greater(t, s.headChunks.len(), 1, "need multiple head chunks for the test")
newestCID := s.firstChunkID + chunks.HeadChunkID(len(s.mmappedChunks)) + chunks.HeadChunkID(s.headChunks.len()) - 1
newestRef := chunks.NewHeadChunkRef(s.ref, newestCID)
s.Unlock()
cr, err := h.chunksRange(0, 10000, nil)
require.NoError(t, err)
cr.enableCache = true
// First call: populates the cache.
_, _, err = cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false)
require.NoError(t, err)
require.NotNil(t, cr.cachedHeadChunks)
cachedSlice := cr.cachedHeadChunks
// Second call (same series, no changes): must reuse the cached slice.
_, _, err = cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false)
require.NoError(t, err)
require.Same(t, &cachedSlice[0], &cr.cachedHeadChunks[0], "expected cache hit — slice backing array should be identical")
})
t.Run("invalidated_after_mmap", func(t *testing.T) {
// Regression test: after mmapChunks(), the head-chunks cache must be
// invalidated even though s.headChunks (the pointer) doesn't change.
// mmapChunks severs the linked list (prev=nil, len=1) but
// keeps the same head pointer, so a pointer-only check would return
// a stale cache with chunks that have been mmapped.
opts := DefaultHeadOptions()
opts.ChunkRange = 100
opts.ChunkDirRoot = t.TempDir()
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, h.Close()) })
// Append enough samples to create multiple head chunks.
// With ChunkRange=100 and DefaultSamplesPerChunk=120, each chunk
// holds ~20 samples (range/step = 100/5). We want >=3 head chunks.
app := h.Appender(t.Context())
lbls := labels.FromStrings("__name__", "test")
for i := int64(0); i < 500; i += 5 {
_, err := app.Append(0, lbls, i, float64(i))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Look up the series and verify we have multiple head chunks.
s := h.series.getByID(1)
require.NotNil(t, s)
s.Lock()
require.Greater(t, s.headChunks.len(), 1, "need multiple head chunks for the test")
headChunksLenBefore := s.headChunks.len()
newestChunkMinTime := s.headChunks.minTime
// The chunk ID for the newest head chunk:
// firstChunkID + len(mmapped) + headChunksLen - 1
newestCID := s.firstChunkID + chunks.HeadChunkID(len(s.mmappedChunks)) + chunks.HeadChunkID(s.headChunks.len()) - 1
newestRef := chunks.NewHeadChunkRef(s.ref, newestCID)
s.Unlock()
// Create a headChunkReader with cache enabled and query the newest head chunk to populate the cache.
cr, err := h.chunksRange(0, 10000, nil)
require.NoError(t, err)
cr.enableCache = true
chk1, _, err := cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false)
require.NoError(t, err)
require.NotNil(t, chk1)
// Verify cache is populated.
require.NotNil(t, cr.cachedHeadChunks)
require.Len(t, cr.cachedHeadChunks, headChunksLenBefore)
// Now mmap all but the newest head chunk — this severs the linked list.
s.Lock()
headPtrBefore := s.headChunks
s.mmapChunks(h.chunkDiskMapper)
require.Equal(t, 1, s.headChunks.len(), "after mmap, should have exactly 1 head chunk")
require.Same(t, headPtrBefore, s.headChunks, "head pointer should not change (the bug scenario)")
require.Equal(t, newestChunkMinTime, s.headChunks.minTime, "newest chunk should be the remaining head chunk")
// Recompute the newest chunk ID after mmap: more mmapped chunks now.
newestCID = s.firstChunkID + chunks.HeadChunkID(len(s.mmappedChunks)) + chunks.HeadChunkID(s.headChunks.len()) - 1
newestRef = chunks.NewHeadChunkRef(s.ref, newestCID)
s.Unlock()
// Query the newest head chunk again. With the bug, the stale cache
// would be used and return a wrong (mmapped) chunk.
chk2, _, err := cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false)
require.NoError(t, err)
require.NotNil(t, chk2)
// After mmap, only 1 head chunk remains. The prev==nil guard skips
// the cache entirely, so the stale slice is preserved but not consulted.
require.Len(t, cr.cachedHeadChunks, headChunksLenBefore, "stale cache not cleared, but also not used")
// Verify the returned chunk is actually the newest head chunk, not a stale cached entry.
it := chk2.Iterator(nil)
require.Equal(t, chunkenc.ValFloat, it.Next())
ts, _ := it.At()
require.Equal(t, newestChunkMinTime, ts, "returned chunk should be the newest head chunk, not a stale cached entry")
})
t.Run("buffer_cap_release", func(t *testing.T) {
// Test that headChunksBuf is released when its capacity exceeds
// headChunksBufMaxCap, preventing unbounded memory retention.
opts := DefaultHeadOptions()
opts.ChunkRange = 1
opts.ChunkDirRoot = t.TempDir()
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, h.Close()) })
// Append enough samples to create >headChunksBufMaxCap head chunks.
// With ChunkRange=1, each sample goes to a new chunk.
app := h.Appender(t.Context())
lbls := labels.FromStrings("__name__", "cap_test")
for i := range int64(headChunksBufMaxCap) + 10 {
_, err := app.Append(0, lbls, i, float64(i))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
s := h.series.getByID(1)
require.NotNil(t, s)
s.Lock()
require.Greater(t, s.headChunks.len(), headChunksBufMaxCap, "need >%d head chunks", headChunksBufMaxCap)
s.Unlock()
// Call Series() via headIndexReader — this populates headChunksBuf.
ir := h.indexRange(0, int64(headChunksBufMaxCap)+10)
var builder labels.ScratchBuilder
var chks []chunks.Meta
require.NoError(t, ir.Series(1, &builder, &chks))
require.Greater(t, len(chks), headChunksBufMaxCap)
// Buffer should have been released because cap exceeded threshold.
require.Nil(t, ir.headChunksBuf, "headChunksBuf should be released when cap > headChunksBufMaxCap")
})
t.Run("buffer_cap_release_ooo_index_reader", func(t *testing.T) {
// Same as buffer_cap_release but exercises HeadAndOOOIndexReader.Series,
// which has its own headChunksBufMaxCap check.
opts := DefaultHeadOptions()
opts.ChunkRange = 1
opts.ChunkDirRoot = t.TempDir()
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, h.Close()) })
app := h.Appender(t.Context())
lbls := labels.FromStrings("__name__", "cap_test_ooo")
for i := range int64(headChunksBufMaxCap) + 10 {
_, err := app.Append(0, lbls, i, float64(i))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
s := h.series.getByID(1)
require.NotNil(t, s)
s.Lock()
require.Greater(t, s.headChunks.len(), headChunksBufMaxCap, "need >%d head chunks", headChunksBufMaxCap)
s.Unlock()
// Call Series() via HeadAndOOOIndexReader (non-OOO series, so the else branch is taken).
maxt := int64(headChunksBufMaxCap) + 10
ir := NewHeadAndOOOIndexReader(h, 0, 0, maxt, 0)
var builder labels.ScratchBuilder
var chks []chunks.Meta
require.NoError(t, ir.Series(1, &builder, &chks))
require.Greater(t, len(chks), headChunksBufMaxCap)
// Buffer should have been released because cap exceeded threshold.
require.Nil(t, ir.headChunksBuf, "headChunksBuf should be released when cap > headChunksBufMaxCap")
})
}
var benchSink *memChunk
// BenchmarkSeriesChunkIteration measures iterating all N head chunks of a series
// oldest-to-newest (the real query pattern) using the cached head-chunks slice.
func BenchmarkSeriesChunkIteration(b *testing.B) {
for _, n := range []int{1, 4, 16, 64, 256} {
b.Run(strconv.Itoa(n), func(b *testing.B) {
s := &memSeries{
ref: 1,
firstChunkID: 0,
headChunks: buildHeadChunksLight(n),
}
hc := collectHeadChunks(s.headChunks, nil)
b.ReportAllocs()
for b.Loop() {
for i := range n {
benchSink, _, _, _ = s.chunk(chunks.HeadChunkID(i), nil, nil, hc)
}
}
})
}
}
// buildHeadChunksLight creates a memChunk linked list without allocating chunk
// encodings. Suitable for benchmarks that only need the linked-list structure
// and time ranges.
func buildHeadChunksLight(n int) *memChunk {
var head *memChunk
for i := range n {
head = &memChunk{
minTime: int64(i) * 1000,
maxTime: int64(i)*1000 + 999,
prev: head,
}
}
return head
}

View File

@ -793,7 +793,7 @@ func TestHead_ReadWAL(t *testing.T) {
}
// Verify samples and exemplar for series 10.
c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool, nil)
require.NoError(t, err)
require.Equal(t, []sample{{0, 100, 2, nil, nil}, {0, 101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
@ -806,14 +806,14 @@ func TestHead_ReadWAL(t *testing.T) {
require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0]))
// Verify samples for series 50
c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool, nil)
require.NoError(t, err)
require.Equal(t, []sample{{0, 101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
// Verify records for series 100 and its duplicate, series 101.
// The samples before the new series record should be discarded since a duplicate record
// is only possible when old samples were compacted.
c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool, nil)
require.NoError(t, err)
require.Equal(t, []sample{{0, 101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
@ -1570,21 +1570,21 @@ func TestMemSeries_truncateChunks(t *testing.T) {
// that the ID of the last chunk still gives us the same chunk afterwards.
countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk.
lastID := s.headChunkID(countBefore - 1)
lastChunk, _, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool)
lastChunk, _, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool, nil)
require.NoError(t, err)
require.NotNil(t, lastChunk)
chk, _, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool)
chk, _, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool, nil)
require.NotNil(t, chk)
require.NoError(t, err)
s.truncateChunksBefore(2000, 0)
require.Equal(t, int64(2000), s.mmappedChunks[0].minTime)
_, _, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool)
_, _, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool, nil)
require.Equal(t, storage.ErrNotFound, err, "first chunks not gone")
require.Equal(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk.
chk, _, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool)
chk, _, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool, nil)
require.NoError(t, err)
require.Equal(t, lastChunk, chk)
}
@ -3840,7 +3840,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
New: func() any {
return &memChunk{}
},
})
}, nil)
require.NoError(t, err)
it := c.chunk.Iterator(nil)

View File

@ -77,18 +77,23 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
*chks = (*chks)[:0]
if s.ooo != nil {
return getOOOSeriesChunks(s, oh.head.opts.EnableXOR2Encoding.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
oh.headChunksBuf = getOOOSeriesChunks(s, oh.head.opts.EnableXOR2Encoding.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks, oh.headChunksBuf)
} else {
*chks, oh.headChunksBuf = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks, oh.headChunksBuf)
}
if cap(oh.headChunksBuf) > headChunksBufMaxCap {
oh.headChunksBuf = nil
}
*chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks)
return nil
}
// getOOOSeriesChunks collects chunk metadata for OOO (and optionally in-order) data.
// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so
// any chunk at or before this ref will not be considered. 0 disables this check.
//
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
// maxMmapRef tells up to what max m-map chunk that we can consider. If it is non-0, then
// the oooHeadChunk will not be considered.
func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
// headChunksBuf is a reusable buffer for collectHeadChunks; the (possibly grown) buffer is returned.
func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta, headChunksBuf []*memChunk) []*memChunk {
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
@ -109,7 +114,7 @@ func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbag
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime, useXOR2)
if err != nil {
handleChunkWriteError(err)
return nil
return headChunksBuf
}
for _, chk := range chks {
addChunk(chk.minTime, chk.maxTime, ref, chk.chunk)
@ -129,12 +134,12 @@ func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbag
}
if includeInOrder {
tmpChks = appendSeriesChunks(s, inoMint, maxt, tmpChks)
tmpChks, headChunksBuf = appendSeriesChunks(s, inoMint, maxt, tmpChks, headChunksBuf)
}
// There is nothing to do if we did not collect any chunk.
if len(tmpChks) == 0 {
return nil
return headChunksBuf
}
// Next we want to sort all the collected chunks by min time so we can find
@ -165,7 +170,7 @@ func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbag
}
*chks = append(*chks, toBeMerged)
return nil
return headChunksBuf
}
// Fake Chunk object to pass a set of Metas inside Meta.Chunk.
@ -207,11 +212,12 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) int {
}
type HeadAndOOOChunkReader struct {
head *Head
mint, maxt int64
cr *headChunkReader // If nil, only read OOO chunks.
maxMmapRef chunks.ChunkDiskMapperRef
oooIsoState *oooIsolationState
head *Head
mint, maxt int64
cr *headChunkReader // If nil, only read OOO chunks.
maxMmapRef chunks.ChunkDiskMapperRef
oooIsoState *oooIsolationState
headChunksBuf []*memChunk // Reusable buffer for collectHeadChunks when cr is nil.
}
func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader {
@ -252,7 +258,11 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk
defer s.Unlock()
if meta.Chunk == nil {
c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk)
var headChunks []*memChunk
if !isOOO {
headChunks = cr.collectOrGetHeadChunks(s)
}
c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk, headChunks)
return c, nil, maxt, err
}
mm, ok := meta.Chunk.(*multiMeta)
@ -261,13 +271,17 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk
}
// We have a composite meta: construct a composite iterable.
mc := &mergedOOOChunks{}
var headChunks []*memChunk
for _, m := range mm.metas {
switch {
case m.Chunk != nil:
mc.chunkIterables = append(mc.chunkIterables, m.Chunk)
default:
_, cid, isOOO := unpackHeadChunkRef(m.Ref)
iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk)
if !isOOO && headChunks == nil {
headChunks = cr.collectOrGetHeadChunks(s)
}
iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk, headChunks)
if err != nil {
return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err)
}
@ -277,6 +291,36 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk
return nil, mc, meta.MaxTime, nil
}
// collectOrGetHeadChunks returns the pre-collected head chunks for s. When the
// underlying headChunkReader exists, it delegates to the per-series cache there.
// Otherwise it collects directly into cr.headChunksBuf — this does not cache
// across series, it only reuses the backing array to avoid per-call allocations.
func (cr *HeadAndOOOChunkReader) collectOrGetHeadChunks(s *memSeries) []*memChunk {
if cr.cr != nil {
return cr.cr.getOrCollectHeadChunks(s)
}
// Defensive: in current callers, cr.cr is nil only for OOO compaction
// (OOOCompactionHead.Chunks), which only requests OOO chunks, so this
// branch is not reached. It is kept for correctness if callers change.
cr.headChunksBuf = collectHeadChunks(s.headChunks, cr.headChunksBuf[:0])
clear(cr.headChunksBuf[len(cr.headChunksBuf):cap(cr.headChunksBuf)])
hc := cr.headChunksBuf
if cap(cr.headChunksBuf) > headChunksBufMaxCap {
cr.headChunksBuf = nil
}
return hc
}
// EnableChunkCache enables the head-chunk cache on the underlying headChunkReader.
// This should only be called for range queries where the cache provides O(1) lookups
// across multiple chunk accesses for the same series.
func (cr *HeadAndOOOChunkReader) EnableChunkCache() {
if cr.cr != nil {
cr.cr.enableCache = true
}
}
func (cr *HeadAndOOOChunkReader) Close() error {
if cr.cr != nil && cr.cr.isoState != nil {
cr.cr.isoState.Close()
@ -481,7 +525,8 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l
return nil
}
return getOOOSeriesChunks(s, ir.ch.head.opts.EnableXOR2Encoding.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
getOOOSeriesChunks(s, ir.ch.head.opts.EnableXOR2Encoding.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks, nil)
return nil
}
func (*OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, error) {

View File

@ -117,12 +117,25 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
return selectSeriesSet(ctx, sortSeries, hints, ms, q.index, q.chunks, q.tombstones, q.mint, q.maxt)
}
// chunkCacheToggler is an optional interface implemented by chunk readers that
// support an in-memory head-chunk cache. The cache is only beneficial for range
// queries (Step > 0) where every chunk of a series is accessed.
type chunkCacheToggler interface {
EnableChunkCache()
}
func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher,
index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
) storage.SeriesSet {
disableTrimming := false
sharded := hints != nil && hints.ShardCount > 0
if hints != nil && hints.Step > 0 {
if toggler, ok := chunks.(chunkCacheToggler); ok {
toggler.EnableChunkCache()
}
}
p, err := PostingsForMatchers(ctx, index, ms...)
if err != nil {
return storage.ErrSeriesSet(err)
@ -171,6 +184,12 @@ func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.S
disableTrimming := false
sharded := hints != nil && hints.ShardCount > 0
if hints != nil && hints.Step > 0 {
if toggler, ok := chunks.(chunkCacheToggler); ok {
toggler.EnableChunkCache()
}
}
if hints != nil {
mint = hints.Start
maxt = hints.End