diff --git a/rules/manager.go b/rules/manager.go index 50c4c8b7f5..33b10d9cb4 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -590,6 +590,13 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { app := g.opts.Appendable.Appender() seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) + defer func() { + if err := app.Commit(); err != nil { + level.Warn(g.logger).Log("msg", "rule sample appending failed", "err", err) + return + } + g.seriesInPreviousEval[i] = seriesReturned + }() for _, s := range vector { if _, err := app.Add(s.Metric, s.T, s.V); err != nil { switch err { @@ -627,11 +634,6 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } } } - if err := app.Commit(); err != nil { - level.Warn(g.logger).Log("msg", "rule sample appending failed", "err", err) - } else { - g.seriesInPreviousEval[i] = seriesReturned - } }(i, rule) } g.cleanupStaleSeries(ts) diff --git a/scrape/scrape.go b/scrape/scrape.go index e0077e9fa7..601fca5538 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1069,6 +1069,19 @@ func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, ) var sampleLimitErr error + defer func() { + if err != nil { + app.Rollback() + return + } + if err = app.Commit(); err != nil { + return + } + // Only perform cache cleaning if the scrape was not empty. + // An empty scrape (usually) is used to indicate a failed scrape. + sl.cache.iterDone(len(b) > 0) + }() + loop: for { var et textparse.Entry @@ -1229,19 +1242,7 @@ loop: return err == nil }) } - if err != nil { - app.Rollback() - return total, added, seriesAdded, err - } - if err := app.Commit(); err != nil { - return total, added, seriesAdded, err - } - - // Only perform cache cleaning if the scrape was not empty. - // An empty scrape (usually) is used to indicate a failed scrape. - sl.cache.iterDone(len(b) > 0) - - return total, added, seriesAdded, nil + return } func yoloString(b []byte) string { @@ -1258,67 +1259,71 @@ const ( scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff" ) -func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, err error) error { - sl.scraper.Report(start, duration, err) +func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, scrapeErr error) (err error) { + sl.scraper.Report(start, duration, scrapeErr) ts := timestamp.FromTime(start) var health float64 - if err == nil { + if scrapeErr == nil { health = 1 } app := sl.appender() + defer func() { + if err != nil { + app.Rollback() + return + } + err = app.Commit() + }() - if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { + return } - if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil { + return } - if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil { + return } - if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil { + return } - if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil { + return } - return app.Commit() + return } -func (sl *scrapeLoop) reportStale(start time.Time) error { +func (sl *scrapeLoop) reportStale(start time.Time) (err error) { ts := timestamp.FromTime(start) app := sl.appender() + defer func() { + if err != nil { + app.Rollback() + return + } + err = app.Commit() + }() stale := math.Float64frombits(value.StaleNaN) - if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil { + return } - return app.Commit() + return } func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { diff --git a/tsdb/head.go b/tsdb/head.go index 0de0430000..197fbe2877 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1079,7 +1079,10 @@ func (a *headAppender) Commit() error { } func (a *headAppender) Rollback() error { - a.head.metrics.activeAppenders.Dec() + defer a.head.metrics.activeAppenders.Dec() + defer a.head.iso.closeAppend(a.appendID) + defer a.head.putSeriesBuffer(a.sampleSeries) + var series *memSeries for i := range a.samples { series = a.sampleSeries[i] @@ -1090,8 +1093,6 @@ func (a *headAppender) Rollback() error { } a.head.putAppendBuffer(a.samples) a.samples = nil - a.head.putSeriesBuffer(a.sampleSeries) - a.head.iso.closeAppend(a.appendID) // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case.