From 8907ba6235cac3a533ef7dc93c909a4eb52f939c Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Mon, 16 Mar 2020 22:52:02 +0100 Subject: [PATCH] Make TSDB use storage errors This fixes #6992, which was introduced by #6777. There was an intermediate component which translated TSDB errors into storage errors, but that component was deleted and this bug went unnoticed, until we were watching at the Prombench results. Without this, scrape will fail instead of dropping samples or using "Add" when the series have been garbage collected. Signed-off-by: Julien Pivotto --- rules/manager.go | 6 +++--- scrape/scrape.go | 10 +++++----- scrape/scrape_test.go | 32 ++++++++++++++++++++++++++++++++ tsdb/block.go | 2 +- tsdb/cmd/tsdb/main.go | 3 ++- tsdb/db_test.go | 8 ++++---- tsdb/head.go | 33 +++++++++------------------------ tsdb/head_test.go | 6 +++--- tsdb/querier.go | 4 ++-- tsdb/querier_test.go | 2 +- 10 files changed, 62 insertions(+), 44 deletions(-) diff --git a/rules/manager.go b/rules/manager.go index 33b10d9cb4..425fb01a8b 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -599,7 +599,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { }() for _, s := range vector { if _, err := app.Add(s.Metric, s.T, s.V); err != nil { - switch err { + switch errors.Cause(err) { case storage.ErrOutOfOrderSample: numOutOfOrder++ level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) @@ -624,7 +624,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { if _, ok := seriesReturned[metric]; !ok { // Series no longer exposed, mark it stale. _, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if series @@ -647,7 +647,7 @@ func (g *Group) cleanupStaleSeries(ts time.Time) { for _, s := range g.staleSeries { // Rule that produced series no longer configured, mark it stale. _, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if series diff --git a/scrape/scrape.go b/scrape/scrape.go index 601fca5538..fbabba2472 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1121,7 +1121,7 @@ loop: } ce, ok := sl.cache.get(yoloString(met)) if ok { - switch err = app.AddFast(ce.ref, t, v); err { + switch err = app.AddFast(ce.ref, t, v); errors.Cause(err) { case nil: if tp == nil { sl.cache.trackStaleness(ce.hash, ce.lset) @@ -1176,7 +1176,7 @@ loop: var ref uint64 ref, err = app.Add(lset, t, v) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample: err = nil @@ -1233,7 +1233,7 @@ loop: sl.cache.forEachStale(func(lset labels.Labels) bool { // Series no longer exposed, mark it stale. _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if a target // goes away and comes back again with a new scrape loop. @@ -1330,7 +1330,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ce, ok := sl.cache.get(s) if ok { err := app.AddFast(ce.ref, t, v) - switch err { + switch errors.Cause(err) { case nil: return nil case storage.ErrNotFound: @@ -1354,7 +1354,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v lset = sl.reportSampleMutator(lset) ref, err := app.Add(lset, t, v) - switch err { + switch errors.Cause(err) { case nil: sl.cache.addRef(s, ref, lset, hash) return nil diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index d05b3c4004..608c956b8e 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1807,3 +1807,35 @@ func TestReuseScrapeCache(t *testing.T) { } } } + +func TestScrapeAddFast(t *testing.T) { + s := teststorage.New(t) + defer s.Close() + + app := s.Appender() + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + &testScraper{}, + nil, nil, + nopMutator, + nopMutator, + func() storage.Appender { return app }, + nil, + 0, + true, + ) + defer cancel() + + _, _, _, err := sl.append([]byte("up 1\n"), "", time.Time{}) + testutil.Ok(t, err) + + // Poison the cache. There is just one entry, and one series in the + // storage. Changing the ref will create a 'not found' error. + for _, v := range sl.getCache().series { + v.ref++ + } + + _, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second)) + testutil.Ok(t, err) +} diff --git a/tsdb/block.go b/tsdb/block.go index 536940ec9b..bcdec5c88f 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -77,7 +77,7 @@ type IndexReader interface { // Series populates the given labels and chunk metas for the series identified // by the reference. - // Returns ErrNotFound if the ref does not resolve to a known series. + // Returns storage.ErrNotFound if the ref does not resolve to a known series. Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error // LabelNames returns all the unique label names present in the index in sorted order. diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index 1febb4004f..458ed5351a 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -34,6 +34,7 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" @@ -309,7 +310,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in s.ref = &ref } else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil { - if errors.Cause(err) != tsdb.ErrNotFound { + if errors.Cause(err) != storage.ErrNotFound { panic(err) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 2493ffea6e..e824b0c40a 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -215,7 +215,7 @@ func TestDBAppenderAddRef(t *testing.T) { testutil.Ok(t, err) err = app2.AddFast(9999999, 1, 1) - testutil.Equals(t, ErrNotFound, errors.Cause(err)) + testutil.Equals(t, storage.ErrNotFound, errors.Cause(err)) testutil.Ok(t, app2.Commit()) @@ -363,7 +363,7 @@ func TestAmendDatapointCausesError(t *testing.T) { app = db.Appender() _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) - testutil.Equals(t, ErrAmendSample, err) + testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) testutil.Ok(t, app.Rollback()) } @@ -398,7 +398,7 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { app = db.Appender() _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002)) - testutil.Equals(t, ErrAmendSample, err) + testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) } func TestEmptyLabelsetCausesError(t *testing.T) { @@ -1660,7 +1660,7 @@ func TestNoEmptyBlocks(t *testing.T) { app = db.Appender() _, err = app.Add(defaultLabel, 1, 0) - testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") + testutil.Assert(t, err == storage.ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") // Adding new blocks. currentTime := db.Head().MaxTime() diff --git a/tsdb/head.go b/tsdb/head.go index d4b8a79976..bbdfead9d9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -39,21 +39,6 @@ import ( ) var ( - // ErrNotFound is returned if a looked up resource was not found. - ErrNotFound = errors.Errorf("not found") - - // ErrOutOfOrderSample is returned if an appended sample has a - // timestamp smaller than the most recent sample. - ErrOutOfOrderSample = errors.New("out of order sample") - - // ErrAmendSample is returned if an appended sample has the same timestamp - // as the most recent sample but a different value. - ErrAmendSample = errors.New("amending sample") - - // ErrOutOfBounds is returned if an appended sample is out of the - // writable time range. - ErrOutOfBounds = errors.New("out of bounds") - // ErrInvalidSample is returned if an appended sample is not valid and can't // be ingested. ErrInvalidSample = errors.New("invalid sample") @@ -841,7 +826,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *initAppender) AddFast(ref uint64, t int64, v float64) error { if a.app == nil { - return ErrNotFound + return storage.ErrNotFound } return a.app.AddFast(ref, t, v) } @@ -954,7 +939,7 @@ type headAppender struct { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if t < a.minValidTime { - return 0, ErrOutOfBounds + return 0, storage.ErrOutOfBounds } // Ensure no empty labels have gotten through. @@ -980,12 +965,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if t < a.minValidTime { - return ErrOutOfBounds + return storage.ErrOutOfBounds } s := a.head.series.getByID(ref) if s == nil { - return errors.Wrap(ErrNotFound, "unknown series") + return errors.Wrap(storage.ErrNotFound, "unknown series") } s.Lock() if err := s.appendable(t, v); err != nil { @@ -1318,7 +1303,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s := h.head.series.getByID(sid) // This means that the series has been garbage collected. if s == nil { - return nil, ErrNotFound + return nil, storage.ErrNotFound } s.Lock() @@ -1328,7 +1313,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { // the specified range. if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() - return nil, ErrNotFound + return nil, storage.ErrNotFound } s.Unlock() @@ -1474,7 +1459,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks if s == nil { h.head.metrics.seriesNotFound.Inc() - return ErrNotFound + return storage.ErrNotFound } *lbls = append((*lbls)[:0], s.lset...) @@ -1818,12 +1803,12 @@ func (s *memSeries) appendable(t int64, v float64) error { return nil } if t < c.maxTime { - return ErrOutOfOrderSample + return storage.ErrOutOfOrderSample } // We are allowing exact duplicates as we can encounter them in valid cases // like federation and erroring out at that time would be extremely noisy. if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { - return ErrAmendSample + return storage.ErrDuplicateSampleForTimestamp } return nil } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index def09fe2c8..f7aa9975c0 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1012,7 +1012,7 @@ func TestGCChunkAccess(t *testing.T) { testutil.Ok(t, h.Truncate(1500)) // Remove a chunk. _, err = cr.Chunk(chunks[0].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) } @@ -1066,9 +1066,9 @@ func TestGCSeriesAccess(t *testing.T) { testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1)) _, err = cr.Chunk(chunks[0].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) _, err = cr.Chunk(chunks[1].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { diff --git a/tsdb/querier.go b/tsdb/querier.go index a5181c3f69..83f7b6ad6b 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -739,7 +739,7 @@ func (s *baseChunkSeries) Next() bool { ref := s.p.At() if err := s.index.Series(ref, &lset, &chkMetas); err != nil { // Postings may be stale. Skip if no underlying series exists. - if errors.Cause(err) == ErrNotFound { + if errors.Cause(err) == storage.ErrNotFound { continue } s.err = err @@ -819,7 +819,7 @@ func (s *populatedChunkSeries) Next() bool { c.Chunk, s.err = s.chunks.Chunk(c.Ref) if s.err != nil { // This means that the chunk has be garbage collected. Remove it from the list. - if s.err == ErrNotFound { + if s.err == storage.ErrNotFound { s.err = nil // Delete in-place. s.chks = append(chks[:j], chks[j+1:]...) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 7780817375..52e3160512 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1491,7 +1491,7 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings { func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { s, ok := m.series[ref] if !ok { - return ErrNotFound + return storage.ErrNotFound } *lset = append((*lset)[:0], s.l...) *chks = append((*chks)[:0], s.chunks...)