Test stale series compaction in TSDB

Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
Ganesh Vernekar 2025-07-25 15:51:28 -07:00
parent 902b585b8c
commit fe9adb0a98

View File

@ -84,7 +84,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
tmpdir := t.TempDir()
var err error
if opts = nil {
if opts == nil {
opts = DefaultOptions()
}
opts.EnableNativeHistograms = true
@ -101,8 +101,8 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
return db
}
// query runs a matcher query against the querier and fully expands its data.
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
// queryHelper runs a matcher query against the querier and fully expands its data.
func queryHelper(t testing.TB, q storage.Querier, withNaNReplacement bool, matchers ...*labels.Matcher) map[string][]chunks.Sample {
ss := q.Select(context.Background(), false, nil, matchers...)
defer func() {
require.NoError(t, q.Close())
@ -114,7 +114,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
series := ss.At()
it = series.Iterator(it)
samples, err := storage.ExpandSamples(it, newSample)
var samples []chunks.Sample
var err error
if withNaNReplacement {
samples, err = storage.ExpandSamples(it, newSample)
} else {
samples, err = storage.ExpandSamplesWithoutReplacingNaNs(it, newSample)
}
require.NoError(t, err)
require.NoError(t, it.Err())
@ -131,6 +137,16 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
return result
}
// query runs a matcher query against the querier and fully expands its data.
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
return queryHelper(t, q, true, matchers...)
}
// queryWithoutReplacingNaNs runs a matcher query against the querier and fully expands its data.
func queryWithoutReplacingNaNs(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
return queryHelper(t, q, false, matchers...)
}
// queryAndExpandChunks runs a matcher query against the querier and fully expands its data into samples.
func queryAndExpandChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Matcher) map[string][][]chunks.Sample {
s := queryChunks(t, q, matchers...)
@ -9547,3 +9563,210 @@ func TestBlockClosingBlockedDuringRemoteRead(t *testing.T) {
case <-blockClosed:
}
}
func TestStaleSeriesCompaction(t *testing.T) {
opts := DefaultOptions()
opts.MinBlockDuration = 1000
opts.MaxBlockDuration = 1000
db := openTestDB(t, opts, nil)
db.DisableCompactions()
t.Cleanup(func() {
require.NoError(t, db.Close())
})
var (
nonStaleSeries, staleSeries,
nonStaleHist, staleHist,
nonStaleFHist, staleFHist,
staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary []labels.Labels
numSeriesPerCategory = 1
)
for i := 0; i < numSeriesPerCategory; i++ {
nonStaleSeries = append(nonStaleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 1000+i)))
nonStaleHist = append(nonStaleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 2000+i)))
nonStaleFHist = append(nonStaleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 3000+i)))
staleSeries = append(staleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 4000+i)))
staleHist = append(staleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 5000+i)))
staleFHist = append(staleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 6000+i)))
staleSeriesCrossingBoundary = append(staleSeriesCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 7000+i)))
staleHistCrossingBoundary = append(staleHistCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 8000+i)))
staleFHistCrossingBoundary = append(staleFHistCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 9000+i)))
}
var (
v = 10.0
staleV = math.Float64frombits(value.StaleNaN)
h = tsdbutil.GenerateTestHistograms(1)[0]
fh = tsdbutil.GenerateTestFloatHistograms(1)[0]
staleH = &histogram.Histogram{Sum: staleV}
staleFH = &histogram.FloatHistogram{Sum: staleV}
)
addNormalSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) {
app := db.Appender(context.Background())
for i := 0; i < len(floatSeries); i++ {
_, err := app.Append(0, floatSeries[i], ts, v)
require.NoError(t, err)
_, err = app.AppendHistogram(0, histSeries[i], ts, h, nil)
require.NoError(t, err)
_, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, fh)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
addStaleSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) {
app := db.Appender(context.Background())
for i := 0; i < len(floatSeries); i++ {
_, err := app.Append(0, floatSeries[i], ts, staleV)
require.NoError(t, err)
_, err = app.AppendHistogram(0, histSeries[i], ts, staleH, nil)
require.NoError(t, err)
_, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, staleFH)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Normal sample for all.
addNormalSamples(100, nonStaleSeries, nonStaleHist, nonStaleFHist)
addNormalSamples(100, staleSeries, staleHist, staleFHist)
// Stale sample for the stale series. Normal sample for the non-stale series.
addNormalSamples(200, nonStaleSeries, nonStaleHist, nonStaleFHist)
addStaleSamples(200, staleSeries, staleHist, staleFHist)
// Normal samples for the non-stale series later
addNormalSamples(300, nonStaleSeries, nonStaleHist, nonStaleFHist)
require.Equal(t, uint64(6*numSeriesPerCategory), db.Head().NumSeries())
require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumStaleSeries())
// Series crossing block boundary and gets stale.
addNormalSamples(300, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
addNormalSamples(700, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
addNormalSamples(1100, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
addStaleSamples(1200, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
require.NoError(t, db.CompactStaleHead())
require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumSeries())
require.Equal(t, uint64(0), db.Head().NumStaleSeries())
require.Len(t, db.Blocks(), 2)
m := db.Blocks()[0].Meta()
require.Equal(t, int64(0), m.MinTime)
require.Equal(t, int64(1000), m.MaxTime)
require.Truef(t, m.Compaction.FromStaleSeries(), "stale series info not found in block meta")
m = db.Blocks()[1].Meta()
require.Equal(t, int64(1000), m.MinTime)
require.Equal(t, int64(2000), m.MaxTime)
require.Truef(t, m.Compaction.FromStaleSeries(), "stale series info not found in block meta")
// To make sure that Head is not truncated based on stale series block.
require.NoError(t, db.reload())
nonFirstH := h.Copy()
nonFirstH.CounterResetHint = histogram.NotCounterReset
nonFirstFH := fh.Copy()
nonFirstFH.CounterResetHint = histogram.NotCounterReset
// Verify head block.
{
expHeadQuery := make(map[string][]chunks.Sample)
for i := 0; i < numSeriesPerCategory; i++ {
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleSeries[i].Get("name"))] = []chunks.Sample{
sample{t: 100, f: v}, sample{t: 200, f: v}, sample{t: 300, f: v},
}
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleHist[i].Get("name"))] = []chunks.Sample{
sample{t: 100, h: h}, sample{t: 200, h: nonFirstH}, sample{t: 300, h: nonFirstH},
}
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleFHist[i].Get("name"))] = []chunks.Sample{
sample{t: 100, fh: fh}, sample{t: 200, fh: nonFirstFH}, sample{t: 300, fh: nonFirstFH},
}
}
querier, err := NewBlockQuerier(NewRangeHead(db.head, 0, 300), 0, 300)
require.NoError(t, err)
t.Cleanup(func() {
querier.Close()
})
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
require.Equal(t, expHeadQuery, seriesSet)
}
// Verify blocks from stale series.
{
expBlockQuery := make(map[string][]chunks.Sample)
for i := 0; i < numSeriesPerCategory; i++ {
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeries[i].Get("name"))] = []chunks.Sample{
sample{t: 100, f: v}, sample{t: 200, f: staleV},
}
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHist[i].Get("name"))] = []chunks.Sample{
sample{t: 100, h: h}, sample{t: 200, h: staleH},
}
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHist[i].Get("name"))] = []chunks.Sample{
sample{t: 100, fh: fh}, sample{t: 200, fh: staleFH},
}
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeriesCrossingBoundary[i].Get("name"))] = []chunks.Sample{
sample{t: 300, f: v}, sample{t: 700, f: v}, sample{t: 1100, f: v}, sample{t: 1200, f: staleV},
}
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHistCrossingBoundary[i].Get("name"))] = []chunks.Sample{
sample{t: 300, h: h}, sample{t: 700, h: nonFirstH}, sample{t: 1100, h: h}, sample{t: 1200, h: staleH},
}
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHistCrossingBoundary[i].Get("name"))] = []chunks.Sample{
sample{t: 300, fh: fh}, sample{t: 700, fh: nonFirstFH}, sample{t: 1100, fh: fh}, sample{t: 1200, fh: staleFH},
}
}
querier, err := NewBlockQuerier(db.Blocks()[0], 0, 1000)
require.NoError(t, err)
t.Cleanup(func() {
querier.Close()
})
seriesSet := queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
querier, err = NewBlockQuerier(db.Blocks()[1], 1000, 2000)
require.NoError(t, err)
t.Cleanup(func() {
querier.Close()
})
seriesSet2 := queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
for k, v := range seriesSet2 {
seriesSet[k] = append(seriesSet[k], v...)
}
require.Len(t, seriesSet, len(expBlockQuery))
// Compare all the samples except the stale value that needs special handling.
for _, category := range [][]labels.Labels{
staleSeries, staleHist, staleFHist,
staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary,
} {
for i := 0; i < numSeriesPerCategory; i++ {
seriesKey := fmt.Sprintf(`{name="%s"}`, category[i].Get("name"))
samples := expBlockQuery[seriesKey]
actSamples, exists := seriesSet[seriesKey]
require.Truef(t, exists, "series not found in result %s", seriesKey)
require.Len(t, actSamples, len(samples))
for i := 0; i < len(samples)-1; i++ {
require.Equal(t, samples[i], actSamples[i])
}
l := len(samples) - 1
require.Equal(t, samples[l].T(), actSamples[l].T())
switch {
case value.IsStaleNaN(samples[l].F()):
require.True(t, value.IsStaleNaN(actSamples[l].F()))
case samples[l].H() != nil:
require.True(t, value.IsStaleNaN(actSamples[l].H().Sum))
default:
require.True(t, value.IsStaleNaN(actSamples[l].FH().Sum))
}
}
}
}
}