diff --git a/tsdb/db_test.go b/tsdb/db_test.go index ecfe1ab804..274739bae1 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -84,7 +84,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) { tmpdir := t.TempDir() var err error - if opts = nil { + if opts == nil { opts = DefaultOptions() } opts.EnableNativeHistograms = true @@ -101,8 +101,8 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) { return db } -// query runs a matcher query against the querier and fully expands its data. -func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { +// queryHelper runs a matcher query against the querier and fully expands its data. +func queryHelper(t testing.TB, q storage.Querier, withNaNReplacement bool, matchers ...*labels.Matcher) map[string][]chunks.Sample { ss := q.Select(context.Background(), false, nil, matchers...) defer func() { require.NoError(t, q.Close()) @@ -114,7 +114,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str series := ss.At() it = series.Iterator(it) - samples, err := storage.ExpandSamples(it, newSample) + var samples []chunks.Sample + var err error + if withNaNReplacement { + samples, err = storage.ExpandSamples(it, newSample) + } else { + samples, err = storage.ExpandSamplesWithoutReplacingNaNs(it, newSample) + } require.NoError(t, err) require.NoError(t, it.Err()) @@ -131,6 +137,16 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str return result } +// query runs a matcher query against the querier and fully expands its data. +func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { + return queryHelper(t, q, true, matchers...) +} + +// queryWithoutReplacingNaNs runs a matcher query against the querier and fully expands its data. +func queryWithoutReplacingNaNs(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { + return queryHelper(t, q, false, matchers...) +} + // queryAndExpandChunks runs a matcher query against the querier and fully expands its data into samples. func queryAndExpandChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Matcher) map[string][][]chunks.Sample { s := queryChunks(t, q, matchers...) @@ -9547,3 +9563,210 @@ func TestBlockClosingBlockedDuringRemoteRead(t *testing.T) { case <-blockClosed: } } + +func TestStaleSeriesCompaction(t *testing.T) { + opts := DefaultOptions() + opts.MinBlockDuration = 1000 + opts.MaxBlockDuration = 1000 + db := openTestDB(t, opts, nil) + db.DisableCompactions() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + var ( + nonStaleSeries, staleSeries, + nonStaleHist, staleHist, + nonStaleFHist, staleFHist, + staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary []labels.Labels + numSeriesPerCategory = 1 + ) + for i := 0; i < numSeriesPerCategory; i++ { + nonStaleSeries = append(nonStaleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 1000+i))) + nonStaleHist = append(nonStaleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 2000+i))) + nonStaleFHist = append(nonStaleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 3000+i))) + + staleSeries = append(staleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 4000+i))) + staleHist = append(staleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 5000+i))) + staleFHist = append(staleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 6000+i))) + + staleSeriesCrossingBoundary = append(staleSeriesCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 7000+i))) + staleHistCrossingBoundary = append(staleHistCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 8000+i))) + staleFHistCrossingBoundary = append(staleFHistCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 9000+i))) + } + + var ( + v = 10.0 + staleV = math.Float64frombits(value.StaleNaN) + h = tsdbutil.GenerateTestHistograms(1)[0] + fh = tsdbutil.GenerateTestFloatHistograms(1)[0] + staleH = &histogram.Histogram{Sum: staleV} + staleFH = &histogram.FloatHistogram{Sum: staleV} + ) + + addNormalSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) { + app := db.Appender(context.Background()) + for i := 0; i < len(floatSeries); i++ { + _, err := app.Append(0, floatSeries[i], ts, v) + require.NoError(t, err) + _, err = app.AppendHistogram(0, histSeries[i], ts, h, nil) + require.NoError(t, err) + _, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, fh) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + addStaleSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) { + app := db.Appender(context.Background()) + for i := 0; i < len(floatSeries); i++ { + _, err := app.Append(0, floatSeries[i], ts, staleV) + require.NoError(t, err) + _, err = app.AppendHistogram(0, histSeries[i], ts, staleH, nil) + require.NoError(t, err) + _, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, staleFH) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + // Normal sample for all. + addNormalSamples(100, nonStaleSeries, nonStaleHist, nonStaleFHist) + addNormalSamples(100, staleSeries, staleHist, staleFHist) + + // Stale sample for the stale series. Normal sample for the non-stale series. + addNormalSamples(200, nonStaleSeries, nonStaleHist, nonStaleFHist) + addStaleSamples(200, staleSeries, staleHist, staleFHist) + + // Normal samples for the non-stale series later + addNormalSamples(300, nonStaleSeries, nonStaleHist, nonStaleFHist) + + require.Equal(t, uint64(6*numSeriesPerCategory), db.Head().NumSeries()) + require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumStaleSeries()) + + // Series crossing block boundary and gets stale. + addNormalSamples(300, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary) + addNormalSamples(700, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary) + addNormalSamples(1100, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary) + addStaleSamples(1200, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary) + + require.NoError(t, db.CompactStaleHead()) + + require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumSeries()) + require.Equal(t, uint64(0), db.Head().NumStaleSeries()) + + require.Len(t, db.Blocks(), 2) + m := db.Blocks()[0].Meta() + require.Equal(t, int64(0), m.MinTime) + require.Equal(t, int64(1000), m.MaxTime) + require.Truef(t, m.Compaction.FromStaleSeries(), "stale series info not found in block meta") + m = db.Blocks()[1].Meta() + require.Equal(t, int64(1000), m.MinTime) + require.Equal(t, int64(2000), m.MaxTime) + require.Truef(t, m.Compaction.FromStaleSeries(), "stale series info not found in block meta") + + // To make sure that Head is not truncated based on stale series block. + require.NoError(t, db.reload()) + + nonFirstH := h.Copy() + nonFirstH.CounterResetHint = histogram.NotCounterReset + nonFirstFH := fh.Copy() + nonFirstFH.CounterResetHint = histogram.NotCounterReset + + // Verify head block. + { + + expHeadQuery := make(map[string][]chunks.Sample) + for i := 0; i < numSeriesPerCategory; i++ { + expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleSeries[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, f: v}, sample{t: 200, f: v}, sample{t: 300, f: v}, + } + expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, h: h}, sample{t: 200, h: nonFirstH}, sample{t: 300, h: nonFirstH}, + } + expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleFHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, fh: fh}, sample{t: 200, fh: nonFirstFH}, sample{t: 300, fh: nonFirstFH}, + } + } + + querier, err := NewBlockQuerier(NewRangeHead(db.head, 0, 300), 0, 300) + require.NoError(t, err) + t.Cleanup(func() { + querier.Close() + }) + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*")) + require.Equal(t, expHeadQuery, seriesSet) + } + + // Verify blocks from stale series. + { + expBlockQuery := make(map[string][]chunks.Sample) + for i := 0; i < numSeriesPerCategory; i++ { + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeries[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, f: v}, sample{t: 200, f: staleV}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, h: h}, sample{t: 200, h: staleH}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, fh: fh}, sample{t: 200, fh: staleFH}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeriesCrossingBoundary[i].Get("name"))] = []chunks.Sample{ + sample{t: 300, f: v}, sample{t: 700, f: v}, sample{t: 1100, f: v}, sample{t: 1200, f: staleV}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHistCrossingBoundary[i].Get("name"))] = []chunks.Sample{ + sample{t: 300, h: h}, sample{t: 700, h: nonFirstH}, sample{t: 1100, h: h}, sample{t: 1200, h: staleH}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHistCrossingBoundary[i].Get("name"))] = []chunks.Sample{ + sample{t: 300, fh: fh}, sample{t: 700, fh: nonFirstFH}, sample{t: 1100, fh: fh}, sample{t: 1200, fh: staleFH}, + } + } + + querier, err := NewBlockQuerier(db.Blocks()[0], 0, 1000) + require.NoError(t, err) + t.Cleanup(func() { + querier.Close() + }) + seriesSet := queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*")) + + querier, err = NewBlockQuerier(db.Blocks()[1], 1000, 2000) + require.NoError(t, err) + t.Cleanup(func() { + querier.Close() + }) + seriesSet2 := queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*")) + for k, v := range seriesSet2 { + seriesSet[k] = append(seriesSet[k], v...) + } + + require.Len(t, seriesSet, len(expBlockQuery)) + + // Compare all the samples except the stale value that needs special handling. + for _, category := range [][]labels.Labels{ + staleSeries, staleHist, staleFHist, + staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary, + } { + for i := 0; i < numSeriesPerCategory; i++ { + seriesKey := fmt.Sprintf(`{name="%s"}`, category[i].Get("name")) + samples := expBlockQuery[seriesKey] + actSamples, exists := seriesSet[seriesKey] + require.Truef(t, exists, "series not found in result %s", seriesKey) + require.Len(t, actSamples, len(samples)) + + for i := 0; i < len(samples)-1; i++ { + require.Equal(t, samples[i], actSamples[i]) + } + + l := len(samples) - 1 + require.Equal(t, samples[l].T(), actSamples[l].T()) + switch { + case value.IsStaleNaN(samples[l].F()): + require.True(t, value.IsStaleNaN(actSamples[l].F())) + case samples[l].H() != nil: + require.True(t, value.IsStaleNaN(actSamples[l].H().Sum)) + default: + require.True(t, value.IsStaleNaN(actSamples[l].FH().Sum)) + } + } + } + } +}