mirror of
https://github.com/prometheus/prometheus.git
synced 2025-08-07 06:37:17 +02:00
Merge 09a2aed488
into 64808d4f56
This commit is contained in:
commit
8643428ace
@ -439,7 +439,17 @@ func (e errChunksIterator) Err() error { return e.err }
|
|||||||
// ExpandSamples iterates over all samples in the iterator, buffering all in slice.
|
// ExpandSamples iterates over all samples in the iterator, buffering all in slice.
|
||||||
// Optionally it takes samples constructor, useful when you want to compare sample slices with different
|
// Optionally it takes samples constructor, useful when you want to compare sample slices with different
|
||||||
// sample implementations. if nil, sample type from this package will be used.
|
// sample implementations. if nil, sample type from this package will be used.
|
||||||
|
// For float sample, NaN values are replaced with -42.
|
||||||
func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
|
func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
|
||||||
|
return expandSamples(iter, true, newSampleFn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExpandSamplesWithoutReplacingNaNs is same as ExpandSamples but it does not replace float sample NaN values with anything.
|
||||||
|
func ExpandSamplesWithoutReplacingNaNs(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
|
||||||
|
return expandSamples(iter, false, newSampleFn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func expandSamples(iter chunkenc.Iterator, replaceNaN bool, newSampleFn func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
|
||||||
if newSampleFn == nil {
|
if newSampleFn == nil {
|
||||||
newSampleFn = func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample {
|
newSampleFn = func(t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample {
|
||||||
switch {
|
switch {
|
||||||
@ -461,7 +471,7 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64,
|
|||||||
case chunkenc.ValFloat:
|
case chunkenc.ValFloat:
|
||||||
t, f := iter.At()
|
t, f := iter.At()
|
||||||
// NaNs can't be compared normally, so substitute for another value.
|
// NaNs can't be compared normally, so substitute for another value.
|
||||||
if math.IsNaN(f) {
|
if replaceNaN && math.IsNaN(f) {
|
||||||
f = -42
|
f = -42
|
||||||
}
|
}
|
||||||
result = append(result, newSampleFn(t, f, nil, nil))
|
result = append(result, newSampleFn(t, f, nil, nil))
|
||||||
|
@ -233,6 +233,18 @@ func (bm *BlockMetaCompaction) FromOutOfOrder() bool {
|
|||||||
return slices.Contains(bm.Hints, CompactionHintFromOutOfOrder)
|
return slices.Contains(bm.Hints, CompactionHintFromOutOfOrder)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bm *BlockMetaCompaction) SetStaleSeries() {
|
||||||
|
if bm.FromStaleSeries() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
bm.Hints = append(bm.Hints, CompactionHintFromStaleSeries)
|
||||||
|
slices.Sort(bm.Hints)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bm *BlockMetaCompaction) FromStaleSeries() bool {
|
||||||
|
return slices.Contains(bm.Hints, CompactionHintFromStaleSeries)
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
indexFilename = "index"
|
indexFilename = "index"
|
||||||
metaFilename = "meta.json"
|
metaFilename = "meta.json"
|
||||||
@ -241,6 +253,10 @@ const (
|
|||||||
// CompactionHintFromOutOfOrder is a hint noting that the block
|
// CompactionHintFromOutOfOrder is a hint noting that the block
|
||||||
// was created from out-of-order chunks.
|
// was created from out-of-order chunks.
|
||||||
CompactionHintFromOutOfOrder = "from-out-of-order"
|
CompactionHintFromOutOfOrder = "from-out-of-order"
|
||||||
|
|
||||||
|
// CompactionHintFromStaleSeries is a hint noting that the block
|
||||||
|
// was created from stale series.
|
||||||
|
CompactionHintFromStaleSeries = "from-stale-series"
|
||||||
)
|
)
|
||||||
|
|
||||||
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
|
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
|
||||||
|
@ -578,6 +578,9 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b
|
|||||||
if base.Compaction.FromOutOfOrder() {
|
if base.Compaction.FromOutOfOrder() {
|
||||||
meta.Compaction.SetOutOfOrder()
|
meta.Compaction.SetOutOfOrder()
|
||||||
}
|
}
|
||||||
|
if base.Compaction.FromStaleSeries() {
|
||||||
|
meta.Compaction.SetStaleSeries()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.write(dest, meta, DefaultBlockPopulator{}, b)
|
err := c.write(dest, meta, DefaultBlockPopulator{}, b)
|
||||||
|
97
tsdb/db.go
97
tsdb/db.go
@ -44,6 +44,7 @@ import (
|
|||||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||||
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met.
|
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met.
|
||||||
|
"github.com/prometheus/prometheus/tsdb/index"
|
||||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||||
"github.com/prometheus/prometheus/util/compression"
|
"github.com/prometheus/prometheus/util/compression"
|
||||||
@ -222,6 +223,19 @@ type Options struct {
|
|||||||
|
|
||||||
// UseUncachedIO allows bypassing the page cache when appropriate.
|
// UseUncachedIO allows bypassing the page cache when appropriate.
|
||||||
UseUncachedIO bool
|
UseUncachedIO bool
|
||||||
|
|
||||||
|
// StaleSeriesCompactionInterval tells at what interval to attempt stale series compaction
|
||||||
|
// if the number of stale series crosses the given threshold.
|
||||||
|
StaleSeriesCompactionInterval time.Duration
|
||||||
|
|
||||||
|
// StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
|
||||||
|
// the in-memory Head block. If the % of stale series crosses this threshold, stale series
|
||||||
|
// compaction will be run in the next stale series compaction interval.
|
||||||
|
StaleSeriesCompactionThreshold float64
|
||||||
|
|
||||||
|
// StaleSeriesImmediateCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
|
||||||
|
// the in-memory Head block. If the % of stale series crosses this threshold, stale series is run immediately.
|
||||||
|
StaleSeriesImmediateCompactionThreshold float64
|
||||||
}
|
}
|
||||||
|
|
||||||
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
|
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
|
||||||
@ -818,6 +832,12 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
|
|||||||
// configured maximum block duration.
|
// configured maximum block duration.
|
||||||
rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)
|
rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For testing purposes. TODO: remove before merging the PR.
|
||||||
|
opts.StaleSeriesCompactionInterval = 15 * time.Minute
|
||||||
|
opts.StaleSeriesCompactionThreshold = 0.2
|
||||||
|
opts.StaleSeriesImmediateCompactionThreshold = 0.35
|
||||||
|
|
||||||
return opts, rngs
|
return opts, rngs
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1089,6 +1109,17 @@ func (db *DB) run(ctx context.Context) {
|
|||||||
|
|
||||||
backoff := time.Duration(0)
|
backoff := time.Duration(0)
|
||||||
|
|
||||||
|
nextStaleSeriesCompactionTime := time.Now().Round(db.opts.StaleSeriesCompactionInterval)
|
||||||
|
if nextStaleSeriesCompactionTime.Before(time.Now()) {
|
||||||
|
nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
staleSeriesWaitDur := time.Until(nextStaleSeriesCompactionTime)
|
||||||
|
if db.opts.StaleSeriesCompactionInterval <= 0 {
|
||||||
|
// Long enough interval so that we don't schedule a stale series compaction.
|
||||||
|
staleSeriesWaitDur = 365 * 24 * time.Hour
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-db.stopc:
|
case <-db.stopc:
|
||||||
@ -1104,6 +1135,16 @@ func (db *DB) run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
db.cmtx.Unlock()
|
db.cmtx.Unlock()
|
||||||
|
|
||||||
|
// TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon.
|
||||||
|
numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries()
|
||||||
|
staleSeriesRatio := float64(numStaleSeries) / float64(numSeries)
|
||||||
|
if db.autoCompact && db.opts.StaleSeriesImmediateCompactionThreshold > 0 &&
|
||||||
|
staleSeriesRatio >= db.opts.StaleSeriesImmediateCompactionThreshold {
|
||||||
|
if err := db.CompactStaleHead(); err != nil {
|
||||||
|
db.logger.Error("immediate stale series compaction failed", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case db.compactc <- struct{}{}:
|
case db.compactc <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
@ -1125,6 +1166,20 @@ func (db *DB) run(ctx context.Context) {
|
|||||||
db.metrics.compactionsSkipped.Inc()
|
db.metrics.compactionsSkipped.Inc()
|
||||||
}
|
}
|
||||||
db.autoCompactMtx.Unlock()
|
db.autoCompactMtx.Unlock()
|
||||||
|
case <-time.After(staleSeriesWaitDur):
|
||||||
|
// TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon.
|
||||||
|
numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries()
|
||||||
|
staleSeriesRatio := float64(numStaleSeries) / float64(numSeries)
|
||||||
|
if db.autoCompact && db.opts.StaleSeriesCompactionThreshold > 0 &&
|
||||||
|
staleSeriesRatio >= db.opts.StaleSeriesCompactionThreshold {
|
||||||
|
if err := db.CompactStaleHead(); err != nil {
|
||||||
|
db.logger.Error("scheduled stale series compaction failed", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval)
|
||||||
|
staleSeriesWaitDur = time.Until(nextStaleSeriesCompactionTime)
|
||||||
|
|
||||||
case <-db.stopc:
|
case <-db.stopc:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -1488,6 +1543,46 @@ func (db *DB) compactHead(head *RangeHead) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) CompactStaleHead() error {
|
||||||
|
// TODO: compact the OOO data as well.
|
||||||
|
// TODO: test splitting of data in multiple blocks if stale series crosses block boundary.
|
||||||
|
db.cmtx.Lock()
|
||||||
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
|
db.logger.Info("Starting stale series compaction")
|
||||||
|
|
||||||
|
staleSeriesRefs := db.head.SortedStaleSeriesRefs(context.Background())
|
||||||
|
meta := &BlockMeta{}
|
||||||
|
meta.Compaction.SetStaleSeries()
|
||||||
|
mint, maxt := db.head.MinTime(), db.head.MaxTime()
|
||||||
|
for ; mint < maxt; mint += db.head.chunkRange.Load() {
|
||||||
|
staleHead := NewStaleHead(db.Head(), mint, mint+db.head.chunkRange.Load(), staleSeriesRefs)
|
||||||
|
|
||||||
|
uids, err := db.compactor.Write(db.dir, staleHead, staleHead.MinTime(), staleHead.BlockMaxTime(), meta)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("persist stale head: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db.logger.Info("Stale series block created", "ulids", fmt.Sprintf("%v", uids), "min_time", mint, "max_time", maxt)
|
||||||
|
|
||||||
|
if err := db.reloadBlocks(); err != nil {
|
||||||
|
multiErr := tsdb_errors.NewMulti(fmt.Errorf("reloadBlocks blocks: %w", err))
|
||||||
|
for _, uid := range uids {
|
||||||
|
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
|
||||||
|
multiErr.Add(fmt.Errorf("delete persisted stale head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return multiErr.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
db.head.truncateStaleSeries(index.NewListPostings(staleSeriesRefs), maxt)
|
||||||
|
db.head.RebuildSymbolTable(db.logger)
|
||||||
|
|
||||||
|
db.logger.Info("Ending stale series compaction")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// compactBlocks compacts all the eligible on-disk blocks.
|
// compactBlocks compacts all the eligible on-disk blocks.
|
||||||
// The db.cmtx should be held before calling this method.
|
// The db.cmtx should be held before calling this method.
|
||||||
func (db *DB) compactBlocks() (err error) {
|
func (db *DB) compactBlocks() (err error) {
|
||||||
@ -1945,7 +2040,7 @@ func (db *DB) inOrderBlocksMaxTime() (maxt int64, ok bool) {
|
|||||||
maxt, ok = int64(math.MinInt64), false
|
maxt, ok = int64(math.MinInt64), false
|
||||||
// If blocks are overlapping, last block might not have the max time. So check all blocks.
|
// If blocks are overlapping, last block might not have the max time. So check all blocks.
|
||||||
for _, b := range db.Blocks() {
|
for _, b := range db.Blocks() {
|
||||||
if !b.meta.Compaction.FromOutOfOrder() && b.meta.MaxTime > maxt {
|
if !b.meta.Compaction.FromOutOfOrder() && !b.meta.Compaction.FromStaleSeries() && b.meta.MaxTime > maxt {
|
||||||
ok = true
|
ok = true
|
||||||
maxt = b.meta.MaxTime
|
maxt = b.meta.MaxTime
|
||||||
}
|
}
|
||||||
|
178
tsdb/db_test.go
178
tsdb/db_test.go
@ -51,6 +51,7 @@ import (
|
|||||||
"github.com/prometheus/prometheus/model/histogram"
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/model/metadata"
|
"github.com/prometheus/prometheus/model/metadata"
|
||||||
|
"github.com/prometheus/prometheus/model/value"
|
||||||
"github.com/prometheus/prometheus/prompb"
|
"github.com/prometheus/prometheus/prompb"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/storage/remote"
|
"github.com/prometheus/prometheus/storage/remote"
|
||||||
@ -100,8 +101,8 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
|
|||||||
return db
|
return db
|
||||||
}
|
}
|
||||||
|
|
||||||
// query runs a matcher query against the querier and fully expands its data.
|
// queryHelper runs a matcher query against the querier and fully expands its data.
|
||||||
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
|
func queryHelper(t testing.TB, q storage.Querier, withNaNReplacement bool, matchers ...*labels.Matcher) map[string][]chunks.Sample {
|
||||||
ss := q.Select(context.Background(), false, nil, matchers...)
|
ss := q.Select(context.Background(), false, nil, matchers...)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, q.Close())
|
require.NoError(t, q.Close())
|
||||||
@ -113,7 +114,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
|
|||||||
series := ss.At()
|
series := ss.At()
|
||||||
|
|
||||||
it = series.Iterator(it)
|
it = series.Iterator(it)
|
||||||
samples, err := storage.ExpandSamples(it, newSample)
|
var samples []chunks.Sample
|
||||||
|
var err error
|
||||||
|
if withNaNReplacement {
|
||||||
|
samples, err = storage.ExpandSamples(it, newSample)
|
||||||
|
} else {
|
||||||
|
samples, err = storage.ExpandSamplesWithoutReplacingNaNs(it, newSample)
|
||||||
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, it.Err())
|
require.NoError(t, it.Err())
|
||||||
|
|
||||||
@ -130,6 +137,16 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// query runs a matcher query against the querier and fully expands its data.
|
||||||
|
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
|
||||||
|
return queryHelper(t, q, true, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// queryWithoutReplacingNaNs runs a matcher query against the querier and fully expands its data.
|
||||||
|
func queryWithoutReplacingNaNs(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
|
||||||
|
return queryHelper(t, q, false, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
// queryAndExpandChunks runs a matcher query against the querier and fully expands its data into samples.
|
// queryAndExpandChunks runs a matcher query against the querier and fully expands its data into samples.
|
||||||
func queryAndExpandChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Matcher) map[string][][]chunks.Sample {
|
func queryAndExpandChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Matcher) map[string][][]chunks.Sample {
|
||||||
s := queryChunks(t, q, matchers...)
|
s := queryChunks(t, q, matchers...)
|
||||||
@ -9517,3 +9534,158 @@ func TestBlockClosingBlockedDuringRemoteRead(t *testing.T) {
|
|||||||
case <-blockClosed:
|
case <-blockClosed:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStaleSeriesCompaction(t *testing.T) {
|
||||||
|
opts := DefaultOptions()
|
||||||
|
db := openTestDB(t, opts, nil)
|
||||||
|
db.DisableCompactions()
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
var (
|
||||||
|
nonStaleSeries, staleSeries,
|
||||||
|
nonStaleHist, staleHist,
|
||||||
|
nonStaleFHist, staleFHist []labels.Labels
|
||||||
|
numSeriesPerCategory = 1
|
||||||
|
)
|
||||||
|
for i := 0; i < numSeriesPerCategory; i++ {
|
||||||
|
nonStaleSeries = append(nonStaleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 1000+i)))
|
||||||
|
nonStaleHist = append(nonStaleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 2000+i)))
|
||||||
|
nonStaleFHist = append(nonStaleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 3000+i)))
|
||||||
|
|
||||||
|
staleSeries = append(staleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 4000+i)))
|
||||||
|
staleHist = append(staleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 5000+i)))
|
||||||
|
staleFHist = append(staleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 6000+i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
v = 10.0
|
||||||
|
staleV = math.Float64frombits(value.StaleNaN)
|
||||||
|
h = tsdbutil.GenerateTestHistograms(1)[0]
|
||||||
|
fh = tsdbutil.GenerateTestFloatHistograms(1)[0]
|
||||||
|
staleH = &histogram.Histogram{Sum: staleV}
|
||||||
|
staleFH = &histogram.FloatHistogram{Sum: staleV}
|
||||||
|
)
|
||||||
|
|
||||||
|
addNormalSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) {
|
||||||
|
app := db.Appender(context.Background())
|
||||||
|
for i := 0; i < len(floatSeries); i++ {
|
||||||
|
_, err := app.Append(0, floatSeries[i], ts, v)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = app.AppendHistogram(0, histSeries[i], ts, h, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, fh)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
addStaleSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) {
|
||||||
|
app := db.Appender(context.Background())
|
||||||
|
for i := 0; i < len(floatSeries); i++ {
|
||||||
|
_, err := app.Append(0, floatSeries[i], ts, staleV)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = app.AppendHistogram(0, histSeries[i], ts, staleH, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, staleFH)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normal sample for all.
|
||||||
|
addNormalSamples(100, nonStaleSeries, nonStaleHist, nonStaleFHist)
|
||||||
|
addNormalSamples(100, staleSeries, staleHist, staleFHist)
|
||||||
|
|
||||||
|
// Stale sample for the stale series. Normal sample for the non-stale series.
|
||||||
|
addNormalSamples(200, nonStaleSeries, nonStaleHist, nonStaleFHist)
|
||||||
|
addStaleSamples(200, staleSeries, staleHist, staleFHist)
|
||||||
|
|
||||||
|
// Normal samples for the non-stale series later
|
||||||
|
addNormalSamples(300, nonStaleSeries, nonStaleHist, nonStaleFHist)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(6*numSeriesPerCategory), db.Head().NumSeries())
|
||||||
|
require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumStaleSeries())
|
||||||
|
|
||||||
|
require.NoError(t, db.CompactStaleHead())
|
||||||
|
|
||||||
|
require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumSeries())
|
||||||
|
require.Equal(t, uint64(0), db.Head().NumStaleSeries())
|
||||||
|
require.Len(t, db.Blocks(), 1)
|
||||||
|
|
||||||
|
nonFirstH := h.Copy()
|
||||||
|
nonFirstH.CounterResetHint = histogram.NotCounterReset
|
||||||
|
nonFirstFH := fh.Copy()
|
||||||
|
nonFirstFH.CounterResetHint = histogram.NotCounterReset
|
||||||
|
|
||||||
|
expHeadQuery := make(map[string][]chunks.Sample)
|
||||||
|
for i := 0; i < numSeriesPerCategory; i++ {
|
||||||
|
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleSeries[i].Get("name"))] = []chunks.Sample{
|
||||||
|
sample{t: 100, f: v}, sample{t: 200, f: v}, sample{t: 300, f: v},
|
||||||
|
}
|
||||||
|
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleHist[i].Get("name"))] = []chunks.Sample{
|
||||||
|
sample{t: 100, h: h}, sample{t: 200, h: nonFirstH}, sample{t: 300, h: nonFirstH},
|
||||||
|
}
|
||||||
|
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleFHist[i].Get("name"))] = []chunks.Sample{
|
||||||
|
sample{t: 100, fh: fh}, sample{t: 200, fh: nonFirstFH}, sample{t: 300, fh: nonFirstFH},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
querier, err := NewBlockQuerier(NewRangeHead(db.head, 0, 300), 0, 300)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
querier.Close()
|
||||||
|
})
|
||||||
|
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
|
||||||
|
require.Equal(t, expHeadQuery, seriesSet)
|
||||||
|
|
||||||
|
expBlockQuery := make(map[string][]chunks.Sample)
|
||||||
|
for i := 0; i < numSeriesPerCategory; i++ {
|
||||||
|
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeries[i].Get("name"))] = []chunks.Sample{
|
||||||
|
sample{t: 100, f: v}, sample{t: 200, f: staleV},
|
||||||
|
}
|
||||||
|
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHist[i].Get("name"))] = []chunks.Sample{
|
||||||
|
sample{t: 100, h: h}, sample{t: 200, h: staleH},
|
||||||
|
}
|
||||||
|
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHist[i].Get("name"))] = []chunks.Sample{
|
||||||
|
sample{t: 100, fh: fh}, sample{t: 200, fh: staleFH},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
querier, err = NewBlockQuerier(db.Blocks()[0], 0, 300)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
querier.Close()
|
||||||
|
})
|
||||||
|
seriesSet = queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
|
||||||
|
require.Len(t, seriesSet, len(expBlockQuery))
|
||||||
|
|
||||||
|
// Compare all the samples except the stale value that needs special handling.
|
||||||
|
for _, category := range [][]labels.Labels{staleSeries, staleHist, staleFHist} {
|
||||||
|
for i := 0; i < numSeriesPerCategory; i++ {
|
||||||
|
seriesKey := fmt.Sprintf(`{name="%s"}`, category[i].Get("name"))
|
||||||
|
samples := expBlockQuery[seriesKey]
|
||||||
|
actSamples, exists := seriesSet[seriesKey]
|
||||||
|
require.Truef(t, exists, "series not found in result %s", seriesKey)
|
||||||
|
require.Len(t, actSamples, len(samples))
|
||||||
|
require.Equal(t, samples[0], actSamples[0])
|
||||||
|
require.Equal(t, samples[1].T(), actSamples[1].T())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the NaN values.
|
||||||
|
for i := 0; i < numSeriesPerCategory; i++ {
|
||||||
|
seriesKey := fmt.Sprintf(`{name="%s"}`, staleSeries[i].Get("name"))
|
||||||
|
require.True(t, value.IsStaleNaN(seriesSet[seriesKey][1].F()))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < numSeriesPerCategory; i++ {
|
||||||
|
seriesKey := fmt.Sprintf(`{name="%s"}`, staleHist[i].Get("name"))
|
||||||
|
require.True(t, seriesSet[seriesKey][1].H().Equals(staleH))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < numSeriesPerCategory; i++ {
|
||||||
|
seriesKey := fmt.Sprintf(`{name="%s"}`, staleFHist[i].Get("name"))
|
||||||
|
require.True(t, seriesSet[seriesKey][1].FH().Equals(staleFH))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
202
tsdb/head.go
202
tsdb/head.go
@ -36,6 +36,7 @@ import (
|
|||||||
"github.com/prometheus/prometheus/model/histogram"
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/model/metadata"
|
"github.com/prometheus/prometheus/model/metadata"
|
||||||
|
"github.com/prometheus/prometheus/model/value"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
@ -68,6 +69,7 @@ var (
|
|||||||
type Head struct {
|
type Head struct {
|
||||||
chunkRange atomic.Int64
|
chunkRange atomic.Int64
|
||||||
numSeries atomic.Uint64
|
numSeries atomic.Uint64
|
||||||
|
numStaleSeries atomic.Uint64
|
||||||
minOOOTime, maxOOOTime atomic.Int64 // TODO(jesusvazquez) These should be updated after garbage collection.
|
minOOOTime, maxOOOTime atomic.Int64 // TODO(jesusvazquez) These should be updated after garbage collection.
|
||||||
minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. TODO(jesusvazquez) Ensure these are properly tracked.
|
minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. TODO(jesusvazquez) Ensure these are properly tracked.
|
||||||
minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
|
minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
|
||||||
@ -360,6 +362,7 @@ func (h *Head) resetWLReplayResources() {
|
|||||||
type headMetrics struct {
|
type headMetrics struct {
|
||||||
activeAppenders prometheus.Gauge
|
activeAppenders prometheus.Gauge
|
||||||
series prometheus.GaugeFunc
|
series prometheus.GaugeFunc
|
||||||
|
staleSeries prometheus.GaugeFunc
|
||||||
seriesCreated prometheus.Counter
|
seriesCreated prometheus.Counter
|
||||||
seriesRemoved prometheus.Counter
|
seriesRemoved prometheus.Counter
|
||||||
seriesNotFound prometheus.Counter
|
seriesNotFound prometheus.Counter
|
||||||
@ -406,6 +409,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
|||||||
}, func() float64 {
|
}, func() float64 {
|
||||||
return float64(h.NumSeries())
|
return float64(h.NumSeries())
|
||||||
}),
|
}),
|
||||||
|
staleSeries: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
Name: "prometheus_tsdb_head_stale_series",
|
||||||
|
Help: "Total number of stale series in the head block.",
|
||||||
|
}, func() float64 {
|
||||||
|
return float64(h.NumStaleSeries())
|
||||||
|
}),
|
||||||
seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{
|
seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
Name: "prometheus_tsdb_head_series_created_total",
|
Name: "prometheus_tsdb_head_series_created_total",
|
||||||
Help: "Total number of series created in the head",
|
Help: "Total number of series created in the head",
|
||||||
@ -532,6 +541,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
|||||||
r.MustRegister(
|
r.MustRegister(
|
||||||
m.activeAppenders,
|
m.activeAppenders,
|
||||||
m.series,
|
m.series,
|
||||||
|
m.staleSeries,
|
||||||
m.chunks,
|
m.chunks,
|
||||||
m.chunksCreated,
|
m.chunksCreated,
|
||||||
m.chunksRemoved,
|
m.chunksRemoved,
|
||||||
@ -1188,6 +1198,20 @@ func (h *Head) truncateMemory(mint int64) (err error) {
|
|||||||
return h.truncateSeriesAndChunkDiskMapper("truncateMemory")
|
return h.truncateSeriesAndChunkDiskMapper("truncateMemory")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Head) truncateStaleSeries(p index.Postings, maxt int64) {
|
||||||
|
h.chunkSnapshotMtx.Lock()
|
||||||
|
defer h.chunkSnapshotMtx.Unlock()
|
||||||
|
|
||||||
|
if h.MinTime() >= maxt {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: this will block all queries. See if we can do better.
|
||||||
|
h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt)
|
||||||
|
|
||||||
|
h.gcStaleSeries(p, maxt)
|
||||||
|
}
|
||||||
|
|
||||||
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
|
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
|
||||||
// The query timeout limits the max wait time of this function implicitly.
|
// The query timeout limits the max wait time of this function implicitly.
|
||||||
// The mint is inclusive and maxt is the truncation time hence exclusive.
|
// The mint is inclusive and maxt is the truncation time hence exclusive.
|
||||||
@ -1538,6 +1562,53 @@ func (h *RangeHead) String() string {
|
|||||||
return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
|
return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StaleHead allows querying the stale series in the Head via an IndexReader, ChunkReader and tombstones.Reader.
|
||||||
|
// Used only for compactions.
|
||||||
|
type StaleHead struct {
|
||||||
|
RangeHead
|
||||||
|
staleSeriesRefs []storage.SeriesRef
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStaleHead returns a *StaleHead.
|
||||||
|
func NewStaleHead(head *Head, mint, maxt int64, staleSeriesRefs []storage.SeriesRef) *StaleHead {
|
||||||
|
return &StaleHead{
|
||||||
|
RangeHead: RangeHead{
|
||||||
|
head: head,
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
},
|
||||||
|
staleSeriesRefs: staleSeriesRefs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *StaleHead) Index() (_ IndexReader, err error) {
|
||||||
|
return h.head.staleIndex(h.mint, h.maxt, h.staleSeriesRefs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *StaleHead) NumSeries() uint64 {
|
||||||
|
return h.head.NumStaleSeries()
|
||||||
|
}
|
||||||
|
|
||||||
|
var staleHeadULID = ulid.MustParse("0000000000XXXXXXXSTALEHEAD")
|
||||||
|
|
||||||
|
func (h *StaleHead) Meta() BlockMeta {
|
||||||
|
return BlockMeta{
|
||||||
|
MinTime: h.MinTime(),
|
||||||
|
MaxTime: h.MaxTime(),
|
||||||
|
ULID: staleHeadULID,
|
||||||
|
Stats: BlockStats{
|
||||||
|
NumSeries: h.NumSeries(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns an human readable representation of the stake head. It's important to
|
||||||
|
// keep this function in order to avoid the struct dump when the head is stringified in
|
||||||
|
// errors or logs.
|
||||||
|
func (h *StaleHead) String() string {
|
||||||
|
return fmt.Sprintf("stale head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
|
||||||
|
}
|
||||||
|
|
||||||
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
|
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
|
||||||
// label matchers.
|
// label matchers.
|
||||||
func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
|
func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
|
||||||
@ -1607,13 +1678,14 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
|||||||
|
|
||||||
// Drop old chunks and remember series IDs and hashes if they can be
|
// Drop old chunks and remember series IDs and hashes if they can be
|
||||||
// deleted entirely.
|
// deleted entirely.
|
||||||
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
|
deleted, affected, chunksRemoved, staleSeriesDeleted, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
|
||||||
seriesRemoved := len(deleted)
|
seriesRemoved := len(deleted)
|
||||||
|
|
||||||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||||
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||||||
h.metrics.chunks.Sub(float64(chunksRemoved))
|
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||||||
h.numSeries.Sub(uint64(seriesRemoved))
|
h.numSeries.Sub(uint64(seriesRemoved))
|
||||||
|
h.numStaleSeries.Sub(uint64(staleSeriesDeleted))
|
||||||
|
|
||||||
// Remove deleted series IDs from the postings lists.
|
// Remove deleted series IDs from the postings lists.
|
||||||
h.postings.Delete(deleted, affected)
|
h.postings.Delete(deleted, affected)
|
||||||
@ -1640,16 +1712,57 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
|||||||
return actualInOrderMint, minOOOTime, minMmapFile
|
return actualInOrderMint, minOOOTime, minMmapFile
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// gcStaleSeries removes all the stale series provided given that they are still stale
|
||||||
|
// and the series maxt is <= the given max.
|
||||||
|
func (h *Head) gcStaleSeries(p index.Postings, maxt int64) {
|
||||||
|
// Drop old chunks and remember series IDs and hashes if they can be
|
||||||
|
// deleted entirely.
|
||||||
|
deleted, affected, chunksRemoved := h.series.gcStaleSeries(p, maxt)
|
||||||
|
seriesRemoved := len(deleted)
|
||||||
|
|
||||||
|
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||||
|
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||||||
|
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||||||
|
h.numSeries.Sub(uint64(seriesRemoved))
|
||||||
|
h.numStaleSeries.Sub(uint64(seriesRemoved))
|
||||||
|
|
||||||
|
// Remove deleted series IDs from the postings lists.
|
||||||
|
h.postings.Delete(deleted, affected)
|
||||||
|
|
||||||
|
// Remove tombstones referring to the deleted series.
|
||||||
|
h.tombstones.DeleteTombstones(deleted)
|
||||||
|
|
||||||
|
if h.wal != nil {
|
||||||
|
_, last, _ := wlog.Segments(h.wal.Dir())
|
||||||
|
h.walExpiriesMtx.Lock()
|
||||||
|
// Keep series records until we're past segment 'last'
|
||||||
|
// because the WAL will still have samples records with
|
||||||
|
// this ref ID. If we didn't keep these series records then
|
||||||
|
// on start up when we replay the WAL, or any other code
|
||||||
|
// that reads the WAL, wouldn't be able to use those
|
||||||
|
// samples since we would have no labels for that ref ID.
|
||||||
|
for ref := range deleted {
|
||||||
|
h.walExpiries[chunks.HeadSeriesRef(ref)] = last
|
||||||
|
}
|
||||||
|
h.walExpiriesMtx.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Tombstones returns a new reader over the head's tombstones.
|
// Tombstones returns a new reader over the head's tombstones.
|
||||||
func (h *Head) Tombstones() (tombstones.Reader, error) {
|
func (h *Head) Tombstones() (tombstones.Reader, error) {
|
||||||
return h.tombstones, nil
|
return h.tombstones, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NumSeries returns the number of active series in the head.
|
// NumSeries returns the number of series tracked in the head.
|
||||||
func (h *Head) NumSeries() uint64 {
|
func (h *Head) NumSeries() uint64 {
|
||||||
return h.numSeries.Load()
|
return h.numSeries.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NumStaleSeries returns the number of stale series in the head.
|
||||||
|
func (h *Head) NumStaleSeries() uint64 {
|
||||||
|
return h.numStaleSeries.Load()
|
||||||
|
}
|
||||||
|
|
||||||
var headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD")
|
var headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD")
|
||||||
|
|
||||||
// Meta returns meta information about the head.
|
// Meta returns meta information about the head.
|
||||||
@ -1929,13 +2042,14 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
|
|||||||
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
|
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
|
||||||
// and there's no easy way to cast maps.
|
// and there's no easy way to cast maps.
|
||||||
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
|
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
|
||||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
|
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _, _ int, _, _ int64, minMmapFile int) {
|
||||||
var (
|
var (
|
||||||
deleted = map[storage.SeriesRef]struct{}{}
|
deleted = map[storage.SeriesRef]struct{}{}
|
||||||
affected = map[labels.Label]struct{}{}
|
affected = map[labels.Label]struct{}{}
|
||||||
rmChunks = 0
|
rmChunks = 0
|
||||||
actualMint int64 = math.MaxInt64
|
staleSeriesDeleted = 0
|
||||||
minOOOTime int64 = math.MaxInt64
|
actualMint int64 = math.MaxInt64
|
||||||
|
minOOOTime int64 = math.MaxInt64
|
||||||
)
|
)
|
||||||
minMmapFile = math.MaxInt32
|
minMmapFile = math.MaxInt32
|
||||||
|
|
||||||
@ -1987,6 +2101,12 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|||||||
defer s.locks[refShard].Unlock()
|
defer s.locks[refShard].Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if value.IsStaleNaN(series.lastValue) ||
|
||||||
|
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||||
|
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
|
||||||
|
staleSeriesDeleted++
|
||||||
|
}
|
||||||
|
|
||||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||||
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
||||||
s.hashes[hashShard].del(hash, series.ref)
|
s.hashes[hashShard].del(hash, series.ref)
|
||||||
@ -2000,7 +2120,71 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|||||||
actualMint = mint
|
actualMint = mint
|
||||||
}
|
}
|
||||||
|
|
||||||
return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile
|
return deleted, affected, rmChunks, staleSeriesDeleted, actualMint, minOOOTime, minMmapFile
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: add comments.
|
||||||
|
func (s *stripeSeries) gcStaleSeries(p index.Postings, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) {
|
||||||
|
var (
|
||||||
|
deleted = map[storage.SeriesRef]struct{}{}
|
||||||
|
affected = map[labels.Label]struct{}{}
|
||||||
|
rmChunks = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
staleSeriesMap := map[storage.SeriesRef]struct{}{}
|
||||||
|
for p.Next() {
|
||||||
|
ref := p.At()
|
||||||
|
staleSeriesMap[ref] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
check := func(hashShard int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) {
|
||||||
|
if _, exists := staleSeriesMap[storage.SeriesRef(series.ref)]; !exists {
|
||||||
|
// This series was not compacted. Skip it.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
series.Lock()
|
||||||
|
defer series.Unlock()
|
||||||
|
|
||||||
|
if series.maxTime() > maxt {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the series is still stale.
|
||||||
|
isStale := value.IsStaleNaN(series.lastValue) ||
|
||||||
|
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||||
|
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum))
|
||||||
|
|
||||||
|
if !isStale {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if series.headChunks != nil {
|
||||||
|
rmChunks += series.headChunks.len()
|
||||||
|
}
|
||||||
|
rmChunks += len(series.mmappedChunks)
|
||||||
|
|
||||||
|
// The series is gone entirely. We need to keep the series lock
|
||||||
|
// and make sure we have acquired the stripe locks for hash and ID of the
|
||||||
|
// series alike.
|
||||||
|
// If we don't hold them all, there's a very small chance that a series receives
|
||||||
|
// samples again while we are half-way into deleting it.
|
||||||
|
refShard := int(series.ref) & (s.size - 1)
|
||||||
|
if hashShard != refShard {
|
||||||
|
s.locks[refShard].Lock()
|
||||||
|
defer s.locks[refShard].Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||||
|
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
||||||
|
s.hashes[hashShard].del(hash, series.ref)
|
||||||
|
delete(s.series[refShard], series.ref)
|
||||||
|
deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function.
|
||||||
|
}
|
||||||
|
|
||||||
|
s.iterForDeletion(check)
|
||||||
|
|
||||||
|
return deleted, affected, rmChunks
|
||||||
}
|
}
|
||||||
|
|
||||||
// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.
|
// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.
|
||||||
|
@ -1222,6 +1222,8 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
|
|||||||
acc.floatsAppended--
|
acc.floatsAppended--
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V)
|
||||||
|
staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V)
|
||||||
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
|
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
|
||||||
if ok {
|
if ok {
|
||||||
if s.T < acc.inOrderMint {
|
if s.T < acc.inOrderMint {
|
||||||
@ -1230,6 +1232,12 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
|
|||||||
if s.T > acc.inOrderMaxt {
|
if s.T > acc.inOrderMaxt {
|
||||||
acc.inOrderMaxt = s.T
|
acc.inOrderMaxt = s.T
|
||||||
}
|
}
|
||||||
|
if newlyStale {
|
||||||
|
a.head.numStaleSeries.Inc()
|
||||||
|
}
|
||||||
|
if staleToNonStale {
|
||||||
|
a.head.numStaleSeries.Dec()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// The sample is an exact duplicate, and should be silently dropped.
|
// The sample is an exact duplicate, and should be silently dropped.
|
||||||
acc.floatsAppended--
|
acc.floatsAppended--
|
||||||
@ -1310,6 +1318,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
|
|||||||
acc.histogramsAppended--
|
acc.histogramsAppended--
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
newlyStale := value.IsStaleNaN(s.H.Sum)
|
||||||
|
staleToNonStale := false
|
||||||
|
if series.lastHistogramValue != nil {
|
||||||
|
newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum)
|
||||||
|
staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum)
|
||||||
|
}
|
||||||
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts)
|
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts)
|
||||||
if ok {
|
if ok {
|
||||||
if s.T < acc.inOrderMint {
|
if s.T < acc.inOrderMint {
|
||||||
@ -1318,6 +1332,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
|
|||||||
if s.T > acc.inOrderMaxt {
|
if s.T > acc.inOrderMaxt {
|
||||||
acc.inOrderMaxt = s.T
|
acc.inOrderMaxt = s.T
|
||||||
}
|
}
|
||||||
|
if newlyStale {
|
||||||
|
a.head.numStaleSeries.Inc()
|
||||||
|
}
|
||||||
|
if staleToNonStale {
|
||||||
|
a.head.numStaleSeries.Dec()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
acc.histogramsAppended--
|
acc.histogramsAppended--
|
||||||
acc.histoOOORejected++
|
acc.histoOOORejected++
|
||||||
@ -1398,6 +1418,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
|
|||||||
acc.histogramsAppended--
|
acc.histogramsAppended--
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
newlyStale := value.IsStaleNaN(s.FH.Sum)
|
||||||
|
staleToNonStale := false
|
||||||
|
if series.lastFloatHistogramValue != nil {
|
||||||
|
newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum)
|
||||||
|
staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum)
|
||||||
|
}
|
||||||
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts)
|
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts)
|
||||||
if ok {
|
if ok {
|
||||||
if s.T < acc.inOrderMint {
|
if s.T < acc.inOrderMint {
|
||||||
@ -1406,6 +1432,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
|
|||||||
if s.T > acc.inOrderMaxt {
|
if s.T > acc.inOrderMaxt {
|
||||||
acc.inOrderMaxt = s.T
|
acc.inOrderMaxt = s.T
|
||||||
}
|
}
|
||||||
|
if newlyStale {
|
||||||
|
a.head.numStaleSeries.Inc()
|
||||||
|
}
|
||||||
|
if staleToNonStale {
|
||||||
|
a.head.numStaleSeries.Dec()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
acc.histogramsAppended--
|
acc.histogramsAppended--
|
||||||
acc.histoOOORejected++
|
acc.histoOOORejected++
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/model/value"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
@ -201,9 +202,102 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Head) staleIndex(mint, maxt int64, staleSeriesRefs []storage.SeriesRef) (*headStaleIndexReader, error) {
|
||||||
|
return &headStaleIndexReader{
|
||||||
|
headIndexReader: h.indexRange(mint, maxt),
|
||||||
|
staleSeriesRefs: staleSeriesRefs,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type headStaleIndexReader struct {
|
||||||
|
*headIndexReader
|
||||||
|
staleSeriesRefs []storage.SeriesRef
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *headStaleIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
|
||||||
|
// If all postings are requested, return the precalculated list.
|
||||||
|
k, v := index.AllPostingsKey()
|
||||||
|
if len(h.staleSeriesRefs) > 0 && name == k && len(values) == 1 && values[0] == v {
|
||||||
|
return index.NewListPostings(h.staleSeriesRefs), nil
|
||||||
|
}
|
||||||
|
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.Postings(ctx, name, values...))
|
||||||
|
if err != nil {
|
||||||
|
return index.ErrPostings(err), err
|
||||||
|
}
|
||||||
|
return index.NewListPostings(seriesRefs), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *headStaleIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||||
|
// Unused for compaction, so we don't need to optimise.
|
||||||
|
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForLabelMatching(ctx, name, match))
|
||||||
|
if err != nil {
|
||||||
|
return index.ErrPostings(err)
|
||||||
|
}
|
||||||
|
return index.NewListPostings(seriesRefs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *headStaleIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
|
||||||
|
// Unused for compaction, so we don't need to optimise.
|
||||||
|
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForAllLabelValues(ctx, name))
|
||||||
|
if err != nil {
|
||||||
|
return index.ErrPostings(err)
|
||||||
|
}
|
||||||
|
return index.NewListPostings(seriesRefs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.SeriesRef, error) {
|
||||||
|
series := make([]*memSeries, 0, 128)
|
||||||
|
|
||||||
|
notFoundSeriesCount := 0
|
||||||
|
for p.Next() {
|
||||||
|
s := h.series.getByID(chunks.HeadSeriesRef(p.At()))
|
||||||
|
if s == nil {
|
||||||
|
notFoundSeriesCount++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Lock()
|
||||||
|
if value.IsStaleNaN(s.lastValue) ||
|
||||||
|
(s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) ||
|
||||||
|
(s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) {
|
||||||
|
series = append(series, s)
|
||||||
|
}
|
||||||
|
s.Unlock()
|
||||||
|
}
|
||||||
|
if notFoundSeriesCount > 0 {
|
||||||
|
h.logger.Debug("Looked up stale series not found", "count", notFoundSeriesCount)
|
||||||
|
}
|
||||||
|
if err := p.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("expand postings: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
slices.SortFunc(series, func(a, b *memSeries) int {
|
||||||
|
return labels.Compare(a.labels(), b.labels())
|
||||||
|
})
|
||||||
|
|
||||||
|
// Convert back to list.
|
||||||
|
ep := make([]storage.SeriesRef, 0, len(series))
|
||||||
|
for _, p := range series {
|
||||||
|
ep = append(ep, storage.SeriesRef(p.ref))
|
||||||
|
}
|
||||||
|
return ep, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *headStaleIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||||
|
// All the postings function above already give the sorted list of postings.
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Head) SortedStaleSeriesRefs(ctx context.Context) []storage.SeriesRef {
|
||||||
|
k, v := index.AllPostingsKey()
|
||||||
|
// TODO: handle error
|
||||||
|
seriesRefs, _ := h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v))
|
||||||
|
return seriesRefs
|
||||||
|
}
|
||||||
|
|
||||||
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta {
|
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta {
|
||||||
for i, c := range s.mmappedChunks {
|
for i, c := range s.mmappedChunks {
|
||||||
// Do not expose chunks that are outside of the specified range.
|
// Do not expose chunks that are outside the specified range.
|
||||||
if !c.OverlapsClosedInterval(mint, maxt) {
|
if !c.OverlapsClosedInterval(mint, maxt) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -6866,3 +6866,130 @@ func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(sto
|
|||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHead_NumStaleSeries(t *testing.T) {
|
||||||
|
head, _ := newTestHead(t, 1000, compression.None, false)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, head.Close())
|
||||||
|
})
|
||||||
|
require.NoError(t, head.Init(0))
|
||||||
|
|
||||||
|
// Initially, no series should be stale.
|
||||||
|
require.Equal(t, uint64(0), head.NumStaleSeries())
|
||||||
|
|
||||||
|
appendSample := func(lbls labels.Labels, ts int64, val float64) {
|
||||||
|
app := head.Appender(context.Background())
|
||||||
|
_, err := app.Append(0, lbls, ts, val)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
appendHistogram := func(lbls labels.Labels, ts int64, val *histogram.Histogram) {
|
||||||
|
app := head.Appender(context.Background())
|
||||||
|
_, err := app.AppendHistogram(0, lbls, ts, val, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
appendFloatHistogram := func(lbls labels.Labels, ts int64, val *histogram.FloatHistogram) {
|
||||||
|
app := head.Appender(context.Background())
|
||||||
|
_, err := app.AppendHistogram(0, lbls, ts, nil, val)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
|
||||||
|
verifySeriesCounts := func(numStaleSeries, numSeries int) {
|
||||||
|
require.Equal(t, uint64(numStaleSeries), head.NumStaleSeries())
|
||||||
|
require.Equal(t, uint64(numSeries), head.NumSeries())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create some series with normal samples.
|
||||||
|
series1 := labels.FromStrings("name", "series1", "label", "value1")
|
||||||
|
series2 := labels.FromStrings("name", "series2", "label", "value2")
|
||||||
|
series3 := labels.FromStrings("name", "series3", "label", "value3")
|
||||||
|
|
||||||
|
// Add normal samples to all series.
|
||||||
|
appendSample(series1, 100, 1)
|
||||||
|
appendSample(series2, 100, 2)
|
||||||
|
appendSample(series3, 100, 3)
|
||||||
|
// Still no stale series.
|
||||||
|
verifySeriesCounts(0, 3)
|
||||||
|
|
||||||
|
// Make series1 stale by appending a stale sample. Now we should have 1 stale series.
|
||||||
|
appendSample(series1, 200, math.Float64frombits(value.StaleNaN))
|
||||||
|
verifySeriesCounts(1, 3)
|
||||||
|
|
||||||
|
// Make series2 stale as well.
|
||||||
|
appendSample(series2, 200, math.Float64frombits(value.StaleNaN))
|
||||||
|
verifySeriesCounts(2, 3)
|
||||||
|
|
||||||
|
// Add a non-stale sample to series1. It should not be counted as stale now.
|
||||||
|
appendSample(series1, 300, 10)
|
||||||
|
verifySeriesCounts(1, 3)
|
||||||
|
|
||||||
|
// Test that series3 doesn't become stale when we add another normal sample.
|
||||||
|
appendSample(series3, 200, 10)
|
||||||
|
verifySeriesCounts(1, 3)
|
||||||
|
|
||||||
|
// Test histogram stale samples as well.
|
||||||
|
series4 := labels.FromStrings("name", "series4", "type", "histogram")
|
||||||
|
h := tsdbutil.GenerateTestHistograms(1)[0]
|
||||||
|
appendHistogram(series4, 100, h)
|
||||||
|
verifySeriesCounts(1, 4)
|
||||||
|
|
||||||
|
// Make histogram series stale.
|
||||||
|
staleHist := h.Copy()
|
||||||
|
staleHist.Sum = math.Float64frombits(value.StaleNaN)
|
||||||
|
appendHistogram(series4, 200, staleHist)
|
||||||
|
verifySeriesCounts(2, 4)
|
||||||
|
|
||||||
|
// Test float histogram stale samples.
|
||||||
|
series5 := labels.FromStrings("name", "series5", "type", "float_histogram")
|
||||||
|
fh := tsdbutil.GenerateTestFloatHistograms(1)[0]
|
||||||
|
appendFloatHistogram(series5, 100, fh)
|
||||||
|
verifySeriesCounts(2, 5)
|
||||||
|
|
||||||
|
// Make float histogram series stale.
|
||||||
|
staleFH := fh.Copy()
|
||||||
|
staleFH.Sum = math.Float64frombits(value.StaleNaN)
|
||||||
|
appendFloatHistogram(series5, 200, staleFH)
|
||||||
|
verifySeriesCounts(3, 5)
|
||||||
|
|
||||||
|
// Make histogram sample non-stale and stale back again.
|
||||||
|
appendHistogram(series4, 210, h)
|
||||||
|
verifySeriesCounts(2, 5)
|
||||||
|
appendHistogram(series4, 220, staleHist)
|
||||||
|
verifySeriesCounts(3, 5)
|
||||||
|
|
||||||
|
// Make float histogram sample non-stale and stale back again.
|
||||||
|
appendFloatHistogram(series5, 210, fh)
|
||||||
|
verifySeriesCounts(2, 5)
|
||||||
|
appendFloatHistogram(series5, 220, staleFH)
|
||||||
|
verifySeriesCounts(3, 5)
|
||||||
|
|
||||||
|
// Series 1 and 3 are not stale at this point. Add a new sample to series 1 and series 5,
|
||||||
|
// so after the GC and removing series 2, 3, 4, we should be left with 1 stale and 1 non-stale series.
|
||||||
|
appendSample(series1, 400, 10)
|
||||||
|
appendFloatHistogram(series5, 400, staleFH)
|
||||||
|
verifySeriesCounts(3, 5)
|
||||||
|
|
||||||
|
// Test garbage collection behavior - stale series should be decremented when GC'd.
|
||||||
|
// Force a garbage collection by truncating old data.
|
||||||
|
require.NoError(t, head.Truncate(300))
|
||||||
|
|
||||||
|
// After truncation, run GC to collect old chunks/series.
|
||||||
|
head.gc()
|
||||||
|
|
||||||
|
// series 1 and series 5 are left.
|
||||||
|
verifySeriesCounts(1, 2)
|
||||||
|
|
||||||
|
// Test creating a new series for each of float, histogram, float histogram that starts as stale.
|
||||||
|
// This should be counted as stale.
|
||||||
|
series6 := labels.FromStrings("name", "series6", "direct", "stale")
|
||||||
|
series7 := labels.FromStrings("name", "series7", "direct", "stale")
|
||||||
|
series8 := labels.FromStrings("name", "series8", "direct", "stale")
|
||||||
|
appendSample(series6, 400, math.Float64frombits(value.StaleNaN))
|
||||||
|
verifySeriesCounts(2, 3)
|
||||||
|
appendHistogram(series7, 400, staleHist)
|
||||||
|
verifySeriesCounts(3, 4)
|
||||||
|
appendFloatHistogram(series8, 400, staleFH)
|
||||||
|
verifySeriesCounts(4, 5)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user