mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-11-04 02:11:01 +01:00 
			
		
		
		
	* tsdb/agent: synchronize appender code with grafana/agent main This commit synchronize the appender code with grafana/agent main. This includes adding support for appending exemplars. Closes #9610 Signed-off-by: Robert Fratto <robertfratto@gmail.com> * tsdb/agent: fix build error Signed-off-by: Robert Fratto <robertfratto@gmail.com> * tsdb/agent: introduce some exemplar tests, refactor tests a little Signed-off-by: Robert Fratto <robertfratto@gmail.com> * tsdb/agent: address review feedback - Re-use hash when creating a new series - Fix typo in exemplar append error Signed-off-by: Robert Fratto <robertfratto@gmail.com> * tsdb/agent: remove unused AddFast method Signed-off-by: Robert Fratto <robertfratto@gmail.com> * tsdb/agent: close wal reader after test Signed-off-by: Robert Fratto <robertfratto@gmail.com> * tsdb/agent: add out-of-order tracking, change series TS in commit Signed-off-by: Robert Fratto <robertfratto@gmail.com> * address review feedback Signed-off-by: Robert Fratto <robertfratto@gmail.com>
		
			
				
	
	
		
			448 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			448 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2021 The Prometheus Authors
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
// http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package agent
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"path/filepath"
 | 
						|
	"strconv"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/go-kit/log"
 | 
						|
 | 
						|
	"github.com/prometheus/client_golang/prometheus"
 | 
						|
	dto "github.com/prometheus/client_model/go"
 | 
						|
	"github.com/stretchr/testify/require"
 | 
						|
 | 
						|
	"github.com/prometheus/prometheus/model/exemplar"
 | 
						|
	"github.com/prometheus/prometheus/model/labels"
 | 
						|
	"github.com/prometheus/prometheus/storage"
 | 
						|
	"github.com/prometheus/prometheus/storage/remote"
 | 
						|
	"github.com/prometheus/prometheus/tsdb"
 | 
						|
	"github.com/prometheus/prometheus/tsdb/record"
 | 
						|
	"github.com/prometheus/prometheus/tsdb/tsdbutil"
 | 
						|
	"github.com/prometheus/prometheus/tsdb/wal"
 | 
						|
	"github.com/prometheus/prometheus/util/testutil"
 | 
						|
)
 | 
						|
 | 
						|
func TestDB_InvalidSeries(t *testing.T) {
 | 
						|
	s := createTestAgentDB(t, nil, DefaultOptions())
 | 
						|
	defer s.Close()
 | 
						|
 | 
						|
	app := s.Appender(context.Background())
 | 
						|
 | 
						|
	t.Run("Samples", func(t *testing.T) {
 | 
						|
		_, err := app.Append(0, labels.Labels{}, 0, 0)
 | 
						|
		require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels")
 | 
						|
 | 
						|
		_, err = app.Append(0, labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}, 0, 0)
 | 
						|
		require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels")
 | 
						|
	})
 | 
						|
 | 
						|
	t.Run("Exemplars", func(t *testing.T) {
 | 
						|
		sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0)
 | 
						|
		require.NoError(t, err, "should not reject valid series")
 | 
						|
 | 
						|
		_, err = app.AppendExemplar(0, nil, exemplar.Exemplar{})
 | 
						|
		require.EqualError(t, err, "unknown series ref when trying to add exemplar: 0")
 | 
						|
 | 
						|
		e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}}
 | 
						|
		_, err = app.AppendExemplar(sRef, nil, e)
 | 
						|
		require.ErrorIs(t, err, tsdb.ErrInvalidExemplar, "should reject duplicate labels")
 | 
						|
 | 
						|
		e = exemplar.Exemplar{Labels: labels.Labels{{Name: "a_somewhat_long_trace_id", Value: "nYJSNtFrFTY37VR7mHzEE/LIDt7cdAQcuOzFajgmLDAdBSRHYPDzrxhMA4zz7el8naI/AoXFv9/e/G0vcETcIoNUi3OieeLfaIRQci2oa"}}}
 | 
						|
		_, err = app.AppendExemplar(sRef, nil, e)
 | 
						|
		require.ErrorIs(t, err, storage.ErrExemplarLabelLength, "should reject too long label length")
 | 
						|
 | 
						|
		// Inverse check
 | 
						|
		e = exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true}
 | 
						|
		_, err = app.AppendExemplar(sRef, nil, e)
 | 
						|
		require.NoError(t, err, "should not reject valid exemplars")
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func createTestAgentDB(t *testing.T, reg prometheus.Registerer, opts *Options) *DB {
 | 
						|
	t.Helper()
 | 
						|
 | 
						|
	dbDir := t.TempDir()
 | 
						|
	rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil)
 | 
						|
	t.Cleanup(func() {
 | 
						|
		require.NoError(t, rs.Close())
 | 
						|
	})
 | 
						|
 | 
						|
	db, err := Open(log.NewNopLogger(), reg, rs, dbDir, opts)
 | 
						|
	require.NoError(t, err)
 | 
						|
	return db
 | 
						|
}
 | 
						|
 | 
						|
