diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 7b631000a2..e6df9b78cf 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -16,6 +16,7 @@ package main import ( "bufio" "context" + "errors" "fmt" "io" "os" @@ -643,10 +644,15 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. for _, chk := range chks { // Load the actual data of the chunk. - chk, err := chunkr.Chunk(chk) + chk, iterable, err := chunkr.ChunkOrIterable(chk) if err != nil { return err } + // Chunks within blocks should not need to be re-written, so an + // iterable is not expected to be returned from the chunk reader. + if iterable != nil { + return errors.New("ChunkOrIterable should not return an iterable when reading a block") + } switch chk.Encoding() { case chunkenc.EncXOR: floatChunkSamplesCount = append(floatChunkSamplesCount, chk.NumSamples()) diff --git a/storage/merge.go b/storage/merge.go index 501e8db09d..b4ebb440f3 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -473,10 +473,10 @@ func ChainSampleIteratorFromSeries(it chunkenc.Iterator, series []Series) chunke return csi } -func ChainSampleIteratorFromMetas(it chunkenc.Iterator, chunks []chunks.Meta) chunkenc.Iterator { - csi := getChainSampleIterator(it, len(chunks)) - for i, c := range chunks { - csi.iterators[i] = c.Chunk.Iterator(csi.iterators[i]) +func ChainSampleIteratorFromIterables(it chunkenc.Iterator, iterables []chunkenc.Iterable) chunkenc.Iterator { + csi := getChainSampleIterator(it, len(iterables)) + for i, c := range iterables { + csi.iterators[i] = c.Iterator(csi.iterators[i]) } return csi } diff --git a/tsdb/block.go b/tsdb/block.go index b995e4fd59..a586536b15 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -117,8 +117,19 @@ type ChunkWriter interface { // ChunkReader provides reading access of serialized time series data. type ChunkReader interface { - // Chunk returns the series data chunk with the given reference. - Chunk(meta chunks.Meta) (chunkenc.Chunk, error) + // ChunkOrIterable returns the series data for the given chunks.Meta. + // Either a single chunk will be returned, or an iterable. + // A single chunk should be returned if chunks.Meta maps to a chunk that + // already exists and doesn't need modifications. + // An iterable should be returned if chunks.Meta maps to a subset of the + // samples in a stored chunk, or multiple chunks. (E.g. OOOHeadChunkReader + // could return an iterable where multiple histogram samples have counter + // resets. There can only be one counter reset per histogram chunk so + // multiple chunks would be created from the iterable in this case.) + // Only one of chunk or iterable should be returned. In some cases you may + // always expect a chunk to be returned. You can check that iterable is nil + // in those cases. + ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) // Close releases all underlying resources of the reader. Close() error diff --git a/tsdb/block_test.go b/tsdb/block_test.go index c49c1d5a8d..46e6ecf844 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -501,6 +501,19 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { return filepath.Join(dir, ulid.String()) } +func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string { + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil) + require.NoError(tb, err) + + require.NoError(tb, os.MkdirAll(dir, 0o777)) + + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) + require.NoError(tb, err) + return filepath.Join(dir, ulid.String()) +} + func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string) *Head { opts := DefaultHeadOptions() opts.ChunkDirRoot = chunkDir diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index f4d11986c4..0126f1fbdb 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -67,6 +67,8 @@ const ( // Chunk holds a sequence of sample pairs that can be iterated over and appended to. type Chunk interface { + Iterable + // Bytes returns the underlying byte slice of the chunk. Bytes() []byte @@ -76,11 +78,6 @@ type Chunk interface { // Appender returns an appender to append samples to the chunk. Appender() (Appender, error) - // The iterator passed as argument is for re-use. - // Depending on implementation, the iterator can - // be re-used or a new iterator can be allocated. - Iterator(Iterator) Iterator - // NumSamples returns the number of samples in the chunk. NumSamples() int @@ -92,6 +89,13 @@ type Chunk interface { Compact() } +type Iterable interface { + // The iterator passed as argument is for re-use. + // Depending on implementation, the iterator can + // be re-used or a new iterator can be allocated. + Iterator(Iterator) Iterator +} + // Appender adds sample pairs to a chunk. type Appender interface { Append(int64, float64) @@ -184,6 +188,19 @@ func (v ValueType) ChunkEncoding() Encoding { } } +func (v ValueType) NewChunk() (Chunk, error) { + switch v { + case ValFloat: + return NewXORChunk(), nil + case ValHistogram: + return NewHistogramChunk(), nil + case ValFloatHistogram: + return NewFloatHistogramChunk(), nil + default: + return nil, fmt.Errorf("value type %v unsupported", v) + } +} + // MockSeriesIterator returns an iterator for a mock series with custom timeStamps and values. func MockSeriesIterator(timestamps []int64, values []float64) Iterator { return &mockSeriesIterator{ diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index c4c4e3c933..f22285a0ca 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -117,11 +117,16 @@ func (b BlockChunkRef) Unpack() (int, int) { return sgmIndex, chkStart } -// Meta holds information about a chunk of data. +// Meta holds information about one or more chunks. +// For examples of when chunks.Meta could refer to multiple chunks, see +// ChunkReader.ChunkOrIterable(). type Meta struct { // Ref and Chunk hold either a reference that can be used to retrieve // chunk data or the data itself. - // If Chunk is nil, call ChunkReader.Chunk(Meta.Ref) to get the chunk and assign it to the Chunk field + // If Chunk is nil, call ChunkReader.ChunkOrIterable(Meta.Ref) to get the + // chunk and assign it to the Chunk field. If an iterable is returned from + // that method, then it may not be possible to set Chunk as the iterable + // might form several chunks. Ref ChunkRef Chunk chunkenc.Chunk @@ -667,24 +672,24 @@ func (s *Reader) Size() int64 { } // Chunk returns a chunk from a given reference. -func (s *Reader) Chunk(meta Meta) (chunkenc.Chunk, error) { +func (s *Reader) ChunkOrIterable(meta Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { sgmIndex, chkStart := BlockChunkRef(meta.Ref).Unpack() if sgmIndex >= len(s.bs) { - return nil, fmt.Errorf("segment index %d out of range", sgmIndex) + return nil, nil, fmt.Errorf("segment index %d out of range", sgmIndex) } sgmBytes := s.bs[sgmIndex] if chkStart+MaxChunkLengthFieldSize > sgmBytes.Len() { - return nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len()) + return nil, nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len()) } // With the minimum chunk length this should never cause us reading // over the end of the slice. c := sgmBytes.Range(chkStart, chkStart+MaxChunkLengthFieldSize) chkDataLen, n := binary.Uvarint(c) if n <= 0 { - return nil, fmt.Errorf("reading chunk length failed with %d", n) + return nil, nil, fmt.Errorf("reading chunk length failed with %d", n) } chkEncStart := chkStart + n @@ -693,17 +698,18 @@ func (s *Reader) Chunk(meta Meta) (chunkenc.Chunk, error) { chkDataEnd := chkEnd - crc32.Size if chkEnd > sgmBytes.Len() { - return nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len()) + return nil, nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len()) } sum := sgmBytes.Range(chkDataEnd, chkEnd) if err := checkCRC32(sgmBytes.Range(chkEncStart, chkDataEnd), sum); err != nil { - return nil, err + return nil, nil, err } chkData := sgmBytes.Range(chkDataStart, chkDataEnd) chkEnc := sgmBytes.Range(chkEncStart, chkEncStart+ChunkEncodingSize)[0] - return s.pool.Get(chunkenc.Encoding(chkEnc), chkData) + chk, err := s.pool.Get(chunkenc.Encoding(chkEnc), chkData) + return chk, nil, err } func nextSequenceFile(dir string) (string, int, error) { diff --git a/tsdb/chunks/chunks_test.go b/tsdb/chunks/chunks_test.go index affaa4b9f1..86f2fbddbd 100644 --- a/tsdb/chunks/chunks_test.go +++ b/tsdb/chunks/chunks_test.go @@ -23,6 +23,6 @@ func TestReaderWithInvalidBuffer(t *testing.T) { b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) r := &Reader{bs: []ByteSlice{b}} - _, err := r.Chunk(Meta{Ref: 0}) + _, _, err := r.ChunkOrIterable(Meta{Ref: 0}) require.Error(t, err) } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index cf8c2439ac..94b35e3b4c 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1144,6 +1144,46 @@ func BenchmarkCompactionFromHead(b *testing.B) { } } +func BenchmarkCompactionFromOOOHead(b *testing.B) { + dir := b.TempDir() + totalSeries := 100000 + totalSamples := 100 + for labelNames := 1; labelNames < totalSeries; labelNames *= 10 { + labelValues := totalSeries / labelNames + b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) { + chunkDir := b.TempDir() + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = chunkDir + opts.OutOfOrderTimeWindow.Store(int64(totalSamples)) + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(b, err) + for ln := 0; ln < labelNames; ln++ { + app := h.Appender(context.Background()) + for lv := 0; lv < labelValues; lv++ { + lbls := labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)) + _, err = app.Append(0, lbls, int64(totalSamples), 0) + require.NoError(b, err) + for ts := 0; ts < totalSamples; ts++ { + _, err = app.Append(0, lbls, int64(ts), float64(ts)) + require.NoError(b, err) + } + } + require.NoError(b, app.Commit()) + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + oooHead, err := NewOOOCompactionHead(context.TODO(), h) + require.NoError(b, err) + createBlockFromOOOHead(b, filepath.Join(dir, fmt.Sprintf("%d-%d", i, labelNames)), oooHead) + } + h.Close() + }) + } +} + // TestDisableAutoCompactions checks that we can // disable and enable the auto compaction. // This is needed for unit tests that rely on diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 5728b49bd0..f602f5ee9d 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2909,8 +2909,9 @@ func TestChunkWriter_ReadAfterWrite(t *testing.T) { for _, chks := range test.chks { for _, chkExp := range chks { - chkAct, err := r.Chunk(chkExp) + chkAct, iterable, err := r.ChunkOrIterable(chkExp) require.NoError(t, err) + require.Nil(t, iterable) require.Equal(t, chkExp.Chunk.Bytes(), chkAct.Bytes()) } } @@ -2969,8 +2970,9 @@ func TestChunkReader_ConcurrentReads(t *testing.T) { go func(chunk chunks.Meta) { defer wg.Done() - chkAct, err := r.Chunk(chunk) + chkAct, iterable, err := r.ChunkOrIterable(chunk) require.NoError(t, err) + require.Nil(t, iterable) require.Equal(t, chunk.Chunk.Bytes(), chkAct.Bytes()) }(chk) } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 32a23499a3..35ef26a58a 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -289,10 +289,10 @@ func (h *headChunkReader) Close() error { return nil } -// Chunk returns the chunk for the reference number. -func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { +// ChunkOrIterable returns the chunk for the reference number. +func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { chk, _, err := h.chunk(meta, false) - return chk, err + return chk, nil, err } // ChunkWithCopy returns the chunk for the reference number. @@ -416,13 +416,13 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi return elem, true, offset == 0, nil } -// oooMergedChunk returns the requested chunk based on the given chunks.Meta -// reference from memory or by m-mapping it from the disk. The returned chunk -// might be a merge of all the overlapping chunks, if any, amongst all the -// chunks in the OOOHead. +// oooMergedChunks return an iterable over one or more OOO chunks for the given +// chunks.Meta reference from memory or by m-mapping it from the disk. The +// returned iterable will be a merge of all the overlapping chunks, if any, +// amongst all the chunks in the OOOHead. // This function is not thread safe unless the caller holds a lock. // The caller must ensure that s.ooo is not nil. -func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (chunk *mergedOOOChunks, err error) { +func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (*mergedOOOChunks, error) { _, cid := chunks.HeadChunkRef(meta.Ref).Unpack() // ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are @@ -499,11 +499,13 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper mc := &mergedOOOChunks{} absoluteMax := int64(math.MinInt64) for _, c := range tmpChks { - if c.meta.Ref != meta.Ref && (len(mc.chunks) == 0 || c.meta.MinTime > absoluteMax) { + if c.meta.Ref != meta.Ref && (len(mc.chunkIterables) == 0 || c.meta.MinTime > absoluteMax) { continue } + var iterable chunkenc.Iterable if c.meta.Ref == oooHeadRef { var xor *chunkenc.XORChunk + var err error // If head chunk min and max time match the meta OOO markers // that means that the chunk has not expanded so we can append // it as it is. @@ -516,7 +518,7 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper if err != nil { return nil, errors.Wrap(err, "failed to convert ooo head chunk to xor chunk") } - c.meta.Chunk = xor + iterable = xor } else { chk, err := cdm.Chunk(c.ref) if err != nil { @@ -531,12 +533,12 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper // wrap the chunk within a chunk that doesnt allows us to iterate // through samples out of the OOOLastMinT and OOOLastMaxT // markers. - c.meta.Chunk = boundedChunk{chk, meta.OOOLastMinTime, meta.OOOLastMaxTime} + iterable = boundedIterable{chk, meta.OOOLastMinTime, meta.OOOLastMaxTime} } else { - c.meta.Chunk = chk + iterable = chk } } - mc.chunks = append(mc.chunks, c.meta) + mc.chunkIterables = append(mc.chunkIterables, iterable) if c.meta.MaxTime > absoluteMax { absoluteMax = c.meta.MaxTime } @@ -545,77 +547,30 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper return mc, nil } -var _ chunkenc.Chunk = &mergedOOOChunks{} +var _ chunkenc.Iterable = &mergedOOOChunks{} -// mergedOOOChunks holds the list of overlapping chunks. This struct satisfies -// chunkenc.Chunk. +// mergedOOOChunks holds the list of iterables for overlapping chunks. type mergedOOOChunks struct { - chunks []chunks.Meta -} - -// Bytes is a very expensive method because its calling the iterator of all the -// chunks in the mergedOOOChunk and building a new chunk with the samples. -func (o mergedOOOChunks) Bytes() []byte { - xc := chunkenc.NewXORChunk() - app, err := xc.Appender() - if err != nil { - panic(err) - } - it := o.Iterator(nil) - for it.Next() == chunkenc.ValFloat { - t, v := it.At() - app.Append(t, v) - } - - return xc.Bytes() -} - -func (o mergedOOOChunks) Encoding() chunkenc.Encoding { - return chunkenc.EncXOR -} - -func (o mergedOOOChunks) Appender() (chunkenc.Appender, error) { - return nil, errors.New("can't append to mergedOOOChunks") + chunkIterables []chunkenc.Iterable } func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { - return storage.ChainSampleIteratorFromMetas(iterator, o.chunks) + return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables) } -func (o mergedOOOChunks) NumSamples() int { - samples := 0 - for _, c := range o.chunks { - samples += c.Chunk.NumSamples() - } - return samples -} +var _ chunkenc.Iterable = &boundedIterable{} -func (o mergedOOOChunks) Compact() {} - -var _ chunkenc.Chunk = &boundedChunk{} - -// boundedChunk is an implementation of chunkenc.Chunk that uses a +// boundedIterable is an implementation of chunkenc.Iterable that uses a // boundedIterator that only iterates through samples which timestamps are // >= minT and <= maxT. -type boundedChunk struct { - chunkenc.Chunk - minT int64 - maxT int64 +type boundedIterable struct { + chunk chunkenc.Chunk + minT int64 + maxT int64 } -func (b boundedChunk) Bytes() []byte { - xor := chunkenc.NewXORChunk() - a, _ := xor.Appender() - it := b.Iterator(nil) - for it.Next() == chunkenc.ValFloat { - t, v := it.At() - a.Append(t, v) - } - return xor.Bytes() -} - -func (b boundedChunk) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { - it := b.Chunk.Iterator(iterator) +func (b boundedIterable) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { + it := b.chunk.Iterator(iterator) if it == nil { panic("iterator shouldn't be nil") } diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index ad0a59d34e..4f19db4c15 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -129,21 +129,10 @@ func TestBoundedChunk(t *testing.T) { } for _, tc := range tests { t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { - chunk := boundedChunk{tc.inputChunk, tc.inputMinT, tc.inputMaxT} - - // Testing Bytes() - expChunk := chunkenc.NewXORChunk() - if tc.inputChunk.NumSamples() > 0 { - app, err := expChunk.Appender() - require.NoError(t, err) - for ts := tc.inputMinT; ts <= tc.inputMaxT; ts++ { - app.Append(ts, float64(ts)) - } - } - require.Equal(t, expChunk.Bytes(), chunk.Bytes()) + iterable := boundedIterable{tc.inputChunk, tc.inputMinT, tc.inputMaxT} var samples []sample - it := chunk.Iterator(nil) + it := iterable.Iterator(nil) if tc.initialSeek != 0 { // Testing Seek() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 64237e76a7..7adfab1c9f 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1835,16 +1835,16 @@ func TestGCChunkAccess(t *testing.T) { cr, err := h.chunksRange(0, 1500, nil) require.NoError(t, err) - _, err = cr.Chunk(chunks[0]) + _, _, err = cr.ChunkOrIterable(chunks[0]) require.NoError(t, err) - _, err = cr.Chunk(chunks[1]) + _, _, err = cr.ChunkOrIterable(chunks[1]) require.NoError(t, err) require.NoError(t, h.Truncate(1500)) // Remove a chunk. - _, err = cr.Chunk(chunks[0]) + _, _, err = cr.ChunkOrIterable(chunks[0]) require.Equal(t, storage.ErrNotFound, err) - _, err = cr.Chunk(chunks[1]) + _, _, err = cr.ChunkOrIterable(chunks[1]) require.NoError(t, err) } @@ -1894,18 +1894,18 @@ func TestGCSeriesAccess(t *testing.T) { cr, err := h.chunksRange(0, 2000, nil) require.NoError(t, err) - _, err = cr.Chunk(chunks[0]) + _, _, err = cr.ChunkOrIterable(chunks[0]) require.NoError(t, err) - _, err = cr.Chunk(chunks[1]) + _, _, err = cr.ChunkOrIterable(chunks[1]) require.NoError(t, err) require.NoError(t, h.Truncate(2000)) // Remove the series. require.Equal(t, (*memSeries)(nil), h.series.getByID(1)) - _, err = cr.Chunk(chunks[0]) + _, _, err = cr.ChunkOrIterable(chunks[0]) require.Equal(t, storage.ErrNotFound, err) - _, err = cr.Chunk(chunks[1]) + _, _, err = cr.ChunkOrIterable(chunks[1]) require.Equal(t, storage.ErrNotFound, err) } @@ -5406,8 +5406,9 @@ func TestCuttingNewHeadChunks(t *testing.T) { require.Len(t, chkMetas, len(tc.expectedChks)) for i, expected := range tc.expectedChks { - chk, err := chkReader.Chunk(chkMetas[i]) + chk, iterable, err := chkReader.ChunkOrIterable(chkMetas[i]) require.NoError(t, err) + require.Nil(t, iterable) require.Equal(t, expected.numSamples, chk.NumSamples()) require.Len(t, chk.Bytes(), expected.numBytes) @@ -5455,8 +5456,9 @@ func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) { storedSampleCount := 0 for _, chunkMeta := range chunks { - chunk, err := chunkReader.Chunk(chunkMeta) + chunk, iterable, err := chunkReader.ChunkOrIterable(chunkMeta) require.NoError(t, err) + require.Nil(t, iterable) storedSampleCount += chunk.NumSamples() } diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index ace2326576..440130f7db 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -247,33 +247,33 @@ func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationS } } -func (cr OOOHeadChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { +func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { sid, _ := chunks.HeadChunkRef(meta.Ref).Unpack() s := cr.head.series.getByID(sid) // This means that the series has been garbage collected. if s == nil { - return nil, storage.ErrNotFound + return nil, nil, storage.ErrNotFound } s.Lock() if s.ooo == nil { // There is no OOO data for this series. s.Unlock() - return nil, storage.ErrNotFound + return nil, nil, storage.ErrNotFound } - c, err := s.oooMergedChunk(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt) + mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt) s.Unlock() if err != nil { - return nil, err + return nil, nil, err } // This means that the query range did not overlap with the requested chunk. - if len(c.chunks) == 0 { - return nil, storage.ErrNotFound + if len(mc.chunkIterables) == 0 { + return nil, nil, storage.ErrNotFound } - return c, nil + return nil, mc, nil } func (cr OOOHeadChunkReader) Close() error { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 3f4b9bae70..64577872f5 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -486,9 +486,10 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil) defer cr.Close() - c, err := cr.Chunk(chunks.Meta{ + c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, }) + require.Nil(t, iterable) require.Equal(t, err, fmt.Errorf("not found")) require.Equal(t, c, nil) }) @@ -853,11 +854,12 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil) defer cr.Close() for i := 0; i < len(chks); i++ { - c, err := cr.Chunk(chks[i]) + c, iterable, err := cr.ChunkOrIterable(chks[i]) require.NoError(t, err) + require.Nil(t, c) var resultSamples chunks.SampleSlice - it := c.Iterator(nil) + it := iterable.Iterator(nil) for it.Next() == chunkenc.ValFloat { t, v := it.At() resultSamples = append(resultSamples, sample{t: t, f: v}) @@ -1025,11 +1027,12 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil) defer cr.Close() for i := 0; i < len(chks); i++ { - c, err := cr.Chunk(chks[i]) + c, iterable, err := cr.ChunkOrIterable(chks[i]) require.NoError(t, err) + require.Nil(t, c) var resultSamples chunks.SampleSlice - it := c.Iterator(nil) + it := iterable.Iterator(nil) for it.Next() == chunkenc.ValFloat { ts, v := it.At() resultSamples = append(resultSamples, sample{t: ts, f: v}) diff --git a/tsdb/querier.go b/tsdb/querier.go index a832c0d1ba..a32c7dd961 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -632,36 +632,42 @@ func (b *blockBaseSeriesSet) Warnings() annotations.Annotations { return nil } // populateWithDelGenericSeriesIterator assumes that chunks that would be fully // removed by intervals are filtered out in previous phase. // -// On each iteration currChkMeta is available. If currDelIter is not nil, it -// means that the chunk iterator in currChkMeta is invalid and a chunk rewrite -// is needed, for which currDelIter should be used. +// On each iteration currMeta is available. If currDelIter is not nil, it +// means that the chunk in currMeta is invalid and a chunk rewrite is needed, +// for which currDelIter should be used. type populateWithDelGenericSeriesIterator struct { blockID ulid.ULID - chunks ChunkReader - // chks are expected to be sorted by minTime and should be related to + cr ChunkReader + // metas are expected to be sorted by minTime and should be related to // the same, single series. - chks []chunks.Meta + // It's possible for a single chunks.Meta to refer to multiple chunks. + // cr.ChunkOrIterator() would return an iterable and a nil chunk in this + // case. + metas []chunks.Meta - i int // Index into chks; -1 if not started yet. + i int // Index into metas; -1 if not started yet. err error bufIter DeletedIterator // Retained for memory re-use. currDelIter may point here. intervals tombstones.Intervals currDelIter chunkenc.Iterator - currChkMeta chunks.Meta + // currMeta is the current chunks.Meta from metas. currMeta.Chunk is set to + // the chunk returned from cr.ChunkOrIterable(). As that can return a nil + // chunk, currMeta.Chunk is not always guaranteed to be set. + currMeta chunks.Meta } func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) { p.blockID = blockID - p.chunks = cr - p.chks = chks + p.cr = cr + p.metas = chks p.i = -1 p.err = nil // Note we don't touch p.bufIter.Iter; it is holding on to an iterator we might reuse in next(). p.bufIter.Intervals = p.bufIter.Intervals[:0] p.intervals = intervals p.currDelIter = nil - p.currChkMeta = chunks.Meta{} + p.currMeta = chunks.Meta{} } // If copyHeadChunk is true, then the head chunk (i.e. the in-memory chunk of the TSDB) @@ -669,43 +675,54 @@ func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr Chunk // However, if the deletion intervals overlaps with the head chunk, then the head chunk is // not copied irrespective of copyHeadChunk because it will be re-encoded later anyway. func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool { - if p.err != nil || p.i >= len(p.chks)-1 { + if p.err != nil || p.i >= len(p.metas)-1 { return false } p.i++ - p.currChkMeta = p.chks[p.i] + p.currMeta = p.metas[p.i] p.bufIter.Intervals = p.bufIter.Intervals[:0] for _, interval := range p.intervals { - if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) { + if p.currMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) { p.bufIter.Intervals = p.bufIter.Intervals.Add(interval) } } - hcr, ok := p.chunks.(*headChunkReader) + hcr, ok := p.cr.(*headChunkReader) + var iterable chunkenc.Iterable if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 { // ChunkWithCopy will copy the head chunk. var maxt int64 - p.currChkMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currChkMeta) + p.currMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currMeta) // For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here. - p.currChkMeta.MaxTime = maxt + p.currMeta.MaxTime = maxt } else { - p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta) + p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta) } + if p.err != nil { - p.err = errors.Wrapf(p.err, "cannot populate chunk %d from block %s", p.currChkMeta.Ref, p.blockID.String()) + p.err = errors.Wrapf(p.err, "cannot populate chunk %d from block %s", p.currMeta.Ref, p.blockID.String()) return false } - if len(p.bufIter.Intervals) == 0 { - // If there is no overlap with deletion intervals, we can take chunk as it is. - p.currDelIter = nil + // Use the single chunk if possible. + if p.currMeta.Chunk != nil { + if len(p.bufIter.Intervals) == 0 { + // If there is no overlap with deletion intervals and a single chunk is + // returned, we can take chunk as it is. + p.currDelIter = nil + return true + } + // Otherwise we need to iterate over the samples in the single chunk + // and create new chunks. + p.bufIter.Iter = p.currMeta.Chunk.Iterator(p.bufIter.Iter) + p.currDelIter = &p.bufIter return true } - // We don't want the full chunk, take just a part of it. - p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(p.bufIter.Iter) + // Otherwise, use the iterable to create an iterator. + p.bufIter.Iter = iterable.Iterator(p.bufIter.Iter) p.currDelIter = &p.bufIter return true } @@ -765,7 +782,7 @@ func (p *populateWithDelSeriesIterator) Next() chunkenc.ValueType { if p.currDelIter != nil { p.curr = p.currDelIter } else { - p.curr = p.currChkMeta.Chunk.Iterator(p.curr) + p.curr = p.currMeta.Chunk.Iterator(p.curr) } if valueType := p.curr.Next(); valueType != chunkenc.ValNone { return valueType @@ -817,22 +834,61 @@ func (p *populateWithDelSeriesIterator) Err() error { type populateWithDelChunkSeriesIterator struct { populateWithDelGenericSeriesIterator - curr chunks.Meta + // currMetaWithChunk is current meta with its chunk field set. This meta + // is guaranteed to map to a single chunk. This differs from + // populateWithDelGenericSeriesIterator.currMeta as that + // could refer to multiple chunks. + currMetaWithChunk chunks.Meta + + // chunksFromIterable stores the chunks created from iterating through + // the iterable returned by cr.ChunkOrIterable() (with deleted samples + // removed). + chunksFromIterable []chunks.Meta + chunksFromIterableIdx int } func (p *populateWithDelChunkSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) { p.populateWithDelGenericSeriesIterator.reset(blockID, cr, chks, intervals) - p.curr = chunks.Meta{} + p.currMetaWithChunk = chunks.Meta{} + p.chunksFromIterable = p.chunksFromIterable[:0] + p.chunksFromIterableIdx = -1 } func (p *populateWithDelChunkSeriesIterator) Next() bool { + if p.currMeta.Chunk == nil { + // If we've been creating chunks from the iterable, check if there are + // any more chunks to iterate through. + if p.chunksFromIterableIdx < len(p.chunksFromIterable)-1 { + p.chunksFromIterableIdx++ + p.currMetaWithChunk = p.chunksFromIterable[p.chunksFromIterableIdx] + return true + } + } + + // Move to the next chunk/deletion iterator. if !p.next(true) { return false } - p.curr = p.currChkMeta - if p.currDelIter == nil { - return true + + if p.currMeta.Chunk != nil { + if p.currDelIter == nil { + p.currMetaWithChunk = p.currMeta + return true + } + // If ChunkOrIterable() returned a non-nil chunk, the samples in + // p.currDelIter will only form one chunk, as the only change + // p.currDelIter might make is deleting some samples. + return p.populateCurrForSingleChunk() } + + // If ChunkOrIterable() returned an iterable, multiple chunks may be + // created from the samples in p.currDelIter. + return p.populateChunksFromIterable() +} + +// populateCurrForSingleChunk sets the fields within p.currMetaWithChunk. This +// should be called if the samples in p.currDelIter only form one chunk. +func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { valueType := p.currDelIter.Next() if valueType == chunkenc.ValNone { if err := p.currDelIter.Err(); err != nil { @@ -840,9 +896,9 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { } return false } - p.curr.MinTime = p.currDelIter.AtT() + p.currMetaWithChunk.MinTime = p.currDelIter.AtT() - // Re-encode the chunk if iterator is provider. This means that it has + // Re-encode the chunk if iterator is provided. This means that it has // some samples to be deleted or chunk is opened. var ( newChunk chunkenc.Chunk @@ -900,7 +956,7 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { } } default: - err = fmt.Errorf("populateWithDelChunkSeriesIterator: value type %v unsupported", valueType) + err = fmt.Errorf("populateCurrForSingleChunk: value type %v unsupported", valueType) } if err != nil { @@ -912,12 +968,127 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { return false } - p.curr.Chunk = newChunk - p.curr.MaxTime = t + p.currMetaWithChunk.Chunk = newChunk + p.currMetaWithChunk.MaxTime = t return true } -func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.curr } +// populateChunksFromIterable reads the samples from currDelIter to create +// chunks for chunksFromIterable. It also sets p.currMetaWithChunk to the first +// chunk. +func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { + p.chunksFromIterable = p.chunksFromIterable[:0] + p.chunksFromIterableIdx = -1 + + firstValueType := p.currDelIter.Next() + if firstValueType == chunkenc.ValNone { + if err := p.currDelIter.Err(); err != nil { + p.err = errors.Wrap(err, "populateChunksFromIterable: no samples could be read") + return false + } + return false + } + + var ( + // t is the timestamp for the current sample. + t int64 + cmint int64 + cmaxt int64 + + currentChunk chunkenc.Chunk + + app chunkenc.Appender + + newChunk chunkenc.Chunk + recoded bool + + err error + ) + + prevValueType := chunkenc.ValNone + + for currentValueType := firstValueType; currentValueType != chunkenc.ValNone; currentValueType = p.currDelIter.Next() { + // Check if the encoding has changed (i.e. we need to create a new + // chunk as chunks can't have multiple encoding types). + // For the first sample, the following condition will always be true as + // ValNoneNone != ValFloat | ValHistogram | ValFloatHistogram. + if currentValueType != prevValueType { + if prevValueType != chunkenc.ValNone { + p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) + } + cmint = p.currDelIter.AtT() + if currentChunk, err = currentValueType.NewChunk(); err != nil { + break + } + if app, err = currentChunk.Appender(); err != nil { + break + } + } + + switch currentValueType { + case chunkenc.ValFloat: + { + var v float64 + t, v = p.currDelIter.At() + app.Append(t, v) + } + case chunkenc.ValHistogram: + { + var v *histogram.Histogram + t, v = p.currDelIter.AtHistogram() + // No need to set prevApp as AppendHistogram will set the + // counter reset header for the appender that's returned. + newChunk, recoded, app, err = app.AppendHistogram(nil, t, v, false) + } + case chunkenc.ValFloatHistogram: + { + var v *histogram.FloatHistogram + t, v = p.currDelIter.AtFloatHistogram() + // No need to set prevApp as AppendHistogram will set the + // counter reset header for the appender that's returned. + newChunk, recoded, app, err = app.AppendFloatHistogram(nil, t, v, false) + } + } + + if err != nil { + break + } + + if newChunk != nil { + if !recoded { + p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) + } + currentChunk = newChunk + cmint = t + } + + cmaxt = t + prevValueType = currentValueType + } + + if err != nil { + p.err = errors.Wrap(err, "populateChunksFromIterable: error when writing new chunks") + return false + } + if err = p.currDelIter.Err(); err != nil { + p.err = errors.Wrap(err, "populateChunksFromIterable: currDelIter error when writing new chunks") + return false + } + + if prevValueType != chunkenc.ValNone { + p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) + } + + if len(p.chunksFromIterable) == 0 { + return false + } + + p.currMetaWithChunk = p.chunksFromIterable[0] + p.chunksFromIterableIdx = 0 + return true +} + +func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.currMetaWithChunk } // blockSeriesSet allows to iterate over sorted, populated series with applied tombstones. // Series with all deleted chunks are still present as Series with no samples. @@ -1117,8 +1288,8 @@ func newNopChunkReader() ChunkReader { } } -func (cr nopChunkReader) Chunk(chunks.Meta) (chunkenc.Chunk, error) { - return cr.emptyChunk, nil +func (cr nopChunkReader) ChunkOrIterable(chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { + return cr.emptyChunk, nil, nil } func (cr nopChunkReader) Close() error { return nil } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 7260d9d8bd..ce46a5c129 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -685,12 +685,14 @@ func TestBlockQuerierDelete(t *testing.T) { type fakeChunksReader struct { ChunkReader - chks map[chunks.ChunkRef]chunkenc.Chunk + chks map[chunks.ChunkRef]chunkenc.Chunk + iterables map[chunks.ChunkRef]chunkenc.Iterable } func createFakeReaderAndNotPopulatedChunks(s ...[]chunks.Sample) (*fakeChunksReader, []chunks.Meta) { f := &fakeChunksReader{ - chks: map[chunks.ChunkRef]chunkenc.Chunk{}, + chks: map[chunks.ChunkRef]chunkenc.Chunk{}, + iterables: map[chunks.ChunkRef]chunkenc.Iterable{}, } chks := make([]chunks.Meta, 0, len(s)) @@ -707,21 +709,102 @@ func createFakeReaderAndNotPopulatedChunks(s ...[]chunks.Sample) (*fakeChunksRea return f, chks } -func (r *fakeChunksReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { - chk, ok := r.chks[meta.Ref] - if !ok { - return nil, fmt.Errorf("chunk not found at ref %v", meta.Ref) +// Samples in each slice are assumed to be sorted. +func createFakeReaderAndIterables(s ...[]chunks.Sample) (*fakeChunksReader, []chunks.Meta) { + f := &fakeChunksReader{ + chks: map[chunks.ChunkRef]chunkenc.Chunk{}, + iterables: map[chunks.ChunkRef]chunkenc.Iterable{}, } - return chk, nil + chks := make([]chunks.Meta, 0, len(s)) + + for ref, samples := range s { + f.iterables[chunks.ChunkRef(ref)] = &mockIterable{s: samples} + + var minTime, maxTime int64 + if len(samples) > 0 { + minTime = samples[0].T() + maxTime = samples[len(samples)-1].T() + } + chks = append(chks, chunks.Meta{ + Ref: chunks.ChunkRef(ref), + MinTime: minTime, + MaxTime: maxTime, + }) + } + return f, chks } +func (r *fakeChunksReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { + if chk, ok := r.chks[meta.Ref]; ok { + return chk, nil, nil + } + + if it, ok := r.iterables[meta.Ref]; ok { + return nil, it, nil + } + return nil, nil, fmt.Errorf("chunk or iterable not found at ref %v", meta.Ref) +} + +type mockIterable struct { + s []chunks.Sample +} + +func (it *mockIterable) Iterator(chunkenc.Iterator) chunkenc.Iterator { + return &mockSampleIterator{ + s: it.s, + idx: -1, + } +} + +type mockSampleIterator struct { + s []chunks.Sample + idx int +} + +func (it *mockSampleIterator) Seek(t int64) chunkenc.ValueType { + for ; it.idx < len(it.s); it.idx++ { + if it.idx != -1 && it.s[it.idx].T() >= t { + return it.s[it.idx].Type() + } + } + + return chunkenc.ValNone +} + +func (it *mockSampleIterator) At() (int64, float64) { + return it.s[it.idx].T(), it.s[it.idx].F() +} + +func (it *mockSampleIterator) AtHistogram() (int64, *histogram.Histogram) { + return it.s[it.idx].T(), it.s[it.idx].H() +} + +func (it *mockSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { + return it.s[it.idx].T(), it.s[it.idx].FH() +} + +func (it *mockSampleIterator) AtT() int64 { + return it.s[it.idx].T() +} + +func (it *mockSampleIterator) Next() chunkenc.ValueType { + if it.idx < len(it.s)-1 { + it.idx++ + return it.s[it.idx].Type() + } + + return chunkenc.ValNone +} + +func (it *mockSampleIterator) Err() error { return nil } + func TestPopulateWithTombSeriesIterators(t *testing.T) { type minMaxTimes struct { minTime, maxTime int64 } cases := []struct { - name string - chks [][]chunks.Sample + name string + samples [][]chunks.Sample expected []chunks.Sample expectedChks []chunks.Meta @@ -732,23 +815,38 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { // Seek being zero means do not test seek. seek int64 seekSuccess bool + + // Set this to true if a sample slice will form multiple chunks. + skipChunkTest bool + + skipIterableTest bool }{ { - name: "no chunk", - chks: [][]chunks.Sample{}, + name: "no chunk", + samples: [][]chunks.Sample{}, }, { - name: "one empty chunk", // This should never happen. - chks: [][]chunks.Sample{{}}, + name: "one empty chunk", // This should never happen. + samples: [][]chunks.Sample{{}}, expectedChks: []chunks.Meta{ assureChunkFromSamples(t, []chunks.Sample{}), }, expectedMinMaxTimes: []minMaxTimes{{0, 0}}, + // iterables with no samples will return no chunks instead of empty chunks + skipIterableTest: true, }, { - name: "three empty chunks", // This should never happen. - chks: [][]chunks.Sample{{}, {}, {}}, + name: "one empty iterable", + samples: [][]chunks.Sample{{}}, + + // iterables with no samples will return no chunks + expectedChks: nil, + skipChunkTest: true, + }, + { + name: "three empty chunks", // This should never happen. + samples: [][]chunks.Sample{{}, {}, {}}, expectedChks: []chunks.Meta{ assureChunkFromSamples(t, []chunks.Sample{}), @@ -756,10 +854,20 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { assureChunkFromSamples(t, []chunks.Sample{}), }, expectedMinMaxTimes: []minMaxTimes{{0, 0}, {0, 0}, {0, 0}}, + // iterables with no samples will return no chunks instead of empty chunks + skipIterableTest: true, + }, + { + name: "three empty iterables", + samples: [][]chunks.Sample{{}, {}, {}}, + + // iterables with no samples will return no chunks + expectedChks: nil, + skipChunkTest: true, }, { name: "one chunk", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, }, @@ -775,7 +883,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "two full chunks", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, {sample{7, 89, nil, nil}, sample{9, 8, nil, nil}}, }, @@ -795,7 +903,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "three full chunks", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, {sample{7, 89, nil, nil}, sample{9, 8, nil, nil}}, {sample{10, 22, nil, nil}, sample{203, 3493, nil, nil}}, @@ -819,15 +927,15 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, // Seek cases. { - name: "three empty chunks and seek", // This should never happen. - chks: [][]chunks.Sample{{}, {}, {}}, - seek: 1, + name: "three empty chunks and seek", // This should never happen. + samples: [][]chunks.Sample{{}, {}, {}}, + seek: 1, seekSuccess: false, }, { name: "two chunks and seek beyond chunks", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, {sample{7, 89, nil, nil}, sample{9, 8, nil, nil}}, }, @@ -837,7 +945,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "two chunks and seek on middle of first chunk", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, {sample{7, 89, nil, nil}, sample{9, 8, nil, nil}}, }, @@ -850,7 +958,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "two chunks and seek before first chunk", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, {sample{7, 89, nil, nil}, sample{9, 8, nil, nil}}, }, @@ -864,12 +972,12 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { // Deletion / Trim cases. { name: "no chunk with deletion interval", - chks: [][]chunks.Sample{}, + samples: [][]chunks.Sample{}, intervals: tombstones.Intervals{{Mint: 20, Maxt: 21}}, }, { name: "two chunks with trimmed first and last samples from edge chunks", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, {sample{7, 89, nil, nil}, sample{9, 8, nil, nil}}, }, @@ -890,7 +998,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "two chunks with trimmed middle sample of first chunk", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, {sample{7, 89, nil, nil}, sample{9, 8, nil, nil}}, }, @@ -911,7 +1019,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "two chunks with deletion across two chunks", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, {sample{7, 89, nil, nil}, sample{9, 8, nil, nil}}, }, @@ -933,7 +1041,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { // Deletion with seek. { name: "two chunks with trimmed first and last samples from edge chunks, seek from middle of first chunk", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, {sample{7, 89, nil, nil}, sample{9, 8, nil, nil}}, }, @@ -945,9 +1053,20 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { sample{3, 5, nil, nil}, sample{6, 1, nil, nil}, sample{7, 89, nil, nil}, }, }, + { + name: "one chunk where all samples are trimmed", + samples: [][]chunks.Sample{ + {sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, + {sample{7, 89, nil, nil}, sample{9, 8, nil, nil}}, + }, + intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: 3}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64}), + + expected: nil, + expectedChks: nil, + }, { name: "one histogram chunk", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, tsdbutil.GenerateTestHistogram(1), nil}, sample{2, 0, tsdbutil.GenerateTestHistogram(2), nil}, @@ -973,7 +1092,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one histogram chunk intersect with earlier deletion interval", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, tsdbutil.GenerateTestHistogram(1), nil}, sample{2, 0, tsdbutil.GenerateTestHistogram(2), nil}, @@ -996,7 +1115,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one histogram chunk intersect with later deletion interval", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, tsdbutil.GenerateTestHistogram(1), nil}, sample{2, 0, tsdbutil.GenerateTestHistogram(2), nil}, @@ -1021,7 +1140,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one float histogram chunk", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, nil, tsdbutil.GenerateTestFloatHistogram(1)}, sample{2, 0, nil, tsdbutil.GenerateTestFloatHistogram(2)}, @@ -1047,7 +1166,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one float histogram chunk intersect with earlier deletion interval", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, nil, tsdbutil.GenerateTestFloatHistogram(1)}, sample{2, 0, nil, tsdbutil.GenerateTestFloatHistogram(2)}, @@ -1070,7 +1189,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one float histogram chunk intersect with later deletion interval", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, nil, tsdbutil.GenerateTestFloatHistogram(1)}, sample{2, 0, nil, tsdbutil.GenerateTestFloatHistogram(2)}, @@ -1095,7 +1214,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one gauge histogram chunk", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, tsdbutil.GenerateTestGaugeHistogram(1), nil}, sample{2, 0, tsdbutil.GenerateTestGaugeHistogram(2), nil}, @@ -1121,7 +1240,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one gauge histogram chunk intersect with earlier deletion interval", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, tsdbutil.GenerateTestGaugeHistogram(1), nil}, sample{2, 0, tsdbutil.GenerateTestGaugeHistogram(2), nil}, @@ -1144,7 +1263,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one gauge histogram chunk intersect with later deletion interval", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, tsdbutil.GenerateTestGaugeHistogram(1), nil}, sample{2, 0, tsdbutil.GenerateTestGaugeHistogram(2), nil}, @@ -1169,7 +1288,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one gauge float histogram", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(1)}, sample{2, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(2)}, @@ -1195,7 +1314,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one gauge float histogram chunk intersect with earlier deletion interval", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(1)}, sample{2, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(2)}, @@ -1218,7 +1337,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "one gauge float histogram chunk intersect with later deletion interval", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{1, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(1)}, sample{2, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(2)}, @@ -1243,7 +1362,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "three full mixed chunks", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ {sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}}, { sample{7, 0, tsdbutil.GenerateTestGaugeHistogram(89), nil}, @@ -1275,7 +1394,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "three full mixed chunks in different order", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{7, 0, tsdbutil.GenerateTestGaugeHistogram(89), nil}, sample{9, 0, tsdbutil.GenerateTestGaugeHistogram(8), nil}, @@ -1307,7 +1426,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "three full mixed chunks in different order intersect with deletion interval", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{7, 0, tsdbutil.GenerateTestGaugeHistogram(89), nil}, sample{9, 0, tsdbutil.GenerateTestGaugeHistogram(8), nil}, @@ -1338,7 +1457,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, { name: "three full mixed chunks overlapping", - chks: [][]chunks.Sample{ + samples: [][]chunks.Sample{ { sample{7, 0, tsdbutil.GenerateTestGaugeHistogram(89), nil}, sample{12, 0, tsdbutil.GenerateTestGaugeHistogram(8), nil}, @@ -1368,11 +1487,237 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { }, expectedMinMaxTimes: []minMaxTimes{{7, 12}, {11, 16}, {10, 203}}, }, + { + // This case won't actually happen until OOO native histograms is implemented. + // Issue: https://github.com/prometheus/prometheus/issues/11220. + name: "int histogram iterables with counter resets", + samples: [][]chunks.Sample{ + { + sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil}, + sample{8, 0, tsdbutil.GenerateTestHistogram(9), nil}, + // Counter reset should be detected when chunks are created from the iterable. + sample{12, 0, tsdbutil.GenerateTestHistogram(5), nil}, + sample{15, 0, tsdbutil.GenerateTestHistogram(6), nil}, + sample{16, 0, tsdbutil.GenerateTestHistogram(7), nil}, + // Counter reset should be detected when chunks are created from the iterable. + sample{17, 0, tsdbutil.GenerateTestHistogram(5), nil}, + }, + { + sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil}, + sample{19, 0, tsdbutil.GenerateTestHistogram(7), nil}, + // Counter reset should be detected when chunks are created from the iterable. + sample{20, 0, tsdbutil.GenerateTestHistogram(5), nil}, + sample{21, 0, tsdbutil.GenerateTestHistogram(6), nil}, + }, + }, + + expected: []chunks.Sample{ + sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil}, + sample{8, 0, tsdbutil.GenerateTestHistogram(9), nil}, + sample{12, 0, tsdbutil.GenerateTestHistogram(5), nil}, + sample{15, 0, tsdbutil.GenerateTestHistogram(6), nil}, + sample{16, 0, tsdbutil.GenerateTestHistogram(7), nil}, + sample{17, 0, tsdbutil.GenerateTestHistogram(5), nil}, + sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil}, + sample{19, 0, tsdbutil.GenerateTestHistogram(7), nil}, + sample{20, 0, tsdbutil.GenerateTestHistogram(5), nil}, + sample{21, 0, tsdbutil.GenerateTestHistogram(6), nil}, + }, + expectedChks: []chunks.Meta{ + assureChunkFromSamples(t, []chunks.Sample{ + sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil}, + sample{8, 0, tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(9)), nil}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{12, 0, tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(5)), nil}, + sample{15, 0, tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(6)), nil}, + sample{16, 0, tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(7)), nil}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{17, 0, tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(5)), nil}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil}, + sample{19, 0, tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(7)), nil}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{20, 0, tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(5)), nil}, + sample{21, 0, tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(6)), nil}, + }), + }, + expectedMinMaxTimes: []minMaxTimes{ + {7, 8}, + {12, 16}, + {17, 17}, + {18, 19}, + {20, 21}, + }, + + // Skipping chunk test - can't create a single chunk for each + // sample slice since there are counter resets in the middle of + // the slices. + skipChunkTest: true, + }, + { + // This case won't actually happen until OOO native histograms is implemented. + // Issue: https://github.com/prometheus/prometheus/issues/11220. + name: "float histogram iterables with counter resets", + samples: [][]chunks.Sample{ + { + sample{7, 0, nil, tsdbutil.GenerateTestFloatHistogram(8)}, + sample{8, 0, nil, tsdbutil.GenerateTestFloatHistogram(9)}, + // Counter reset should be detected when chunks are created from the iterable. + sample{12, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)}, + sample{15, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)}, + sample{16, 0, nil, tsdbutil.GenerateTestFloatHistogram(7)}, + // Counter reset should be detected when chunks are created from the iterable. + sample{17, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)}, + }, + { + sample{18, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)}, + sample{19, 0, nil, tsdbutil.GenerateTestFloatHistogram(7)}, + // Counter reset should be detected when chunks are created from the iterable. + sample{20, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)}, + sample{21, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)}, + }, + }, + + expected: []chunks.Sample{ + sample{7, 0, nil, tsdbutil.GenerateTestFloatHistogram(8)}, + sample{8, 0, nil, tsdbutil.GenerateTestFloatHistogram(9)}, + sample{12, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)}, + sample{15, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)}, + sample{16, 0, nil, tsdbutil.GenerateTestFloatHistogram(7)}, + sample{17, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)}, + sample{18, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)}, + sample{19, 0, nil, tsdbutil.GenerateTestFloatHistogram(7)}, + sample{20, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)}, + sample{21, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)}, + }, + expectedChks: []chunks.Meta{ + assureChunkFromSamples(t, []chunks.Sample{ + sample{7, 0, nil, tsdbutil.GenerateTestFloatHistogram(8)}, + sample{8, 0, nil, tsdbutil.SetFloatHistogramNotCounterReset(tsdbutil.GenerateTestFloatHistogram(9))}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{12, 0, nil, tsdbutil.SetFloatHistogramCounterReset(tsdbutil.GenerateTestFloatHistogram(5))}, + sample{15, 0, nil, tsdbutil.SetFloatHistogramNotCounterReset(tsdbutil.GenerateTestFloatHistogram(6))}, + sample{16, 0, nil, tsdbutil.SetFloatHistogramNotCounterReset(tsdbutil.GenerateTestFloatHistogram(7))}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{17, 0, nil, tsdbutil.SetFloatHistogramCounterReset(tsdbutil.GenerateTestFloatHistogram(5))}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{18, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)}, + sample{19, 0, nil, tsdbutil.SetFloatHistogramNotCounterReset(tsdbutil.GenerateTestFloatHistogram(7))}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{20, 0, nil, tsdbutil.SetFloatHistogramCounterReset(tsdbutil.GenerateTestFloatHistogram(5))}, + sample{21, 0, nil, tsdbutil.SetFloatHistogramNotCounterReset(tsdbutil.GenerateTestFloatHistogram(6))}, + }), + }, + expectedMinMaxTimes: []minMaxTimes{ + {7, 8}, + {12, 16}, + {17, 17}, + {18, 19}, + {20, 21}, + }, + + // Skipping chunk test - can't create a single chunk for each + // sample slice since there are counter resets in the middle of + // the slices. + skipChunkTest: true, + }, + { + // This case won't actually happen until OOO native histograms is implemented. + // Issue: https://github.com/prometheus/prometheus/issues/11220. + name: "iterables with mixed encodings and counter resets", + samples: [][]chunks.Sample{ + { + sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil}, + sample{8, 0, tsdbutil.GenerateTestHistogram(9), nil}, + sample{9, 0, nil, tsdbutil.GenerateTestFloatHistogram(10)}, + sample{10, 0, nil, tsdbutil.GenerateTestFloatHistogram(11)}, + sample{11, 0, nil, tsdbutil.GenerateTestFloatHistogram(12)}, + sample{12, 13, nil, nil}, + sample{13, 14, nil, nil}, + sample{14, 0, tsdbutil.GenerateTestHistogram(8), nil}, + // Counter reset should be detected when chunks are created from the iterable. + sample{15, 0, tsdbutil.GenerateTestHistogram(7), nil}, + }, + { + sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil}, + sample{19, 45, nil, nil}, + }, + }, + + expected: []chunks.Sample{ + sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil}, + sample{8, 0, tsdbutil.GenerateTestHistogram(9), nil}, + sample{9, 0, nil, tsdbutil.GenerateTestFloatHistogram(10)}, + sample{10, 0, nil, tsdbutil.GenerateTestFloatHistogram(11)}, + sample{11, 0, nil, tsdbutil.GenerateTestFloatHistogram(12)}, + sample{12, 13, nil, nil}, + sample{13, 14, nil, nil}, + sample{14, 0, tsdbutil.GenerateTestHistogram(8), nil}, + sample{15, 0, tsdbutil.GenerateTestHistogram(7), nil}, + sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil}, + sample{19, 45, nil, nil}, + }, + expectedChks: []chunks.Meta{ + assureChunkFromSamples(t, []chunks.Sample{ + sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil}, + sample{8, 0, tsdbutil.GenerateTestHistogram(9), nil}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{9, 0, nil, tsdbutil.GenerateTestFloatHistogram(10)}, + sample{10, 0, nil, tsdbutil.GenerateTestFloatHistogram(11)}, + sample{11, 0, nil, tsdbutil.GenerateTestFloatHistogram(12)}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{12, 13, nil, nil}, + sample{13, 14, nil, nil}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{14, 0, tsdbutil.GenerateTestHistogram(8), nil}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{15, 0, tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(7)), nil}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil}, + }), + assureChunkFromSamples(t, []chunks.Sample{ + sample{19, 45, nil, nil}, + }), + }, + expectedMinMaxTimes: []minMaxTimes{ + {7, 8}, + {9, 11}, + {12, 13}, + {14, 14}, + {15, 15}, + {18, 18}, + {19, 19}, + }, + + skipChunkTest: true, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { t.Run("sample", func(t *testing.T) { - f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...) + var f *fakeChunksReader + var chkMetas []chunks.Meta + // If the test case wants to skip the chunks test, it probably + // means you can't create valid chunks from sample slices, + // therefore create iterables over the samples instead. + if tc.skipChunkTest { + f, chkMetas = createFakeReaderAndIterables(tc.samples...) + } else { + f, chkMetas = createFakeReaderAndNotPopulatedChunks(tc.samples...) + } it := &populateWithDelSeriesIterator{} it.reset(ulid.ULID{}, f, chkMetas, tc.intervals) @@ -1393,7 +1738,35 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) { require.Equal(t, tc.expected, r) }) t.Run("chunk", func(t *testing.T) { - f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...) + if tc.skipChunkTest { + t.Skip() + } + f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.samples...) + it := &populateWithDelChunkSeriesIterator{} + it.reset(ulid.ULID{}, f, chkMetas, tc.intervals) + + if tc.seek != 0 { + // Chunk iterator does not have Seek method. + return + } + expandedResult, err := storage.ExpandChunks(it) + require.NoError(t, err) + + // We don't care about ref IDs for comparison, only chunk's samples matters. + rmChunkRefs(expandedResult) + rmChunkRefs(tc.expectedChks) + require.Equal(t, tc.expectedChks, expandedResult) + + for i, meta := range expandedResult { + require.Equal(t, tc.expectedMinMaxTimes[i].minTime, meta.MinTime) + require.Equal(t, tc.expectedMinMaxTimes[i].maxTime, meta.MaxTime) + } + }) + t.Run("iterables", func(t *testing.T) { + if tc.skipIterableTest { + t.Skip() + } + f, chkMetas := createFakeReaderAndIterables(tc.samples...) it := &populateWithDelChunkSeriesIterator{} it.reset(ulid.ULID{}, f, chkMetas, tc.intervals) @@ -1686,13 +2059,13 @@ func BenchmarkMergedSeriesSet(b *testing.B) { type mockChunkReader map[chunks.ChunkRef]chunkenc.Chunk -func (cr mockChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { +func (cr mockChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { chk, ok := cr[meta.Ref] if ok { - return chk, nil + return chk, nil, nil } - return nil, errors.New("Chunk with ref not found") + return nil, nil, errors.New("Chunk with ref not found") } func (cr mockChunkReader) Close() error { @@ -3020,7 +3393,7 @@ func TestBlockBaseSeriesSet(t *testing.T) { idx := tc.expIdxs[i] require.Equal(t, tc.series[idx].lset, bcs.curr.labels) - require.Equal(t, tc.series[idx].chunks, si.chks) + require.Equal(t, tc.series[idx].chunks, si.metas) i++ } diff --git a/tsdb/tsdbutil/histogram.go b/tsdb/tsdbutil/histogram.go index 0327815c48..0847f81a8a 100644 --- a/tsdb/tsdbutil/histogram.go +++ b/tsdb/tsdbutil/histogram.go @@ -116,7 +116,17 @@ func SetHistogramNotCounterReset(h *histogram.Histogram) *histogram.Histogram { return h } +func SetHistogramCounterReset(h *histogram.Histogram) *histogram.Histogram { + h.CounterResetHint = histogram.CounterReset + return h +} + func SetFloatHistogramNotCounterReset(h *histogram.FloatHistogram) *histogram.FloatHistogram { h.CounterResetHint = histogram.NotCounterReset return h } + +func SetFloatHistogramCounterReset(h *histogram.FloatHistogram) *histogram.FloatHistogram { + h.CounterResetHint = histogram.CounterReset + return h +}