diff --git a/tsdb/head.go b/tsdb/head.go index e26e788aa7..4840f0cae5 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "runtime" + "slices" "strconv" "sync" stdatomic "sync/atomic" //nolint:depguard @@ -2638,6 +2639,22 @@ func (mc *memChunk) len() (count int) { return count } +func collectHeadChunks(head *memChunk, buf []*memChunk) []*memChunk { + if head == nil { + return buf + } + // Single walk: append newest-to-oldest (following prev pointers), then + // reverse to oldest-to-newest. Pointer-chasing the linked list is the + // expensive part; slices.Reverse on a contiguous array is essentially + // free by comparison. + hc := buf + for elem := head; elem != nil; elem = elem.prev { + hc = append(hc, elem) + } + slices.Reverse(hc) + return hc +} + // oldest returns the oldest element on the list. // For single element list this will be the same memChunk oldest() was called on. func (mc *memChunk) oldest() (elem *memChunk) { diff --git a/tsdb/head_read.go b/tsdb/head_read.go index f0a1331fbb..7ce90feb47 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -29,6 +29,11 @@ import ( "github.com/prometheus/prometheus/tsdb/index" ) +// headChunksBufMaxCap is the maximum capacity for the reusable headChunksBuf +// slice. If the buffer grows beyond this, it is released to avoid holding +// oversized backing arrays across many series iterations. +const headChunksBufMaxCap = 256 + func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { return h.exemplars.ExemplarQuerier(ctx) } @@ -45,9 +50,14 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { return &headIndexReader{head: h, mint: mint, maxt: maxt} } +// headIndexReader provides index reading for the head block. +// Not safe for concurrent use from multiple goroutines. type headIndexReader struct { head *Head mint, maxt int64 + // Reusable buffer for collectHeadChunks inside appendSeriesChunks, + // avoiding a per-series allocation during iteration. + headChunksBuf []*memChunk } func (*headIndexReader) Close() error { @@ -197,7 +207,10 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB defer s.Unlock() *chks = (*chks)[:0] - *chks = appendSeriesChunks(s, h.mint, h.maxt, *chks) + *chks, h.headChunksBuf = appendSeriesChunks(s, h.mint, h.maxt, *chks, h.headChunksBuf) + if cap(h.headChunksBuf) > headChunksBufMaxCap { + h.headChunksBuf = nil + } return nil } @@ -308,7 +321,10 @@ func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.Se return h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v)) } -func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta { +// appendSeriesChunks appends chunk metadata for s to chks. +// headChunksBuf is a reusable buffer for collectHeadChunks; the (possibly grown) buffer is returned +// so callers can pass it back on the next call to avoid per-series allocations. +func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta, headChunksBuf []*memChunk) ([]chunks.Meta, []*memChunk) { for i, c := range s.mmappedChunks { // Do not expose chunks that are outside of the specified range. if !c.OverlapsClosedInterval(mint, maxt) { @@ -321,28 +337,39 @@ func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []ch }) } - if s.headChunks != nil { - var maxTime int64 - var i, j int - for i = s.headChunks.len() - 1; i >= 0; i-- { - chk := s.headChunks.atOffset(i) - if i == 0 { - // Set the head chunk as open (being appended to) for the first headChunk. - maxTime = math.MaxInt64 - } else { - maxTime = chk.maxTime - } - if chk.OverlapsClosedInterval(mint, maxt) { - chks = append(chks, chunks.Meta{ - MinTime: chk.minTime, - MaxTime: maxTime, - Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))), - }) - } - j++ + if s.headChunks == nil { + return chks, headChunksBuf + } + + // Fast path: single head chunk — no allocation, no linked-list walk. + if s.headChunks.prev == nil { + if s.headChunks.OverlapsClosedInterval(mint, maxt) { + chks = append(chks, chunks.Meta{ + MinTime: s.headChunks.minTime, + MaxTime: math.MaxInt64, + Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)))), + }) + } + return chks, headChunksBuf + } + + // Multiple head chunks: collect once O(N), iterate O(N). + headChunksBuf = collectHeadChunks(s.headChunks, headChunksBuf[:0]) + clear(headChunksBuf[len(headChunksBuf):cap(headChunksBuf)]) + for i, chk := range headChunksBuf { + maxTime := chk.maxTime + if i == len(headChunksBuf)-1 { + maxTime = math.MaxInt64 // Open (newest) chunk. + } + if chk.OverlapsClosedInterval(mint, maxt) { + chks = append(chks, chunks.Meta{ + MinTime: chk.minTime, + MaxTime: maxTime, + Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+i))), + }) } } - return chks + return chks, headChunksBuf } // headChunkID returns the HeadChunkID referred to by the given position. @@ -421,10 +448,22 @@ func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkRead }, nil } +// headChunkReader provides chunk reading for the head block. +// Not safe for concurrent use from multiple goroutines. type headChunkReader struct { head *Head mint, maxt int64 isoState *isolationState + // When true, enables the head-chunks cache. Range queries benefit from + // caching because they look up every chunk of a series; instant queries + // only need one chunk per series, so the cache is wasted overhead. + enableCache bool + // Cache for head chunks — avoids O(n²) linked-list walks when + // iterating all chunks of a series oldest-to-newest. + cachedSeriesRef storage.SeriesRef + cachedHeadChunks []*memChunk + cachedHeadChunksHead *memChunk // Head pointer at collection time; detects head-chunk replacement. + cachedMmapLen int // len(s.mmappedChunks) at collection time; detects mmap events. } func (h *headChunkReader) Close() error { @@ -434,6 +473,30 @@ func (h *headChunkReader) Close() error { return nil } +func (h *headChunkReader) getOrCollectHeadChunks(s *memSeries) []*memChunk { + // Skip if the cache is disabled (instant queries) or there are no head chunks or there's only one. + if !h.enableCache || s.headChunks == nil || s.headChunks.prev == nil { + return nil + } + + ref := storage.SeriesRef(s.ref) + if ref == h.cachedSeriesRef && s.headChunks == h.cachedHeadChunksHead && h.cachedMmapLen == len(s.mmappedChunks) { + return h.cachedHeadChunks + } + + var buf []*memChunk + if h.cachedHeadChunks != nil { + buf = h.cachedHeadChunks[:0] + } + h.cachedHeadChunks = collectHeadChunks(s.headChunks, buf) + // Allow GC of *memChunk pointers left over from a previous, longer collection. + clear(h.cachedHeadChunks[len(h.cachedHeadChunks):cap(h.cachedHeadChunks)]) + h.cachedSeriesRef = ref + h.cachedHeadChunksHead = s.headChunks + h.cachedMmapLen = len(s.mmappedChunks) + return h.cachedHeadChunks +} + // ChunkOrIterable returns the chunk for the reference number. func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { chk, _, err := h.chunk(meta, false) @@ -465,7 +528,11 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. s.Lock() defer s.Unlock() - return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk) + var headChunks []*memChunk + if !isOOO { + headChunks = h.getOrCollectHeadChunks(s) + } + return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk, headChunks) } // Dumb thing to defeat chunk pool. @@ -474,12 +541,12 @@ type wrapOOOHeadChunk struct { } // Call with s locked. -func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool) (chunkenc.Chunk, int64, error) { +func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool, headChunks []*memChunk) (chunkenc.Chunk, int64, error) { if isOOO { chk, maxTime, err := s.oooChunk(cid, h.chunkDiskMapper, &h.memChunkPool) return wrapOOOHeadChunk{chk}, maxTime, err } - c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool) + c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool, headChunks) if err != nil { return nil, 0, err } @@ -523,7 +590,7 @@ func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, // If headChunk is false, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. // if isOpen is true, it means that the returned *memChunk is used for appends. -func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk, isOpen bool, err error) { +func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool, headChunks []*memChunk) (chunk *memChunk, headChunk, isOpen bool, err error) { // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix @@ -539,7 +606,9 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi ix := int(id) - int(s.firstChunkID) var headChunksLen int - if s.headChunks != nil { + if headChunks != nil { + headChunksLen = len(headChunks) + } else if s.headChunks != nil { headChunksLen = s.headChunks.len() } @@ -563,8 +632,19 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi return mc, false, false, nil } + // Head chunk lookup. ix -= len(s.mmappedChunks) + // Fast path: use pre-collected slice for O(1) indexed lookup. + if headChunks != nil { + if ix >= len(headChunks) { + return nil, false, false, storage.ErrNotFound + } + return headChunks[ix], true, ix == len(headChunks)-1, nil + } + + // Fallback: walk the linked list. + offset := headChunksLen - ix - 1 // headChunks is a linked list where first element is the most recent one and the last one is the oldest. // This order is reversed when compared with mmappedChunks, since mmappedChunks[0] is the oldest chunk, diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index 0849c257b5..ae2b360411 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "strconv" "sync" "testing" @@ -388,7 +389,7 @@ func TestMemSeries_chunk(t *testing.T) { tc.setup(t, series, chunkDiskMapper) } - chk, headChunk, isOpen, err := series.chunk(tc.inputID, chunkDiskMapper, memChunkPool) + chk, headChunk, isOpen, err := series.chunk(tc.inputID, chunkDiskMapper, memChunkPool, nil) switch tc.expected { case outOpenHeadChunk: require.NoError(t, err, "unexpected error") @@ -409,6 +410,71 @@ func TestMemSeries_chunk(t *testing.T) { } } +// TestMemSeries_chunk_FastPath verifies that the O(1) indexed lookup via a +// pre-collected headChunks slice returns identical results to the linked-list +// fallback (headChunks=nil) for every chunk in a series with mixed mmapped +// and head chunks. +func TestMemSeries_chunk_FastPath(t *testing.T) { + const chunkRange int64 = 100 + const chunkStep int64 = 5 + appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) { + for i := start; i < end; i += chunkStep { + ok, _ := s.append(0, i, float64(i), 0, chunkOpts{ + chunkDiskMapper: cdm, + chunkRange: chunkRange, + samplesPerChunk: DefaultSamplesPerChunk, + }) + require.True(t, ok, "sample append failed") + } + } + + dir := t.TempDir() + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) + require.NoError(t, err) + defer func() { + require.NoError(t, chunkDiskMapper.Close()) + }() + memChunkPool := &sync.Pool{New: func() any { return &memChunk{} }} + + series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false) + + // Build 3 mmapped + 3 head chunks. + appendSamples(t, series, 0, chunkRange*4, chunkDiskMapper) + series.mmapChunks(chunkDiskMapper) + require.Len(t, series.mmappedChunks, 3) + require.Equal(t, 1, series.headChunks.len()) + appendSamples(t, series, chunkRange*4, chunkRange*6, chunkDiskMapper) + require.Equal(t, 3, series.headChunks.len()) + require.Len(t, series.mmappedChunks, 3) + + hc := collectHeadChunks(series.headChunks, nil) + + headChunkCount := int(series.headChunkCount.Load()) + require.Equal(t, 3, headChunkCount, "expected 3 head chunks") + totalChunks := len(series.mmappedChunks) + headChunkCount + for ix := range totalChunks { + id := chunks.HeadChunkID(ix) + + // Linked-list fallback. + chkLL, headChunkLL, isOpenLL, errLL := series.chunk(id, chunkDiskMapper, memChunkPool, nil) + // Fast path with pre-collected slice. + chkFP, headChunkFP, isOpenFP, errFP := series.chunk(id, chunkDiskMapper, memChunkPool, hc) + + require.Equal(t, errLL, errFP, "ix=%d: error mismatch", ix) + require.Equal(t, headChunkLL, headChunkFP, "ix=%d: headChunk mismatch", ix) + require.Equal(t, isOpenLL, isOpenFP, "ix=%d: isOpen mismatch", ix) + if ix < len(series.mmappedChunks) { + // Mmapped chunks are loaded from disk into fresh memChunks, so + // pointer equality is not expected — compare the time range instead. + require.Equal(t, chkLL.minTime, chkFP.minTime, "ix=%d: minTime mismatch", ix) + require.Equal(t, chkLL.maxTime, chkFP.maxTime, "ix=%d: maxTime mismatch", ix) + } else { + // Head chunks should be pointer-identical. + require.Same(t, chkLL, chkFP, "ix=%d: head chunk pointer mismatch", ix) + } + } +} + func TestHeadIndexReader_PostingsForLabelMatching(t *testing.T) { testPostingsForLabelMatching(t, 0, func(t *testing.T, series []labels.Labels) IndexReader { opts := DefaultHeadOptions() @@ -430,3 +496,239 @@ func TestHeadIndexReader_PostingsForLabelMatching(t *testing.T) { return ir }) } + +func TestHeadChunkReaderCache(t *testing.T) { + t.Run("cache_hit", func(t *testing.T) { + // Verify that a second chunk lookup for the same (unchanged) series + // returns the cached head-chunks slice without re-collecting. + opts := DefaultHeadOptions() + opts.ChunkRange = 100 + opts.ChunkDirRoot = t.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, h.Close()) }) + + app := h.Appender(t.Context()) + lbls := labels.FromStrings("__name__", "test") + for i := int64(0); i < 500; i += 5 { + _, err := app.Append(0, lbls, i, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + s := h.series.getByID(1) + require.NotNil(t, s) + s.Lock() + require.Greater(t, s.headChunks.len(), 1, "need multiple head chunks for the test") + newestCID := s.firstChunkID + chunks.HeadChunkID(len(s.mmappedChunks)) + chunks.HeadChunkID(s.headChunks.len()) - 1 + newestRef := chunks.NewHeadChunkRef(s.ref, newestCID) + s.Unlock() + + cr, err := h.chunksRange(0, 10000, nil) + require.NoError(t, err) + cr.enableCache = true + + // First call: populates the cache. + _, _, err = cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false) + require.NoError(t, err) + require.NotNil(t, cr.cachedHeadChunks) + cachedSlice := cr.cachedHeadChunks + + // Second call (same series, no changes): must reuse the cached slice. + _, _, err = cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false) + require.NoError(t, err) + require.Same(t, &cachedSlice[0], &cr.cachedHeadChunks[0], "expected cache hit — slice backing array should be identical") + }) + + t.Run("invalidated_after_mmap", func(t *testing.T) { + // Regression test: after mmapChunks(), the head-chunks cache must be + // invalidated even though s.headChunks (the pointer) doesn't change. + // mmapChunks severs the linked list (prev=nil, len=1) but + // keeps the same head pointer, so a pointer-only check would return + // a stale cache with chunks that have been mmapped. + + opts := DefaultHeadOptions() + opts.ChunkRange = 100 + opts.ChunkDirRoot = t.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, h.Close()) }) + + // Append enough samples to create multiple head chunks. + // With ChunkRange=100 and DefaultSamplesPerChunk=120, each chunk + // holds ~20 samples (range/step = 100/5). We want >=3 head chunks. + app := h.Appender(t.Context()) + lbls := labels.FromStrings("__name__", "test") + for i := int64(0); i < 500; i += 5 { + _, err := app.Append(0, lbls, i, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + // Look up the series and verify we have multiple head chunks. + s := h.series.getByID(1) + require.NotNil(t, s) + s.Lock() + require.Greater(t, s.headChunks.len(), 1, "need multiple head chunks for the test") + headChunksLenBefore := s.headChunks.len() + newestChunkMinTime := s.headChunks.minTime + // The chunk ID for the newest head chunk: + // firstChunkID + len(mmapped) + headChunksLen - 1 + newestCID := s.firstChunkID + chunks.HeadChunkID(len(s.mmappedChunks)) + chunks.HeadChunkID(s.headChunks.len()) - 1 + newestRef := chunks.NewHeadChunkRef(s.ref, newestCID) + s.Unlock() + + // Create a headChunkReader with cache enabled and query the newest head chunk to populate the cache. + cr, err := h.chunksRange(0, 10000, nil) + require.NoError(t, err) + cr.enableCache = true + + chk1, _, err := cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false) + require.NoError(t, err) + require.NotNil(t, chk1) + + // Verify cache is populated. + require.NotNil(t, cr.cachedHeadChunks) + require.Len(t, cr.cachedHeadChunks, headChunksLenBefore) + + // Now mmap all but the newest head chunk — this severs the linked list. + s.Lock() + headPtrBefore := s.headChunks + s.mmapChunks(h.chunkDiskMapper) + require.Equal(t, 1, s.headChunks.len(), "after mmap, should have exactly 1 head chunk") + require.Same(t, headPtrBefore, s.headChunks, "head pointer should not change (the bug scenario)") + require.Equal(t, newestChunkMinTime, s.headChunks.minTime, "newest chunk should be the remaining head chunk") + + // Recompute the newest chunk ID after mmap: more mmapped chunks now. + newestCID = s.firstChunkID + chunks.HeadChunkID(len(s.mmappedChunks)) + chunks.HeadChunkID(s.headChunks.len()) - 1 + newestRef = chunks.NewHeadChunkRef(s.ref, newestCID) + s.Unlock() + + // Query the newest head chunk again. With the bug, the stale cache + // would be used and return a wrong (mmapped) chunk. + chk2, _, err := cr.chunk(chunks.Meta{Ref: chunks.ChunkRef(newestRef)}, false) + require.NoError(t, err) + require.NotNil(t, chk2) + + // After mmap, only 1 head chunk remains. The prev==nil guard skips + // the cache entirely, so the stale slice is preserved but not consulted. + require.Len(t, cr.cachedHeadChunks, headChunksLenBefore, "stale cache not cleared, but also not used") + + // Verify the returned chunk is actually the newest head chunk, not a stale cached entry. + it := chk2.Iterator(nil) + require.Equal(t, chunkenc.ValFloat, it.Next()) + ts, _ := it.At() + require.Equal(t, newestChunkMinTime, ts, "returned chunk should be the newest head chunk, not a stale cached entry") + }) + + t.Run("buffer_cap_release", func(t *testing.T) { + // Test that headChunksBuf is released when its capacity exceeds + // headChunksBufMaxCap, preventing unbounded memory retention. + opts := DefaultHeadOptions() + opts.ChunkRange = 1 + opts.ChunkDirRoot = t.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, h.Close()) }) + + // Append enough samples to create >headChunksBufMaxCap head chunks. + // With ChunkRange=1, each sample goes to a new chunk. + app := h.Appender(t.Context()) + lbls := labels.FromStrings("__name__", "cap_test") + for i := range int64(headChunksBufMaxCap) + 10 { + _, err := app.Append(0, lbls, i, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + s := h.series.getByID(1) + require.NotNil(t, s) + s.Lock() + require.Greater(t, s.headChunks.len(), headChunksBufMaxCap, "need >%d head chunks", headChunksBufMaxCap) + s.Unlock() + + // Call Series() via headIndexReader — this populates headChunksBuf. + ir := h.indexRange(0, int64(headChunksBufMaxCap)+10) + var builder labels.ScratchBuilder + var chks []chunks.Meta + require.NoError(t, ir.Series(1, &builder, &chks)) + require.Greater(t, len(chks), headChunksBufMaxCap) + + // Buffer should have been released because cap exceeded threshold. + require.Nil(t, ir.headChunksBuf, "headChunksBuf should be released when cap > headChunksBufMaxCap") + }) + + t.Run("buffer_cap_release_ooo_index_reader", func(t *testing.T) { + // Same as buffer_cap_release but exercises HeadAndOOOIndexReader.Series, + // which has its own headChunksBufMaxCap check. + opts := DefaultHeadOptions() + opts.ChunkRange = 1 + opts.ChunkDirRoot = t.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, h.Close()) }) + + app := h.Appender(t.Context()) + lbls := labels.FromStrings("__name__", "cap_test_ooo") + for i := range int64(headChunksBufMaxCap) + 10 { + _, err := app.Append(0, lbls, i, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + s := h.series.getByID(1) + require.NotNil(t, s) + s.Lock() + require.Greater(t, s.headChunks.len(), headChunksBufMaxCap, "need >%d head chunks", headChunksBufMaxCap) + s.Unlock() + + // Call Series() via HeadAndOOOIndexReader (non-OOO series, so the else branch is taken). + maxt := int64(headChunksBufMaxCap) + 10 + ir := NewHeadAndOOOIndexReader(h, 0, 0, maxt, 0) + var builder labels.ScratchBuilder + var chks []chunks.Meta + require.NoError(t, ir.Series(1, &builder, &chks)) + require.Greater(t, len(chks), headChunksBufMaxCap) + + // Buffer should have been released because cap exceeded threshold. + require.Nil(t, ir.headChunksBuf, "headChunksBuf should be released when cap > headChunksBufMaxCap") + }) +} + +var benchSink *memChunk + +// BenchmarkSeriesChunkIteration measures iterating all N head chunks of a series +// oldest-to-newest (the real query pattern) using the cached head-chunks slice. +func BenchmarkSeriesChunkIteration(b *testing.B) { + for _, n := range []int{1, 4, 16, 64, 256} { + b.Run(strconv.Itoa(n), func(b *testing.B) { + s := &memSeries{ + ref: 1, + firstChunkID: 0, + headChunks: buildHeadChunksLight(n), + } + hc := collectHeadChunks(s.headChunks, nil) + b.ReportAllocs() + for b.Loop() { + for i := range n { + benchSink, _, _, _ = s.chunk(chunks.HeadChunkID(i), nil, nil, hc) + } + } + }) + } +} + +// buildHeadChunksLight creates a memChunk linked list without allocating chunk +// encodings. Suitable for benchmarks that only need the linked-list structure +// and time ranges. +func buildHeadChunksLight(n int) *memChunk { + var head *memChunk + for i := range n { + head = &memChunk{ + minTime: int64(i) * 1000, + maxTime: int64(i)*1000 + 999, + prev: head, + } + } + return head +} diff --git a/tsdb/head_test.go b/tsdb/head_test.go index ac1c339f09..0bbdc4c12c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -793,7 +793,7 @@ func TestHead_ReadWAL(t *testing.T) { } // Verify samples and exemplar for series 10. - c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool, nil) require.NoError(t, err) require.Equal(t, []sample{{0, 100, 2, nil, nil}, {0, 101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) @@ -806,14 +806,14 @@ func TestHead_ReadWAL(t *testing.T) { require.True(t, exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("trace_id", "asdf")}.Equals(e[0].Exemplars[0])) // Verify samples for series 50 - c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool, nil) require.NoError(t, err) require.Equal(t, []sample{{0, 101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) // Verify records for series 100 and its duplicate, series 101. // The samples before the new series record should be discarded since a duplicate record // is only possible when old samples were compacted. - c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool, nil) require.NoError(t, err) require.Equal(t, []sample{{0, 101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) @@ -1570,21 +1570,21 @@ func TestMemSeries_truncateChunks(t *testing.T) { // that the ID of the last chunk still gives us the same chunk afterwards. countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk. lastID := s.headChunkID(countBefore - 1) - lastChunk, _, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool) + lastChunk, _, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool, nil) require.NoError(t, err) require.NotNil(t, lastChunk) - chk, _, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool) + chk, _, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool, nil) require.NotNil(t, chk) require.NoError(t, err) s.truncateChunksBefore(2000, 0) require.Equal(t, int64(2000), s.mmappedChunks[0].minTime) - _, _, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool) + _, _, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool, nil) require.Equal(t, storage.ErrNotFound, err, "first chunks not gone") require.Equal(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk. - chk, _, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool) + chk, _, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool, nil) require.NoError(t, err) require.Equal(t, lastChunk, chk) } @@ -3840,7 +3840,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { New: func() any { return &memChunk{} }, - }) + }, nil) require.NoError(t, err) it := c.chunk.Iterator(nil) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index ed3e7baeb5..0aa381a673 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -77,18 +77,23 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S *chks = (*chks)[:0] if s.ooo != nil { - return getOOOSeriesChunks(s, oh.head.opts.EnableXOR2Encoding.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks) + oh.headChunksBuf = getOOOSeriesChunks(s, oh.head.opts.EnableXOR2Encoding.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks, oh.headChunksBuf) + } else { + *chks, oh.headChunksBuf = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks, oh.headChunksBuf) + } + if cap(oh.headChunksBuf) > headChunksBufMaxCap { + oh.headChunksBuf = nil } - *chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks) return nil } +// getOOOSeriesChunks collects chunk metadata for OOO (and optionally in-order) data. // lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so // any chunk at or before this ref will not be considered. 0 disables this check. -// -// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then +// maxMmapRef tells up to what max m-map chunk that we can consider. If it is non-0, then // the oooHeadChunk will not be considered. -func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error { +// headChunksBuf is a reusable buffer for collectHeadChunks; the (possibly grown) buffer is returned. +func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta, headChunksBuf []*memChunk) []*memChunk { tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { @@ -109,7 +114,7 @@ func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbag chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime, useXOR2) if err != nil { handleChunkWriteError(err) - return nil + return headChunksBuf } for _, chk := range chks { addChunk(chk.minTime, chk.maxTime, ref, chk.chunk) @@ -129,12 +134,12 @@ func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbag } if includeInOrder { - tmpChks = appendSeriesChunks(s, inoMint, maxt, tmpChks) + tmpChks, headChunksBuf = appendSeriesChunks(s, inoMint, maxt, tmpChks, headChunksBuf) } // There is nothing to do if we did not collect any chunk. if len(tmpChks) == 0 { - return nil + return headChunksBuf } // Next we want to sort all the collected chunks by min time so we can find @@ -165,7 +170,7 @@ func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbag } *chks = append(*chks, toBeMerged) - return nil + return headChunksBuf } // Fake Chunk object to pass a set of Metas inside Meta.Chunk. @@ -207,11 +212,12 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) int { } type HeadAndOOOChunkReader struct { - head *Head - mint, maxt int64 - cr *headChunkReader // If nil, only read OOO chunks. - maxMmapRef chunks.ChunkDiskMapperRef - oooIsoState *oooIsolationState + head *Head + mint, maxt int64 + cr *headChunkReader // If nil, only read OOO chunks. + maxMmapRef chunks.ChunkDiskMapperRef + oooIsoState *oooIsolationState + headChunksBuf []*memChunk // Reusable buffer for collectHeadChunks when cr is nil. } func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { @@ -252,7 +258,11 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk defer s.Unlock() if meta.Chunk == nil { - c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk) + var headChunks []*memChunk + if !isOOO { + headChunks = cr.collectOrGetHeadChunks(s) + } + c, maxt, err := cr.head.chunkFromSeries(s, cid, isOOO, meta.MinTime, meta.MaxTime, isoState, copyLastChunk, headChunks) return c, nil, maxt, err } mm, ok := meta.Chunk.(*multiMeta) @@ -261,13 +271,17 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk } // We have a composite meta: construct a composite iterable. mc := &mergedOOOChunks{} + var headChunks []*memChunk for _, m := range mm.metas { switch { case m.Chunk != nil: mc.chunkIterables = append(mc.chunkIterables, m.Chunk) default: _, cid, isOOO := unpackHeadChunkRef(m.Ref) - iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk) + if !isOOO && headChunks == nil { + headChunks = cr.collectOrGetHeadChunks(s) + } + iterable, _, err := cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk, headChunks) if err != nil { return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err) } @@ -277,6 +291,36 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk return nil, mc, meta.MaxTime, nil } +// collectOrGetHeadChunks returns the pre-collected head chunks for s. When the +// underlying headChunkReader exists, it delegates to the per-series cache there. +// Otherwise it collects directly into cr.headChunksBuf — this does not cache +// across series, it only reuses the backing array to avoid per-call allocations. +func (cr *HeadAndOOOChunkReader) collectOrGetHeadChunks(s *memSeries) []*memChunk { + if cr.cr != nil { + return cr.cr.getOrCollectHeadChunks(s) + } + + // Defensive: in current callers, cr.cr is nil only for OOO compaction + // (OOOCompactionHead.Chunks), which only requests OOO chunks, so this + // branch is not reached. It is kept for correctness if callers change. + cr.headChunksBuf = collectHeadChunks(s.headChunks, cr.headChunksBuf[:0]) + clear(cr.headChunksBuf[len(cr.headChunksBuf):cap(cr.headChunksBuf)]) + hc := cr.headChunksBuf + if cap(cr.headChunksBuf) > headChunksBufMaxCap { + cr.headChunksBuf = nil + } + return hc +} + +// EnableChunkCache enables the head-chunk cache on the underlying headChunkReader. +// This should only be called for range queries where the cache provides O(1) lookups +// across multiple chunk accesses for the same series. +func (cr *HeadAndOOOChunkReader) EnableChunkCache() { + if cr.cr != nil { + cr.cr.enableCache = true + } +} + func (cr *HeadAndOOOChunkReader) Close() error { if cr.cr != nil && cr.cr.isoState != nil { cr.cr.isoState.Close() @@ -481,7 +525,8 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l return nil } - return getOOOSeriesChunks(s, ir.ch.head.opts.EnableXOR2Encoding.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks) + getOOOSeriesChunks(s, ir.ch.head.opts.EnableXOR2Encoding.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks, nil) + return nil } func (*OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, error) { diff --git a/tsdb/querier.go b/tsdb/querier.go index 6d0cf36db4..82936acdb9 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -117,12 +117,25 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora return selectSeriesSet(ctx, sortSeries, hints, ms, q.index, q.chunks, q.tombstones, q.mint, q.maxt) } +// chunkCacheToggler is an optional interface implemented by chunk readers that +// support an in-memory head-chunk cache. The cache is only beneficial for range +// queries (Step > 0) where every chunk of a series is accessed. +type chunkCacheToggler interface { + EnableChunkCache() +} + func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64, ) storage.SeriesSet { disableTrimming := false sharded := hints != nil && hints.ShardCount > 0 + if hints != nil && hints.Step > 0 { + if toggler, ok := chunks.(chunkCacheToggler); ok { + toggler.EnableChunkCache() + } + } + p, err := PostingsForMatchers(ctx, index, ms...) if err != nil { return storage.ErrSeriesSet(err) @@ -171,6 +184,12 @@ func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.S disableTrimming := false sharded := hints != nil && hints.ShardCount > 0 + if hints != nil && hints.Step > 0 { + if toggler, ok := chunks.(chunkCacheToggler); ok { + toggler.EnableChunkCache() + } + } + if hints != nil { mint = hints.Start maxt = hints.End