prometheus/tsdb/agent/db_append_v2_test.go
bwplotka 6ab5d8f9be feat(agent): add support for appending ST
Signed-off-by: bwplotka <bwplotka@gmail.com>
2026-03-10 12:27:48 +00:00

1194 lines
38 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"context"
"fmt"
"math"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/testutil"
)
func TestDB_InvalidSeries_AppendV2(t *testing.T) {
s := createTestAgentDB(t, nil, DefaultOptions())
defer s.Close()
app := s.AppenderV2(context.Background())
t.Run("Samples", func(t *testing.T) {
_, err := app.Append(0, labels.Labels{}, 0, 0, 0, nil, nil, storage.AOptions{})
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels")
_, err = app.Append(0, labels.FromStrings("a", "1", "a", "2"), 0, 0, 0, nil, nil, storage.AOptions{})
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels")
})
t.Run("Histograms", func(t *testing.T) {
_, err := app.Append(0, labels.Labels{}, 0, 0, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{})
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels")
_, err = app.Append(0, labels.FromStrings("a", "1", "a", "2"), 0, 0, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{})
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels")
})
t.Run("Exemplars", func(t *testing.T) {
e := exemplar.Exemplar{Labels: labels.FromStrings("a", "1", "a", "2")}
_, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0, 0, nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{e},
})
partErr := &storage.AppendPartialError{}
require.ErrorAs(t, err, &partErr)
require.Len(t, partErr.ExemplarErrors, 1)
require.ErrorIs(t, partErr.ExemplarErrors[0], tsdb.ErrInvalidExemplar, "should reject duplicate labels")
e = exemplar.Exemplar{Labels: labels.FromStrings("a_somewhat_long_trace_id", "nYJSNtFrFTY37VR7mHzEE/LIDt7cdAQcuOzFajgmLDAdBSRHYPDzrxhMA4zz7el8naI/AoXFv9/e/G0vcETcIoNUi3OieeLfaIRQci2oa")}
_, err = app.Append(0, labels.FromStrings("a", "2"), 0, 0, 0, nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{e},
})
partErr = &storage.AppendPartialError{}
require.ErrorAs(t, err, &partErr)
require.Len(t, partErr.ExemplarErrors, 1)
require.ErrorIs(t, partErr.ExemplarErrors[0], storage.ErrExemplarLabelLength, "should reject too long label length")
// Inverse check.
e = exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true}
_, err = app.Append(0, labels.FromStrings("a", "1"), 0, 0, 0, nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{e},
})
require.NoError(t, err, "should not reject valid exemplars")
})
}
// TestCommit_AppendV2 tests Appender commit.
// TODO(bwplotka): Rewrite this so Refs are generated, then appended, then expected so we test the
// exact data durability.
func TestCommit_AppendV2(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 8
)
for _, enableSTStorage := range []bool{false, true} {
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
opts := DefaultOptions()
opts.EnableSTStorage = enableSTStorage
s := createTestAgentDB(t, nil, opts)
var (
expectedSampleSTs []int64
gotSampleSTs []int64
)
if enableSTStorage {
expectedSampleSTs = make([]int64, 0, numSeries*numDatapoints)
gotSampleSTs = make([]int64, 0, numSeries*numDatapoints)
}
app := s.AppenderV2(t.Context())
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for i := range numDatapoints {
sample := chunks.GenerateSamples(0, 1)
st := int64(i + 1234)
_, err := app.Append(0, lset, st, sample[0].T()+2000, sample[0].F(), nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{{
Labels: lset,
Ts: sample[0].T() + int64(i) + 2000,
Value: sample[0].F(),
HasTs: true,
}},
})
require.NoError(t, err)
if enableSTStorage {
expectedSampleSTs = append(expectedSampleSTs, st)
}
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, int64(i+2234), int64(i+2000), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
customBucketHistograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, int64(i+3234), int64(i+2000), 0, customBucketHistograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, int64(i+4234), int64(i+2000), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
customBucketFloatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, int64(i+5234), int64(i+2000), 0, nil, customBucketFloatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
require.NoError(t, s.Close())
sr, err := wlog.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err)
defer func() {
require.NoError(t, sr.Close())
}()
// Read records from WAL and check for expected count of series, samples, and exemplars.
var (
r = wlog.NewReader(sr)
dec = record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger())
walSeriesCount, walSamplesCount, walExemplarsCount, walHistogramCount, walFloatHistogramCount int
)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
var series []record.RefSeries
series, err = dec.Series(rec, series)
require.NoError(t, err)
walSeriesCount += len(series)
case record.Samples:
if enableSTStorage {
t.Errorf("Got V1 Samples when ST enabled")
}
var samples []record.RefSample
samples, err = dec.Samples(rec, samples)
require.NoError(t, err)
walSamplesCount += len(samples)
case record.SamplesV2:
if !enableSTStorage {
t.Errorf("Got V2 Samples when ST disabled")
}
var samples []record.RefSample
samples, err = dec.Samples(rec, samples)
require.NoError(t, err)
for _, s := range samples {
gotSampleSTs = append(gotSampleSTs, s.ST)
}
walSamplesCount += len(samples)
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
var histograms []record.RefHistogramSample
histograms, err = dec.HistogramSamples(rec, histograms)
require.NoError(t, err)
walHistogramCount += len(histograms)
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
var floatHistograms []record.RefFloatHistogramSample
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
require.NoError(t, err)
walFloatHistogramCount += len(floatHistograms)
case record.Exemplars:
var exemplars []record.RefExemplar
exemplars, err = dec.Exemplars(rec, exemplars)
require.NoError(t, err)
walExemplarsCount += len(exemplars)
default:
}
}
// Check that the WAL contained the same number of committed series/samples/exemplars.
require.Equal(t, numSeries*5, walSeriesCount, "unexpected number of series")
require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples")
require.Equal(t, expectedSampleSTs, gotSampleSTs, "unexpected STs received")
require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars")
require.Equal(t, numSeries*numHistograms*2, walHistogramCount, "unexpected number of histograms")
require.Equal(t, numSeries*numHistograms*2, walFloatHistogramCount, "unexpected number of float histograms")
// Check that we can still create both kinds of Appender.
// Regression test against https://github.com/prometheus/prometheus/issues/17800.
_ = s.Appender(t.Context())
_ = s.AppenderV2(t.Context())
})
}
}
func TestRollbackAppendV2(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 8
)
for _, enableSTStorage := range []bool{false, true} {
opts := DefaultOptions()
opts.EnableSTStorage = enableSTStorage
s := createTestAgentDB(t, nil, opts)
app := s.AppenderV2(context.TODO())
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for i := range numDatapoints {
sample := chunks.GenerateSamples(0, 1)
_, err := app.Append(0, lset, int64(i), sample[0].T()+2000, sample[0].F(), nil, nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, int64(i), int64(i+2000), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, int64(i), int64(i+2000), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, int64(i), int64(i+2000), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, int64(i), int64(i+2000), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
// Do a rollback, which should clear uncommitted data. A followup call to
// commit should persist nothing to the WAL.
require.NoError(t, app.Rollback())
require.NoError(t, app.Commit())
require.NoError(t, s.Close())
sr, err := wlog.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err)
defer func() {
require.NoError(t, sr.Close())
}()
// Read records from WAL and check for expected count of series and samples.
var (
r = wlog.NewReader(sr)
dec = record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger())
walSeriesCount int
)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
var series []record.RefSeries
series, err = dec.Series(rec, series)
require.NoError(t, err)
walSeriesCount += len(series)
case record.Samples, record.SamplesV2:
t.Errorf("should not have found samples")
case record.Exemplars:
t.Errorf("should not have found exemplars")
case record.HistogramSamples, record.CustomBucketsHistogramSamples, record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
t.Errorf("should not have found histograms")
default:
}
}
// Check that only series get stored after calling Rollback.
require.Equal(t, numSeries*5, walSeriesCount, "series should have been written to WAL")
}
}
func TestFullTruncateWAL_AppendV2(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 800
lastTs = 500
)
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.TruncateFrequency = time.Minute * 2
s := createTestAgentDB(t, reg, opts)
defer func() {
require.NoError(t, s.Close())
}()
app := s.AppenderV2(context.TODO())
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for range numDatapoints {
_, err := app.Append(0, lset, 0, int64(lastTs), 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(lastTs), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(lastTs), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(lastTs), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, int64(lastTs), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Truncate WAL with mint to GC all the samples.
s.truncate(lastTs + 1)
m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
}
func TestPartialTruncateWAL_AppendV2(t *testing.T) {
const (
numDatapoints = 1000
numSeries = 800
)
opts := DefaultOptions()
reg := prometheus.NewRegistry()
s := createTestAgentDB(t, reg, opts)
defer func() {
require.NoError(t, s.Close())
}()
app := s.AppenderV2(context.TODO())
// Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500.
var lastTs int64 = 500
lbls := labelsForTest(t.Name()+"batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_histogram_batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram_batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_float_histogram_batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram_batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600.
lastTs = 600
lbls = labelsForTest(t.Name()+"batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_histogram_batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram_batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_float_histogram_batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram_batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numDatapoints)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series.
s.truncate(lastTs - 1)
m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
require.Len(t, m.Metric, 1)
require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
}
func TestWALReplay_AppendV2(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 8
lastTs = 500
)
s := createTestAgentDB(t, nil, DefaultOptions())
app := s.AppenderV2(context.TODO())
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for range numDatapoints {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := range numHistograms {
_, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
require.NoError(t, s.Close())
// Hack: s.wal.Dir() is the /wal subdirectory of the original storage path.
// We need the original directory so we can recreate the storage for replay.
storageDir := filepath.Dir(s.wal.Dir())
reg := prometheus.NewRegistry()
replayStorage, err := Open(s.logger, reg, nil, storageDir, s.opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
defer func() {
require.NoError(t, replayStorage.Close())
}()
// Check if all the series are retrieved back from the WAL.
m := gatherFamily(t, reg, "prometheus_agent_active_series")
require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count")
// Check if lastTs of the samples retrieved from the WAL is retained.
metrics := replayStorage.series.series
for i := range metrics {
mp := metrics[i]
for _, v := range mp {
require.Equal(t, v.lastTs, int64(lastTs))
}
}
}
func Test_ExistingWAL_NextRef_AppendV2(t *testing.T) {
dbDir := t.TempDir()
rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
defer func() {
require.NoError(t, rs.Close())
}()
db, err := Open(promslog.NewNopLogger(), nil, rs, dbDir, DefaultOptions())
require.NoError(t, err)
seriesCount := 10
// Append <seriesCount> series
app := db.AppenderV2(context.Background())
for i := range seriesCount {
lset := labels.FromStrings(model.MetricNameLabel, fmt.Sprintf("series_%d", i))
_, err := app.Append(0, lset, 0, 0, 100, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
histogramCount := 10
histograms := tsdbutil.GenerateTestHistograms(histogramCount)
// Append <histogramCount> series
for i := range histogramCount {
lset := labels.FromStrings(model.MetricNameLabel, fmt.Sprintf("histogram_%d", i))
_, err := app.Append(0, lset, 0, 0, 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Truncate the WAL to force creation of a new segment.
require.NoError(t, db.truncate(0))
require.NoError(t, db.Close())
// Create a new storage and see what nextRef is initialized to.
db, err = Open(promslog.NewNopLogger(), nil, rs, dbDir, DefaultOptions())
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
require.Equal(t, uint64(seriesCount+histogramCount), db.nextRef.Load(), "nextRef should be equal to the number of series written across the entire WAL")
}
func TestStorage_DuplicateExemplarsIgnored_AppendV2(t *testing.T) {
s := createTestAgentDB(t, nil, DefaultOptions())
app := s.AppenderV2(context.Background())
defer s.Close()
// Write a few exemplars to our appender and call Commit().
// If the Labels, Value or Timestamp are different than the last exemplar,
// then a new one should be appended; Otherwise, it should be skipped.
e1 := exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true}
e2 := exemplar.Exemplar{Labels: labels.FromStrings("b", "2"), Value: 20, Ts: 10, HasTs: true}
e3 := exemplar.Exemplar{Labels: labels.FromStrings("b", "2"), Value: 42, Ts: 10, HasTs: true}
e4 := exemplar.Exemplar{Labels: labels.FromStrings("b", "2"), Value: 42, Ts: 25, HasTs: true}
_, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0, 0, nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{e1, e1, e2, e2, e2, e3, e3, e4, e4},
})
require.NoError(t, err, "should not reject valid series")
require.NoError(t, app.Commit())
// Read back what was written to the WAL.
var walExemplarsCount int
sr, err := wlog.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err)
defer sr.Close()
r := wlog.NewReader(sr)
dec := record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger())
for r.Next() {
rec := r.Record()
if dec.Type(rec) == record.Exemplars {
var exemplars []record.RefExemplar
exemplars, err = dec.Exemplars(rec, exemplars)
require.NoError(t, err)
walExemplarsCount += len(exemplars)
}
}
// We had 9 calls to AppendExemplar but only 4 of those should have gotten through.
require.Equal(t, 4, walExemplarsCount)
}
func TestDBAllowOOOSamples_AppendV2(t *testing.T) {
const (
numDatapoints = 5
numHistograms = 5
numSeries = 4
offset = 100
)
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = math.MaxInt64
s := createTestAgentDB(t, reg, opts)
app := s.AppenderV2(context.TODO())
// Let's add some samples in the [offset, offset+numDatapoints) range.
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.Append(0, lset, 0, int64(i), float64(i), nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{{
Labels: lset,
Ts: int64(i) * 2,
Value: float64(i),
HasTs: true,
}},
})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i-offset], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i-offset], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i-offset], storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i-offset], storage.AOptions{})
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, float64(20), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, float64(80), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, s.Close())
// Hack: s.wal.Dir() is the /wal subdirectory of the original storage path.
// We need the original directory so we can recreate the storage for replay.
storageDir := filepath.Dir(s.wal.Dir())
// Replay the storage so that the lastTs for each series is recorded.
reg2 := prometheus.NewRegistry()
db, err := Open(s.logger, reg2, nil, storageDir, s.opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
app = db.AppenderV2(context.Background())
// Now the lastTs will have been recorded successfully.
// Let's try appending twice as many OOO samples in the [0, numDatapoints) range.
lbls = labelsForTest(t.Name()+"_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, int64(i), float64(i), nil, nil, storage.AOptions{
Exemplars: []exemplar.Exemplar{{
Labels: lset,
Ts: int64(i) * 2,
Value: float64(i),
HasTs: true,
}},
})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_custom_buckets_float_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms)
for i := range numDatapoints {
_, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{})
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
m = gatherFamily(t, reg2, "prometheus_agent_samples_appended_total")
require.Equal(t, float64(40), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, float64(160), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, db.Close())
}
func TestDBOutOfOrderTimeWindow_AppendV2(t *testing.T) {
tc := []struct {
outOfOrderTimeWindow, firstTs, secondTs int64
expectedError error
}{
{0, 100, 101, nil},
{0, 100, 100, storage.ErrOutOfOrderSample},
{0, 100, 99, storage.ErrOutOfOrderSample},
{100, 100, 1, nil},
{100, 100, 0, storage.ErrOutOfOrderSample},
}
for _, c := range tc {
t.Run(fmt.Sprintf("outOfOrderTimeWindow=%d, firstTs=%d, secondTs=%d, expectedError=%s", c.outOfOrderTimeWindow, c.firstTs, c.secondTs, c.expectedError), func(t *testing.T) {
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = c.outOfOrderTimeWindow
s := createTestAgentDB(t, reg, opts)
app := s.AppenderV2(context.TODO())
lbls := labelsForTest(t.Name()+"_histogram", 1)
lset := labels.New(lbls[0]...)
_, err := app.Append(0, lset, 0, c.firstTs, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{})
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
_, err = app.Append(0, lset, 0, c.secondTs, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{})
require.ErrorIs(t, err, c.expectedError)
lbls = labelsForTest(t.Name(), 1)
lset = labels.New(lbls[0]...)
_, err = app.Append(0, lset, 0, c.firstTs, 0, nil, nil, storage.AOptions{})
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
_, err = app.Append(0, lset, 0, c.secondTs, 0, nil, nil, storage.AOptions{})
require.ErrorIs(t, err, c.expectedError)
expectedAppendedSamples := float64(2)
if c.expectedError != nil {
expectedAppendedSamples = 1
}
m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, expectedAppendedSamples, m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, expectedAppendedSamples, m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, s.Close())
})
}
}
// TestDB_EnableSTZeroInjection_AppendV2 replaces TestDBStartTimestampSamplesIngestion.
func TestDB_EnableSTZeroInjection_AppendV2(t *testing.T) {
t.Parallel()
// NOTE: Eventually wal sample and appendable sample should be the same.
type appendableSample struct {
st, t int64
v float64
lbls labels.Labels
h *histogram.Histogram
}
testHistograms := tsdbutil.GenerateTestHistograms(2)
zeroHistogram := &histogram.Histogram{
// The STZeroSample represents a counter reset by definition.
CounterResetHint: histogram.CounterReset,
// Replicate other fields to avoid needless chunk creation.
Schema: testHistograms[0].Schema,
ZeroThreshold: testHistograms[0].ZeroThreshold,
CustomValues: testHistograms[0].CustomValues,
}
lbls := labelsForTest(t.Name(), 1)
defLbls := labels.New(lbls[0]...)
testCases := []struct {
name string
inputSamples []appendableSample
expectedSamples []walSample
expectedSeriesCount int
}{
{
name: "in order ct+normal sample/floatSamples",
inputSamples: []appendableSample{
{t: 100, st: 1, v: 10, lbls: defLbls},
{t: 101, st: 1, v: 10, lbls: defLbls},
},
expectedSamples: []walSample{
{t: 1, f: 0, lbls: defLbls, ref: 1},
{t: 100, f: 10, lbls: defLbls, ref: 1},
{t: 101, f: 10, lbls: defLbls, ref: 1},
},
},
{
name: "ST+float && ST+histogram samples",
inputSamples: []appendableSample{
{
t: 100,
st: 30,
v: 20,
lbls: defLbls,
},
{
t: 300,
st: 230,
h: testHistograms[0],
lbls: defLbls,
},
},
expectedSamples: []walSample{
{t: 30, f: 0, lbls: defLbls, ref: 1},
{t: 100, f: 20, lbls: defLbls, ref: 1},
{t: 230, h: zeroHistogram, lbls: defLbls, ref: 1},
{t: 300, h: testHistograms[0], lbls: defLbls, ref: 1},
},
expectedSeriesCount: 1,
},
{
name: "ST+float && ST+histogram samples with error; should be ignored",
inputSamples: []appendableSample{
{
// invalid ST
t: 100,
st: 100,
v: 10,
lbls: defLbls,
},
{
// invalid ST histogram
t: 300,
st: 300,
h: testHistograms[0],
lbls: defLbls,
},
},
expectedSamples: []walSample{
{t: 100, f: 10, lbls: defLbls, ref: 1},
{t: 300, h: testHistograms[0], lbls: defLbls, ref: 1},
},
expectedSeriesCount: 0,
},
{
name: "In order ct+normal sample/histogram",
inputSamples: []appendableSample{
{t: 100, h: testHistograms[0], st: 1, lbls: defLbls},
{t: 101, h: testHistograms[1], st: 1, lbls: defLbls},
},
expectedSamples: []walSample{
{t: 1, h: zeroHistogram, lbls: defLbls, ref: 1},
{t: 100, h: testHistograms[0], lbls: defLbls, ref: 1},
{t: 101, h: testHistograms[1], lbls: defLbls, ref: 1},
},
},
{
name: "ct+normal then OOO sample/float",
inputSamples: []appendableSample{
{t: 60_000, st: 40_000, v: 10, lbls: defLbls},
{t: 120_000, st: 40_000, v: 10, lbls: defLbls},
{t: 180_000, st: 40_000, v: 10, lbls: defLbls},
{t: 50_000, st: 40_000, v: 10, lbls: defLbls},
},
expectedSamples: []walSample{
{t: 40_000, f: 0, lbls: defLbls, ref: 1},
{t: 60_000, f: 10, lbls: defLbls, ref: 1},
{t: 120_000, f: 10, lbls: defLbls, ref: 1},
{t: 180_000, f: 10, lbls: defLbls, ref: 1},
{t: 50_000, f: 10, lbls: defLbls, ref: 1}, // OOO sample.
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 360_000
opts.EnableSTAsZeroSample = true
s := createTestAgentDB(t, reg, opts)
for _, sample := range tc.inputSamples {
// Simulate one sample per series logic we have in all our ingestion paths in Prometheus.
app := s.AppenderV2(t.Context())
_, err := app.Append(0, sample.lbls, sample.st, sample.t, sample.v, sample.h, nil, storage.AOptions{})
require.NoError(t, err)
require.NoError(t, app.Commit())
}
// Close the DB to ensure all data is flushed to the WAL
require.NoError(t, s.Close())
// Check that we don't have any OOO samples in the WAL by checking metrics
families, err := reg.Gather()
require.NoError(t, err, "failed to gather metrics")
for _, f := range families {
if f.GetName() == "prometheus_agent_out_of_order_samples_total" {
t.Fatalf("unexpected metric %s", f.GetName())
}
}
got := readWALSamples(t, s.wal.Dir())
testutil.RequireEqualWithOptions(t, tc.expectedSamples, got, cmp.Options{cmp.AllowUnexported(walSample{})})
})
}
}