mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-11-04 02:11:01 +01:00 
			
		
		
		
	If the query overlaps the range currently undergoing compaction, we should only fetch chunks up to that time. Need to store that min time in `HeadAndOOOIndexReader`. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
		
			
				
	
	
		
			1139 lines
		
	
	
		
			42 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1139 lines
		
	
	
		
			42 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2022 The Prometheus Authors
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
// http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package tsdb
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"math"
 | 
						|
	"slices"
 | 
						|
	"sort"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/stretchr/testify/require"
 | 
						|
 | 
						|
	"github.com/prometheus/prometheus/model/labels"
 | 
						|
	"github.com/prometheus/prometheus/storage"
 | 
						|
	"github.com/prometheus/prometheus/tsdb/chunkenc"
 | 
						|
	"github.com/prometheus/prometheus/tsdb/chunks"
 | 
						|
	"github.com/prometheus/prometheus/tsdb/wlog"
 | 
						|
)
 | 
						|
 | 
						|
type chunkInterval struct {
 | 
						|
	// because we permutate the order of chunks, we cannot determine at test declaration time which chunkRefs we expect in the Output.
 | 
						|
	// This ID matches expected output chunks against test input chunks, the test runner will assert the chunkRef for the matching chunk
 | 
						|
	ID   int
 | 
						|
	mint int64
 | 
						|
	maxt int64
 | 
						|
}
 | 
						|
 | 
						|
type expChunk struct {
 | 
						|
	c chunkInterval
 | 
						|
	m []chunkInterval
 | 
						|
}
 | 
						|
 | 
						|
// permutateChunkIntervals returns all possible orders of the given chunkIntervals.
 | 
						|
func permutateChunkIntervals(in []chunkInterval, out [][]chunkInterval, left, right int) [][]chunkInterval {
 | 
						|
	if left == right {
 | 
						|
		inCopy := make([]chunkInterval, len(in))
 | 
						|
		copy(inCopy, in)
 | 
						|
		return append(out, inCopy)
 | 
						|
	}
 | 
						|
	for i := left; i <= right; i++ {
 | 
						|
		in[left], in[i] = in[i], in[left]
 | 
						|
		out = permutateChunkIntervals(in, out, left+1, right)
 | 
						|
		in[left], in[i] = in[i], in[left]
 | 
						|
	}
 | 
						|
	return out
 | 
						|
}
 | 
						|
 | 
						|
// TestOOOHeadIndexReader_Series tests that the Series method works as expected.
 | 
						|
// However it does so by creating chunks and memory mapping them unlike other
 | 
						|
// tests of the head where samples are appended and we let the head memory map.
 | 
						|
// We do this because the ingestion path and the appender for out of order
 | 
						|
// samples are not ready yet.
 | 
						|
