diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 6d04998e80..9817fe47a1 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -85,13 +85,21 @@ func (p HeadChunkRef) Unpack() (HeadSeriesRef, HeadChunkID) { // - less than the above, but >= memSeries.firstID, then it's // memSeries.mmappedChunks[i] where i = HeadChunkID - memSeries.firstID. // +// If memSeries.headChunks is non-nil it points to a *memChunk that holds the current +// "open" (accepting appends) instance. *memChunk is a linked list and memChunk.next pointer +// might link to the older *memChunk instance. +// If there are multiple *memChunk instances linked to each other from memSeries.headChunks +// they will be m-mapped as soon as possible leaving only "open" *memChunk instance. +// // Example: // assume a memSeries.firstChunkID=7 and memSeries.mmappedChunks=[p5,p6,p7,p8,p9]. // | HeadChunkID value | refers to ... | // |-------------------|----------------------------------------------------------------------------------------| // | 0-6 | chunks that have been compacted to blocks, these won't return data for queries in Head | // | 7-11 | memSeries.mmappedChunks[i] where i is 0 to 4. | -// | 12 | memSeries.headChunk | +// | 12 | *memChunk{next: nil} +// | 13 | *memChunk{next: ^} +// | 14 | memSeries.headChunks -> *memChunk{next: ^} type HeadChunkID uint64 // BlockChunkRef refers to a chunk within a persisted block. diff --git a/tsdb/db.go b/tsdb/db.go index 2ca6034a03..0c69ae6e69 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -973,6 +973,8 @@ func (db *DB) run() { case db.compactc <- struct{}{}: default: } + // We attempt mmapping of head chunks regularly. + db.head.mmapHeadChunks() case <-db.compactc: db.metrics.compactionsTriggered.Inc() diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 772fcf9d12..0eb361db7e 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1535,6 +1535,7 @@ func TestSizeRetention(t *testing.T) { } } require.NoError(t, headApp.Commit()) + db.Head().mmapHeadChunks() require.Eventually(t, func() bool { return db.Head().chunkDiskMapper.IsQueueEmpty() @@ -6049,12 +6050,14 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { // Check that m-map files gets deleted properly after compactions. + db.head.mmapHeadChunks() checkMmapFileContents([]string{"000001", "000002"}, nil) require.NoError(t, db.Compact()) checkMmapFileContents([]string{"000002"}, []string{"000001"}) require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted") addSamples(501, 650) + db.head.mmapHeadChunks() checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"}) require.NoError(t, db.Compact()) checkMmapFileContents(nil, []string{"000001", "000002", "000003"}) diff --git a/tsdb/head.go b/tsdb/head.go index 499be067ad..e18bd55a5c 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -344,6 +344,7 @@ type headMetrics struct { mmapChunkCorruptionTotal prometheus.Counter snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1. oooHistogram prometheus.Histogram + mmapChunksTotal prometheus.Counter } const ( @@ -468,6 +469,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { 60 * 60 * 12, // 12h }, }), + mmapChunksTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_mmap_chunks_total", + Help: "Total number of chunks that were memory-mapped.", + }), } if r != nil { @@ -495,6 +500,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.checkpointDeleteTotal, m.checkpointCreationFail, m.checkpointCreationTotal, + m.mmapChunksTotal, m.mmapChunkCorruptionTotal, m.snapshotReplayErrorTotal, // Metrics bound to functions and not needed in tests @@ -880,11 +886,11 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) numSamples: numSamples, }) h.updateMinMaxTime(mint, maxt) - if ms.headChunk != nil && maxt >= ms.headChunk.minTime { + if ms.headChunks != nil && maxt >= ms.headChunks.minTime { // The head chunk was completed and was m-mapped after taking the snapshot. // Hence remove this chunk. ms.nextAt = 0 - ms.headChunk = nil + ms.headChunks = nil ms.app = nil } return nil @@ -1574,6 +1580,10 @@ func (h *Head) Close() error { defer h.closedMtx.Unlock() h.closed = true + // mmap all but last chunk in case we're performing snapshot since that only + // takes samples from most recent head chunk. + h.mmapHeadChunks() + errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close()) if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown { errs.Add(h.performChunkSnapshot()) @@ -1630,6 +1640,37 @@ func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labe return s, true, nil } +// mmapHeadChunks will iterate all memSeries stored on Head and call mmapHeadChunks() on each of them. +// +// There are two types of chunks that store samples for each memSeries: +// A) Head chunk - stored on Go heap, when new samples are appended they go there. +// B) M-mapped chunks - memory mapped chunks, kernel manages the memory for us on-demand, these chunks +// +// are read-only. +// +// Calling mmapHeadChunks() will iterate all memSeries and m-mmap all chunks that should be m-mapped. +// The m-mapping operation is needs to be serialised and so it goes via central lock. +// If there are multiple concurrent memSeries that need to m-map some chunk then they can block each-other. +// +// To minimise the effect of locking on TSDB operations m-mapping is serialised and done away from +// sample append path, since waiting on a lock inside an append would lock the entire memSeries for +// (potentially) a long time, since that could eventually delay next scrape and/or cause query timeouts. +func (h *Head) mmapHeadChunks() { + var count int + for i := 0; i < h.series.size; i++ { + h.series.locks[i].RLock() + for _, all := range h.series.hashes[i] { + for _, series := range all { + series.Lock() + count += series.mmapChunks(h.chunkDiskMapper) + series.Unlock() + } + } + h.series.locks[i].RUnlock() + } + h.metrics.mmapChunksTotal.Add(float64(count)) +} + // seriesHashmap is a simple hashmap for memSeries by their label set. It is built // on top of a regular hashmap and holds a slice of series to resolve hash collisions. // Its methods require the hash to be submitted with it to avoid re-computations throughout @@ -1760,7 +1801,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( minOOOTime = series.ooo.oooHeadChunk.minTime } } - if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit || + if len(series.mmappedChunks) > 0 || series.headChunks != nil || series.pendingCommit || (series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) { seriesMint := series.minTime() if seriesMint < actualMint { @@ -1915,8 +1956,11 @@ type memSeries struct { // // pN is the pointer to the mmappedChunk referered to by HeadChunkID=N mmappedChunks []*mmappedChunk - headChunk *memChunk // Most recent chunk in memory that's still being built. - firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0] + // Most recent chunks in memory that are still being built or waiting to be mmapped. + // This is a linked list, headChunks points to the most recent chunk, headChunks.next points + // to older chunk and so on. + headChunks *memChunk + firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0] ooo *memSeriesOOOFields @@ -1932,7 +1976,7 @@ type memSeries struct { lastFloatHistogramValue *histogram.FloatHistogram // Current appender for the head chunk. Set when a new head chunk is cut. - // It is nil only if headChunk is nil. E.g. if there was an appender that created a new series, but rolled back the commit + // It is nil only if headChunks is nil. E.g. if there was an appender that created a new series, but rolled back the commit // (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series). app chunkenc.Appender @@ -1966,17 +2010,16 @@ func (s *memSeries) minTime() int64 { if len(s.mmappedChunks) > 0 { return s.mmappedChunks[0].minTime } - if s.headChunk != nil { - return s.headChunk.minTime + if s.headChunks != nil { + return s.headChunks.oldest().minTime } return math.MinInt64 } func (s *memSeries) maxTime() int64 { // The highest timestamps will always be in the regular (non-OOO) chunks, even if OOO is enabled. - c := s.head() - if c != nil { - return c.maxTime + if s.headChunks != nil { + return s.headChunks.maxTime } if len(s.mmappedChunks) > 0 { return s.mmappedChunks[len(s.mmappedChunks)-1].maxTime @@ -1989,12 +2032,29 @@ func (s *memSeries) maxTime() int64 { // Chunk IDs remain unchanged. func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) int { var removedInOrder int - if s.headChunk != nil && s.headChunk.maxTime < mint { - // If head chunk is truncated, we can truncate all mmapped chunks. - removedInOrder = 1 + len(s.mmappedChunks) - s.firstChunkID += chunks.HeadChunkID(removedInOrder) - s.headChunk = nil - s.mmappedChunks = nil + if s.headChunks != nil { + var i int + var nextChk *memChunk + chk := s.headChunks + for chk != nil { + if chk.maxTime < mint { + // If any head chunk is truncated, we can truncate all mmapped chunks. + removedInOrder = chk.len() + len(s.mmappedChunks) + s.firstChunkID += chunks.HeadChunkID(removedInOrder) + if i == 0 { + // This is the first chunk on the list so we need to remove the entire list. + s.headChunks = nil + } else { + // This is NOT the first chunk, unlink it from parent. + nextChk.prev = nil + } + s.mmappedChunks = nil + break + } + nextChk = chk + chk = chk.prev + i++ + } } if len(s.mmappedChunks) > 0 { for i, c := range s.mmappedChunks { @@ -2034,13 +2094,52 @@ func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { } } -func (s *memSeries) head() *memChunk { - return s.headChunk -} - type memChunk struct { chunk chunkenc.Chunk minTime, maxTime int64 + prev *memChunk // Link to the previous element on the list. +} + +// len returns the length of memChunk list, including the element it was called on. +func (mc *memChunk) len() (count int) { + elem := mc + for elem != nil { + count++ + elem = elem.prev + } + return count +} + +// oldest returns the oldest element on the list. +// For single element list this will be the same memChunk oldest() was called on. +func (mc *memChunk) oldest() (elem *memChunk) { + elem = mc + for elem.prev != nil { + elem = elem.prev + } + return elem +} + +// atOffset returns a memChunk that's Nth element on the linked list. +func (mc *memChunk) atOffset(offset int) (elem *memChunk) { + if offset == 0 { + return mc + } + if offset < 0 { + return nil + } + + var i int + elem = mc + for i < offset { + i++ + elem = elem.prev + if elem == nil { + break + } + } + + return elem } type oooHeadChunk struct { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index cbbb60f031..b981831648 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -395,7 +395,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTimeWindow int64) (isOOO bool, oooDelta int64, err error) { // Check if we can append in the in-order chunk. if t >= minValidTime { - if s.head() == nil { + if s.headChunks == nil { // The series has no sample and was freshly created. return false, 0, nil } @@ -433,15 +433,14 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi // appendableHistogram checks whether the given histogram is valid for appending to the series. func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error { - c := s.head() - if c == nil { + if s.headChunks == nil { return nil } - if t > c.maxTime { + if t > s.headChunks.maxTime { return nil } - if t < c.maxTime { + if t < s.headChunks.maxTime { return storage.ErrOutOfOrderSample } @@ -455,15 +454,14 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error { // appendableFloatHistogram checks whether the given float histogram is valid for appending to the series. func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram) error { - c := s.head() - if c == nil { + if s.headChunks == nil { return nil } - if t > c.maxTime { + if t > s.headChunks.maxTime { return nil } - if t < c.maxTime { + if t < s.headChunks.maxTime { return storage.ErrOutOfOrderSample } @@ -1200,12 +1198,11 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui return true, false } - // This is a brand new chunk, switch out the head chunk (based on cutNewHeadChunk). - s.mmapCurrentHeadChunk(o.chunkDiskMapper) - s.headChunk = &memChunk{ + s.headChunks = &memChunk{ chunk: newChunk, minTime: t, maxTime: t, + prev: s.headChunks, } s.nextAt = rangeForTimestamp(t, o.chunkRange) return true, true @@ -1258,12 +1255,11 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, return true, false } - // This is a brand new chunk, switch out the head chunk (based on cutNewHeadChunk). - s.mmapCurrentHeadChunk(o.chunkDiskMapper) - s.headChunk = &memChunk{ + s.headChunks = &memChunk{ chunk: newChunk, minTime: t, maxTime: t, + prev: s.headChunks, } s.nextAt = rangeForTimestamp(t, o.chunkRange) return true, true @@ -1273,7 +1269,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { - c = s.head() + c = s.headChunks if c == nil { if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { @@ -1281,7 +1277,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts return c, false, false } // There is no head chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } @@ -1293,8 +1289,9 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts if c.chunk.Encoding() != e { // The chunk encoding expected by this append is different than the head chunk's // encoding. So we cut a new chunk with the expected encoding. - c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true + } numSamples := c.chunk.NumSamples() @@ -1318,7 +1315,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts // as we expect more chunks to come. // Note that next chunk will have its nextAt recalculated for the new rate. if t >= s.nextAt || numSamples >= o.samplesPerChunk*2 { - c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } @@ -1338,36 +1335,37 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/n } -func (s *memSeries) cutNewHeadChunk( - mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, -) *memChunk { - s.mmapCurrentHeadChunk(chunkDiskMapper) - - s.headChunk = &memChunk{ +func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk { + // When cutting a new head chunk we create a new memChunk instance with .prev + // pointing at the current .headChunks, so it forms a linked list. + // All but first headChunks list elements will be m-mapped as soon as possible + // so this is a single element list most of the time. + s.headChunks = &memChunk{ minTime: mint, maxTime: math.MinInt64, + prev: s.headChunks, } if chunkenc.IsValidEncoding(e) { var err error - s.headChunk.chunk, err = chunkenc.NewEmptyChunk(e) + s.headChunks.chunk, err = chunkenc.NewEmptyChunk(e) if err != nil { panic(err) // This should never happen. } } else { - s.headChunk.chunk = chunkenc.NewXORChunk() + s.headChunks.chunk = chunkenc.NewXORChunk() } // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. s.nextAt = rangeForTimestamp(mint, chunkRange) - app, err := s.headChunk.chunk.Appender() + app, err := s.headChunks.chunk.Appender() if err != nil { panic(err) } s.app = app - return s.headChunk + return s.headChunks } // cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk. @@ -1401,19 +1399,32 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap return chunkRef } -func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) { - if s.headChunk == nil || s.headChunk.chunk.NumSamples() == 0 { - // There is no head chunk, so nothing to m-map here. +// mmapChunks will m-map all but first chunk on s.headChunks list. +func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count int) { + if s.headChunks == nil || s.headChunks.prev == nil { + // There is none or only one head chunk, so nothing to m-map here. return } - chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, false, handleChunkWriteError) - s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ - ref: chunkRef, - numSamples: uint16(s.headChunk.chunk.NumSamples()), - minTime: s.headChunk.minTime, - maxTime: s.headChunk.maxTime, - }) + // Write chunks starting from the oldest one and stop before we get to current s.headChunk. + // If we have this chain: s.headChunk{t4} -> t3 -> t2 -> t1 -> t0 + // then we need to write chunks t0 to t3, but skip s.headChunks. + for i := s.headChunks.len() - 1; i > 0; i-- { + chk := s.headChunks.atOffset(i) + chunkRef := chunkDiskMapper.WriteChunk(s.ref, chk.minTime, chk.maxTime, chk.chunk, false, handleChunkWriteError) + s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ + ref: chunkRef, + numSamples: uint16(chk.chunk.NumSamples()), + minTime: chk.minTime, + maxTime: chk.maxTime, + }) + count++ + } + + // Once we've written out all chunks except s.headChunks we need to unlink these from s.headChunk. + s.headChunks.prev = nil + + return count } func handleChunkWriteError(err error) { diff --git a/tsdb/head_read.go b/tsdb/head_read.go index b2af74ace9..f27d4ef762 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -174,12 +174,27 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))), }) } - if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) { - *chks = append(*chks, chunks.Meta{ - MinTime: s.headChunk.minTime, - MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to). - Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)))), - }) + + if s.headChunks != nil { + var maxTime int64 + var i, j int + for i = s.headChunks.len() - 1; i >= 0; i-- { + chk := s.headChunks.atOffset(i) + if i == 0 { + // Set the head chunk as open (being appended to) for the first headChunk. + maxTime = math.MaxInt64 + } else { + maxTime = chk.maxTime + } + if chk.OverlapsClosedInterval(h.mint, h.maxt) { + *chks = append(*chks, chunks.Meta{ + MinTime: chk.minTime, + MaxTime: maxTime, + Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))), + }) + } + j++ + } } return nil @@ -187,7 +202,7 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB // headChunkID returns the HeadChunkID referred to by the given position. // * 0 <= pos < len(s.mmappedChunks) refer to s.mmappedChunks[pos] -// * pos == len(s.mmappedChunks) refers to s.headChunk +// * pos >= len(s.mmappedChunks) refers to s.headChunks linked list func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID { return chunks.HeadChunkID(pos) + s.firstChunkID } @@ -296,7 +311,7 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. } s.Lock() - c, headChunk, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool) + c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool) if err != nil { s.Unlock() return nil, 0, err @@ -305,6 +320,7 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. if !headChunk { // Set this to nil so that Go GC can collect it after it has been used. c.chunk = nil + c.prev = nil h.head.memChunkPool.Put(c) } }() @@ -316,14 +332,14 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. } chk, maxTime := c.chunk, c.maxTime - if headChunk && copyLastChunk { + if headChunk && isOpen && copyLastChunk { // The caller may ask to copy the head chunk in order to take the // bytes of the chunk without causing the race between read and append. - b := s.headChunk.chunk.Bytes() + b := s.headChunks.chunk.Bytes() newB := make([]byte, len(b)) copy(newB, b) // TODO(codesome): Use bytes.Clone() when we upgrade to Go 1.20. // TODO(codesome): Put back in the pool (non-trivial). - chk, err = h.head.opts.ChunkPool.Get(s.headChunk.chunk.Encoding(), newB) + chk, err = h.head.opts.ChunkPool.Get(s.headChunks.chunk.Encoding(), newB) if err != nil { return nil, 0, err } @@ -341,34 +357,60 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. // chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk. // If headChunk is false, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. -func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk bool, err error) { +// if isOpen is true, it means that the returned *memChunk is used for appends. +func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk, isOpen bool, err error) { // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix - // is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. + // is >= len(s.mmappedChunks), it represents one of the chunks on s.headChunks linked list. + // The order of elemens is different for slice and linked list. + // For s.mmappedChunks slice newer chunks are appended to it. + // For s.headChunks list newer chunks are prepended to it. + // + // memSeries { + // mmappedChunks: [t0, t1, t2] + // headChunk: {t5}->{t4}->{t3} + // } ix := int(id) - int(s.firstChunkID) - if ix < 0 || ix > len(s.mmappedChunks) { - return nil, false, storage.ErrNotFound + + var headChunksLen int + if s.headChunks != nil { + headChunksLen = s.headChunks.len() } - if ix == len(s.mmappedChunks) { - if s.headChunk == nil { - return nil, false, errors.New("invalid head chunk") - } - return s.headChunk, true, nil + if ix < 0 || ix > len(s.mmappedChunks)+headChunksLen-1 { + return nil, false, false, storage.ErrNotFound } - chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) - if err != nil { - if _, ok := err.(*chunks.CorruptionErr); ok { - panic(err) + + if ix < len(s.mmappedChunks) { + chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) + if err != nil { + if _, ok := err.(*chunks.CorruptionErr); ok { + panic(err) + } + return nil, false, false, err } - return nil, false, err + mc := memChunkPool.Get().(*memChunk) + mc.chunk = chk + mc.minTime = s.mmappedChunks[ix].minTime + mc.maxTime = s.mmappedChunks[ix].maxTime + return mc, false, false, nil } - mc := memChunkPool.Get().(*memChunk) - mc.chunk = chk - mc.minTime = s.mmappedChunks[ix].minTime - mc.maxTime = s.mmappedChunks[ix].maxTime - return mc, false, nil + + ix -= len(s.mmappedChunks) + + offset := headChunksLen - ix - 1 + // headChunks is a linked list where first element is the most recent one and the last one is the oldest. + // This order is reversed when compared with mmappedChunks, since mmappedChunks[0] is the oldest chunk, + // while headChunk.atOffset(0) would give us the most recent chunk. + // So when calling headChunk.atOffset() we need to reverse the value of ix. + elem := s.headChunks.atOffset(offset) + if elem == nil { + // This should never really happen and would mean that headChunksLen value is NOT equal + // to the length of the headChunks list. + return nil, false, false, storage.ErrNotFound + } + return elem, true, offset == 0, nil } // oooMergedChunk returns the requested chunk based on the given chunks.Meta @@ -660,8 +702,21 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState * } } - if s.headChunk != nil { - totalSamples += s.headChunk.chunk.NumSamples() + ix -= len(s.mmappedChunks) + if s.headChunks != nil { + // Iterate all head chunks from the oldest to the newest. + headChunksLen := s.headChunks.len() + for j := headChunksLen - 1; j >= 0; j-- { + chk := s.headChunks.atOffset(j) + chkSamples := chk.chunk.NumSamples() + totalSamples += chkSamples + // Chunk ID is len(s.mmappedChunks) + $(headChunks list position). + // Where $(headChunks list position) is zero for the oldest chunk and $(s.headChunks.len() - 1) + // for the newest (open) chunk. + if headChunksLen-1-j < ix { + previousSamples += chkSamples + } + } } // Removing the extra transactionIDs that are relevant for samples that diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index 2712bcd1a1..ad0a59d34e 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -15,11 +15,14 @@ package tsdb import ( "fmt" + "sync" "testing" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" ) func TestBoundedChunk(t *testing.T) { @@ -176,3 +179,387 @@ func newTestChunk(numSamples int) chunkenc.Chunk { } return xor } + +// TestMemSeries_chunk runs a series of tests on memSeries.chunk() calls. +// It will simulate various conditions to ensure all code paths in that function are covered. +func TestMemSeries_chunk(t *testing.T) { + const chunkRange int64 = 100 + const chunkStep int64 = 5 + + appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) { + for i := start; i < end; i += chunkStep { + ok, _ := s.append(i, float64(i), 0, chunkOpts{ + chunkDiskMapper: cdm, + chunkRange: chunkRange, + samplesPerChunk: DefaultSamplesPerChunk, + }) + require.True(t, ok, "sample append failed") + } + } + + type setupFn func(*testing.T, *memSeries, *chunks.ChunkDiskMapper) + + type callOutput uint8 + const ( + outOpenHeadChunk callOutput = iota // memSeries.chunk() call returned memSeries.headChunks with headChunk=true & isOpen=true + outClosedHeadChunk // memSeries.chunk() call returned memSeries.headChunks with headChunk=true & isOpen=false + outMmappedChunk // memSeries.chunk() call returned a chunk from memSeries.mmappedChunks with headChunk=false + outErr // memSeries.chunk() call returned an error + ) + + tests := []struct { + name string + setup setupFn // optional function called just before the test memSeries.chunk() call + inputID chunks.HeadChunkID // requested chunk id for memSeries.chunk() call + expected callOutput + }{ + { + name: "call ix=0 on empty memSeries", + inputID: 0, + expected: outErr, + }, + { + name: "call ix=1 on empty memSeries", + inputID: 1, + expected: outErr, + }, + { + name: "firstChunkID > ix", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange, cdm) + require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, chunkRange-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + s.firstChunkID = 5 + }, + inputID: 1, + expected: outErr, + }, + { + name: "call ix=0 on memSeries with no mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange, cdm) + require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, chunkRange-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 0, + expected: outOpenHeadChunk, + }, + { + name: "call ix=1 on memSeries with no mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange, cdm) + require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, chunkRange-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 1, + expected: outErr, + }, + { + name: "call ix=10 on memSeries with no mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange, cdm) + require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, chunkRange-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 10, + expected: outErr, + }, + { + name: "call ix=0 on memSeries with 3 mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 0, + expected: outMmappedChunk, + }, + { + name: "call ix=1 on memSeries with 3 mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 1, + expected: outMmappedChunk, + }, + { + name: "call ix=3 on memSeries with 3 mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 3, + expected: outOpenHeadChunk, + }, + { + name: "call ix=0 on memSeries with 3 mmapped chunks and no headChunk", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + s.headChunks = nil + }, + inputID: 0, + expected: outMmappedChunk, + }, + { + name: "call ix=2 on memSeries with 3 mmapped chunks and no headChunk", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + s.headChunks = nil + }, + inputID: 2, + expected: outMmappedChunk, + }, + { + name: "call ix=3 on memSeries with 3 mmapped chunks and no headChunk", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + s.headChunks = nil + }, + inputID: 3, + expected: outErr, + }, + { + name: "call ix=1 on memSeries with 3 mmapped chunks and closed ChunkDiskMapper", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + cdm.Close() + }, + inputID: 1, + expected: outErr, + }, + { + name: "call ix=3 on memSeries with 3 mmapped chunks and closed ChunkDiskMapper", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + cdm.Close() + }, + inputID: 3, + expected: outOpenHeadChunk, + }, + { + name: "call ix=0 on memSeries with 3 head chunks and no mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*3, cdm) + require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks") + require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*3)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 0, + expected: outClosedHeadChunk, + }, + { + name: "call ix=1 on memSeries with 3 head chunks and no mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*3, cdm) + require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks") + require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*3)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 1, + expected: outClosedHeadChunk, + }, + { + name: "call ix=10 on memSeries with 3 head chunks and no mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*3, cdm) + require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks") + require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*3)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 10, + expected: outErr, + }, + { + name: "call ix=0 on memSeries with 3 head chunks and 3 mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + + appendSamples(t, s, chunkRange*4, chunkRange*6, cdm) + require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks") + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 0, + expected: outMmappedChunk, + }, + { + name: "call ix=2 on memSeries with 3 head chunks and 3 mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + + appendSamples(t, s, chunkRange*4, chunkRange*6, cdm) + require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks") + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 2, + expected: outMmappedChunk, + }, + { + name: "call ix=3 on memSeries with 3 head chunks and 3 mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + + appendSamples(t, s, chunkRange*4, chunkRange*6, cdm) + require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks") + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 3, + expected: outClosedHeadChunk, + }, + { + name: "call ix=5 on memSeries with 3 head chunks and 3 mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + + appendSamples(t, s, chunkRange*4, chunkRange*6, cdm) + require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks") + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 5, + expected: outOpenHeadChunk, + }, + { + name: "call ix=6 on memSeries with 3 head chunks and 3 mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + + appendSamples(t, s, chunkRange*4, chunkRange*6, cdm) + require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks") + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 6, + expected: outErr, + }, + + { + name: "call ix=10 on memSeries with 3 head chunks and 3 mmapped chunks", + setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) { + appendSamples(t, s, 0, chunkRange*4, cdm) + s.mmapChunks(cdm) + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks") + + appendSamples(t, s, chunkRange*4, chunkRange*6, cdm) + require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks") + require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks") + require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element") + require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element") + }, + inputID: 10, + expected: outErr, + }, + } + + memChunkPool := &sync.Pool{ + New: func() interface{} { + return &memChunk{} + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) + require.NoError(t, err) + defer func() { + require.NoError(t, chunkDiskMapper.Close()) + }() + + series := newMemSeries(labels.EmptyLabels(), 1, true) + + if tc.setup != nil { + tc.setup(t, series, chunkDiskMapper) + } + + chk, headChunk, isOpen, err := series.chunk(tc.inputID, chunkDiskMapper, memChunkPool) + switch tc.expected { + case outOpenHeadChunk: + require.NoError(t, err, "unexpected error") + require.True(t, headChunk, "expected a chunk with headChunk=true but got headChunk=%v", headChunk) + require.True(t, isOpen, "expected a chunk with isOpen=true but got isOpen=%v", isOpen) + case outClosedHeadChunk: + require.NoError(t, err, "unexpected error") + require.True(t, headChunk, "expected a chunk with headChunk=true but got headChunk=%v", headChunk) + require.False(t, isOpen, "expected a chunk with isOpen=false but got isOpen=%v", isOpen) + case outMmappedChunk: + require.NoError(t, err, "unexpected error") + require.False(t, headChunk, "expected a chunk with headChunk=false but got gc=%v", headChunk) + case outErr: + require.Nil(t, chk, "got a non-nil chunk reference returned with an error") + require.Error(t, err) + } + }) + } +} diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 0a6eef66ca..9b49aca034 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -292,7 +292,7 @@ func BenchmarkLoadWAL(b *testing.B) { // Create one mmapped chunk per series, with one sample at the given time. s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled) s.append(c.mmappedChunkT, 42, 0, cOpts) - s.mmapCurrentHeadChunk(chunkDiskMapper) + s.mmapChunks(chunkDiskMapper) } require.NoError(b, chunkDiskMapper.Close()) } @@ -587,15 +587,15 @@ func TestHead_ReadWAL(t *testing.T) { return x } - c, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool) require.NoError(t, err) require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) - c, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool) require.NoError(t, err) require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) // The samples before the new series record should be discarded since a duplicate record // is only possible when old samples were compacted. - c, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool) require.NoError(t, err) require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) @@ -822,30 +822,200 @@ func TestMemSeries_truncateChunks(t *testing.T) { ok, _ := s.append(int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } + s.mmapChunks(chunkDiskMapper) // Check that truncate removes half of the chunks and afterwards // that the ID of the last chunk still gives us the same chunk afterwards. countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk. lastID := s.headChunkID(countBefore - 1) - lastChunk, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool) + lastChunk, _, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool) require.NoError(t, err) require.NotNil(t, lastChunk) - chk, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool) + chk, _, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool) require.NotNil(t, chk) require.NoError(t, err) s.truncateChunksBefore(2000, 0) require.Equal(t, int64(2000), s.mmappedChunks[0].minTime) - _, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool) + _, _, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool) require.Equal(t, storage.ErrNotFound, err, "first chunks not gone") require.Equal(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk. - chk, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool) + chk, _, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool) require.NoError(t, err) require.Equal(t, lastChunk, chk) } +func TestMemSeries_truncateChunks_scenarios(t *testing.T) { + const chunkRange = 100 + const chunkStep = 5 + + tests := []struct { + name string + headChunks int // the number of head chubks to create on memSeries by appending enough samples + mmappedChunks int // the number of mmapped chunks to create on memSeries by appending enough samples + truncateBefore int64 // the mint to pass to truncateChunksBefore() + expectedTruncated int // the number of chunks that we're expecting be truncated and returned by truncateChunksBefore() + expectedHead int // the expected number of head chunks after truncation + expectedMmap int // the expected number of mmapped chunks after truncation + expectedFirstChunkID chunks.HeadChunkID // the expected series.firstChunkID after truncation + }{ + { + name: "empty memSeries", + truncateBefore: chunkRange * 10, + }, + { + name: "single head chunk, not truncated", + headChunks: 1, + expectedHead: 1, + }, + { + name: "single head chunk, truncated", + headChunks: 1, + truncateBefore: chunkRange, + expectedTruncated: 1, + expectedHead: 0, + expectedFirstChunkID: 1, + }, + { + name: "2 head chunks, not truncated", + headChunks: 2, + expectedHead: 2, + }, + { + name: "2 head chunks, first truncated", + headChunks: 2, + truncateBefore: chunkRange, + expectedTruncated: 1, + expectedHead: 1, + expectedFirstChunkID: 1, + }, + { + name: "2 head chunks, everything truncated", + headChunks: 2, + truncateBefore: chunkRange * 2, + expectedTruncated: 2, + expectedHead: 0, + expectedFirstChunkID: 2, + }, + { + name: "no head chunks, 3 mmap chunks, second mmap truncated", + headChunks: 0, + mmappedChunks: 3, + truncateBefore: chunkRange * 2, + expectedTruncated: 2, + expectedHead: 0, + expectedMmap: 1, + expectedFirstChunkID: 2, + }, + { + name: "single head chunk, single mmap chunk, not truncated", + headChunks: 1, + mmappedChunks: 1, + expectedHead: 1, + expectedMmap: 1, + }, + { + name: "single head chunk, single mmap chunk, mmap truncated", + headChunks: 1, + mmappedChunks: 1, + truncateBefore: chunkRange, + expectedTruncated: 1, + expectedHead: 1, + expectedMmap: 0, + expectedFirstChunkID: 1, + }, + { + name: "5 head chunk, 5 mmap chunk, third head truncated", + headChunks: 5, + mmappedChunks: 5, + truncateBefore: chunkRange * 7, + expectedTruncated: 7, + expectedHead: 3, + expectedMmap: 0, + expectedFirstChunkID: 7, + }, + { + name: "2 head chunks, 3 mmap chunks, second mmap truncated", + headChunks: 2, + mmappedChunks: 3, + truncateBefore: chunkRange * 2, + expectedTruncated: 2, + expectedHead: 2, + expectedMmap: 1, + expectedFirstChunkID: 2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) + require.NoError(t, err) + defer func() { + require.NoError(t, chunkDiskMapper.Close()) + }() + + series := newMemSeries(labels.EmptyLabels(), 1, true) + + cOpts := chunkOpts{ + chunkDiskMapper: chunkDiskMapper, + chunkRange: chunkRange, + samplesPerChunk: DefaultSamplesPerChunk, + } + + var headStart int + if tc.mmappedChunks > 0 { + headStart = (tc.mmappedChunks + 1) * chunkRange + for i := 0; i < (tc.mmappedChunks+1)*chunkRange; i += chunkStep { + ok, _ := series.append(int64(i), float64(i), 0, cOpts) + require.True(t, ok, "sample append failed") + } + series.mmapChunks(chunkDiskMapper) + } + + if tc.headChunks == 0 { + series.headChunks = nil + } else { + for i := headStart; i < chunkRange*(tc.mmappedChunks+tc.headChunks); i += chunkStep { + ok, _ := series.append(int64(i), float64(i), 0, cOpts) + require.True(t, ok, "sample append failed: %d", i) + } + } + + if tc.headChunks > 0 { + require.NotNil(t, series.headChunks, "head chunk is missing") + require.Equal(t, tc.headChunks, series.headChunks.len(), "wrong number of head chunks") + } else { + require.Nil(t, series.headChunks, "head chunk is present") + } + require.Equal(t, tc.mmappedChunks, len(series.mmappedChunks), "wrong number of mmapped chunks") + + truncated := series.truncateChunksBefore(tc.truncateBefore, 0) + require.Equal(t, tc.expectedTruncated, truncated, "wrong number of truncated chunks returned") + + require.Equal(t, tc.expectedMmap, len(series.mmappedChunks), "wrong number of mmappedChunks after truncation") + + if tc.expectedHead > 0 { + require.NotNil(t, series.headChunks, "headChunks should is nil after truncation") + require.Equal(t, tc.expectedHead, series.headChunks.len(), "wrong number of head chunks after truncation") + require.Nil(t, series.headChunks.oldest().prev, "last head chunk cannot have any next chunk set") + } else { + require.Nil(t, series.headChunks, "headChunks should is non-nil after truncation") + } + + if series.headChunks != nil || len(series.mmappedChunks) > 0 { + require.GreaterOrEqual(t, series.maxTime(), tc.truncateBefore, "wrong value of series.maxTime() after truncation") + } else { + require.Equal(t, int64(math.MinInt64), series.maxTime(), "wrong value of series.maxTime() after truncation") + } + + require.Equal(t, tc.expectedFirstChunkID, series.firstChunkID, "wrong firstChunkID after truncation") + }) + } +} + func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { @@ -1363,6 +1533,7 @@ func TestMemSeries_append(t *testing.T) { ok, chunkCreated = s.append(999, 2, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") + s.mmapChunks(chunkDiskMapper) ok, chunkCreated = s.append(1000, 3, 0, cOpts) require.True(t, ok, "append failed") @@ -1372,11 +1543,12 @@ func TestMemSeries_append(t *testing.T) { require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") + s.mmapChunks(chunkDiskMapper) require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk") require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range") require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range") - require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range") - require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range") + require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range") + require.Equal(t, int64(1001), s.headChunks.maxTime, "wrong chunk range") // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. @@ -1384,6 +1556,7 @@ func TestMemSeries_append(t *testing.T) { ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts) require.True(t, ok, "append failed") } + s.mmapChunks(chunkDiskMapper) require.Greater(t, len(s.mmappedChunks)+1, 7, "expected intermediate chunks") @@ -1437,21 +1610,23 @@ func TestMemSeries_appendHistogram(t *testing.T) { require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") + s.mmapChunks(chunkDiskMapper) require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk") require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range") require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range") - require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range") - require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range") + require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range") + require.Equal(t, int64(1001), s.headChunks.maxTime, "wrong chunk range") ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk") + s.mmapChunks(chunkDiskMapper) require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk") require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range") require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range") - require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range") - require.Equal(t, int64(1002), s.headChunk.maxTime, "wrong chunk range") + require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range") + require.Equal(t, int64(1002), s.headChunks.maxTime, "wrong chunk range") } func TestMemSeries_append_atVariableRate(t *testing.T) { @@ -1495,6 +1670,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { require.True(t, ok, "new chunk sample was not appended") require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk") + s.mmapChunks(chunkDiskMapper) var totalSamplesInChunks int for i, c := range s.mmappedChunks { totalSamplesInChunks += int(c.numSamples) @@ -1841,6 +2017,7 @@ func TestHeadReadWriterRepair(t *testing.T) { require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") h.chunkDiskMapper.CutNewFile() + s.mmapChunks(h.chunkDiskMapper) } require.NoError(t, h.Close()) @@ -1985,6 +2162,7 @@ func TestMemSeriesIsolation(t *testing.T) { _, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) require.NoError(t, err) require.NoError(t, app.Commit()) + h.mmapHeadChunks() } return i } @@ -2666,7 +2844,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { require.True(t, ok, "sample append failed") } - c, _, err := s.chunk(0, chunkDiskMapper, &sync.Pool{ + c, _, _, err := s.chunk(0, chunkDiskMapper, &sync.Pool{ New: func() interface{} { return &memChunk{} }, @@ -3092,6 +3270,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { } } require.NoError(t, app.Commit()) + head.mmapHeadChunks() } // There should be 11 mmap chunks in s1. @@ -3103,7 +3282,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { cpy := *mmap expMmapChunks = append(expMmapChunks, &cpy) } - expHeadChunkSamples := ms.headChunk.chunk.NumSamples() + expHeadChunkSamples := ms.headChunks.chunk.NumSamples() require.Greater(t, expHeadChunkSamples, 0) // Series with mix of histograms and float. @@ -3199,7 +3378,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { // Checking contents of s1. ms = head.series.getByHash(s1.Hash(), s1) require.Equal(t, expMmapChunks, ms.mmappedChunks) - require.Equal(t, expHeadChunkSamples, ms.headChunk.chunk.NumSamples()) + require.Equal(t, expHeadChunkSamples, ms.headChunks.chunk.NumSamples()) testQuery := func() { q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) @@ -3738,6 +3917,8 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { // Only 1 chunk in the memory, no m-mapped chunk. s := head.series.getByHash(l.Hash(), l) require.NotNil(t, s) + require.NotNil(t, s.headChunks) + require.Equal(t, s.headChunks.len(), 1) require.Equal(t, 0, len(s.mmappedChunks)) testQuery(1) @@ -3766,10 +3947,13 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { expHistograms = append(expHistograms, timedHistogram{t: 100*int64(len(expHistograms)) + 1, h: &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}}) } require.NoError(t, app.Commit()) + head.mmapHeadChunks() // Total 2 chunks, 1 m-mapped. s = head.series.getByHash(l.Hash(), l) require.NotNil(t, s) + require.NotNil(t, s.headChunks) + require.Equal(t, s.headChunks.len(), 1) require.Equal(t, 1, len(s.mmappedChunks)) testQuery(2) } @@ -3804,6 +3988,7 @@ func TestHistogramCounterResetHeader(t *testing.T) { ms, _, err := head.getOrCreate(l.Hash(), l) require.NoError(t, err) + ms.mmapChunks(head.chunkDiskMapper) require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk. for i, mmapChunk := range ms.mmappedChunks { @@ -3816,9 +4001,9 @@ func TestHistogramCounterResetHeader(t *testing.T) { } } if floatHisto { - require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) + require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunks.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) } else { - require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunks.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) } } @@ -3909,7 +4094,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { require.NoError(t, err) require.False(t, created) require.NotNil(t, ms) - require.Len(t, ms.mmappedChunks, count-1) // One will be the head chunk. + require.Equal(t, count, ms.headChunks.len()) } appends := []struct { @@ -4350,6 +4535,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { require.False(t, created, "should already exist") require.NotNil(t, series, "should return the series we created above") + series.mmapChunks(h.chunkDiskMapper) expChunks := make([]*mmappedChunk, len(series.mmappedChunks)) copy(expChunks, series.mmappedChunks) @@ -4507,6 +4693,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) { require.NoError(t, f.Close()) openHead() + h.mmapHeadChunks() // There should be less m-map files due to corruption. files, err = os.ReadDir(filepath.Join(dir, "chunks_head")) @@ -4697,7 +4884,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) { require.False(t, created) require.NotNil(t, ms) - require.Nil(t, ms.headChunk) + require.Nil(t, ms.headChunks) require.NotNil(t, ms.ooo.oooHeadChunk) require.Equal(t, expSamples, ms.ooo.oooHeadChunk.chunk.NumSamples()) } @@ -4709,8 +4896,8 @@ func TestOOOAppendWithNoSeries(t *testing.T) { require.NotNil(t, ms) require.Nil(t, ms.ooo) - require.NotNil(t, ms.headChunk) - require.Equal(t, expSamples, ms.headChunk.chunk.NumSamples()) + require.NotNil(t, ms.headChunks) + require.Equal(t, expSamples, ms.headChunks.chunk.NumSamples()) } newLabels := func(idx int) labels.Labels { return labels.FromStrings("foo", fmt.Sprintf("%d", idx)) } @@ -4821,6 +5008,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { appendHistogram(hists[4]) checkHeaders := func() { + head.mmapHeadChunks() ms, _, err := head.getOrCreate(l.Hash(), l) require.NoError(t, err) require.Len(t, ms.mmappedChunks, 3) @@ -4835,7 +5023,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { require.NoError(t, err) require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) } - require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunks.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) } checkHeaders() @@ -4898,6 +5086,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { checkHeaders := func() { ms, _, err := head.getOrCreate(l.Hash(), l) require.NoError(t, err) + head.mmapHeadChunks() require.Len(t, ms.mmappedChunks, 3) expHeaders := []chunkenc.CounterResetHeader{ chunkenc.UnknownCounterReset, @@ -4910,7 +5099,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { require.NoError(t, err) require.Equal(t, expHeaders[i], chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) } - require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) + require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunks.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) } checkHeaders() diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 2397a9ec97..3ed95887ec 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -503,7 +503,7 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m // Any samples replayed till now would already be compacted. Resetting the head chunk. mSeries.nextAt = 0 - mSeries.headChunk = nil + mSeries.headChunks = nil mSeries.app = nil return } @@ -595,6 +595,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() + _ = ms.mmapChunks(h.chunkDiskMapper) } if s.T > maxt { maxt = s.T @@ -960,15 +961,15 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused. s.Lock() - if s.headChunk == nil { + if s.headChunks == nil { buf.PutUvarint(0) } else { - enc := s.headChunk.chunk.Encoding() + enc := s.headChunks.chunk.Encoding() buf.PutUvarint(1) - buf.PutBE64int64(s.headChunk.minTime) - buf.PutBE64int64(s.headChunk.maxTime) + buf.PutBE64int64(s.headChunks.minTime) + buf.PutBE64int64(s.headChunks.maxTime) buf.PutByte(byte(enc)) - buf.PutUvarintBytes(s.headChunk.chunk.Bytes()) + buf.PutUvarintBytes(s.headChunks.chunk.Bytes()) switch enc { case chunkenc.EncXOR: @@ -1414,12 +1415,12 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie continue } series.nextAt = csr.mc.maxTime // This will create a new chunk on append. - series.headChunk = csr.mc + series.headChunks = csr.mc series.lastValue = csr.lastValue series.lastHistogramValue = csr.lastHistogramValue series.lastFloatHistogramValue = csr.lastFloatHistogramValue - app, err := series.headChunk.chunk.Appender() + app, err := series.headChunks.chunk.Appender() if err != nil { errChan <- err return