diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f8745c228..5dcfcadfc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,28 @@ +## 2.17.0-rc.3 / 2020-03-18 + +This release implements isolation in TSDB. API queries and recording rules are +guaranteed to only see full scrapes and full recording rules. This comes with a +certain overhead in resource usage. Depending on the situation, there might be +some increase in memory usage, CPU usage, or query latency. + +* [FEATURE] TSDB: Support isolation #6841 +* [ENHANCEMENT] PromQL: Allow more keywords as metric names #6933 +* [ENHANCEMENT] React UI: Add normalization of localhost URLs in targets page #6794 +* [ENHANCEMENT] Remote read: Read from remote storage concurrently #6770 +* [ENHANCEMENT] Rules: Mark deleted rule series as stale after a reload #6745 +* [ENHANCEMENT] Scrape: Log scrape append failures as debug rather than warn #6852 +* [ENHANCEMENT] TSDB: Improve query performance for queries that partially hit the head #6676 +* [ENHANCEMENT] Consul SD: Expose service health as meta label #5313 +* [ENHANCEMENT] EC2 SD: Expose EC2 instance lifecycle as meta label #6914 +* [ENHANCEMENT] Kubernetes SD: Expose service type as meta label for K8s service role #6684 +* [ENHANCEMENT] Kubernetes SD: Expose label_selector and field_selector #6807 +* [ENHANCEMENT] Openstack SD: Expose hypervisor id as meta label #6962 +* [BUGFIX] PromQL: Do not escape HTML-like chars in query log #6834 #6795 +* [BUGFIX] React UI: Fix data table matrix values #6896 +* [BUGFIX] React UI: Fix new targets page not loading when using non-ASCII characters #6892 +* [BUGFIX] Scrape: Prevent removal of metric names upon relabeling #6891 +* [BUGFIX] Scrape: Fix 'superfluous response.WriteHeader call' errors when scrape fails under some circonstances #6986 + ## 2.16.0 / 2020-02-13 * [FEATURE] React UI: Support local timezone on /graph #6692 diff --git a/VERSION b/VERSION index 7524906967..9dba67016b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.16.0 +2.17.0-rc.3 diff --git a/go.mod b/go.mod index 1d02853c3e..437c7c5af1 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/opentracing/opentracing-go v1.1.0 github.com/pkg/errors v0.9.1 github.com/prometheus/alertmanager v0.20.0 - github.com/prometheus/client_golang v1.5.0 + github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da diff --git a/go.sum b/go.sum index 3412ca7966..efd0139740 100644 --- a/go.sum +++ b/go.sum @@ -459,8 +459,8 @@ github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.2.1/go.mod h1:XMU6Z2MjaRKVu/dC1qupJI9SiNkDYzz3xecMgSW/F+U= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= -github.com/prometheus/client_golang v1.5.0 h1:Ctq0iGpCmr3jeP77kbF2UxgvRwzWWz+4Bh9/vJTyg1A= -github.com/prometheus/client_golang v1.5.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= +github.com/prometheus/client_golang v1.5.1 h1:bdHYieyGlH+6OLEk2YQha8THib30KP0/yD0YH9m6xcA= +github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/promql/engine.go b/promql/engine.go index 2e8c568fc7..bf55e60326 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -502,15 +502,16 @@ func durationMilliseconds(d time.Duration) int64 { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.EvalStmt) (parser.Value, storage.Warnings, error) { prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime) - querier, warnings, err := ng.populateSeries(ctxPrepare, query.queryable, s) - prepareSpanTimer.Finish() - - // XXX(fabxc): the querier returned by populateSeries might be instantiated - // we must not return without closing irrespective of the error. - // TODO: make this semantically saner. - if querier != nil { - defer querier.Close() + mint := ng.findMinTime(s) + querier, err := query.queryable.Querier(ctxPrepare, timestamp.FromTime(mint), timestamp.FromTime(s.End)) + if err != nil { + prepareSpanTimer.Finish() + return nil, nil, err } + defer querier.Close() + + warnings, err := ng.populateSeries(ctxPrepare, querier, s) + prepareSpanTimer.Finish() if err != nil { return nil, warnings, err @@ -616,7 +617,7 @@ func (ng *Engine) cumulativeSubqueryOffset(path []parser.Node) time.Duration { return subqOffset } -func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *parser.EvalStmt) (storage.Querier, storage.Warnings, error) { +func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time { var maxOffset time.Duration parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { subqOffset := ng.cumulativeSubqueryOffset(path) @@ -638,20 +639,18 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa } return nil }) + return s.Start.Add(-maxOffset) +} - mint := s.Start.Add(-maxOffset) - - querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End)) - if err != nil { - return nil, nil, err - } - - var warnings storage.Warnings - - // Whenever a MatrixSelector is evaluated this variable is set to the corresponding range. - // The evaluation of the VectorSelector inside then evaluates the given range and unsets - // the variable. - var evalRange time.Duration +func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s *parser.EvalStmt) (storage.Warnings, error) { + var ( + // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. + // The evaluation of the VectorSelector inside then evaluates the given range and unsets + // the variable. + evalRange time.Duration + warnings storage.Warnings + err error + ) parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { var set storage.SeriesSet @@ -703,7 +702,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa } return nil }) - return querier, warnings, err + return warnings, err } // extractFuncFromPath walks up the path and searches for the first instance of diff --git a/rules/manager.go b/rules/manager.go index b777c9a01a..383d383271 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -590,9 +590,16 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { app := g.opts.Appendable.Appender() seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) + defer func() { + if err := app.Commit(); err != nil { + level.Warn(g.logger).Log("msg", "rule sample appending failed", "err", err) + return + } + g.seriesInPreviousEval[i] = seriesReturned + }() for _, s := range vector { if _, err := app.Add(s.Metric, s.T, s.V); err != nil { - switch err { + switch errors.Cause(err) { case storage.ErrOutOfOrderSample: numOutOfOrder++ level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) @@ -617,7 +624,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { if _, ok := seriesReturned[metric]; !ok { // Series no longer exposed, mark it stale. _, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if series @@ -627,11 +634,6 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } } } - if err := app.Commit(); err != nil { - level.Warn(g.logger).Log("msg", "rule sample appending failed", "err", err) - } else { - g.seriesInPreviousEval[i] = seriesReturned - } }(i, rule) } g.cleanupStaleSeries(ts) @@ -645,7 +647,7 @@ func (g *Group) cleanupStaleSeries(ts time.Time) { for _, s := range g.staleSeries { // Rule that produced series no longer configured, mark it stale. _, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if series diff --git a/scrape/scrape.go b/scrape/scrape.go index e0077e9fa7..fbabba2472 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1069,6 +1069,19 @@ func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, ) var sampleLimitErr error + defer func() { + if err != nil { + app.Rollback() + return + } + if err = app.Commit(); err != nil { + return + } + // Only perform cache cleaning if the scrape was not empty. + // An empty scrape (usually) is used to indicate a failed scrape. + sl.cache.iterDone(len(b) > 0) + }() + loop: for { var et textparse.Entry @@ -1108,7 +1121,7 @@ loop: } ce, ok := sl.cache.get(yoloString(met)) if ok { - switch err = app.AddFast(ce.ref, t, v); err { + switch err = app.AddFast(ce.ref, t, v); errors.Cause(err) { case nil: if tp == nil { sl.cache.trackStaleness(ce.hash, ce.lset) @@ -1163,7 +1176,7 @@ loop: var ref uint64 ref, err = app.Add(lset, t, v) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample: err = nil @@ -1220,7 +1233,7 @@ loop: sl.cache.forEachStale(func(lset labels.Labels) bool { // Series no longer exposed, mark it stale. _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if a target // goes away and comes back again with a new scrape loop. @@ -1229,19 +1242,7 @@ loop: return err == nil }) } - if err != nil { - app.Rollback() - return total, added, seriesAdded, err - } - if err := app.Commit(); err != nil { - return total, added, seriesAdded, err - } - - // Only perform cache cleaning if the scrape was not empty. - // An empty scrape (usually) is used to indicate a failed scrape. - sl.cache.iterDone(len(b) > 0) - - return total, added, seriesAdded, nil + return } func yoloString(b []byte) string { @@ -1258,74 +1259,78 @@ const ( scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff" ) -func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, err error) error { - sl.scraper.Report(start, duration, err) +func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, scrapeErr error) (err error) { + sl.scraper.Report(start, duration, scrapeErr) ts := timestamp.FromTime(start) var health float64 - if err == nil { + if scrapeErr == nil { health = 1 } app := sl.appender() + defer func() { + if err != nil { + app.Rollback() + return + } + err = app.Commit() + }() - if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { + return } - if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil { + return } - if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil { + return } - if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil { + return } - if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil { + return } - return app.Commit() + return } -func (sl *scrapeLoop) reportStale(start time.Time) error { +func (sl *scrapeLoop) reportStale(start time.Time) (err error) { ts := timestamp.FromTime(start) app := sl.appender() + defer func() { + if err != nil { + app.Rollback() + return + } + err = app.Commit() + }() stale := math.Float64frombits(value.StaleNaN) - if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil { + return } - return app.Commit() + return } func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { ce, ok := sl.cache.get(s) if ok { err := app.AddFast(ce.ref, t, v) - switch err { + switch errors.Cause(err) { case nil: return nil case storage.ErrNotFound: @@ -1349,7 +1354,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v lset = sl.reportSampleMutator(lset) ref, err := app.Add(lset, t, v) - switch err { + switch errors.Cause(err) { case nil: sl.cache.addRef(s, ref, lset, hash) return nil diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 8bc45a72e7..4271f27490 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1807,3 +1807,35 @@ func TestReuseScrapeCache(t *testing.T) { } } } + +func TestScrapeAddFast(t *testing.T) { + s := teststorage.New(t) + defer s.Close() + + app := s.Appender() + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + &testScraper{}, + nil, nil, + nopMutator, + nopMutator, + func() storage.Appender { return app }, + nil, + 0, + true, + ) + defer cancel() + + _, _, _, err := sl.append([]byte("up 1\n"), "", time.Time{}) + testutil.Ok(t, err) + + // Poison the cache. There is just one entry, and one series in the + // storage. Changing the ref will create a 'not found' error. + for _, v := range sl.getCache().series { + v.ref++ + } + + _, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second)) + testutil.Ok(t, err) +} diff --git a/storage/fanout.go b/storage/fanout.go index 4646aae2d6..2366fb272b 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -260,6 +260,7 @@ func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...* } else { priErr = qryResult.selectError } + continue } seriesSets = append(seriesSets, qryResult.set) } diff --git a/storage/fanout/fanout_test.go b/storage/fanout/fanout_test.go index e26106ccad..0554444fd1 100644 --- a/storage/fanout/fanout_test.go +++ b/storage/fanout/fanout_test.go @@ -15,6 +15,7 @@ package storage import ( "context" + "errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -26,7 +27,6 @@ import ( ) func TestSelectSorted(t *testing.T) { - inputLabel := labels.FromStrings(model.MetricNameLabel, "a") outputLabel := labels.FromStrings(model.MetricNameLabel, "a") @@ -97,5 +97,94 @@ func TestSelectSorted(t *testing.T) { testutil.Equals(t, labelsResult, outputLabel) testutil.Equals(t, inputTotalSize, len(result)) - +} + +func TestFanoutErrors(t *testing.T) { + workingStorage := teststorage.New(t) + defer workingStorage.Close() + + cases := []struct { + primary storage.Storage + secondary storage.Storage + warnings storage.Warnings + err error + }{ + { + primary: workingStorage, + secondary: errStorage{}, + warnings: storage.Warnings{errSelect}, + err: nil, + }, + { + primary: errStorage{}, + secondary: workingStorage, + warnings: nil, + err: errSelect, + }, + } + + for _, tc := range cases { + fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary) + + querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) + testutil.Ok(t, err) + defer querier.Close() + + matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b") + ss, warnings, err := querier.SelectSorted(nil, matcher) + testutil.Equals(t, tc.err, err) + testutil.Equals(t, tc.warnings, warnings) + + // Only test series iteration if there are no errors. + if err != nil { + continue + } + + for ss.Next() { + ss.At() + } + testutil.Ok(t, ss.Err()) + } +} + +var errSelect = errors.New("select error") + +type errStorage struct{} + +func (errStorage) Querier(_ context.Context, _, _ int64) (storage.Querier, error) { + return errQuerier{}, nil +} + +func (errStorage) Appender() storage.Appender { + return nil +} + +func (errStorage) StartTime() (int64, error) { + return 0, nil +} + +func (errStorage) Close() error { + return nil +} + +type errQuerier struct{} + +func (errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return nil, nil, errSelect +} + +func (errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return nil, nil, errSelect +} + +func (errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { + return nil, nil, errors.New("label values error") +} + +func (errQuerier) LabelNames() ([]string, storage.Warnings, error) { + return nil, nil, errors.New("label names error") +} + +func (errQuerier) Close() error { + return nil } diff --git a/tsdb/block.go b/tsdb/block.go index 536940ec9b..bcdec5c88f 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -77,7 +77,7 @@ type IndexReader interface { // Series populates the given labels and chunk metas for the series identified // by the reference. - // Returns ErrNotFound if the ref does not resolve to a known series. + // Returns storage.ErrNotFound if the ref does not resolve to a known series. Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error // LabelNames returns all the unique label names present in the index in sorted order. diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index 1d9229c2fc..3b2ebf2963 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -34,6 +34,7 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" @@ -309,7 +310,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in s.ref = &ref } else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil { - if errors.Cause(err) != tsdb.ErrNotFound { + if errors.Cause(err) != storage.ErrNotFound { panic(err) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index dfd7f304e4..545473f820 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -215,7 +215,7 @@ func TestDBAppenderAddRef(t *testing.T) { testutil.Ok(t, err) err = app2.AddFast(9999999, 1, 1) - testutil.Equals(t, ErrNotFound, errors.Cause(err)) + testutil.Equals(t, storage.ErrNotFound, errors.Cause(err)) testutil.Ok(t, app2.Commit()) @@ -363,7 +363,7 @@ func TestAmendDatapointCausesError(t *testing.T) { app = db.Appender() _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) - testutil.Equals(t, ErrAmendSample, err) + testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) testutil.Ok(t, app.Rollback()) } @@ -398,7 +398,7 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { app = db.Appender() _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002)) - testutil.Equals(t, ErrAmendSample, err) + testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) } func TestEmptyLabelsetCausesError(t *testing.T) { @@ -1660,7 +1660,7 @@ func TestNoEmptyBlocks(t *testing.T) { app = db.Appender() _, err = app.Add(defaultLabel, 1, 0) - testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") + testutil.Assert(t, err == storage.ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") // Adding new blocks. currentTime := db.Head().MaxTime() diff --git a/tsdb/head.go b/tsdb/head.go index 0de0430000..bbdfead9d9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -39,21 +39,6 @@ import ( ) var ( - // ErrNotFound is returned if a looked up resource was not found. - ErrNotFound = errors.Errorf("not found") - - // ErrOutOfOrderSample is returned if an appended sample has a - // timestamp smaller than the most recent sample. - ErrOutOfOrderSample = errors.New("out of order sample") - - // ErrAmendSample is returned if an appended sample has the same timestamp - // as the most recent sample but a different value. - ErrAmendSample = errors.New("amending sample") - - // ErrOutOfBounds is returned if an appended sample is out of the - // writable time range. - ErrOutOfBounds = errors.New("out of bounds") - // ErrInvalidSample is returned if an appended sample is not valid and can't // be ingested. ErrInvalidSample = errors.New("invalid sample") @@ -841,7 +826,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *initAppender) AddFast(ref uint64, t int64, v float64) error { if a.app == nil { - return ErrNotFound + return storage.ErrNotFound } return a.app.AddFast(ref, t, v) } @@ -954,7 +939,7 @@ type headAppender struct { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if t < a.minValidTime { - return 0, ErrOutOfBounds + return 0, storage.ErrOutOfBounds } // Ensure no empty labels have gotten through. @@ -980,12 +965,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if t < a.minValidTime { - return ErrOutOfBounds + return storage.ErrOutOfBounds } s := a.head.series.getByID(ref) if s == nil { - return errors.Wrap(ErrNotFound, "unknown series") + return errors.Wrap(storage.ErrNotFound, "unknown series") } s.Lock() if err := s.appendable(t, v); err != nil { @@ -1079,7 +1064,10 @@ func (a *headAppender) Commit() error { } func (a *headAppender) Rollback() error { - a.head.metrics.activeAppenders.Dec() + defer a.head.metrics.activeAppenders.Dec() + defer a.head.iso.closeAppend(a.appendID) + defer a.head.putSeriesBuffer(a.sampleSeries) + var series *memSeries for i := range a.samples { series = a.sampleSeries[i] @@ -1090,8 +1078,6 @@ func (a *headAppender) Rollback() error { } a.head.putAppendBuffer(a.samples) a.samples = nil - a.head.putSeriesBuffer(a.sampleSeries) - a.head.iso.closeAppend(a.appendID) // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. @@ -1115,7 +1101,9 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { for p.Next() { series := h.series.getByID(p.At()) + series.RLock() t0, t1 := series.minTime(), series.maxTime() + series.RUnlock() if t0 == math.MinInt64 || t1 == math.MinInt64 { continue } @@ -1315,7 +1303,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s := h.head.series.getByID(sid) // This means that the series has been garbage collected. if s == nil { - return nil, ErrNotFound + return nil, storage.ErrNotFound } s.Lock() @@ -1325,7 +1313,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { // the specified range. if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() - return nil, ErrNotFound + return nil, storage.ErrNotFound } s.Unlock() @@ -1423,9 +1411,11 @@ func (h *headIndexReader) Postings(name string, values ...string) (index.Posting level.Debug(h.head.logger).Log("msg", "looked up series not found") continue } + s.RLock() if s.minTime() <= h.maxt && s.maxTime() >= h.mint { filtered = append(filtered, p.At()) } + s.RUnlock() } if p.Err() != nil { return nil, p.Err() @@ -1469,7 +1459,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks if s == nil { h.head.metrics.seriesNotFound.Inc() - return ErrNotFound + return storage.ErrNotFound } *lbls = append((*lbls)[:0], s.lset...) @@ -1732,7 +1722,7 @@ func (s sample) V() float64 { // memSeries is the in-memory representation of a series. None of its methods // are goroutine safe and it is the caller's responsibility to lock it. type memSeries struct { - sync.Mutex + sync.RWMutex ref uint64 lset labels.Labels @@ -1813,12 +1803,12 @@ func (s *memSeries) appendable(t int64, v float64) error { return nil } if t < c.maxTime { - return ErrOutOfOrderSample + return storage.ErrOutOfOrderSample } // We are allowing exact duplicates as we can encounter them in valid cases // like federation and erroring out at that time would be extremely noisy. if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { - return ErrAmendSample + return storage.ErrDuplicateSampleForTimestamp } return nil } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index a8b55f91b3..bea95271a7 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -23,6 +23,7 @@ import ( "path/filepath" "sort" "strconv" + "sync" "testing" "github.com/pkg/errors" @@ -1011,7 +1012,7 @@ func TestGCChunkAccess(t *testing.T) { testutil.Ok(t, h.Truncate(1500)) // Remove a chunk. _, err = cr.Chunk(chunks[0].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) } @@ -1065,9 +1066,9 @@ func TestGCSeriesAccess(t *testing.T) { testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1)) _, err = cr.Chunk(chunks[0].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) _, err = cr.Chunk(chunks[1].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { @@ -1336,6 +1337,7 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) { h, err := NewHead(nil, nil, nil, 15, DefaultStripeSize) testutil.Ok(t, err) defer h.Close() + testutil.Ok(t, h.Init(0)) app := h.Appender() s1, err := app.Add(labels.FromStrings("foo1", "bar"), 2, 0) @@ -1594,3 +1596,42 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { testutil.Assert(t, ok, "Series append failed.") testutil.Equals(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") } + +func TestHeadSeriesChunkRace(t *testing.T) { + for i := 0; i < 1000; i++ { + testHeadSeriesChunkRace(t) + } +} + +func testHeadSeriesChunkRace(t *testing.T) { + h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize) + testutil.Ok(t, err) + defer h.Close() + testutil.Ok(t, h.Init(0)) + app := h.Appender() + + s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0) + testutil.Ok(t, err) + for ts := int64(6); ts < 11; ts++ { + err = app.AddFast(s2, ts, 0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + var wg sync.WaitGroup + matcher := labels.MustNewMatcher(labels.MatchEqual, "", "") + q, err := NewBlockQuerier(h, 18, 22) + testutil.Ok(t, err) + defer q.Close() + + wg.Add(1) + go func() { + h.updateMinMaxTime(20, 25) + h.gc() + wg.Done() + }() + ss, _, err := q.Select(nil, matcher) + testutil.Ok(t, err) + testutil.Ok(t, ss.Err()) + wg.Wait() +} diff --git a/tsdb/querier.go b/tsdb/querier.go index 2a64d57ead..ea1100a528 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -715,7 +715,7 @@ func (s *baseChunkSeries) Next() bool { ref := s.p.At() if err := s.index.Series(ref, &lset, &chkMetas); err != nil { // Postings may be stale. Skip if no underlying series exists. - if errors.Cause(err) == ErrNotFound { + if errors.Cause(err) == storage.ErrNotFound { continue } s.err = err @@ -795,7 +795,7 @@ func (s *populatedChunkSeries) Next() bool { c.Chunk, s.err = s.chunks.Chunk(c.Ref) if s.err != nil { // This means that the chunk has be garbage collected. Remove it from the list. - if s.err == ErrNotFound { + if s.err == storage.ErrNotFound { s.err = nil // Delete in-place. s.chks = append(chks[:j], chks[j+1:]...) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 5c31897f82..9668a21fe7 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1491,7 +1491,7 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings { func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { s, ok := m.series[ref] if !ok { - return ErrNotFound + return storage.ErrNotFound } *lset = append((*lset)[:0], s.l...) *chks = append((*chks)[:0], s.chunks...) diff --git a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go index d1354b1016..5070e72e28 100644 --- a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go +++ b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go @@ -53,12 +53,16 @@ func (r *responseWriterDelegator) Written() int64 { } func (r *responseWriterDelegator) WriteHeader(code int) { + if r.observeWriteHeader != nil && !r.wroteHeader { + // Only call observeWriteHeader for the 1st time. It's a bug if + // WriteHeader is called more than once, but we want to protect + // against it here. Note that we still delegate the WriteHeader + // to the original ResponseWriter to not mask the bug from it. + r.observeWriteHeader(code) + } r.status = code r.wroteHeader = true r.ResponseWriter.WriteHeader(code) - if r.observeWriteHeader != nil { - r.observeWriteHeader(code) - } } func (r *responseWriterDelegator) Write(b []byte) (int, error) { diff --git a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go index b0ee4678e5..5e1c4546ce 100644 --- a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go +++ b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go @@ -167,15 +167,12 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { enc := expfmt.NewEncoder(w, contentType) - var lastErr error - // handleError handles the error according to opts.ErrorHandling // and returns true if we have to abort after the handling. handleError := func(err error) bool { if err == nil { return false } - lastErr = err if opts.ErrorLog != nil { opts.ErrorLog.Println("error encoding and sending metric family:", err) } @@ -184,7 +181,10 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { case PanicOnError: panic(err) case HTTPErrorOnError: - httpError(rsp, err) + // We cannot really send an HTTP error at this + // point because we most likely have written + // something to rsp already. But at least we can + // stop sending. return true } // Do nothing in all other cases, including ContinueOnError. @@ -202,10 +202,6 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { return } } - - if lastErr != nil { - httpError(rsp, lastErr) - } }) if opts.Timeout <= 0 { @@ -276,7 +272,12 @@ type HandlerErrorHandling int // errors are encountered. const ( // Serve an HTTP status code 500 upon the first error - // encountered. Report the error message in the body. + // encountered. Report the error message in the body. Note that HTTP + // errors cannot be served anymore once the beginning of a regular + // payload has been sent. Thus, in the (unlikely) case that encoding the + // payload into the negotiated wire format fails, serving the response + // will simply be aborted. Set an ErrorLog in HandlerOpts to detect + // those errors. HTTPErrorOnError HandlerErrorHandling = iota // Ignore errors and try to serve as many metrics as possible. However, // if no metrics can be served, serve an HTTP status code 500 and the @@ -365,11 +366,9 @@ func gzipAccepted(header http.Header) bool { } // httpError removes any content-encoding header and then calls http.Error with -// the provided error and http.StatusInternalServerErrer. Error contents is -// supposed to be uncompressed plain text. However, same as with a plain -// http.Error, any header settings will be void if the header has already been -// sent. The error message will still be written to the writer, but it will -// probably be of limited use. +// the provided error and http.StatusInternalServerError. Error contents is +// supposed to be uncompressed plain text. Same as with a plain http.Error, this +// must not be called if the header or any payload has already been sent. func httpError(rsp http.ResponseWriter, err error) { rsp.Header().Del(contentEncodingHeader) http.Error( diff --git a/vendor/modules.txt b/vendor/modules.txt index 4872d85221..e4ff4a4c9f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -272,7 +272,7 @@ github.com/opentracing/opentracing-go/log github.com/pkg/errors # github.com/prometheus/alertmanager v0.20.0 github.com/prometheus/alertmanager/api/v2/models -# github.com/prometheus/client_golang v1.5.0 +# github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_golang/api github.com/prometheus/client_golang/api/prometheus/v1 github.com/prometheus/client_golang/prometheus