func TestOOOHeadIndexReader_Series(t *testing.T) {
 | 
						|
	tests := []struct {
 | 
						|
		name                string
 | 
						|
		queryMinT           int64
 | 
						|
		queryMaxT           int64
 | 
						|
		inputChunkIntervals []chunkInterval
 | 
						|
		expChunks           []expChunk
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:      "Empty result and no error when head is empty",
 | 
						|
			queryMinT: 0,
 | 
						|
			queryMaxT: 100,
 | 
						|
			expChunks: nil,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "If query interval is bigger than the existing chunks nothing is returned",
 | 
						|
			queryMinT: 500,
 | 
						|
			queryMaxT: 700,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 100, 400},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
 | 
						|
			// Query Interval                                                                                                  [---------------------------------------]
 | 
						|
			// Chunk 0                         [-----------------------------------------------------------]
 | 
						|
			expChunks: nil,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "If query interval is smaller than the existing chunks nothing is returned",
 | 
						|
			queryMinT: 100,
 | 
						|
			queryMaxT: 400,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 500, 700},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
 | 
						|
			// Query Interval                [-----------------------------------------------------------]
 | 
						|
			// Chunk 0:                                                                                                        [---------------------------------------]
 | 
						|
			expChunks: nil,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "If query interval exceeds the existing chunk, it is returned",
 | 
						|
			queryMinT: 100,
 | 
						|
			queryMaxT: 400,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 150, 350},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
 | 
						|
			// Query Interval                [-----------------------------------------------------------]
 | 
						|
			// Chunk 0:                                 [---------------------------------------]
 | 
						|
			expChunks: []expChunk{
 | 
						|
				{c: chunkInterval{0, 150, 350}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "If chunk exceeds the query interval, it is returned",
 | 
						|
			queryMinT: 150,
 | 
						|
			queryMaxT: 350,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 100, 400},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
 | 
						|
			// Query Interval:                          [---------------------------------------]
 | 
						|
			// Chunk 0:                       [-----------------------------------------------------------]
 | 
						|
			expChunks: []expChunk{
 | 
						|
				{c: chunkInterval{0, 100, 400}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "Pairwise overlaps should return the references of the first of each pair",
 | 
						|
			queryMinT: 0,
 | 
						|
			queryMaxT: 700,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 100, 200},
 | 
						|
				{1, 500, 600},
 | 
						|
				{2, 150, 250},
 | 
						|
				{3, 550, 650},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
 | 
						|
			// Query Interval        [---------------------------------------------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0:                        [-------------------]
 | 
						|
			// Chunk 1:                                                                                                        [-------------------]
 | 
						|
			// Chunk 2:                                  [-------------------]
 | 
						|
			// Chunk 3:                                                                                                                  [-------------------]
 | 
						|
			// Output Graphically              [-----------------------------]                                                 [-----------------------------]
 | 
						|
			expChunks: []expChunk{
 | 
						|
				{c: chunkInterval{0, 100, 250}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}}},
 | 
						|
				{c: chunkInterval{1, 500, 650}, m: []chunkInterval{{1, 500, 600}, {3, 550, 650}}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "If all chunks overlap, single big chunk is returned",
 | 
						|
			queryMinT: 0,
 | 
						|
			queryMaxT: 700,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 100, 200},
 | 
						|
				{1, 200, 300},
 | 
						|
				{2, 300, 400},
 | 
						|
				{3, 400, 500},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
 | 
						|
			// Query Interval        [---------------------------------------------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0:                        [-------------------]
 | 
						|
			// Chunk 1:                                            [-------------------]
 | 
						|
			// Chunk 2:                                                                [-------------------]
 | 
						|
			// Chunk 3:                                                                                    [------------------]
 | 
						|
			// Output Graphically              [------------------------------------------------------------------------------]
 | 
						|
			expChunks: []expChunk{
 | 
						|
				{c: chunkInterval{0, 100, 500}, m: []chunkInterval{{0, 100, 200}, {1, 200, 300}, {2, 300, 400}, {3, 400, 500}}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "If no chunks overlap, all chunks are returned",
 | 
						|
			queryMinT: 0,
 | 
						|
			queryMaxT: 700,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 100, 199},
 | 
						|
				{1, 200, 299},
 | 
						|
				{2, 300, 399},
 | 
						|
				{3, 400, 499},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
 | 
						|
			// Query Interval        [---------------------------------------------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0:                        [------------------]
 | 
						|
			// Chunk 1:                                            [------------------]
 | 
						|
			// Chunk 2:                                                                [------------------]
 | 
						|
			// Chunk 3:                                                                                    [------------------]
 | 
						|
			// Output Graphically              [------------------][------------------][------------------][------------------]
 | 
						|
			expChunks: []expChunk{
 | 
						|
				{c: chunkInterval{0, 100, 199}},
 | 
						|
				{c: chunkInterval{1, 200, 299}},
 | 
						|
				{c: chunkInterval{2, 300, 399}},
 | 
						|
				{c: chunkInterval{3, 400, 499}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "Triplet with pairwise overlaps, query range covers all, and distractor extra chunk",
 | 
						|
			queryMinT: 0,
 | 
						|
			queryMaxT: 400,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 100, 200},
 | 
						|
				{1, 150, 300},
 | 
						|
				{2, 250, 350},
 | 
						|
				{3, 450, 550},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
 | 
						|
			// Query Interval        [--------------------------------------------------------------------]
 | 
						|
			// Chunk 0:                        [------------------]
 | 
						|
			// Chunk 1:                                 [-----------------------------]
 | 
						|
			// Chunk 2:                                                     [------------------]
 | 
						|
			// Chunk 3:                                                                                             [------------------]
 | 
						|
			// Output Graphically              [-----------------------------------------------]
 | 
						|
			expChunks: []expChunk{
 | 
						|
				{c: chunkInterval{0, 100, 350}, m: []chunkInterval{{0, 100, 200}, {1, 150, 300}, {2, 250, 350}}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "Query interval partially overlaps some chunks",
 | 
						|
			queryMinT: 100,
 | 
						|
			queryMaxT: 400,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 250, 500},
 | 
						|
				{1, 0, 200},
 | 
						|
				{2, 150, 300},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700
 | 
						|
			// Query Interval                [------------------------------------------------------------]
 | 
						|
			// Chunk 0:                                                     [-------------------------------------------------]
 | 
						|
			// Chunk 1:             [-----------------------------]
 | 
						|
			// Chunk 2:                                [------------------------------]
 | 
						|
			// Output Graphically   [-----------------------------------------------------------------------------------------]
 | 
						|
			expChunks: []expChunk{
 | 
						|
				{c: chunkInterval{1, 0, 500}, m: []chunkInterval{{1, 0, 200}, {2, 150, 300}, {0, 250, 500}}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "A full overlap pair and disjointed triplet",
 | 
						|
			queryMinT: 0,
 | 
						|
			queryMaxT: 900,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 100, 300},
 | 
						|
				{1, 770, 850},
 | 
						|
				{2, 150, 250},
 | 
						|
				{3, 650, 750},
 | 
						|
				{4, 600, 800},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700       750       800       850
 | 
						|
			// Query Interval        [---------------------------------------------------------------------------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0:                        [---------------------------------------]
 | 
						|
			// Chunk 1:                                                                                                                                                               [--------------]
 | 
						|
			// Chunk 2:                                  [-------------------]
 | 
						|
			// Chunk 3:                                                                                                                                      [-------------------]
 | 
						|
			// Chunk 4:                                                                                                                             [---------------------------------------]
 | 
						|
			// Output Graphically              [---------------------------------------]                                                            [------------------------------------------------]
 | 
						|
			expChunks: []expChunk{
 | 
						|
				{c: chunkInterval{0, 100, 300}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}}},
 | 
						|
				{c: chunkInterval{4, 600, 850}, m: []chunkInterval{{4, 600, 800}, {3, 650, 750}, {1, 770, 850}}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:      "Query range covers 3 disjoint chunks",
 | 
						|
			queryMinT: 0,
 | 
						|
			queryMaxT: 650,
 | 
						|
			inputChunkIntervals: []chunkInterval{
 | 
						|
				{0, 100, 150},
 | 
						|
				{1, 300, 350},
 | 
						|
				{2, 200, 250},
 | 
						|
			},
 | 
						|
			// ts                    0       100       150       200       250       300       350       400       450       500       550       600       650       700       750       800       850
 | 
						|
			// Query Interval        [----------------------------------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0:                        [-------]
 | 
						|
			// Chunk 1:                                                              [----------]
 | 
						|
			// Chunk 2:                                           [--------]
 | 
						|
			// Output Graphically              [-------]          [--------]         [----------]
 | 
						|
			expChunks: []expChunk{
 | 
						|
				{c: chunkInterval{0, 100, 150}},
 | 
						|
				{c: chunkInterval{2, 200, 250}},
 | 
						|
				{c: chunkInterval{1, 300, 350}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	s1Lset := labels.FromStrings("foo", "bar")
 | 
						|
	s1ID := uint64(1)
 | 
						|
 | 
						|
	for _, tc := range tests {
 | 
						|
		var permutations [][]chunkInterval
 | 
						|
		if len(tc.inputChunkIntervals) == 0 {
 | 
						|
			// handle special case
 | 
						|
			permutations = [][]chunkInterval{
 | 
						|
				nil,
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			permutations = permutateChunkIntervals(tc.inputChunkIntervals, nil, 0, len(tc.inputChunkIntervals)-1)
 | 
						|
		}
 | 
						|
		for perm, intervals := range permutations {
 | 
						|
			for _, headChunk := range []bool{false, true} {
 | 
						|
				t.Run(fmt.Sprintf("name=%s, permutation=%d, headChunk=%t", tc.name, perm, headChunk), func(t *testing.T) {
 | 
						|
					h, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
 | 
						|
					defer func() {
 | 
						|
						require.NoError(t, h.Close())
 | 
						|
					}()
 | 
						|
					require.NoError(t, h.Init(0))
 | 
						|
 | 
						|
					s1, _, _ := h.getOrCreate(s1ID, s1Lset)
 | 
						|
					s1.ooo = &memSeriesOOOFields{}
 | 
						|
 | 
						|
					// define our expected chunks, by looking at the expected ChunkIntervals and setting...
 | 
						|
					// Ref to whatever Ref the chunk has, that we refer to by ID
 | 
						|
					findID := func(id int) chunks.ChunkRef {
 | 
						|
						for ref, c := range intervals {
 | 
						|
							if c.ID == id {
 | 
						|
								return chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref)))
 | 
						|
							}
 | 
						|
						}
 | 
						|
						return 0
 | 
						|
					}
 | 
						|
					var expChunks []chunks.Meta
 | 
						|
					for _, e := range tc.expChunks {
 | 
						|
						var chunk chunkenc.Chunk
 | 
						|
						if len(e.m) > 0 {
 | 
						|
							mm := &multiMeta{}
 | 
						|
							for _, x := range e.m {
 | 
						|
								meta := chunks.Meta{
 | 
						|
									MinTime: x.mint,
 | 
						|
									MaxTime: x.maxt,
 | 
						|
									Ref:     findID(x.ID),
 | 
						|
								}
 | 
						|
								mm.metas = append(mm.metas, meta)
 | 
						|
							}
 | 
						|
							chunk = mm
 | 
						|
						}
 | 
						|
						meta := chunks.Meta{
 | 
						|
							Chunk:   chunk,
 | 
						|
							MinTime: e.c.mint,
 | 
						|
							MaxTime: e.c.maxt,
 | 
						|
							Ref:     findID(e.c.ID),
 | 
						|
						}
 | 
						|
						expChunks = append(expChunks, meta)
 | 
						|
					}
 | 
						|
 | 
						|
					if headChunk && len(intervals) > 0 {
 | 
						|
						// Put the last interval in the head chunk
 | 
						|
						s1.ooo.oooHeadChunk = &oooHeadChunk{
 | 
						|
							chunk:   NewOOOChunk(),
 | 
						|
							minTime: intervals[len(intervals)-1].mint,
 | 
						|
							maxTime: intervals[len(intervals)-1].maxt,
 | 
						|
						}
 | 
						|
						intervals = intervals[:len(intervals)-1]
 | 
						|
					}
 | 
						|
 | 
						|
					for _, ic := range intervals {
 | 
						|
						s1.ooo.oooMmappedChunks = append(s1.ooo.oooMmappedChunks, &mmappedChunk{
 | 
						|
							minTime: ic.mint,
 | 
						|
							maxTime: ic.maxt,
 | 
						|
						})
 | 
						|
					}
 | 
						|
 | 
						|
					ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
 | 
						|
 | 
						|
					var chks []chunks.Meta
 | 
						|
					var b labels.ScratchBuilder
 | 
						|
					err := ir.Series(storage.SeriesRef(s1ID), &b, &chks)
 | 
						|
					require.NoError(t, err)
 | 
						|
					require.Equal(t, s1Lset, b.Labels())
 | 
						|
					require.Equal(t, expChunks, chks)
 | 
						|
 | 
						|
					err = ir.Series(storage.SeriesRef(s1ID+1), &b, &chks)
 | 
						|
					require.Equal(t, storage.ErrNotFound, err)
 | 
						|
				})
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
 | 
						|
	for name, scenario := range sampleTypeScenarios {
 | 
						|
		t.Run(name, func(t *testing.T) {
 | 
						|
			testOOOHeadChunkReader_LabelValues(t, scenario)
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
//nolint:revive // unexported-return.
 | 
						|
func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) {
 | 
						|
	chunkRange := int64(2000)
 | 
						|
	head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
 | 
						|
	head.opts.EnableOOONativeHistograms.Store(true)
 | 
						|
	t.Cleanup(func() { require.NoError(t, head.Close()) })
 | 
						|
 | 
						|
	ctx := context.Background()
 | 
						|
 | 
						|
	app := head.Appender(context.Background())
 | 
						|
 | 
						|
	// Add in-order samples
 | 
						|
	_, _, err := scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 100, int64(1))
 | 
						|
	require.NoError(t, err)
 | 
						|
	_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 100, int64(2))
 | 
						|
	require.NoError(t, err)
 | 
						|
 | 
						|
	// Add ooo samples for those series
 | 
						|
	_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 90, int64(1))
 | 
						|
	require.NoError(t, err)
 | 
						|
	_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 90, int64(2))
 | 
						|
	require.NoError(t, err)
 | 
						|
 | 
						|
	require.NoError(t, app.Commit())
 | 
						|
 | 
						|
	cases := []struct {
 | 
						|
		name       string
 | 
						|
		queryMinT  int64
 | 
						|
		queryMaxT  int64
 | 
						|
		expValues1 []string
 | 
						|
		expValues2 []string
 | 
						|
		expValues3 []string
 | 
						|
		expValues4 []string
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:       "LabelValues calls when ooo head has max query range",
 | 
						|
			queryMinT:  math.MinInt64,
 | 
						|
			queryMaxT:  math.MaxInt64,
 | 
						|
			expValues1: []string{"bar1"},
 | 
						|
			expValues2: nil,
 | 
						|
			expValues3: []string{"bar1", "bar2"},
 | 
						|
			expValues4: []string{"bar1", "bar2"},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:       "LabelValues calls with ooo head query range not overlapping in-order data",
 | 
						|
			queryMinT:  90,
 | 
						|
			queryMaxT:  90,
 | 
						|
			expValues1: []string{"bar1"},
 | 
						|
			expValues2: nil,
 | 
						|
			expValues3: []string{"bar1", "bar2"},
 | 
						|
			expValues4: []string{"bar1", "bar2"},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:       "LabelValues calls with ooo head query range not overlapping out-of-order data",
 | 
						|
			queryMinT:  100,
 | 
						|
			queryMaxT:  100,
 | 
						|
			expValues1: []string{"bar1"},
 | 
						|
			expValues2: nil,
 | 
						|
			expValues3: []string{"bar1", "bar2"},
 | 
						|
			expValues4: []string{"bar1", "bar2"},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tc := range cases {
 | 
						|
		t.Run(tc.name, func(t *testing.T) {
 | 
						|
			// We first want to test using a head index reader that covers the biggest query interval
 | 
						|
			oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
 | 
						|
			matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
 | 
						|
			values, err := oh.LabelValues(ctx, "foo", matchers...)
 | 
						|
			sort.Strings(values)
 | 
						|
			require.NoError(t, err)
 | 
						|
			require.Equal(t, tc.expValues1, values)
 | 
						|
 | 
						|
			matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")}
 | 
						|
			values, err = oh.LabelValues(ctx, "foo", matchers...)
 | 
						|
			sort.Strings(values)
 | 
						|
			require.NoError(t, err)
 | 
						|
			require.Equal(t, tc.expValues2, values)
 | 
						|
 | 
						|
			matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")}
 | 
						|
			values, err = oh.LabelValues(ctx, "foo", matchers...)
 | 
						|
			sort.Strings(values)
 | 
						|
			require.NoError(t, err)
 | 
						|
			require.Equal(t, tc.expValues3, values)
 | 
						|
 | 
						|
			values, err = oh.LabelValues(ctx, "foo")
 | 
						|
			sort.Strings(values)
 | 
						|
			require.NoError(t, err)
 | 
						|
			require.Equal(t, tc.expValues4, values)
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected.
 | 
						|
// It does so by appending out of order samples to the db and then initializing
 | 
						|
// an OOOHeadChunkReader to read chunks from it.
 | 
						|
func TestOOOHeadChunkReader_Chunk(t *testing.T) {
 | 
						|
	for name, scenario := range sampleTypeScenarios {
 | 
						|
		t.Run(name, func(t *testing.T) {
 | 
						|
			testOOOHeadChunkReader_Chunk(t, scenario)
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
//nolint:revive // unexported-return.
 | 
						|
func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
 | 
						|
	opts := DefaultOptions()
 | 
						|
	opts.OutOfOrderCapMax = 5
 | 
						|
	opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
 | 
						|
	opts.EnableNativeHistograms = true
 | 
						|
	opts.EnableOOONativeHistograms = true
 | 
						|
 | 
						|
	s1 := labels.FromStrings("l", "v1")
 | 
						|
	minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
 | 
						|
 | 
						|
	t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
 | 
						|
		db := newTestDBWithOpts(t, opts)
 | 
						|
 | 
						|
		cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
 | 
						|
		defer cr.Close()
 | 
						|
		c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
 | 
						|
			Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
 | 
						|
		})
 | 
						|
		require.Nil(t, iterable)
 | 
						|
		require.Equal(t, err, fmt.Errorf("not found"))
 | 
						|
		require.Nil(t, c)
 | 
						|
	})
 | 
						|
 | 
						|
	tests := []struct {
 | 
						|
		name                 string
 | 
						|
		queryMinT            int64
 | 
						|
		queryMaxT            int64
 | 
						|
		firstInOrderSampleAt int64
 | 
						|
		inputSamples         []testValue
 | 
						|
		expSingleChunks      bool
 | 
						|
		expChunkError        bool
 | 
						|
		expChunksSamples     []chunks.SampleSlice
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:                 "Getting the head when there are no overlapping chunks returns just the samples in the head",
 | 
						|
			queryMinT:            minutes(0),
 | 
						|
			queryMaxT:            minutes(100),
 | 
						|
			firstInOrderSampleAt: minutes(120),
 | 
						|
			inputSamples: []testValue{
 | 
						|
				{Ts: minutes(30), V: 0},
 | 
						|
				{Ts: minutes(40), V: 0},
 | 
						|
			},
 | 
						|
			expChunkError:   false,
 | 
						|
			expSingleChunks: true,
 | 
						|
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
 | 
						|
			// Query Interval          [------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0: Current Head                              [--------] (With 2 samples)
 | 
						|
			// Output Graphically                                 [--------] (With 2 samples)
 | 
						|
			expChunksSamples: []chunks.SampleSlice{
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(30), 0),
 | 
						|
					scenario.sampleFunc(minutes(40), 0),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                 "Getting the head chunk when there are overlapping chunks returns all combined",
 | 
						|
			queryMinT:            minutes(0),
 | 
						|
			queryMaxT:            minutes(100),
 | 
						|
			firstInOrderSampleAt: minutes(120),
 | 
						|
			inputSamples:         []testValue{{Ts: minutes(41), V: 0}, {Ts: minutes(42), V: 0}, {Ts: minutes(43), V: 0}, {Ts: minutes(44), V: 0}, {Ts: minutes(45), V: 0}, {Ts: minutes(30), V: 1}, {Ts: minutes(50), V: 1}},
 | 
						|
			expChunkError:        false,
 | 
						|
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
 | 
						|
			// Query Interval          [------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0                                                     [---] (With 5 samples)
 | 
						|
			// Chunk 1: Current Head                              [-----------------] (With 2 samples)
 | 
						|
			// Output Graphically                                 [-----------------] (With 7 samples)
 | 
						|
			expChunksSamples: []chunks.SampleSlice{
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(30), 1),
 | 
						|
					scenario.sampleFunc(minutes(41), 0),
 | 
						|
					scenario.sampleFunc(minutes(42), 0),
 | 
						|
					scenario.sampleFunc(minutes(43), 0),
 | 
						|
					scenario.sampleFunc(minutes(44), 0),
 | 
						|
					scenario.sampleFunc(minutes(45), 0),
 | 
						|
					scenario.sampleFunc(minutes(50), 1),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                 "Two windows of overlapping chunks get properly converged",
 | 
						|
			queryMinT:            minutes(0),
 | 
						|
			queryMaxT:            minutes(100),
 | 
						|
			firstInOrderSampleAt: minutes(120),
 | 
						|
			inputSamples: []testValue{
 | 
						|
				// Chunk 0
 | 
						|
				{Ts: minutes(10), V: 0},
 | 
						|
				{Ts: minutes(12), V: 0},
 | 
						|
				{Ts: minutes(14), V: 0},
 | 
						|
				{Ts: minutes(16), V: 0},
 | 
						|
				{Ts: minutes(20), V: 0},
 | 
						|
				// Chunk 1
 | 
						|
				{Ts: minutes(20), V: 1},
 | 
						|
				{Ts: minutes(22), V: 1},
 | 
						|
				{Ts: minutes(24), V: 1},
 | 
						|
				{Ts: minutes(26), V: 1},
 | 
						|
				{Ts: minutes(29), V: 1},
 | 
						|
				// Chunk 3
 | 
						|
				{Ts: minutes(30), V: 2},
 | 
						|
				{Ts: minutes(32), V: 2},
 | 
						|
				{Ts: minutes(34), V: 2},
 | 
						|
				{Ts: minutes(36), V: 2},
 | 
						|
				{Ts: minutes(40), V: 2},
 | 
						|
				// Head
 | 
						|
				{Ts: minutes(40), V: 3},
 | 
						|
				{Ts: minutes(50), V: 3},
 | 
						|
			},
 | 
						|
			expChunkError: false,
 | 
						|
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
 | 
						|
			// Query Interval          [------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0                          [--------]
 | 
						|
			// Chunk 1                                   [-------]
 | 
						|
			// Chunk 2                                            [--------]
 | 
						|
			// Chunk 3: Current Head                                       [--------]
 | 
						|
			// Output Graphically               [----------------][-----------------]
 | 
						|
			expChunksSamples: []chunks.SampleSlice{
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(10), 0),
 | 
						|
					scenario.sampleFunc(minutes(12), 0),
 | 
						|
					scenario.sampleFunc(minutes(14), 0),
 | 
						|
					scenario.sampleFunc(minutes(16), 0),
 | 
						|
					scenario.sampleFunc(minutes(20), 1),
 | 
						|
					scenario.sampleFunc(minutes(22), 1),
 | 
						|
					scenario.sampleFunc(minutes(24), 1),
 | 
						|
					scenario.sampleFunc(minutes(26), 1),
 | 
						|
					scenario.sampleFunc(minutes(29), 1),
 | 
						|
				},
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(30), 2),
 | 
						|
					scenario.sampleFunc(minutes(32), 2),
 | 
						|
					scenario.sampleFunc(minutes(34), 2),
 | 
						|
					scenario.sampleFunc(minutes(36), 2),
 | 
						|
					scenario.sampleFunc(minutes(40), 3),
 | 
						|
					scenario.sampleFunc(minutes(50), 3),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                 "Two windows of overlapping chunks in descending order get properly converged",
 | 
						|
			queryMinT:            minutes(0),
 | 
						|
			queryMaxT:            minutes(100),
 | 
						|
			firstInOrderSampleAt: minutes(120),
 | 
						|
			inputSamples: []testValue{
 | 
						|
				// Chunk 0
 | 
						|
				{Ts: minutes(40), V: 0},
 | 
						|
				{Ts: minutes(42), V: 0},
 | 
						|
				{Ts: minutes(44), V: 0},
 | 
						|
				{Ts: minutes(46), V: 0},
 | 
						|
				{Ts: minutes(50), V: 0},
 | 
						|
				// Chunk 1
 | 
						|
				{Ts: minutes(30), V: 1},
 | 
						|
				{Ts: minutes(32), V: 1},
 | 
						|
				{Ts: minutes(34), V: 1},
 | 
						|
				{Ts: minutes(36), V: 1},
 | 
						|
				{Ts: minutes(40), V: 1},
 | 
						|
				// Chunk 3
 | 
						|
				{Ts: minutes(20), V: 2},
 | 
						|
				{Ts: minutes(22), V: 2},
 | 
						|
				{Ts: minutes(24), V: 2},
 | 
						|
				{Ts: minutes(26), V: 2},
 | 
						|
				{Ts: minutes(29), V: 2},
 | 
						|
				// Head
 | 
						|
				{Ts: minutes(10), V: 3},
 | 
						|
				{Ts: minutes(20), V: 3},
 | 
						|
			},
 | 
						|
			expChunkError: false,
 | 
						|
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
 | 
						|
			// Query Interval          [------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0                                                     [--------]
 | 
						|
			// Chunk 1                                            [--------]
 | 
						|
			// Chunk 2                                   [-------]
 | 
						|
			// Chunk 3: Current Head            [--------]
 | 
						|
			// Output Graphically               [----------------][-----------------]
 | 
						|
			expChunksSamples: []chunks.SampleSlice{
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(10), 3),
 | 
						|
					scenario.sampleFunc(minutes(20), 2),
 | 
						|
					scenario.sampleFunc(minutes(22), 2),
 | 
						|
					scenario.sampleFunc(minutes(24), 2),
 | 
						|
					scenario.sampleFunc(minutes(26), 2),
 | 
						|
					scenario.sampleFunc(minutes(29), 2),
 | 
						|
				},
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(30), 1),
 | 
						|
					scenario.sampleFunc(minutes(32), 1),
 | 
						|
					scenario.sampleFunc(minutes(34), 1),
 | 
						|
					scenario.sampleFunc(minutes(36), 1),
 | 
						|
					scenario.sampleFunc(minutes(40), 0),
 | 
						|
					scenario.sampleFunc(minutes(42), 0),
 | 
						|
					scenario.sampleFunc(minutes(44), 0),
 | 
						|
					scenario.sampleFunc(minutes(46), 0),
 | 
						|
					scenario.sampleFunc(minutes(50), 0),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                 "If chunks are not overlapped they are not converged",
 | 
						|
			queryMinT:            minutes(0),
 | 
						|
			queryMaxT:            minutes(100),
 | 
						|
			firstInOrderSampleAt: minutes(120),
 | 
						|
			inputSamples: []testValue{
 | 
						|
				// Chunk 0
 | 
						|
				{Ts: minutes(10), V: 0},
 | 
						|
				{Ts: minutes(12), V: 0},
 | 
						|
				{Ts: minutes(14), V: 0},
 | 
						|
				{Ts: minutes(16), V: 0},
 | 
						|
				{Ts: minutes(18), V: 0},
 | 
						|
				// Chunk 1
 | 
						|
				{Ts: minutes(20), V: 1},
 | 
						|
				{Ts: minutes(22), V: 1},
 | 
						|
				{Ts: minutes(24), V: 1},
 | 
						|
				{Ts: minutes(26), V: 1},
 | 
						|
				{Ts: minutes(28), V: 1},
 | 
						|
				// Chunk 3
 | 
						|
				{Ts: minutes(30), V: 2},
 | 
						|
				{Ts: minutes(32), V: 2},
 | 
						|
				{Ts: minutes(34), V: 2},
 | 
						|
				{Ts: minutes(36), V: 2},
 | 
						|
				{Ts: minutes(38), V: 2},
 | 
						|
				// Head
 | 
						|
				{Ts: minutes(40), V: 3},
 | 
						|
				{Ts: minutes(42), V: 3},
 | 
						|
			},
 | 
						|
			expChunkError:   false,
 | 
						|
			expSingleChunks: true,
 | 
						|
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
 | 
						|
			// Query Interval          [------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0                          [-------]
 | 
						|
			// Chunk 1                                   [-------]
 | 
						|
			// Chunk 2                                            [-------]
 | 
						|
			// Chunk 3: Current Head                                       [-------]
 | 
						|
			// Output Graphically               [-------][-------][-------][--------]
 | 
						|
			expChunksSamples: []chunks.SampleSlice{
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(10), 0),
 | 
						|
					scenario.sampleFunc(minutes(12), 0),
 | 
						|
					scenario.sampleFunc(minutes(14), 0),
 | 
						|
					scenario.sampleFunc(minutes(16), 0),
 | 
						|
					scenario.sampleFunc(minutes(18), 0),
 | 
						|
				},
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(20), 1),
 | 
						|
					scenario.sampleFunc(minutes(22), 1),
 | 
						|
					scenario.sampleFunc(minutes(24), 1),
 | 
						|
					scenario.sampleFunc(minutes(26), 1),
 | 
						|
					scenario.sampleFunc(minutes(28), 1),
 | 
						|
				},
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(30), 2),
 | 
						|
					scenario.sampleFunc(minutes(32), 2),
 | 
						|
					scenario.sampleFunc(minutes(34), 2),
 | 
						|
					scenario.sampleFunc(minutes(36), 2),
 | 
						|
					scenario.sampleFunc(minutes(38), 2),
 | 
						|
				},
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(40), 3),
 | 
						|
					scenario.sampleFunc(minutes(42), 3),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                 "Triplet of chunks overlapping returns a single merged chunk",
 | 
						|
			queryMinT:            minutes(0),
 | 
						|
			queryMaxT:            minutes(100),
 | 
						|
			firstInOrderSampleAt: minutes(120),
 | 
						|
			inputSamples: []testValue{
 | 
						|
				// Chunk 0
 | 
						|
				{Ts: minutes(10), V: 0},
 | 
						|
				{Ts: minutes(15), V: 0},
 | 
						|
				{Ts: minutes(20), V: 0},
 | 
						|
				{Ts: minutes(25), V: 0},
 | 
						|
				{Ts: minutes(30), V: 0},
 | 
						|
				// Chunk 1
 | 
						|
				{Ts: minutes(20), V: 1},
 | 
						|
				{Ts: minutes(25), V: 1},
 | 
						|
				{Ts: minutes(30), V: 1},
 | 
						|
				{Ts: minutes(35), V: 1},
 | 
						|
				{Ts: minutes(42), V: 1},
 | 
						|
				// Chunk 2 Head
 | 
						|
				{Ts: minutes(32), V: 2},
 | 
						|
				{Ts: minutes(50), V: 2},
 | 
						|
			},
 | 
						|
			expChunkError: false,
 | 
						|
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
 | 
						|
			// Query Interval          [------------------------------------------------------------------------------------------]
 | 
						|
			// Chunk 0                          [-----------------]
 | 
						|
			// Chunk 1                                   [--------------------]
 | 
						|
			// Chunk 2 Current Head                                  [--------------]
 | 
						|
			// Output Graphically               [-----------------------------------]
 | 
						|
			expChunksSamples: []chunks.SampleSlice{
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(10), 0),
 | 
						|
					scenario.sampleFunc(minutes(15), 0),
 | 
						|
					scenario.sampleFunc(minutes(20), 1),
 | 
						|
					scenario.sampleFunc(minutes(25), 1),
 | 
						|
					scenario.sampleFunc(minutes(30), 1),
 | 
						|
					scenario.sampleFunc(minutes(32), 2),
 | 
						|
					scenario.sampleFunc(minutes(35), 1),
 | 
						|
					scenario.sampleFunc(minutes(42), 1),
 | 
						|
					scenario.sampleFunc(minutes(50), 2),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                 "Query interval partially overlaps with a triplet of chunks but still returns a single merged chunk",
 | 
						|
			queryMinT:            minutes(12),
 | 
						|
			queryMaxT:            minutes(33),
 | 
						|
			firstInOrderSampleAt: minutes(120),
 | 
						|
			inputSamples: []testValue{
 | 
						|
				// Chunk 0
 | 
						|
				{Ts: minutes(10), V: 0},
 | 
						|
				{Ts: minutes(15), V: 0},
 | 
						|
				{Ts: minutes(20), V: 0},
 | 
						|
				{Ts: minutes(25), V: 0},
 | 
						|
				{Ts: minutes(30), V: 0},
 | 
						|
				// Chunk 1
 | 
						|
				{Ts: minutes(20), V: 1},
 | 
						|
				{Ts: minutes(25), V: 1},
 | 
						|
				{Ts: minutes(30), V: 1},
 | 
						|
				{Ts: minutes(35), V: 1},
 | 
						|
				{Ts: minutes(42), V: 1},
 | 
						|
				// Chunk 2 Head
 | 
						|
				{Ts: minutes(32), V: 2},
 | 
						|
				{Ts: minutes(50), V: 2},
 | 
						|
			},
 | 
						|
			expChunkError: false,
 | 
						|
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
 | 
						|
			// Query Interval                     [------------------]
 | 
						|
			// Chunk 0                          [-----------------]
 | 
						|
			// Chunk 1                                   [--------------------]
 | 
						|
			// Chunk 2 Current Head                                  [--------------]
 | 
						|
			// Output Graphically               [-----------------------------------]
 | 
						|
			expChunksSamples: []chunks.SampleSlice{
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(10), 0),
 | 
						|
					scenario.sampleFunc(minutes(15), 0),
 | 
						|
					scenario.sampleFunc(minutes(20), 1),
 | 
						|
					scenario.sampleFunc(minutes(25), 1),
 | 
						|
					scenario.sampleFunc(minutes(30), 1),
 | 
						|
					scenario.sampleFunc(minutes(32), 2),
 | 
						|
					scenario.sampleFunc(minutes(35), 1),
 | 
						|
					scenario.sampleFunc(minutes(42), 1),
 | 
						|
					scenario.sampleFunc(minutes(50), 2),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tc := range tests {
 | 
						|
		t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
 | 
						|
			db := newTestDBWithOpts(t, opts)
 | 
						|
 | 
						|
			app := db.Appender(context.Background())
 | 
						|
			s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
 | 
						|
			require.NoError(t, err)
 | 
						|
			require.NoError(t, app.Commit())
 | 
						|
 | 
						|
			// OOO few samples for s1.
 | 
						|
			app = db.Appender(context.Background())
 | 
						|
			for _, s := range tc.inputSamples {
 | 
						|
				_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
 | 
						|
				require.NoError(t, err)
 | 
						|
			}
 | 
						|
			require.NoError(t, app.Commit())
 | 
						|
 | 
						|
			// The Series method populates the chunk metas, taking a copy of the
 | 
						|
			// head OOO chunk if necessary. These are then used by the ChunkReader.
 | 
						|
			ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
 | 
						|
			var chks []chunks.Meta
 | 
						|
			var b labels.ScratchBuilder
 | 
						|
			err = ir.Series(s1Ref, &b, &chks)
 | 
						|
			require.NoError(t, err)
 | 
						|
			require.Equal(t, len(tc.expChunksSamples), len(chks))
 | 
						|
 | 
						|
			cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
 | 
						|
			defer cr.Close()
 | 
						|
			for i := 0; i < len(chks); i++ {
 | 
						|
				c, iterable, err := cr.ChunkOrIterable(chks[i])
 | 
						|
				require.NoError(t, err)
 | 
						|
				var it chunkenc.Iterator
 | 
						|
				if tc.expSingleChunks {
 | 
						|
					it = c.Iterator(nil)
 | 
						|
				} else {
 | 
						|
					require.Nil(t, c)
 | 
						|
					it = iterable.Iterator(nil)
 | 
						|
				}
 | 
						|
				resultSamples, err := storage.ExpandSamples(it, nil)
 | 
						|
				require.NoError(t, err)
 | 
						|
				requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding tests
 | 
						|
// that if a query comes and performs a Series() call followed by a Chunks() call
 | 
						|
// the response is consistent with the data seen by Series() even if the OOO
 | 
						|
// head receives more samples before Chunks() is called.
 | 
						|
// An example:
 | 
						|
//   - Response A comes from: Series() then Chunk()
 | 
						|
//   - Response B comes from : Series(), in parallel new samples added to the head, then Chunk()
 | 
						|
//   - A == B
 | 
						|
func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T) {
 | 
						|
	for name, scenario := range sampleTypeScenarios {
 | 
						|
		t.Run(name, func(t *testing.T) {
 | 
						|
			testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t, scenario)
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
//nolint:revive // unexported-return.
 | 
						|
func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T, scenario sampleTypeScenario) {
 | 
						|
	opts := DefaultOptions()
 | 
						|
	opts.OutOfOrderCapMax = 5
 | 
						|
	opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
 | 
						|
	opts.EnableNativeHistograms = true
 | 
						|
	opts.EnableOOONativeHistograms = true
 | 
						|
 | 
						|
	s1 := labels.FromStrings("l", "v1")
 | 
						|
	minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
 | 
						|
 | 
						|
	tests := []struct {
 | 
						|
		name                   string
 | 
						|
		queryMinT              int64
 | 
						|
		queryMaxT              int64
 | 
						|
		firstInOrderSampleAt   int64
 | 
						|
		initialSamples         []testValue
 | 
						|
		samplesAfterSeriesCall []testValue
 | 
						|
		expChunkError          bool
 | 
						|
		expChunksSamples       []chunks.SampleSlice
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:                 "Current head gets old, new and in between sample after Series call, they all should be omitted from the result",
 | 
						|
			queryMinT:            minutes(0),
 | 
						|
			queryMaxT:            minutes(100),
 | 
						|
			firstInOrderSampleAt: minutes(120),
 | 
						|
			initialSamples: []testValue{
 | 
						|
				// Chunk 0
 | 
						|
				{Ts: minutes(20), V: 0},
 | 
						|
				{Ts: minutes(22), V: 0},
 | 
						|
				{Ts: minutes(24), V: 0},
 | 
						|
				{Ts: minutes(26), V: 0},
 | 
						|
				{Ts: minutes(30), V: 0},
 | 
						|
				// Chunk 1 Head
 | 
						|
				{Ts: minutes(25), V: 1},
 | 
						|
				{Ts: minutes(35), V: 1},
 | 
						|
			},
 | 
						|
			samplesAfterSeriesCall: []testValue{
 | 
						|
				{Ts: minutes(10), V: 1},
 | 
						|
				{Ts: minutes(32), V: 1},
 | 
						|
				{Ts: minutes(50), V: 1},
 | 
						|
			},
 | 
						|
			expChunkError: false,
 | 
						|
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
 | 
						|
			// Query Interval          [-----------------------------------]
 | 
						|
			// Chunk 0:                                  [--------] (5 samples)
 | 
						|
			// Chunk 1: Current Head                          [-------] (2 samples)
 | 
						|
			// New samples added after Series()
 | 
						|
			// Chunk 1: Current Head            [-----------------------------------] (5 samples)
 | 
						|
			// Output Graphically                        [------------] (With 8 samples, samples newer than lastmint or older than lastmaxt are omitted but the ones in between are kept)
 | 
						|
			expChunksSamples: []chunks.SampleSlice{
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(20), 0),
 | 
						|
					scenario.sampleFunc(minutes(22), 0),
 | 
						|
					scenario.sampleFunc(minutes(24), 0),
 | 
						|
					scenario.sampleFunc(minutes(25), 1),
 | 
						|
					scenario.sampleFunc(minutes(26), 0),
 | 
						|
					scenario.sampleFunc(minutes(30), 0),
 | 
						|
					scenario.sampleFunc(minutes(35), 1),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                 "After Series() prev head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in response.",
 | 
						|
			queryMinT:            minutes(0),
 | 
						|
			queryMaxT:            minutes(100),
 | 
						|
			firstInOrderSampleAt: minutes(120),
 | 
						|
			initialSamples: []testValue{
 | 
						|
				// Chunk 0
 | 
						|
				{Ts: minutes(20), V: 0},
 | 
						|
				{Ts: minutes(22), V: 0},
 | 
						|
				{Ts: minutes(24), V: 0},
 | 
						|
				{Ts: minutes(26), V: 0},
 | 
						|
				{Ts: minutes(30), V: 0},
 | 
						|
				// Chunk 1 Head
 | 
						|
				{Ts: minutes(25), V: 1},
 | 
						|
				{Ts: minutes(35), V: 1},
 | 
						|
			},
 | 
						|
			samplesAfterSeriesCall: []testValue{
 | 
						|
				{Ts: minutes(10), V: 1},
 | 
						|
				{Ts: minutes(32), V: 1},
 | 
						|
				{Ts: minutes(50), V: 1},
 | 
						|
				// Chunk 1 gets mmapped and Chunk 2, the new head is born
 | 
						|
				{Ts: minutes(25), V: 2},
 | 
						|
				{Ts: minutes(31), V: 2},
 | 
						|
			},
 | 
						|
			expChunkError: false,
 | 
						|
			// ts (in minutes)         0       10       20       30       40       50       60       70       80       90       100
 | 
						|
			// Query Interval          [-----------------------------------]
 | 
						|
			// Chunk 0:                                  [--------] (5 samples)
 | 
						|
			// Chunk 1: Current Head                          [-------] (2 samples)
 | 
						|
			// New samples added after Series()
 | 
						|
			// Chunk 1 (mmapped)                     [-------------------------] (5 samples)
 | 
						|
			// Chunk 2: Current Head                    [-----------] (2 samples)
 | 
						|
			// Output Graphically                        [------------]  (8 samples) It has 5 from Chunk 0 and 3 from Chunk 1
 | 
						|
			expChunksSamples: []chunks.SampleSlice{
 | 
						|
				{
 | 
						|
					scenario.sampleFunc(minutes(20), 0),
 | 
						|
					scenario.sampleFunc(minutes(22), 0),
 | 
						|
					scenario.sampleFunc(minutes(24), 0),
 | 
						|
					scenario.sampleFunc(minutes(25), 1),
 | 
						|
					scenario.sampleFunc(minutes(26), 0),
 | 
						|
					scenario.sampleFunc(minutes(30), 0),
 | 
						|
					scenario.sampleFunc(minutes(35), 1),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tc := range tests {
 | 
						|
		t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
 | 
						|
			db := newTestDBWithOpts(t, opts)
 | 
						|
 | 
						|
			app := db.Appender(context.Background())
 | 
						|
			s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
 | 
						|
			require.NoError(t, err)
 | 
						|
			require.NoError(t, app.Commit())
 | 
						|
 | 
						|
			// OOO few samples for s1.
 | 
						|
			app = db.Appender(context.Background())
 | 
						|
			for _, s := range tc.initialSamples {
 | 
						|
				_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
 | 
						|
				require.NoError(t, err)
 | 
						|
			}
 | 
						|
			require.NoError(t, app.Commit())
 | 
						|
 | 
						|
			// The Series method populates the chunk metas, taking a copy of the
 | 
						|
			// head OOO chunk if necessary. These are then used by the ChunkReader.
 | 
						|
			ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
 | 
						|
			var chks []chunks.Meta
 | 
						|
			var b labels.ScratchBuilder
 | 
						|
			err = ir.Series(s1Ref, &b, &chks)
 | 
						|
			require.NoError(t, err)
 | 
						|
			require.Equal(t, len(tc.expChunksSamples), len(chks))
 | 
						|
 | 
						|
			// Now we keep receiving ooo samples
 | 
						|
			// OOO few samples for s1.
 | 
						|
			app = db.Appender(context.Background())
 | 
						|
			for _, s := range tc.samplesAfterSeriesCall {
 | 
						|
				_, _, err = scenario.appendFunc(app, s1, s.Ts, s.V)
 | 
						|
				require.NoError(t, err)
 | 
						|
			}
 | 
						|
			require.NoError(t, app.Commit())
 | 
						|
 | 
						|
			cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
 | 
						|
			defer cr.Close()
 | 
						|
			for i := 0; i < len(chks); i++ {
 | 
						|
				c, iterable, err := cr.ChunkOrIterable(chks[i])
 | 
						|
				require.NoError(t, err)
 | 
						|
				require.Nil(t, c)
 | 
						|
 | 
						|
				it := iterable.Iterator(nil)
 | 
						|
				resultSamples, err := storage.ExpandSamples(it, nil)
 | 
						|
				require.NoError(t, err)
 | 
						|
				requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// TestSortMetaByMinTimeAndMinRef tests that the sort function for chunk metas does sort
 | 
						|
// by chunk meta MinTime and in case of same references by the lower reference.
 | 
						|
func TestSortMetaByMinTimeAndMinRef(t *testing.T) {
 | 
						|
	tests := []struct {
 | 
						|
		name       string
 | 
						|
		inputMetas []chunks.Meta
 | 
						|
		expMetas   []chunks.Meta
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name: "chunks are ordered by min time",
 | 
						|
			inputMetas: []chunks.Meta{
 | 
						|
				{
 | 
						|
					Ref:     0,
 | 
						|
					MinTime: 0,
 | 
						|
				},
 | 
						|
				{
 | 
						|
					Ref:     1,
 | 
						|
					MinTime: 1,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expMetas: []chunks.Meta{
 | 
						|
				{
 | 
						|
					Ref:     0,
 | 
						|
					MinTime: 0,
 | 
						|
				},
 | 
						|
				{
 | 
						|
					Ref:     1,
 | 
						|
					MinTime: 1,
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "if same mintime, lower reference goes first",
 | 
						|
			inputMetas: []chunks.Meta{
 | 
						|
				{
 | 
						|
					Ref:     10,
 | 
						|
					MinTime: 0,
 | 
						|
				},
 | 
						|
				{
 | 
						|
					Ref:     5,
 | 
						|
					MinTime: 0,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expMetas: []chunks.Meta{
 | 
						|
				{
 | 
						|
					Ref:     5,
 | 
						|
					MinTime: 0,
 | 
						|
				},
 | 
						|
				{
 | 
						|
					Ref:     10,
 | 
						|
					MinTime: 0,
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tc := range tests {
 | 
						|
		t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
 | 
						|
			slices.SortFunc(tc.inputMetas, lessByMinTimeAndMinRef)
 | 
						|
			require.Equal(t, tc.expMetas, tc.inputMetas)
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func newTestDBWithOpts(t *testing.T, opts *Options) *DB {
 | 
						|
	dir := t.TempDir()
 | 
						|
 | 
						|
	db, err := Open(dir, nil, nil, opts, nil)
 | 
						|
	require.NoError(t, err)
 | 
						|
 | 
						|
	t.Cleanup(func() {
 | 
						|
		require.NoError(t, db.Close())
 | 
						|
	})
 | 
						|
 | 
						|
	return db
 | 
						|
}
 |