func TestUnsupportedFunctions(t *testing.T) {
 | 
						|
	s := createTestAgentDB(t, nil, DefaultOptions())
 | 
						|
	defer s.Close()
 | 
						|
 | 
						|
	t.Run("Querier", func(t *testing.T) {
 | 
						|
		_, err := s.Querier(context.TODO(), 0, 0)
 | 
						|
		require.Equal(t, err, ErrUnsupported)
 | 
						|
	})
 | 
						|
 | 
						|
	t.Run("ChunkQuerier", func(t *testing.T) {
 | 
						|
		_, err := s.ChunkQuerier(context.TODO(), 0, 0)
 | 
						|
		require.Equal(t, err, ErrUnsupported)
 | 
						|
	})
 | 
						|
 | 
						|
	t.Run("ExemplarQuerier", func(t *testing.T) {
 | 
						|
		_, err := s.ExemplarQuerier(context.TODO())
 | 
						|
		require.Equal(t, err, ErrUnsupported)
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func TestCommit(t *testing.T) {
 | 
						|
	const (
 | 
						|
		numDatapoints = 1000
 | 
						|
		numSeries     = 8
 | 
						|
	)
 | 
						|
 | 
						|
	s := createTestAgentDB(t, nil, DefaultOptions())
 | 
						|
	app := s.Appender(context.TODO())
 | 
						|
 | 
						|
	lbls := labelsForTest(t.Name(), numSeries)
 | 
						|
	for _, l := range lbls {
 | 
						|
		lset := labels.New(l...)
 | 
						|
 | 
						|
		for i := 0; i < numDatapoints; i++ {
 | 
						|
			sample := tsdbutil.GenerateSamples(0, 1)
 | 
						|
			ref, err := app.Append(0, lset, sample[0].T(), sample[0].V())
 | 
						|
			require.NoError(t, err)
 | 
						|
 | 
						|
			e := exemplar.Exemplar{
 | 
						|
				Labels: lset,
 | 
						|
				Ts:     sample[0].T(),
 | 
						|
				Value:  sample[0].V(),
 | 
						|
				HasTs:  true,
 | 
						|
			}
 | 
						|
			_, err = app.AppendExemplar(ref, lset, e)
 | 
						|
			require.NoError(t, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	require.NoError(t, app.Commit())
 | 
						|
	require.NoError(t, s.Close())
 | 
						|
 | 
						|
	sr, err := wal.NewSegmentsReader(s.wal.Dir())
 | 
						|
	require.NoError(t, err)
 | 
						|
	defer func() {
 | 
						|
		require.NoError(t, sr.Close())
 | 
						|
	}()
 | 
						|
 | 
						|
	// Read records from WAL and check for expected count of series, samples, and exemplars.
 | 
						|
	var (
 | 
						|
		r   = wal.NewReader(sr)
 | 
						|
		dec record.Decoder
 | 
						|
 | 
						|
		walSeriesCount, walSamplesCount, walExemplarsCount int
 | 
						|
	)
 | 
						|
	for r.Next() {
 | 
						|
		rec := r.Record()
 | 
						|
		switch dec.Type(rec) {
 | 
						|
		case record.Series:
 | 
						|
			var series []record.RefSeries
 | 
						|
			series, err = dec.Series(rec, series)
 | 
						|
			require.NoError(t, err)
 | 
						|
			walSeriesCount += len(series)
 | 
						|
 | 
						|
		case record.Samples:
 | 
						|
			var samples []record.RefSample
 | 
						|
			samples, err = dec.Samples(rec, samples)
 | 
						|
			require.NoError(t, err)
 | 
						|
			walSamplesCount += len(samples)
 | 
						|
 | 
						|
		case record.Exemplars:
 | 
						|
			var exemplars []record.RefExemplar
 | 
						|
			exemplars, err = dec.Exemplars(rec, exemplars)
 | 
						|
			require.NoError(t, err)
 | 
						|
			walExemplarsCount += len(exemplars)
 | 
						|
 | 
						|
		default:
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Check that the WAL contained the same number of commited series/samples/exemplars.
 | 
						|
	require.Equal(t, numSeries, walSeriesCount, "unexpected number of series")
 | 
						|
	require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples")
 | 
						|
	require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars")
 | 
						|
}
 | 
						|
 | 
						|
func TestRollback(t *testing.T) {
 | 
						|
	const (
 | 
						|
		numDatapoints = 1000
 | 
						|
		numSeries     = 8
 | 
						|
	)
 | 
						|
 | 
						|
	s := createTestAgentDB(t, nil, DefaultOptions())
 | 
						|
	app := s.Appender(context.TODO())
 | 
						|
 | 
						|
	lbls := labelsForTest(t.Name(), numSeries)
 | 
						|
	for _, l := range lbls {
 | 
						|
		lset := labels.New(l...)
 | 
						|
 | 
						|
		for i := 0; i < numDatapoints; i++ {
 | 
						|
			sample := tsdbutil.GenerateSamples(0, 1)
 | 
						|
			_, err := app.Append(0, lset, sample[0].T(), sample[0].V())
 | 
						|
			require.NoError(t, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Do a rollback, which should clear uncommitted data. A followup call to
 | 
						|
	// commit should persist nothing to the WAL.
 | 
						|
	require.NoError(t, app.Rollback())
 | 
						|
	require.NoError(t, app.Commit())
 | 
						|
	require.NoError(t, s.Close())
 | 
						|
 | 
						|
	sr, err := wal.NewSegmentsReader(s.wal.Dir())
 | 
						|
	require.NoError(t, err)
 | 
						|
	defer func() {
 | 
						|
		require.NoError(t, sr.Close())
 | 
						|
	}()
 | 
						|
 | 
						|
	// Read records from WAL and check for expected count of series and samples.
 | 
						|
	var (
 | 
						|
		r   = wal.NewReader(sr)
 | 
						|
		dec record.Decoder
 | 
						|
 | 
						|
		walSeriesCount, walSamplesCount, walExemplarsCount int
 | 
						|
	)
 | 
						|
	for r.Next() {
 | 
						|
		rec := r.Record()
 | 
						|
		switch dec.Type(rec) {
 | 
						|
		case record.Series:
 | 
						|
			var series []record.RefSeries
 | 
						|
			series, err = dec.Series(rec, series)
 | 
						|
			require.NoError(t, err)
 | 
						|
			walSeriesCount += len(series)
 | 
						|
 | 
						|
		case record.Samples:
 | 
						|
			var samples []record.RefSample
 | 
						|
			samples, err = dec.Samples(rec, samples)
 | 
						|
			require.NoError(t, err)
 | 
						|
			walSamplesCount += len(samples)
 | 
						|
 | 
						|
		case record.Exemplars:
 | 
						|
			var exemplars []record.RefExemplar
 | 
						|
			exemplars, err = dec.Exemplars(rec, exemplars)
 | 
						|
			require.NoError(t, err)
 | 
						|
			walExemplarsCount += len(exemplars)
 | 
						|
 | 
						|
		default:
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Check that the rollback ensured nothing got stored.
 | 
						|
	require.Equal(t, 0, walSeriesCount, "series should not have been written to WAL")
 | 
						|
	require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL")
 | 
						|
	require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL")
 | 
						|
}
 | 
						|
 | 
						|
func TestFullTruncateWAL(t *testing.T) {
 | 
						|
	const (
 | 
						|
		numDatapoints = 1000
 | 
						|
		numSeries     = 800
 | 
						|
		lastTs        = 500
 | 
						|
	)
 | 
						|
 | 
						|
	reg := prometheus.NewRegistry()
 | 
						|
	opts := DefaultOptions()
 | 
						|
	opts.TruncateFrequency = time.Minute * 2
 | 
						|
 | 
						|
	s := createTestAgentDB(t, reg, opts)
 | 
						|
	defer func() {
 | 
						|
		require.NoError(t, s.Close())
 | 
						|
	}()
 | 
						|
	app := s.Appender(context.TODO())
 | 
						|
 | 
						|
	lbls := labelsForTest(t.Name(), numSeries)
 | 
						|
	for _, l := range lbls {
 | 
						|
		lset := labels.New(l...)
 | 
						|
 | 
						|
		for i := 0; i < numDatapoints; i++ {
 | 
						|
			_, err := app.Append(0, lset, int64(lastTs), 0)
 | 
						|
			require.NoError(t, err)
 | 
						|
		}
 | 
						|
		require.NoError(t, app.Commit())
 | 
						|
	}
 | 
						|
 | 
						|
	// Truncate WAL with mint to GC all the samples.
 | 
						|
	s.truncate(lastTs + 1)
 | 
						|
 | 
						|
	m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
 | 
						|
	require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
 | 
						|
}
 | 
						|
 | 
						|
func TestPartialTruncateWAL(t *testing.T) {
 | 
						|
	const (
 | 
						|
		numDatapoints = 1000
 | 
						|
		numSeries     = 800
 | 
						|
	)
 | 
						|
 | 
						|
	opts := DefaultOptions()
 | 
						|
	opts.TruncateFrequency = time.Minute * 2
 | 
						|
 | 
						|
	reg := prometheus.NewRegistry()
 | 
						|
	s := createTestAgentDB(t, reg, opts)
 | 
						|
	defer func() {
 | 
						|
		require.NoError(t, s.Close())
 | 
						|
	}()
 | 
						|
	app := s.Appender(context.TODO())
 | 
						|
 | 
						|
	// Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500.
 | 
						|
	var lastTs int64 = 500
 | 
						|
	lbls := labelsForTest(t.Name()+"batch-1", numSeries)
 | 
						|
	for _, l := range lbls {
 | 
						|
		lset := labels.New(l...)
 | 
						|
 | 
						|
		for i := 0; i < numDatapoints; i++ {
 | 
						|
			_, err := app.Append(0, lset, lastTs, 0)
 | 
						|
			require.NoError(t, err)
 | 
						|
		}
 | 
						|
		require.NoError(t, app.Commit())
 | 
						|
	}
 | 
						|
 | 
						|
	// Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600.
 | 
						|
	lastTs = 600
 | 
						|
	lbls = labelsForTest(t.Name()+"batch-2", numSeries)
 | 
						|
	for _, l := range lbls {
 | 
						|
		lset := labels.New(l...)
 | 
						|
 | 
						|
		for i := 0; i < numDatapoints; i++ {
 | 
						|
			_, err := app.Append(0, lset, lastTs, 0)
 | 
						|
			require.NoError(t, err)
 | 
						|
		}
 | 
						|
		require.NoError(t, app.Commit())
 | 
						|
	}
 | 
						|
 | 
						|
	// Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series.
 | 
						|
	s.truncate(lastTs - 1)
 | 
						|
 | 
						|
	m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
 | 
						|
	require.Equal(t, m.Metric[0].Gauge.GetValue(), float64(numSeries), "agent wal truncate mismatch of deleted series count")
 | 
						|
}
 | 
						|
 | 
						|
func TestWALReplay(t *testing.T) {
 | 
						|
	const (
 | 
						|
		numDatapoints = 1000
 | 
						|
		numSeries     = 8
 | 
						|
		lastTs        = 500
 | 
						|
	)
 | 
						|
 | 
						|
	s := createTestAgentDB(t, nil, DefaultOptions())
 | 
						|
	app := s.Appender(context.TODO())
 | 
						|
 | 
						|
	lbls := labelsForTest(t.Name(), numSeries)
 | 
						|
	for _, l := range lbls {
 | 
						|
		lset := labels.New(l...)
 | 
						|
 | 
						|
		for i := 0; i < numDatapoints; i++ {
 | 
						|
			_, err := app.Append(0, lset, lastTs, 0)
 | 
						|
			require.NoError(t, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	require.NoError(t, app.Commit())
 | 
						|
	require.NoError(t, s.Close())
 | 
						|
 | 
						|
	// Hack: s.wal.Dir() is the /wal subdirectory of the original storage path.
 | 
						|
	// We need the original directory so we can recreate the storage for replay.
 | 
						|
	storageDir := filepath.Dir(s.wal.Dir())
 | 
						|
 | 
						|
	reg := prometheus.NewRegistry()
 | 
						|
	replayStorage, err := Open(s.logger, reg, nil, storageDir, s.opts)
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("unable to create storage for the agent: %v", err)
 | 
						|
	}
 | 
						|
	defer func() {
 | 
						|
		require.NoError(t, replayStorage.Close())
 | 
						|
	}()
 | 
						|
 | 
						|
	// Check if all the series are retrieved back from the WAL.
 | 
						|
	m := gatherFamily(t, reg, "prometheus_agent_active_series")
 | 
						|
	require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count")
 | 
						|
 | 
						|
	// Check if lastTs of the samples retrieved from the WAL is retained.
 | 
						|
	metrics := replayStorage.series.series
 | 
						|
	for i := 0; i < len(metrics); i++ {
 | 
						|
		mp := metrics[i]
 | 
						|
		for _, v := range mp {
 | 
						|
			require.Equal(t, v.lastTs, int64(lastTs))
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestLockfile(t *testing.T) {
 | 
						|
	tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
 | 
						|
		logger := log.NewNopLogger()
 | 
						|
		reg := prometheus.NewRegistry()
 | 
						|
		rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil)
 | 
						|
		t.Cleanup(func() {
 | 
						|
			require.NoError(t, rs.Close())
 | 
						|
		})
 | 
						|
 | 
						|
		opts := DefaultOptions()
 | 
						|
		opts.NoLockfile = !createLock
 | 
						|
 | 
						|
		// Create the DB. This should create lockfile and its metrics.
 | 
						|
		db, err := Open(logger, nil, rs, data, opts)
 | 
						|
		require.NoError(t, err)
 | 
						|
 | 
						|
		return db.locker, testutil.NewCallbackCloser(func() {
 | 
						|
			require.NoError(t, db.Close())
 | 
						|
		})
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func startTime() (int64, error) {
 | 
						|
	return time.Now().Unix() * 1000, nil
 | 
						|
}
 | 
						|
 | 
						|
// Create series for tests.
 | 
						|
func labelsForTest(lName string, seriesCount int) []labels.Labels {
 | 
						|
	var series []labels.Labels
 | 
						|
 | 
						|
	for i := 0; i < seriesCount; i++ {
 | 
						|
		lset := labels.Labels{
 | 
						|
			{Name: "a", Value: lName},
 | 
						|
			{Name: "job", Value: "prometheus"},
 | 
						|
			{Name: "instance", Value: "localhost" + strconv.Itoa(i)},
 | 
						|
		}
 | 
						|
		series = append(series, lset)
 | 
						|
	}
 | 
						|
 | 
						|
	return series
 | 
						|
}
 | 
						|
 | 
						|
func gatherFamily(t *testing.T, reg prometheus.Gatherer, familyName string) *dto.MetricFamily {
 | 
						|
	t.Helper()
 | 
						|
 | 
						|
	families, err := reg.Gather()
 | 
						|
	require.NoError(t, err, "failed to gather metrics")
 | 
						|
 | 
						|
	for _, f := range families {
 | 
						|
		if f.GetName() == familyName {
 | 
						|
			return f
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	t.Fatalf("could not find family %s", familyName)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 |