diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index f81f55248b..7d42d14feb 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -16,9 +16,11 @@ package agent import ( "context" "fmt" + "math" "path/filepath" "sync" "time" + "unicode/utf8" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -93,6 +95,8 @@ type dbMetrics struct { numActiveSeries prometheus.Gauge numWALSeriesPendingDeletion prometheus.Gauge totalAppendedSamples prometheus.Counter + totalAppendedExemplars prometheus.Counter + totalOutOfOrderSamples prometheus.Counter walTruncateDuration prometheus.Summary walCorruptionsTotal prometheus.Counter walTotalReplayDuration prometheus.Gauge @@ -119,6 +123,16 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { Help: "Total number of samples appended to the storage", }) + m.totalAppendedExemplars = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_exemplars_appended_total", + Help: "Total number of exemplars appended to the storage", + }) + + m.totalOutOfOrderSamples = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_out_of_order_samples_total", + Help: "Total number of out of order samples ingestion failed attempts.", + }) + m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "prometheus_agent_truncate_duration_seconds", Help: "Duration of WAL truncation.", @@ -159,6 +173,8 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { m.numActiveSeries, m.numWALSeriesPendingDeletion, m.totalAppendedSamples, + m.totalAppendedExemplars, + m.totalOutOfOrderSamples, m.walTruncateDuration, m.walCorruptionsTotal, m.walTotalReplayDuration, @@ -180,6 +196,15 @@ func (m *dbMetrics) Unregister() { m.numActiveSeries, m.numWALSeriesPendingDeletion, m.totalAppendedSamples, + m.totalAppendedExemplars, + m.totalOutOfOrderSamples, + m.walTruncateDuration, + m.walCorruptionsTotal, + m.walTotalReplayDuration, + m.checkpointDeleteFail, + m.checkpointDeleteTotal, + m.checkpointCreationFail, + m.checkpointCreationTotal, } for _, c := range cs { m.r.Unregister(c) @@ -257,9 +282,10 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin db.appenderPool.New = func() interface{} { return &appender{ - DB: db, - pendingSeries: make([]record.RefSeries, 0, 100), - pendingSamples: make([]record.RefSample, 0, 100), + DB: db, + pendingSeries: make([]record.RefSeries, 0, 100), + pendingSamples: make([]record.RefSample, 0, 100), + pendingExamplars: make([]record.RefExemplar, 0, 10), } } @@ -411,11 +437,8 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He return } decoded <- samples - case record.Tombstones: - // We don't care about tombstones - continue - case record.Exemplars: - // We don't care about exemplars + case record.Tombstones, record.Exemplars: + // We don't care about tombstones or exemplars during replay. continue default: errCh <- &wal.CorruptionErr{ @@ -665,82 +688,114 @@ func (db *DB) Close() error { type appender struct { *DB - pendingSeries []record.RefSeries - pendingSamples []record.RefSample + pendingSeries []record.RefSeries + pendingSamples []record.RefSample + pendingExamplars []record.RefExemplar + + // Pointers to the series referenced by each element of pendingSamples. + // Series lock is not held on elements. + sampleSeries []*memSeries } func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - if ref == 0 { - r, err := a.Add(l, t, v) - return storage.SeriesRef(r), err - } - return ref, a.AddFast(chunks.HeadSeriesRef(ref), t, v) -} + // series references and chunk references are identical for agent mode. + headRef := chunks.HeadSeriesRef(ref) -func (a *appender) Add(l labels.Labels, t int64, v float64) (chunks.HeadSeriesRef, error) { - hash := l.Hash() - series := a.series.GetByHash(hash, l) - if series != nil { - return series.ref, a.AddFast(series.ref, t, v) - } - - // Ensure no empty or duplicate labels have gotten through. This mirrors the - // equivalent validation code in the TSDB's headAppender. - l = l.WithoutEmpty() - if len(l) == 0 { - return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset") - } - - if lbl, dup := l.HasDuplicateLabelNames(); dup { - return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) - } - - ref := chunks.HeadSeriesRef(a.nextRef.Inc()) - series = &memSeries{ref: ref, lset: l, lastTs: t} - - a.pendingSeries = append(a.pendingSeries, record.RefSeries{ - Ref: ref, - Labels: l, - }) - a.pendingSamples = append(a.pendingSamples, record.RefSample{ - Ref: ref, - T: t, - V: v, - }) - - a.series.Set(hash, series) - - a.metrics.numActiveSeries.Inc() - a.metrics.totalAppendedSamples.Inc() - - return series.ref, nil -} - -func (a *appender) AddFast(ref chunks.HeadSeriesRef, t int64, v float64) error { - series := a.series.GetByID(ref) + series := a.series.GetByID(headRef) if series == nil { - return storage.ErrNotFound + // Ensure no empty or duplicate labels have gotten through. This mirrors the + // equivalent validation code in the TSDB's headAppender. + l = l.WithoutEmpty() + if len(l) == 0 { + return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset") + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + } + + var created bool + series, created = a.getOrCreate(l) + if created { + a.pendingSeries = append(a.pendingSeries, record.RefSeries{ + Ref: series.ref, + Labels: l, + }) + + a.metrics.numActiveSeries.Inc() + } } + series.Lock() defer series.Unlock() - // Update last recorded timestamp. Used by Storage.gc to determine if a - // series is dead. - series.lastTs = t + if t < series.lastTs { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + // NOTE: always modify pendingSamples and sampleSeries together a.pendingSamples = append(a.pendingSamples, record.RefSample{ - Ref: ref, + Ref: series.ref, T: t, V: v, }) + a.sampleSeries = append(a.sampleSeries, series) a.metrics.totalAppendedSamples.Inc() - return nil + return storage.SeriesRef(series.ref), nil +} + +func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool) { + hash := l.Hash() + + series = a.series.GetByHash(hash, l) + if series != nil { + return series, false + } + + ref := chunks.HeadSeriesRef(a.nextRef.Inc()) + series = &memSeries{ref: ref, lset: l, lastTs: math.MinInt64} + a.series.Set(hash, series) + return series, true } func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - // remote_write doesn't support exemplars yet, so do nothing here. - return 0, nil + // series references and chunk references are identical for agent mode. + headRef := chunks.HeadSeriesRef(ref) + + s := a.series.GetByID(headRef) + if s == nil { + return 0, fmt.Errorf("unknown series ref when trying to add exemplar: %d", ref) + } + + // Ensure no empty labels have gotten through. + e.Labels = e.Labels.WithoutEmpty() + + if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(tsdb.ErrInvalidExemplar, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + } + + // Exemplar label length does not include chars involved in text rendering such as quotes + // equals sign, or commas. See definition of const ExemplarMaxLabelLength. + labelSetLen := 0 + for _, l := range e.Labels { + labelSetLen += utf8.RuneCountInString(l.Name) + labelSetLen += utf8.RuneCountInString(l.Value) + + if labelSetLen > exemplar.ExemplarMaxLabelSetLength { + return 0, storage.ErrExemplarLabelLength + } + } + + a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{ + Ref: s.ref, + T: e.Ts, + V: e.Value, + Labels: e.Labels, + }) + + return storage.SeriesRef(s.ref), nil } // Commit submits the collected samples and purges the batch. @@ -767,6 +822,22 @@ func (a *appender) Commit() error { buf = buf[:0] } + if len(a.pendingExamplars) > 0 { + buf = encoder.Exemplars(a.pendingExamplars, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + var series *memSeries + for i, s := range a.pendingSamples { + series = a.sampleSeries[i] + if !series.updateTimestamp(s.T) { + a.metrics.totalOutOfOrderSamples.Inc() + } + } + //nolint:staticcheck a.bufPool.Put(buf) return a.Rollback() @@ -775,6 +846,8 @@ func (a *appender) Commit() error { func (a *appender) Rollback() error { a.pendingSeries = a.pendingSeries[:0] a.pendingSamples = a.pendingSamples[:0] + a.pendingExamplars = a.pendingExamplars[:0] + a.sampleSeries = a.sampleSeries[:0] a.appenderPool.Put(a) return nil } diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 4a196180db..1e5ea11180 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -15,8 +15,8 @@ package agent import ( "context" + "path/filepath" "strconv" - "sync" "testing" "time" @@ -26,25 +26,70 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/util/testutil" ) -func TestUnsupported(t *testing.T) { - promAgentDir := t.TempDir() +func TestDB_InvalidSeries(t *testing.T) { + s := createTestAgentDB(t, nil, DefaultOptions()) + defer s.Close() - opts := DefaultOptions() - logger := log.NewNopLogger() + app := s.Appender(context.Background()) - s, err := Open(logger, prometheus.NewRegistry(), nil, promAgentDir, opts) + t.Run("Samples", func(t *testing.T) { + _, err := app.Append(0, labels.Labels{}, 0, 0) + require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels") + + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}, 0, 0) + require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels") + }) + + t.Run("Exemplars", func(t *testing.T) { + sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0) + require.NoError(t, err, "should not reject valid series") + + _, err = app.AppendExemplar(0, nil, exemplar.Exemplar{}) + require.EqualError(t, err, "unknown series ref when trying to add exemplar: 0") + + e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}} + _, err = app.AppendExemplar(sRef, nil, e) + require.ErrorIs(t, err, tsdb.ErrInvalidExemplar, "should reject duplicate labels") + + e = exemplar.Exemplar{Labels: labels.Labels{{Name: "a_somewhat_long_trace_id", Value: "nYJSNtFrFTY37VR7mHzEE/LIDt7cdAQcuOzFajgmLDAdBSRHYPDzrxhMA4zz7el8naI/AoXFv9/e/G0vcETcIoNUi3OieeLfaIRQci2oa"}}} + _, err = app.AppendExemplar(sRef, nil, e) + require.ErrorIs(t, err, storage.ErrExemplarLabelLength, "should reject too long label length") + + // Inverse check + e = exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true} + _, err = app.AppendExemplar(sRef, nil, e) + require.NoError(t, err, "should not reject valid exemplars") + }) +} + +func createTestAgentDB(t *testing.T, reg prometheus.Registerer, opts *Options) *DB { + t.Helper() + + dbDir := t.TempDir() + rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil) + t.Cleanup(func() { + require.NoError(t, rs.Close()) + }) + + db, err := Open(log.NewNopLogger(), reg, rs, dbDir, opts) require.NoError(t, err) - defer func() { - require.NoError(t, s.Close()) - }() + return db +} + +func TestUnsupportedFunctions(t *testing.T) { + s := createTestAgentDB(t, nil, DefaultOptions()) + defer s.Close() t.Run("Querier", func(t *testing.T) { _, err := s.Querier(context.TODO(), 0, 0) @@ -68,93 +113,74 @@ func TestCommit(t *testing.T) { numSeries = 8 ) - promAgentDir := t.TempDir() + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.TODO()) lbls := labelsForTest(t.Name(), numSeries) - opts := DefaultOptions() - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func(rs *remote.Storage) { - require.NoError(t, rs.Close()) - }(remoteStorage) - - s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) - require.NoError(t, err) - - a := s.Appender(context.TODO()) - for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { sample := tsdbutil.GenerateSamples(0, 1) - _, err := a.Append(0, lset, sample[0].T(), sample[0].V()) + ref, err := app.Append(0, lset, sample[0].T(), sample[0].V()) + require.NoError(t, err) + + e := exemplar.Exemplar{ + Labels: lset, + Ts: sample[0].T(), + Value: sample[0].V(), + HasTs: true, + } + _, err = app.AppendExemplar(ref, lset, e) require.NoError(t, err) } } - require.NoError(t, a.Commit()) + require.NoError(t, app.Commit()) require.NoError(t, s.Close()) - // Read records from WAL and check for expected count of series and samples. - walSeriesCount := 0 - walSamplesCount := 0 - - reg = prometheus.NewRegistry() - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func() { - require.NoError(t, remoteStorage.Close()) - }() - - s1, err := Open(logger, nil, remoteStorage, promAgentDir, opts) + sr, err := wal.NewSegmentsReader(s.wal.Dir()) require.NoError(t, err) defer func() { - require.NoError(t, s1.Close()) + require.NoError(t, sr.Close()) }() - var dec record.Decoder + // Read records from WAL and check for expected count of series, samples, and exemplars. + var ( + r = wal.NewReader(sr) + dec record.Decoder - if err == nil { - sr, err := wal.NewSegmentsReader(s1.wal.Dir()) - require.NoError(t, err) - defer func() { - require.NoError(t, sr.Close()) - }() + walSeriesCount, walSamplesCount, walExemplarsCount int + ) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + var series []record.RefSeries + series, err = dec.Series(rec, series) + require.NoError(t, err) + walSeriesCount += len(series) - r := wal.NewReader(sr) - seriesPool := sync.Pool{ - New: func() interface{} { - return []record.RefSeries{} - }, - } - samplesPool := sync.Pool{ - New: func() interface{} { - return []record.RefSample{} - }, - } + case record.Samples: + var samples []record.RefSample + samples, err = dec.Samples(rec, samples) + require.NoError(t, err) + walSamplesCount += len(samples) - for r.Next() { - rec := r.Record() - switch dec.Type(rec) { - case record.Series: - series := seriesPool.Get().([]record.RefSeries)[:0] - series, _ = dec.Series(rec, series) - walSeriesCount += len(series) - case record.Samples: - samples := samplesPool.Get().([]record.RefSample)[:0] - samples, _ = dec.Samples(rec, samples) - walSamplesCount += len(samples) - default: - } + case record.Exemplars: + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + + default: } } - // Retrieved series count from WAL should match the count of series been added to the WAL. - require.Equal(t, walSeriesCount, numSeries) - - // Retrieved samples count from WAL should match the count of samples been added to the WAL. - require.Equal(t, walSamplesCount, numSeries*numDatapoints) + // Check that the WAL contained the same number of commited series/samples/exemplars. + require.Equal(t, numSeries, walSeriesCount, "unexpected number of series") + require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples") + require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars") } func TestRollback(t *testing.T) { @@ -163,93 +189,68 @@ func TestRollback(t *testing.T) { numSeries = 8 ) - promAgentDir := t.TempDir() + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.TODO()) lbls := labelsForTest(t.Name(), numSeries) - opts := DefaultOptions() - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func(rs *remote.Storage) { - require.NoError(t, rs.Close()) - }(remoteStorage) - - s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) - require.NoError(t, err) - - a := s.Appender(context.TODO()) - for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { sample := tsdbutil.GenerateSamples(0, 1) - _, err := a.Append(0, lset, sample[0].T(), sample[0].V()) + _, err := app.Append(0, lset, sample[0].T(), sample[0].V()) require.NoError(t, err) } } - require.NoError(t, a.Rollback()) + // Do a rollback, which should clear uncommitted data. A followup call to + // commit should persist nothing to the WAL. + require.NoError(t, app.Rollback()) + require.NoError(t, app.Commit()) require.NoError(t, s.Close()) - // Read records from WAL and check for expected count of series and samples. - walSeriesCount := 0 - walSamplesCount := 0 - - reg = prometheus.NewRegistry() - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func() { - require.NoError(t, remoteStorage.Close()) - }() - - s1, err := Open(logger, nil, remoteStorage, promAgentDir, opts) + sr, err := wal.NewSegmentsReader(s.wal.Dir()) require.NoError(t, err) defer func() { - require.NoError(t, s1.Close()) + require.NoError(t, sr.Close()) }() - var dec record.Decoder + // Read records from WAL and check for expected count of series and samples. + var ( + r = wal.NewReader(sr) + dec record.Decoder - if err == nil { - sr, err := wal.NewSegmentsReader(s1.wal.Dir()) - require.NoError(t, err) - defer func() { - require.NoError(t, sr.Close()) - }() + walSeriesCount, walSamplesCount, walExemplarsCount int + ) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + var series []record.RefSeries + series, err = dec.Series(rec, series) + require.NoError(t, err) + walSeriesCount += len(series) - r := wal.NewReader(sr) - seriesPool := sync.Pool{ - New: func() interface{} { - return []record.RefSeries{} - }, - } - samplesPool := sync.Pool{ - New: func() interface{} { - return []record.RefSample{} - }, - } + case record.Samples: + var samples []record.RefSample + samples, err = dec.Samples(rec, samples) + require.NoError(t, err) + walSamplesCount += len(samples) - for r.Next() { - rec := r.Record() - switch dec.Type(rec) { - case record.Series: - series := seriesPool.Get().([]record.RefSeries)[:0] - series, _ = dec.Series(rec, series) - walSeriesCount += len(series) - case record.Samples: - samples := samplesPool.Get().([]record.RefSample)[:0] - samples, _ = dec.Samples(rec, samples) - walSamplesCount += len(samples) - default: - } + case record.Exemplars: + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + + default: } } - // Retrieved series count from WAL should be zero. - require.Equal(t, walSeriesCount, 0) - - // Retrieved samples count from WAL should be zero. - require.Equal(t, walSamplesCount, 0) + // Check that the rollback ensured nothing got stored. + require.Equal(t, 0, walSeriesCount, "series should not have been written to WAL") + require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL") + require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL") } func TestFullTruncateWAL(t *testing.T) { @@ -259,34 +260,25 @@ func TestFullTruncateWAL(t *testing.T) { lastTs = 500 ) - promAgentDir := t.TempDir() - - lbls := labelsForTest(t.Name(), numSeries) + reg := prometheus.NewRegistry() opts := DefaultOptions() opts.TruncateFrequency = time.Minute * 2 - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func() { - require.NoError(t, remoteStorage.Close()) - }() - s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) - require.NoError(t, err) + s := createTestAgentDB(t, reg, opts) defer func() { require.NoError(t, s.Close()) }() + app := s.Appender(context.TODO()) - a := s.Appender(context.TODO()) - + lbls := labelsForTest(t.Name(), numSeries) for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { - _, err := a.Append(0, lset, int64(lastTs), 0) + _, err := app.Append(0, lset, int64(lastTs), 0) require.NoError(t, err) } - require.NoError(t, a.Commit()) + require.NoError(t, app.Commit()) } // Truncate WAL with mint to GC all the samples. @@ -302,52 +294,40 @@ func TestPartialTruncateWAL(t *testing.T) { numSeries = 800 ) - promAgentDir := t.TempDir() - opts := DefaultOptions() opts.TruncateFrequency = time.Minute * 2 - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func() { - require.NoError(t, remoteStorage.Close()) - }() - s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) - require.NoError(t, err) + reg := prometheus.NewRegistry() + s := createTestAgentDB(t, reg, opts) defer func() { require.NoError(t, s.Close()) }() - - a := s.Appender(context.TODO()) - - var lastTs int64 + app := s.Appender(context.TODO()) // Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500. - lastTs = 500 + var lastTs int64 = 500 lbls := labelsForTest(t.Name()+"batch-1", numSeries) for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { - _, err := a.Append(0, lset, lastTs, 0) + _, err := app.Append(0, lset, lastTs, 0) require.NoError(t, err) } - require.NoError(t, a.Commit()) + require.NoError(t, app.Commit()) } // Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600. lastTs = 600 - lbls = labelsForTest(t.Name()+"batch-2", numSeries) for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { - _, err := a.Append(0, lset, lastTs, 0) + _, err := app.Append(0, lset, lastTs, 0) require.NoError(t, err) } - require.NoError(t, a.Commit()) + require.NoError(t, app.Commit()) } // Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series. @@ -364,53 +344,41 @@ func TestWALReplay(t *testing.T) { lastTs = 500 ) - promAgentDir := t.TempDir() + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.TODO()) lbls := labelsForTest(t.Name(), numSeries) - opts := DefaultOptions() - - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) - defer func() { - require.NoError(t, remoteStorage.Close()) - }() - - s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) - require.NoError(t, err) - - a := s.Appender(context.TODO()) - for _, l := range lbls { lset := labels.New(l...) for i := 0; i < numDatapoints; i++ { - _, err := a.Append(0, lset, lastTs, 0) + _, err := app.Append(0, lset, lastTs, 0) require.NoError(t, err) } } - require.NoError(t, a.Commit()) + require.NoError(t, app.Commit()) require.NoError(t, s.Close()) - restartOpts := DefaultOptions() - restartLogger := log.NewNopLogger() - restartReg := prometheus.NewRegistry() + // Hack: s.wal.Dir() is the /wal subdirectory of the original storage path. + // We need the original directory so we can recreate the storage for replay. + storageDir := filepath.Dir(s.wal.Dir()) - // Open a new DB with the same WAL to check that series from the previous DB - // get replayed. - replayDB, err := Open(restartLogger, restartReg, nil, promAgentDir, restartOpts) - require.NoError(t, err) + reg := prometheus.NewRegistry() + replayStorage, err := Open(s.logger, reg, nil, storageDir, s.opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } defer func() { - require.NoError(t, replayDB.Close()) + require.NoError(t, replayStorage.Close()) }() // Check if all the series are retrieved back from the WAL. - m := gatherFamily(t, restartReg, "prometheus_agent_active_series") + m := gatherFamily(t, reg, "prometheus_agent_active_series") require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count") // Check if lastTs of the samples retrieved from the WAL is retained. - metrics := replayDB.series.series + metrics := replayStorage.series.series for i := 0; i < len(metrics); i++ { mp := metrics[i] for _, v := range mp { diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go index 73fcb60099..f30ff96200 100644 --- a/tsdb/agent/series.go +++ b/tsdb/agent/series.go @@ -24,11 +24,26 @@ import ( type memSeries struct { sync.Mutex - ref chunks.HeadSeriesRef - lset labels.Labels + ref chunks.HeadSeriesRef + lset labels.Labels + + // Last recorded timestamp. Used by Storage.gc to determine if a series is + // stale. lastTs int64 } +// updateTimestamp obtains the lock on s and will attempt to update lastTs. +// fails if newTs < lastTs. +func (m *memSeries) updateTimestamp(newTs int64) bool { + m.Lock() + defer m.Unlock() + if newTs >= m.lastTs { + m.lastTs = newTs + return true + } + return false +} + // seriesHashmap is a simple hashmap for memSeries by their label set. // It is built on top of a regular hashmap and holds a slice of series to // resolve hash collisions. Its methods require the hash to be submitted