prometheus/tsdb/agent/db_append_v2_test.go
Bryan Boreham f1719fa1d4
[BUGFIX] Agent: fix crash from invalid type in pool (#17802)
We have separate pools for Appender and AppenderV2 objects, and must not
put another kind of object into them.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
2026-01-07 14:01:02 +00:00

1170 lines
37 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"context"
"fmt"
"math"
"path/filepath"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/testutil"
)
func TestDB_InvalidSeries_AppendV2(t *testing.T) {
s := createTestAgentDB(t, nil, DefaultOptions())
defer s.Close()
app := s.AppenderV2(context.Background())
t.Run("Samples", func(t *testing.T) {
_, err := app.Append(0, labels.Labels{}, 0, 0, 0, nil, nil, storage.AOptions{})
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels")
_, err = app.Append(0, labels.FromStrings("a", "1", "a", "2"), 0, 0, 0, nil, nil, storage.AOptions{})
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels")
})
t.Run("Histograms", func(t *testing.T) {
_, err := app.Append(0, labels.Labels{}, 0, 0, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{})
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels")
_, err = app.Append(0, labels.FromStrings("a", "1", "a", "2"), 0, 0, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{})
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels")
})
t.Run("Exemplars", func(t *testing.T) {
e := exemplar.Exemplar{Labels: labels.FromStrings("a", "1", "a", "2")}
_, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0, 0, nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{e},
})
partErr := &storage.AppendPartialError{}
require.ErrorAs(t, err, &partErr)
require.Len(t, partErr.ExemplarErrors, 1)
require.ErrorIs(t, partErr.ExemplarErrors[0], tsdb.ErrInvalidExemplar, "should reject duplicate labels")
e = exemplar.Exemplar{Labels: labels.FromStrings("a_somewhat_long_trace_id", "nYJSNtFrFTY37VR7mHzEE/LIDt7cdAQcuOzFajgmLDAdBSRHYPDzrxhMA4zz7el8naI/AoXFv9/e/G0vcETcIoNUi3OieeLfaIRQci2oa")}
_, err = app.Append(0, labels.FromStrings("a", "2"), 0, 0, 0, nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{e},
})
partErr = &storage.AppendPartialError{}
require.ErrorAs(t, err, &partErr)
require.Len(t, partErr.ExemplarErrors, 1)
require.ErrorIs(t, partErr.ExemplarErrors[0], storage.ErrExemplarLabelLength, "should reject too long label length")
// Inverse check.
e = exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true}
_, err = app.Append(0, labels.FromStrings("a", "1"), 0, 0, 0, nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{e},
})
require.NoError(t, err, "should not reject valid exemplars")
})
}
func TestCommit_AppendV2(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 8
)
s := createTestAgentDB(t, nil, DefaultOptions())
app := s.AppenderV2(context.TODO())
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for i := range numDatapoints {
sample := chunks.GenerateSamples(0, 1)
_, err := app.Append(0, lset, 0, sample[0].T(), sample[0].F(), nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{{
Labels: lset,
Ts: sample[0].T() + int64(i),
Value: sample[0].F(),
HasTs: true,
}},
})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
customBucketHistograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(i), 0, customBucketHistograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
customBucketFloatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, customBucketFloatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
require.NoError(t, s.Close())
sr, err := wlog.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err)
defer func() {
require.NoError(t, sr.Close())
}()
// Read records from WAL and check for expected count of series, samples, and exemplars.
var (
r = wlog.NewReader(sr)
dec = record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger())
walSeriesCount, walSamplesCount, walExemplarsCount, walHistogramCount, walFloatHistogramCount int
)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
var series []record.RefSeries
series, err = dec.Series(rec, series)
require.NoError(t, err)
walSeriesCount += len(series)
case record.Samples:
var samples []record.RefSample
samples, err = dec.Samples(rec, samples)
require.NoError(t, err)
walSamplesCount += len(samples)
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
var histograms []record.RefHistogramSample
histograms, err = dec.HistogramSamples(rec, histograms)
require.NoError(t, err)
walHistogramCount += len(histograms)
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
var floatHistograms []record.RefFloatHistogramSample
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
require.NoError(t, err)
walFloatHistogramCount += len(floatHistograms)
case record.Exemplars:
var exemplars []record.RefExemplar
exemplars, err = dec.Exemplars(rec, exemplars)
require.NoError(t, err)
walExemplarsCount += len(exemplars)
default:
}
}
// Check that the WAL contained the same number of committed series/samples/exemplars.
require.Equal(t, numSeries*5, walSeriesCount, "unexpected number of series")
require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples")
require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars")
require.Equal(t, numSeries*numHistograms*2, walHistogramCount, "unexpected number of histograms")
require.Equal(t, numSeries*numHistograms*2, walFloatHistogramCount, "unexpected number of float histograms")
// Check that we can still create both kinds of Appender - see https://github.com/prometheus/prometheus/issues/17800.
_ = s.Appender(context.TODO())
_ = s.AppenderV2(context.TODO())
}
func TestRollback_AppendV2(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 8
)
s := createTestAgentDB(t, nil, DefaultOptions())
app := s.AppenderV2(context.TODO())
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for range numDatapoints {
sample := chunks.GenerateSamples(0, 1)
_, err := app.Append(0, lset, 0, sample[0].T(), sample[0].F(), nil, nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
// Do a rollback, which should clear uncommitted data. A followup call to
// commit should persist nothing to the WAL.
require.NoError(t, app.Rollback())
require.NoError(t, app.Commit())
require.NoError(t, s.Close())
sr, err := wlog.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err)
defer func() {
require.NoError(t, sr.Close())
}()
// Read records from WAL and check for expected count of series and samples.
var (
r = wlog.NewReader(sr)
dec = record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger())
walSeriesCount, walSamplesCount, walHistogramCount, walFloatHistogramCount, walExemplarsCount int
)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
var series []record.RefSeries
series, err = dec.Series(rec, series)
require.NoError(t, err)
walSeriesCount += len(series)
case record.Samples:
var samples []record.RefSample
samples, err = dec.Samples(rec, samples)
require.NoError(t, err)
walSamplesCount += len(samples)
case record.Exemplars:
var exemplars []record.RefExemplar
exemplars, err = dec.Exemplars(rec, exemplars)
require.NoError(t, err)
walExemplarsCount += len(exemplars)
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
var histograms []record.RefHistogramSample
histograms, err = dec.HistogramSamples(rec, histograms)
require.NoError(t, err)
walHistogramCount += len(histograms)
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
var floatHistograms []record.RefFloatHistogramSample
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
require.NoError(t, err)
walFloatHistogramCount += len(floatHistograms)
default:
}
}
// Check that only series get stored after calling Rollback.
require.Equal(t, numSeries*5, walSeriesCount, "series should have been written to WAL")
require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL")
require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL")
require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL")
require.Equal(t, 0, walFloatHistogramCount, "float histograms should not have been written to WAL")
}
func TestFullTruncateWAL_AppendV2(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 800
lastTs = 500
)
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.TruncateFrequency = time.Minute * 2
s := createTestAgentDB(t, reg, opts)
defer func() {
require.NoError(t, s.Close())
}()
app := s.AppenderV2(context.TODO())
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for range numDatapoints {
_, err := app.Append(0, lset, 0, int64(lastTs), 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(lastTs), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(lastTs), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(lastTs), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(lastTs), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Truncate WAL with mint to GC all the samples.
s.truncate(lastTs + 1)
m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
}
func TestPartialTruncateWAL_AppendV2(t *testing.T) {
const (
numDatapoints = 1000
numSeries = 800
)
opts := DefaultOptions()
reg := prometheus.NewRegistry()
s := createTestAgentDB(t, reg, opts)
defer func() {
require.NoError(t, s.Close())
}()
app := s.AppenderV2(context.TODO())
// Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500.
var lastTs int64 = 500
lbls := labelsForTest(t.Name()+"batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_histogram_batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram_batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_float_histogram_batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram_batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600.
lastTs = 600
lbls = labelsForTest(t.Name()+"batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_histogram_batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram_batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_float_histogram_batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram_batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series.
s.truncate(lastTs - 1)
m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
require.Len(t, m.Metric, 1)
require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
}
func TestWALReplay_AppendV2(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 8
lastTs = 500
)
s := createTestAgentDB(t, nil, DefaultOptions())
app := s.AppenderV2(context.TODO())
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
require.NoError(t, s.Close())
// Hack: s.wal.Dir() is the /wal subdirectory of the original storage path.
// We need the original directory so we can recreate the storage for replay.
storageDir := filepath.Dir(s.wal.Dir())
reg := prometheus.NewRegistry()
replayStorage, err := Open(s.logger, reg, nil, storageDir, s.opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
defer func() {
require.NoError(t, replayStorage.Close())
}()
// Check if all the series are retrieved back from the WAL.
m := gatherFamily(t, reg, "prometheus_agent_active_series")
require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count")
// Check if lastTs of the samples retrieved from the WAL is retained.
metrics := replayStorage.series.series
for i := range metrics {
mp := metrics[i]
for _, v := range mp {
require.Equal(t, v.lastTs, int64(lastTs))
}
}
}
func Test_ExistingWAL_NextRef_AppendV2(t *testing.T) {
dbDir := t.TempDir()
rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
defer func() {
require.NoError(t, rs.Close())
}()
db, err := Open(promslog.NewNopLogger(), nil, rs, dbDir, DefaultOptions())
require.NoError(t, err)
seriesCount := 10
// Append <seriesCount> series
app := db.AppenderV2(context.Background())
for i := range seriesCount {
lset := labels.FromStrings(model.MetricNameLabel, fmt.Sprintf("series_%d", i))
_, err := app.Append(0, lset, 0, 0, 100, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
histogramCount := 10
histograms := tsdbutil.GenerateTestHistograms(histogramCount)
// Append <histogramCount> series
for i := range histogramCount {
lset := labels.FromStrings(model.MetricNameLabel, fmt.Sprintf("histogram_%d", i))
_, err := app.Append(0, lset, 0, 0, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Truncate the WAL to force creation of a new segment.
require.NoError(t, db.truncate(0))
require.NoError(t, db.Close())
// Create a new storage and see what nextRef is initialized to.
db, err = Open(promslog.NewNopLogger(), nil, rs, dbDir, DefaultOptions())
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
require.Equal(t, uint64(seriesCount+histogramCount), db.nextRef.Load(), "nextRef should be equal to the number of series written across the entire WAL")
}
func TestStorage_DuplicateExemplarsIgnored_AppendV2(t *testing.T) {
s := createTestAgentDB(t, nil, DefaultOptions())
app := s.AppenderV2(context.Background())
defer s.Close()
// Write a few exemplars to our appender and call Commit().
// If the Labels, Value or Timestamp are different than the last exemplar,
// then a new one should be appended; Otherwise, it should be skipped.
e1 := exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true}
e2 := exemplar.Exemplar{Labels: labels.FromStrings("b", "2"), Value: 20, Ts: 10, HasTs: true}
e3 := exemplar.Exemplar{Labels: labels.FromStrings("b", "2"), Value: 42, Ts: 10, HasTs: true}
e4 := exemplar.Exemplar{Labels: labels.FromStrings("b", "2"), Value: 42, Ts: 25, HasTs: true}
_, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0, 0, nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{e1, e1, e2, e2, e2, e3, e3, e4, e4},
})
require.NoError(t, err, "should not reject valid series")
require.NoError(t, app.Commit())
// Read back what was written to the WAL.
var walExemplarsCount int
sr, err := wlog.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err)
defer sr.Close()
r := wlog.NewReader(sr)
dec := record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger())
for r.Next() {
rec := r.Record()
if dec.Type(rec) == record.Exemplars {
var exemplars []record.RefExemplar
exemplars, err = dec.Exemplars(rec, exemplars)
require.NoError(t, err)
walExemplarsCount += len(exemplars)
}
}
// We had 9 calls to AppendExemplar but only 4 of those should have gotten through.
require.Equal(t, 4, walExemplarsCount)
}
func TestDBAllowOOOSamples_AppendV2(t *testing.T) {
const (
numDatapoints = 5
numHistograms = 5
numSeries = 4
offset = 100
)
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = math.MaxInt64
s := createTestAgentDB(t, reg, opts)
app := s.AppenderV2(context.TODO())
// Let's add some samples in the [offset, offset+numDatapoints) range.
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.Append(0, lset, 0, int64(i), float64(i), nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{{
Labels: lset,
Ts: int64(i) * 2,
Value: float64(i),
HasTs: true,
}},
})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i-offset], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i-offset], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i-offset], storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i-offset], storage.AOptions{})
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, float64(20), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, float64(80), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, s.Close())
// Hack: s.wal.Dir() is the /wal subdirectory of the original storage path.
// We need the original directory so we can recreate the storage for replay.
storageDir := filepath.Dir(s.wal.Dir())
// Replay the storage so that the lastTs for each series is recorded.
reg2 := prometheus.NewRegistry()
db, err := Open(s.logger, reg2, nil, storageDir, s.opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
app = db.AppenderV2(context.Background())
// Now the lastTs will have been recorded successfully.
// Let's try appending twice as many OOO samples in the [0, numDatapoints) range.
lbls = labelsForTest(t.Name()+"_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, int64(i), float64(i), nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{{
Labels: lset,
Ts: int64(i) * 2,
Value: float64(i),
HasTs: true,
}},
})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
m = gatherFamily(t, reg2, "prometheus_agent_samples_appended_total")
require.Equal(t, float64(40), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, float64(160), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, db.Close())
}
func TestDBOutOfOrderTimeWindow_AppendV2(t *testing.T) {
tc := []struct {
outOfOrderTimeWindow, firstTs, secondTs int64
expectedError error
}{
{0, 100, 101, nil},
{0, 100, 100, storage.ErrOutOfOrderSample},
{0, 100, 99, storage.ErrOutOfOrderSample},
{100, 100, 1, nil},
{100, 100, 0, storage.ErrOutOfOrderSample},
}
for _, c := range tc {
t.Run(fmt.Sprintf("outOfOrderTimeWindow=%d, firstTs=%d, secondTs=%d, expectedError=%s", c.outOfOrderTimeWindow, c.firstTs, c.secondTs, c.expectedError), func(t *testing.T) {
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = c.outOfOrderTimeWindow
s := createTestAgentDB(t, reg, opts)
app := s.AppenderV2(context.TODO())
lbls := labelsForTest(t.Name()+"_histogram", 1)
lset := labels.New(lbls[0]...)
_, err := app.Append(0, lset, 0, c.firstTs, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{})
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
_, err = app.Append(0, lset, 0, c.secondTs, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{})
require.ErrorIs(t, err, c.expectedError)
lbls = labelsForTest(t.Name(), 1)
lset = labels.New(lbls[0]...)
_, err = app.Append(0, lset, 0, c.firstTs, 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
_, err = app.Append(0, lset, 0, c.secondTs, 0, nil, nil, storage.AOptions{})
require.ErrorIs(t, err, c.expectedError)
expectedAppendedSamples := float64(2)
if c.expectedError != nil {
expectedAppendedSamples = 1
}
m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, expectedAppendedSamples, m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, expectedAppendedSamples, m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, s.Close())
})
}
}
// TestDB_EnableSTZeroInjection_AppendV2 replaces TestDBStartTimestampSamplesIngestion.
func TestDB_EnableSTZeroInjection_AppendV2(t *testing.T) {
t.Parallel()
// NOTE: Eventually wal sample and appendable sample should be the same.
type appendableSample struct {
st, t int64
v float64
lbls labels.Labels
h *histogram.Histogram
}
testHistograms := tsdbutil.GenerateTestHistograms(2)
zeroHistogram := &histogram.Histogram{
// The STZeroSample represents a counter reset by definition.
CounterResetHint: histogram.CounterReset,
// Replicate other fields to avoid needless chunk creation.
Schema: testHistograms[0].Schema,
ZeroThreshold: testHistograms[0].ZeroThreshold,
CustomValues: testHistograms[0].CustomValues,
}
lbls := labelsForTest(t.Name(), 1)
defLbls := labels.New(lbls[0]...)
testCases := []struct {
name string
inputSamples []appendableSample
expectedSamples []walSample
expectedSeriesCount int
}{
{
name: "in order ct+normal sample/floatSamples",
inputSamples: []appendableSample{
{t: 100, st: 1, v: 10, lbls: defLbls},
{t: 101, st: 1, v: 10, lbls: defLbls},
},
expectedSamples: []walSample{
{t: 1, f: 0, lbls: defLbls, ref: 1},
{t: 100, f: 10, lbls: defLbls, ref: 1},
{t: 101, f: 10, lbls: defLbls, ref: 1},
},
},
{
name: "ST+float && ST+histogram samples",
inputSamples: []appendableSample{
{
t: 100,
st: 30,
v: 20,
lbls: defLbls,
},
{
t: 300,
st: 230,
h: testHistograms[0],
lbls: defLbls,
},
},
expectedSamples: []walSample{
{t: 30, f: 0, lbls: defLbls, ref: 1},
{t: 100, f: 20, lbls: defLbls, ref: 1},
{t: 230, h: zeroHistogram, lbls: defLbls, ref: 1},
{t: 300, h: testHistograms[0], lbls: defLbls, ref: 1},
},
expectedSeriesCount: 1,
},
{
name: "ST+float && ST+histogram samples with error; should be ignored",
inputSamples: []appendableSample{
{
// invalid ST
t: 100,
st: 100,
v: 10,
lbls: defLbls,
},
{
// invalid ST histogram
t: 300,
st: 300,
h: testHistograms[0],
lbls: defLbls,
},
},
expectedSamples: []walSample{
{t: 100, f: 10, lbls: defLbls, ref: 1},
{t: 300, h: testHistograms[0], lbls: defLbls, ref: 1},
},
expectedSeriesCount: 0,
},
{
name: "In order ct+normal sample/histogram",
inputSamples: []appendableSample{
{t: 100, h: testHistograms[0], st: 1, lbls: defLbls},
{t: 101, h: testHistograms[1], st: 1, lbls: defLbls},
},
expectedSamples: []walSample{
{t: 1, h: zeroHistogram, lbls: defLbls, ref: 1},
{t: 100, h: testHistograms[0], lbls: defLbls, ref: 1},
{t: 101, h: testHistograms[1], lbls: defLbls, ref: 1},
},
},
{
name: "ct+normal then OOO sample/float",
inputSamples: []appendableSample{
{t: 60_000, st: 40_000, v: 10, lbls: defLbls},
{t: 120_000, st: 40_000, v: 10, lbls: defLbls},
{t: 180_000, st: 40_000, v: 10, lbls: defLbls},
{t: 50_000, st: 40_000, v: 10, lbls: defLbls},
},
expectedSamples: []walSample{
{t: 40_000, f: 0, lbls: defLbls, ref: 1},
{t: 60_000, f: 10, lbls: defLbls, ref: 1},
{t: 120_000, f: 10, lbls: defLbls, ref: 1},
{t: 180_000, f: 10, lbls: defLbls, ref: 1},
{t: 50_000, f: 10, lbls: defLbls, ref: 1}, // OOO sample.
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 360_000
opts.EnableSTAsZeroSample = true
s := createTestAgentDB(t, reg, opts)
for _, sample := range tc.inputSamples {
// Simulate one sample per series logic we have in all our ingestion paths in Prometheus.
app := s.AppenderV2(t.Context())
_, err := app.Append(0, sample.lbls, sample.st, sample.t, sample.v, sample.h, nil, storage.AOptions{})
require.NoError(t, err)
require.NoError(t, app.Commit())
}
// Close the DB to ensure all data is flushed to the WAL
require.NoError(t, s.Close())
// Check that we don't have any OOO samples in the WAL by checking metrics
families, err := reg.Gather()
require.NoError(t, err, "failed to gather metrics")
for _, f := range families {
if f.GetName() == "prometheus_agent_out_of_order_samples_total" {
t.Fatalf("unexpected metric %s", f.GetName())
}
}
got := readWALSamples(t, s.wal.Dir())
testutil.RequireEqualWithOptions(t, tc.expectedSamples, got, cmp.Options{cmp.AllowUnexported(walSample{})})
})
}
}