From c7b2210ac3f619feeefdc0075eca3dc81ae757c9 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 17 Apr 2026 18:34:41 +0200 Subject: [PATCH] tsdb: cache collected head chunks on ChunkReader for O(1) lookup (#18302) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tsdb: cache collected head chunks on ChunkReader for O(1) lookup The query path calls s.chunk() once per chunk meta via ChunkOrIterableWithCopy. Each call walks the head chunks linked list from the head to the target position. For a series with N head chunks iterated oldest-first, total work is O(N²). Cache the collected []*memChunk slice on headChunkReader, keyed by series ref, head pointer, and mmapped chunks length. Collected once per series under lock; reused on subsequent chunk lookups for the same series. The backing array is reused across series (zero alloc after first use). Series with 0 or 1 head chunks skip the cache entirely to avoid per-series overhead that dominates for typical workloads where most series have a single head chunk. The cache is gated behind an enableCache flag, toggled via an optional chunkCacheToggler interface only when hints.Step > 0 (range queries). Instant queries only need one chunk per series, so the cache overhead is not recouped. Also replace O(N²) linked-list traversals in appendSeriesChunks with O(N) collectHeadChunks + slice iteration, and thread reusable headChunksBuf through the index reader paths to avoid per-series allocations. --------- Signed-off-by: Arve Knudsen Co-authored-by: George Krajcsovits --- tsdb/head.go | 17 +++ tsdb/head_read.go | 134 ++++++++++++++---- tsdb/head_read_test.go | 304 ++++++++++++++++++++++++++++++++++++++++- tsdb/head_test.go | 16 +-- tsdb/ooo_head_read.go | 79 ++++++++--- tsdb/querier.go | 19 +++ 6 files changed, 516 insertions(+), 53 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index e26e788aa7..4840f0cae5 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "runtime" + "slices" "strconv" "sync" stdatomic "sync/atomic" //nolint:depguard @@ -2638,6 +2639,22 @@ func (mc *memChunk) len() (count int) { return count } +func collectHeadChunks(head *memChunk, buf []*memChunk) []*memChunk { + if head == nil { + return buf + } + // Single walk: append newest-to-oldest (following prev pointers), then + // reverse to oldest-to-newest. Pointer-chasing the linked list is the + // expensive part; slices.Reverse on a contiguous array is essentially + // free by comparison. + hc := buf + for elem := head; elem != nil; elem = elem.prev { + hc = append(hc, elem) + } + slices.Reverse(hc) + return hc +} + // oldest returns the oldest element on the list. // For single element list this will be the same memChunk oldest() was called on. func (mc *memChunk) oldest() (elem *memChunk) { diff --git a/tsdb/head_read.go b/tsdb/head_read.go index f0a1331fbb..7ce90feb47 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -29,6 +29,11 @@ import ( "github.com/prometheus/prometheus/tsdb/index" ) +// headChunksBufMaxCap is the maximum capacity for the reusable headChunksBuf +// slice. If the buffer grows beyond this, it is released to avoid holding +// oversized backing arrays across many series iterations. +const headChunksBufMaxCap = 256 + func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { return h.exemplars.ExemplarQuerier(ctx) } @@ -45,9 +50,14 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { return &headIndexReader{head: h, mint: mint, maxt: maxt} } +// headIndexReader provides index reading for the head block. +// Not safe for concurrent use from multiple goroutines. type headIndexReader struct { head *Head mint, maxt int64 + // Reusable buffer for collectHeadChunks inside appendSeriesChunks, + // avoiding a per-series allocation during iteration. + headChunksBuf []*memChunk } func (*headIndexReader) Close() error { @@ -197,7 +207,10 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB defer s.Unlock() *chks = (*chks)[:0] - *chks = appendSeriesChunks(s, h.mint, h.maxt, *chks) + *chks, h.headChunksBuf = appendSeriesChunks(s, h.mint, h.maxt, *chks, h.headChunksBuf) + if cap(h.headChunksBuf) > headChunksBufMaxCap { + h.headChunksBuf = nil + } return nil } @@ -308,7 +321,10 @@ func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.Se return h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v)) } -func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta { +// appendSeriesChunks appends chunk metadata for s to chks. +// headChunksBuf is a reusable buffer for collectHeadChunks; the (possibly grown) buffer is returned +// so callers can pass it back on the next call to avoid per-series allocations. +func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta, headChunksBuf []*memChunk) ([]chunks.Meta, []*memChunk) { for i, c := range s.mmappedChunks { // Do not expose chunks that are outside of the specified range. if !c.OverlapsClosedInterval(mint, maxt) { @@ -321,28 +337,39 @@ func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []ch }) } - if s.headChunks != nil { - var maxTime int64 - var i, j int - for i = s.headChunks.len() - 1; i >= 0; i-- { - chk := s.headChunks.atOffset(i) - if i == 0 { - // Set the head chunk as open (being appended to) for the first headChunk. - maxTime = math.MaxInt64 - } else { - maxTime = chk.maxTime - } - if chk.OverlapsClosedInterval(mint, maxt) { - chks = append(chks, chunks.Meta{ - MinTime: chk.minTime, - MaxTime: maxTime, - Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))), - }) - } - j++ + if s.headChunks == nil { + return chks, headChunksBuf + } + + // Fast path: single head chunk — no allocation, no linked-list walk. + if s.headChunks.prev == nil { + if s.headChunks.OverlapsClosedInterval(mint, maxt) { + chks = append(chks, chunks.Meta{ + MinTime: s.headChunks.minTime, + MaxTime: math.MaxInt64, + Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)))), + }) + } + return chks, headChunksBuf + } + + // Multiple head chunks: collect once O(N), iterate O(N). + headChunksBuf = collectHeadChunks(s.headChunks, headChunksBuf[:0]) + clear(headChunksBuf[len(headChunksBuf):cap(headChunksBuf)]) + for i, chk := range headChunksBuf { + maxTime := chk.maxTime + if i == len(headChunksBuf)-1 { + maxTime = math.MaxInt64 // Open (newest) chunk. + } + if chk.OverlapsClosedInterval(mint, maxt) { + chks = append(chks, chunks.Meta{ + MinTime: chk.minTime, + MaxTime: maxTime, + Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+i))), + }) } } - return chks + return chks, headChunksBuf } // headChunkID returns the HeadChunkID referred to by the given position. @@ -421,10 +448,22 @@ func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkRead }, nil } +// headChunkReader provides chunk reading for the head block. +// Not safe for concurrent use from multiple goroutines. type headChunkReader struct { head *Head mint, maxt int64 isoState *isolationState + // When true, enables the head-chunks cache. Range queries benefit from + // caching because they look up every chunk of a series; instant queries + // only need one chunk per series, so the cache is wasted overhead. + enableCache bool + // Cache for head chunks — avoids O(n²) linked-list walks when + // iterating all chunks of a series oldest-to-newest. + cachedSeriesRef storage.SeriesRef + cachedHeadChunks []*memChunk + cachedHeadChunksHead *memChunk // Head pointer at collection time; detects head-chunk replacement. + cachedMmapLen int // len(s.mmappedChunks) at collection time; detects mmap events. } func (h *headChunkReader) Close() error { @@ -434,6 +473,30 @@ func (h *headChunkReader) Close() error { return nil } +func (h *headChunkReader) getOrCollectHeadChunks(s *memSeries) []*memChunk { + // Skip if the cache is disabled (instant queries) or there are no head chunks or there's only one. + if !h.enableCache || s.headChunks == nil || s.headChunks.prev == nil { + return nil + } + + ref := storage.SeriesRef(s.ref) + if ref == h.cachedSeriesRef && s.headChunks == h.cachedHeadChunksHead && h.cachedMmapLen == len(s.mmappedChunks) { + return h.cachedHeadChunks + } + + var buf []*memChunk + if h.cachedHeadChunks != nil { + buf = h.cachedHeadChunks[:0] + } + h.cachedHeadChunks = collectHeadChunks(s.headChunks, buf) + // Allow GC of *memChunk pointers left over from a previous, longer collection. + clear(h.cachedHeadChunks[len(h.cachedHeadChunks):cap(h.cachedHeadChunks)]) + h.cachedSeriesRef = ref + h.cachedHeadChunksHead = s.headChunks + h.cachedMmapLen = len(s.mmappedChunks) + return h.cachedHeadChunks +} + // ChunkOrIterable returns the chunk for the reference number. func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { chk, _, err := h.chunk(meta, false) @@ -465,7 +528,11 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. s.Lock() defer s.Unlock() - return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk) + var headChunks []*memChunk + if !isOOO { + headChunks = h.getOrCollectHeadChunks(s) + } + return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk, headChunks) } // Dumb thing to defeat chunk pool. @@ -474,12 +541,12 @@ type wrapOOOHeadChunk struct { } // Call with s locked. -func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) { +func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool, headChunks []*memChunk) (chunkenc.Chunk, int64, error) { if isOOO { chk, maxTime, err := s.oooChunk(cid, h.chunkDiskMapper, &h.memChunkPool) return wrapOOOHeadChunk{chk}, maxTime, err } - c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool) + c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool, headChunks) if err != nil { return nil, 0, err } @@ -523,7 +590,7 @@ func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, // If headChunk is false, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. // if isOpen is true, it means that the returned *memChunk is used for appends. -func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk, isOpen bool, err error) { +func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool, headChunks []*memChunk) (chunk *memChunk, headChunk, isOpen bool, err error) { // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix @@ -539,7 +606,9 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi ix := int(id) - int(s.firstChunkID) var headChunksLen int - if s.headChunks != nil { + if headChunks != nil { + headChunksLen = len(headChunks) + } else if s.headChunks != nil { headChunksLen = s.headChunks.len() } @@ -563,8 +632,19 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi return mc, false, false, nil } + // Head chunk lookup. ix -= len(s.mmappedChunks) + // Fast path: use pre-collected slice for O(1) indexed lookup. + if headChunks != nil { + if ix >= len(headChunks) { + return nil, false, false, storage.ErrNotFound + } + return headChunks[ix], true, ix == len(headChunks)-1, nil + } + + // Fallback: walk the linked list. + offset := headChunksLen - ix - 1 // headChunks is a linked list where first element is the most recent one and the last one is the oldest. // This order is reversed when compared with mmappedChunks, since mmappedChunks[0] is the oldest chunk, diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index 0849c257b5..ae2b360411 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "strconv" "sync" "testing" @@ -388,7 +389,7 @@ func TestMemSeries_chunk(t *testing.T) { tc.setup(t, series, chunkDiskMapper) } - chk, headChunk, isOpen, err := series.chunk(tc.inputID, chunkDiskMapper, memChunkPool) + chk, headChunk, isOpen, err := series.chunk(tc.inputID, chunkDiskMapper, memChunkPool, nil) switch tc.expected { case outOpenHeadChunk: require.NoError(t, err, "unexpected error") @@ -409,6 +410,71 @@ func TestMemSeries_chunk(t *testing.T) { } } +// TestMemSeries_chunk_FastPath verifies that the O(1) indexed lookup via a +// pre-collected headChunks slice returns identical results to the linked-list +// fallback (headChunks=nil) for every chunk in a series with mixed mmapped +// and head chunks. +func TestMemSeries_chunk_FastPath(t *testing.T) { + const chunkRange int64 = 100 + const chunkStep int64 = 5 + appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) { + for i := start; i < end; i += chunkStep { + ok, _ := s.append(0, i, float64(i), 0, chunkOpts{ + chunkDiskMapper: cdm, + chunkRange: chunkRange, + samplesPerChunk: DefaultSamplesPerChunk, + }) + require.True(t, ok, "sample append failed") + } + } + + dir := t.TempDir() + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) + require.NoError(t, err) + defer func() { + require.NoError(t, chunkDiskMapper.Close()) + }() + memChunkPool := &sync.Pool{New: func() any { return &memChunk{} }} + + series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false) + + // Build 3 mmapped + 3 head chunks. + appendSamples(t, series, 0, chunkRange*4, chunkDiskMapper) + series.mmapChunks(chunkDiskMapper) + require.Len(t, series.mmappedChunks, 3) + require.Equal(t, 1, series.headChunks.len()) + appendSamples(t, series, chunkRange*4, chunkRange*6, chunkDiskMapper) + require.Equal(t, 3, series.headChunks.len()) + require.Len(t, series.mmappedChunks, 3) + + hc := collectHeadChunks(series.headChunks, nil) + + headChunkCount := int(series.headChunkCount.Load()) + require.Equal(t, 3, headChunkCount, "expected 3 head chunks") + totalChunks := len(series.mmappedChunks) + headChunkCount + for ix := range totalChunks { + id := chunks.HeadChunkID(ix) + + // Linked-list fallback. + chkLL, headChunkLL, isOpenLL, errLL := series.chunk(id, chunkDiskMapper, memChunkPool, nil) + // Fast path with pre-collected slice. + chkFP, headChunkFP, isOpenFP, errFP := series.chunk(id, chunkDiskMapper, memChunkPool, hc) + + require.Equal(t, errLL, errFP, "ix=%d: error mismatch", ix) + require.Equal(t, headChunkLL, headChunkFP, "ix=%d: headChunk mismatch", ix) + require.Equal(t, isOpenLL, isOpenFP, "ix=%d: isOpen mismatch", ix) + if ix < len(series.mmappedChunks) { + // Mmapped chunks are loaded from disk into fresh memChunks, so + // pointer equality is not expected — compare the time range instead. + require.Equal(t, chkLL.minTime, chkFP.minTime, "ix=%d: minTime mismatch", ix) + require.Equal(t, chkLL.maxTime, chkFP.maxTime, "ix=%d: maxTime mismatch", ix) + } else { + // Head chunks should be pointer-identical. + require.Same(t, chkLL, chkFP, "ix=%d: head chunk pointer mismatch", ix) + } + } +} + func TestHeadIndexReader_PostingsForLabelMatching(t *testing.T) { testPostingsForLabelMatching(t, 0, func(t *testing.T, series []labels.Labels) IndexReader { opts := DefaultHeadOptions() @@ -430,3 +496,239 @@ func TestHeadIndexReader_PostingsForLabelMatching(t *testing.T) { return ir }) } + +func TestHeadChunkReaderCache(t *testing.T) { + t.Run("cache_hit", func(t *testing.T) { + // Verify that a second chunk lookup for the same (unchanged) series + // returns the cached head-chunks slice without re-collecting. + opts := DefaultHeadOptions() + opts.ChunkRange = 100 + opts.ChunkDirRoot = t.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, h.Close()) }) + + app := h.Appender(t.Context()) + lbls := labels.FromStrings("__name__", "test") + for i := int64(0); i < 500; i += 5 { + _, err := app.Append(0, lbls, i, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + s := h.series.getByID(1) + require.NotNil(t, s) + s.Lock() + require.Greater(t, s.headChunks.len(), 1, "need multiple head chunks for the test") + newestCID := s.firstChunkID + chunks.HeadChunkID(len(s.mmappedChunks)) + chunks.HeadChunkID(s.headChunks.len()) - 1 + newestRef := chunks.NewHeadChunkRef(s.ref, newestCID) + s.Unlock() + + cr, err := h.chunksRange(0, 10000, nil) + require.NoError(t, err) + cr.enableCache = true + + // First call: populates the cache. + _, _, err = cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false) + require.NoError(t, err) + require.NotNil(t, cr.cachedHeadChunks) + cachedSlice := cr.cachedHeadChunks + + // Second call (same series, no changes): must reuse the cached slice. + _, _, err = cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false) + require.NoError(t, err) + require.Same(t, &cachedSlice[0], &cr.cachedHeadChunks[0], "expected cache hit — slice backing array should be identical") + }) + + t.Run("invalidated_after_mmap", func(t *testing.T) { + // Regression test: after mmapChunks(), the head-chunks cache must be + // invalidated even though s.headChunks (the pointer) doesn't change. + // mmapChunks severs the linked list (prev=nil, len=1) but + // keeps the same head pointer, so a pointer-only check would return + // a stale cache with chunks that have been mmapped. + + opts := DefaultHeadOptions() + opts.ChunkRange = 100 + opts.ChunkDirRoot = t.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, h.Close()) }) + + // Append enough samples to create multiple head chunks. + // With ChunkRange=100 and DefaultSamplesPerChunk=120, each chunk + // holds ~20 samples (range/step = 100/5). We want >=3 head chunks. + app := h.Appender(t.Context()) + lbls := labels.FromStrings("__name__", "test") + for i := int64(0); i < 500; i += 5 { + _, err := app.Append(0, lbls, i, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + // Look up the series and verify we have multiple head chunks. + s := h.series.getByID(1) + require.NotNil(t, s) + s.Lock() + require.Greater(t, s.headChunks.len(), 1, "need multiple head chunks for the test") + headChunksLenBefore := s.headChunks.len() + newestChunkMinTime := s.headChunks.minTime + // The chunk ID for the newest head chunk: + // firstChunkID + len(mmapped) + headChunksLen - 1 + newestCID := s.firstChunkID + chunks.HeadChunkID(len(s.mmappedChunks)) + chunks.HeadChunkID(s.headChunks.len()) - 1 + newestRef := chunks.NewHeadChunkRef(s.ref, newestCID) + s.Unlock() + + // Create a headChunkReader with cache enabled and query the newest head chunk to populate the cache. + cr, err := h.chunksRange(0, 10000, nil) + require.NoError(t, err) + cr.enableCache = true + + chk1, _, err := cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false) + require.NoError(t, err) + require.NotNil(t, chk1) + + // Verify cache is populated. + require.NotNil(t, cr.cachedHeadChunks) + require.Len(t, cr.cachedHeadChunks, headChunksLenBefore) + + // Now mmap all but the newest head chunk — this severs the linked list. + s.Lock() + headPtrBefore := s.headChunks + s.mmapChunks(h.chunkDiskMapper) + require.Equal(t, 1, s.headChunks.len(), "after mmap, should have exactly 1 head chunk") + require.Same(t, headPtrBefore, s.headChunks, "head pointer should not change (the bug scenario)") + require.Equal(t, newestChunkMinTime, s.headChunks.minTime, "newest chunk should be the remaining head chunk") + + // Recompute the newest chunk ID after mmap: more mmapped chunks now. + newestCID = s.firstChunkID + chunks.HeadChunkID(len(s.mmappedChunks)) + chunks.HeadChunkID(s.headChunks.len()) - 1 + newestRef = chunks.NewHeadChunkRef(s.ref, newestCID) + s.Unlock() + + // Query the newest head chunk again. With the bug, the stale cache + // would be used and return a wrong (mmapped) chunk. + chk2, _, err := cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false) + require.NoError(t, err) + require.NotNil(t, chk2) + + // After mmap, only 1 head chunk remains. The prev==nil guard skips + // the cache entirely, so the stale slice is preserved but not consulted. + require.Len(t, cr.cachedHeadChunks, headChunksLenBefore, "stale cache not cleared, but also not used") + + // Verify the returned chunk is actually the newest head chunk, not a stale cached entry. + it := chk2.Iterator(nil) + require.Equal(t, chunkenc.ValFloat, it.Next()) + ts, _ := it.At() + require.Equal(t, newestChunkMinTime, ts, "returned chunk should be the newest head chunk, not a stale cached entry") + }) + + t.Run("buffer_cap_release", func(t *testing.T) { + // Test that headChunksBuf is released when its capacity exceeds + // headChunksBufMaxCap, preventing unbounded memory retention. + opts := DefaultHeadOptions() + opts.ChunkRange = 1 + opts.ChunkDirRoot = t.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, h.Close()) }) + + // Append enough samples to create >headChunksBufMaxCap head chunks. + // With ChunkRange=1, each sample goes to a new chunk. + app := h.Appender(t.Context()) + lbls := labels.FromStrings("__name__", "cap_test") + for i := range int64(headChunksBufMaxCap) + 10 { + _, err := app.Append(0, lbls, i, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + s := h.series.getByID(1) + require.NotNil(t, s) + s.Lock() + require.Greater(t, s.headChunks.len(), headChunksBufMaxCap, "need >%d head chunks", headChunksBufMaxCap) + s.Unlock() + + // Call Series() via headIndexReader — this populates headChunksBuf. + ir := h.indexRange(0, int64(headChunksBufMaxCap)+10) + var builder labels.ScratchBuilder + var chks []chunks.Meta + require.NoError(t, ir.Series(1, &builder, &chks)) + require.Greater(t, len(chks), headChunksBufMaxCap) + + // Buffer should have been released because cap exceeded threshold. + require.Nil(t, ir.headChunksBuf, "headChunksBuf should be released when cap > headChunksBufMaxCap") + }) + + t.Run("buffer_cap_release_ooo_index_reader", func(t *testing.T) { + // Same as buffer_cap_release but exercises HeadAndOOOIndexReader.Series, + // which has its own headChunksBufMaxCap check. + opts := DefaultHeadOptions() + opts.ChunkRange = 1 + opts.ChunkDirRoot = t.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, h.Close()) }) + + app := h.Appender(t.Context()) + lbls := labels.FromStrings("__name__", "cap_test_ooo") + for i := range int64(headChunksBufMaxCap) + 10 { + _, err := app.Append(0, lbls, i, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + s := h.series.getByID(1) + require.NotNil(t, s) + s.Lock() + require.Greater(t, s.headChunks.len(), headChunksBufMaxCap, "need >%d head chunks", headChunksBufMaxCap) + s.Unlock() + + // Call Series() via HeadAndOOOIndexReader (non-OOO series, so the else branch is taken). + maxt := int64(headChunksBufMaxCap) + 10 + ir := NewHeadAndOOOIndexReader(h, 0, 0, maxt, 0) + var builder labels.ScratchBuilder + var chks []chunks.Meta + require.NoError(t, ir.Series(1, &builder, &chks)) + require.Greater(t, len(chks), headChunksBufMaxCap) + + // Buffer should have been released because cap exceeded threshold. + require.Nil(t, ir.headChunksBuf, "headChunksBuf should be released when cap > headChunksBufMaxCap") + }) +} + +var benchSink *memChunk + +// BenchmarkSeriesChunkIteration measures iterating all N head chunks of a series +// oldest-to-newest (the real query pattern) using the cached head-chunks slice. +func BenchmarkSeriesChunkIteration(b *testing.B) { + for _, n := range []int{1, 4, 16, 64, 256} { + b.Run(strconv.Itoa(n), func(b *testing.B) { + s := &memSeries{ + ref: 1, + firstChunkID: 0, + headChunks: buildHeadChunksLight(n), + } + hc := collectHeadChunks(s.headChunks, nil) + b.ReportAllocs() + for b.Loop() { + for i := range n { + benchSink, _, _, _ = s.chunk(chunks.HeadChunkID(i), nil, nil, hc) + } + } + }) + } +} + +// buildHeadChunksLight creates a memChunk linked list without allocating chunk +// encodings. Suitable for benchmarks that only need the linked-list structure +// and time ranges. +func buildHeadChunksLight(n int) *memChunk { + var head *memChunk + for i := range n { + head = &memChunk{ + minTime: int64(i) * 1000, + maxTime: int64(i)*1000 + 999, + prev: head, + } + } + return head +} diff --git a/tsdb/head_test.go b/tsdb/head_test.go index ac1c339f09..0bbdc4c12c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -793,7 +793,7 @@ func TestHead_ReadWAL(t *testing.T) { } // Verify samples and exemplar for series 10. - c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool, nil) require.NoError(t, err) require.Equal(t, []sample{{0, 100, 2, nil, nil}, {0, 101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) @@ -806,14 +806,14 @@ func TestHead_ReadWAL(t *testing.T) { require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0])) // Verify samples for series 50 - c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool, nil) require.NoError(t, err) require.Equal(t, []sample{{0, 101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) // Verify records for series 100 and its duplicate, series 101. // The samples before the new series record should be discarded since a duplicate record // is only possible when old samples were compacted. - c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool, nil) require.NoError(t, err) require.Equal(t, []sample{{0, 101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) @@ -1570,21 +1570,21 @@ func TestMemSeries_truncateChunks(t *testing.T) { // that the ID of the last chunk still gives us the same chunk afterwards. countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk. lastID := s.headChunkID(countBefore - 1) - lastChunk, _, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool) + lastChunk, _, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool, nil) require.NoError(t, err) require.NotNil(t, lastChunk) - chk, _, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool) + chk, _, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool, nil) require.NotNil(t, chk) require.NoError(t, err) s.truncateChunksBefore(2000, 0) require.Equal(t, int64(2000), s.mmappedChunks[0].minTime) - _, _, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool) + _, _, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool, nil) require.Equal(t, storage.ErrNotFound, err, "first chunks not gone") require.Equal(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk. - chk, _, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool) + chk, _, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool, nil) require.NoError(t, err) require.Equal(t, lastChunk, chk) } @@ -3840,7 +3840,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { New: func() any { return &memChunk{} }, - }) + }, nil) require.NoError(t, err) it := c.chunk.Iterator(nil) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index ed3e7baeb5..0aa381a673 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -77,18 +77,23 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S *chks = (*chks)[:0] if s.ooo != nil { - return getOOOSeriesChunks(s, oh.head.opts.EnableXOR2Encoding.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks) + oh.headChunksBuf = getOOOSeriesChunks(s, oh.head.opts.EnableXOR2Encoding.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks, oh.headChunksBuf) + } else { + *chks, oh.headChunksBuf = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks, oh.headChunksBuf) + } + if cap(oh.headChunksBuf) > headChunksBufMaxCap { + oh.headChunksBuf = nil } - *chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks) return nil } +// getOOOSeriesChunks collects chunk metadata for OOO (and optionally in-order) data. // lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so // any chunk at or before this ref will not be considered. 0 disables this check. -// -// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then +// maxMmapRef tells up to what max m-map chunk that we can consider. If it is non-0, then // the oooHeadChunk will not be considered. -func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error { +// headChunksBuf is a reusable buffer for collectHeadChunks; the (possibly grown) buffer is returned. +func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta, headChunksBuf []*memChunk) []*memChunk { tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { @@ -109,7 +114,7 @@ func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbag chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime, useXOR2) if err != nil { handleChunkWriteError(err) - return nil + return headChunksBuf } for _, chk := range chks { addChunk(chk.minTime, chk.maxTime, ref, chk.chunk) @@ -129,12 +134,12 @@ func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbag } if includeInOrder { - tmpChks = appendSeriesChunks(s, inoMint, maxt, tmpChks) + tmpChks, headChunksBuf = appendSeriesChunks(s, inoMint, maxt, tmpChks, headChunksBuf) } // There is nothing to do if we did not collect any chunk. if len(tmpChks) == 0 { - return nil + return headChunksBuf } // Next we want to sort all the collected chunks by min time so we can find @@ -165,7 +170,7 @@ func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbag } *chks = append(*chks, toBeMerged) - return nil + return headChunksBuf } // Fake Chunk object to pass a set of Metas inside Meta.Chunk. @@ -207,11 +212,12 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) int { } type HeadAndOOOChunkReader struct { - head *Head - mint, maxt int64 - cr *headChunkReader // If nil, only read OOO chunks. - maxMmapRef chunks.ChunkDiskMapperRef - oooIsoState *oooIsolationState + head *Head + mint, maxt int64 + cr *headChunkReader // If nil, only read OOO chunks. + maxMmapRef chunks.ChunkDiskMapperRef + oooIsoState *oooIsolationState + headChunksBuf []*memChunk // Reusable buffer for collectHeadChunks when cr is nil. } func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { @@ -252,7 +258,11 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk defer s.Unlock() if meta.Chunk == nil { - c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk) + var headChunks []*memChunk + if !isOOO { + headChunks = cr.collectOrGetHeadChunks(s) + } + c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk, headChunks) return c, nil, maxt, err } mm, ok := meta.Chunk.(*multiMeta) @@ -261,13 +271,17 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk } // We have a composite meta: construct a composite iterable. mc := &mergedOOOChunks{} + var headChunks []*memChunk for _, m := range mm.metas { switch { case m.Chunk != nil: mc.chunkIterables = append(mc.chunkIterables, m.Chunk) default: _, cid, isOOO := unpackHeadChunkRef(m.Ref) - iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk) + if !isOOO && headChunks == nil { + headChunks = cr.collectOrGetHeadChunks(s) + } + iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk, headChunks) if err != nil { return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err) } @@ -277,6 +291,36 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk return nil, mc, meta.MaxTime, nil } +// collectOrGetHeadChunks returns the pre-collected head chunks for s. When the +// underlying headChunkReader exists, it delegates to the per-series cache there. +// Otherwise it collects directly into cr.headChunksBuf — this does not cache +// across series, it only reuses the backing array to avoid per-call allocations. +func (cr *HeadAndOOOChunkReader) collectOrGetHeadChunks(s *memSeries) []*memChunk { + if cr.cr != nil { + return cr.cr.getOrCollectHeadChunks(s) + } + + // Defensive: in current callers, cr.cr is nil only for OOO compaction + // (OOOCompactionHead.Chunks), which only requests OOO chunks, so this + // branch is not reached. It is kept for correctness if callers change. + cr.headChunksBuf = collectHeadChunks(s.headChunks, cr.headChunksBuf[:0]) + clear(cr.headChunksBuf[len(cr.headChunksBuf):cap(cr.headChunksBuf)]) + hc := cr.headChunksBuf + if cap(cr.headChunksBuf) > headChunksBufMaxCap { + cr.headChunksBuf = nil + } + return hc +} + +// EnableChunkCache enables the head-chunk cache on the underlying headChunkReader. +// This should only be called for range queries where the cache provides O(1) lookups +// across multiple chunk accesses for the same series. +func (cr *HeadAndOOOChunkReader) EnableChunkCache() { + if cr.cr != nil { + cr.cr.enableCache = true + } +} + func (cr *HeadAndOOOChunkReader) Close() error { if cr.cr != nil && cr.cr.isoState != nil { cr.cr.isoState.Close() @@ -481,7 +525,8 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l return nil } - return getOOOSeriesChunks(s, ir.ch.head.opts.EnableXOR2Encoding.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks) + getOOOSeriesChunks(s, ir.ch.head.opts.EnableXOR2Encoding.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks, nil) + return nil } func (*OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, error) { diff --git a/tsdb/querier.go b/tsdb/querier.go index 6d0cf36db4..82936acdb9 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -117,12 +117,25 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora return selectSeriesSet(ctx, sortSeries, hints, ms, q.index, q.chunks, q.tombstones, q.mint, q.maxt) } +// chunkCacheToggler is an optional interface implemented by chunk readers that +// support an in-memory head-chunk cache. The cache is only beneficial for range +// queries (Step > 0) where every chunk of a series is accessed. +type chunkCacheToggler interface { + EnableChunkCache() +} + func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64, ) storage.SeriesSet { disableTrimming := false sharded := hints != nil && hints.ShardCount > 0 + if hints != nil && hints.Step > 0 { + if toggler, ok := chunks.(chunkCacheToggler); ok { + toggler.EnableChunkCache() + } + } + p, err := PostingsForMatchers(ctx, index, ms...) if err != nil { return storage.ErrSeriesSet(err) @@ -171,6 +184,12 @@ func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.S disableTrimming := false sharded := hints != nil && hints.ShardCount > 0 + if hints != nil && hints.Step > 0 { + if toggler, ok := chunks.(chunkCacheToggler); ok { + toggler.EnableChunkCache() + } + } + if hints != nil { mint = hints.Start maxt = hints.End