diff --git a/scrape/scrape.go b/scrape/scrape.go index f8e01f0afb..a5c2183dea 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -932,12 +932,12 @@ mainLoop: // A failed scrape is the same as an empty scrape, // we still call sl.append to trigger stale markers. - total, added, appErr := sl.append(b, contentType, start) + total, added, seriesAdded, appErr := sl.append(b, contentType, start) if appErr != nil { level.Warn(sl.l).Log("msg", "append failed", "err", appErr) // The append failed, probably due to a parse error or sample limit. // Call sl.append again with an empty scrape to trigger stale markers. - if _, _, err := sl.append([]byte{}, "", start); err != nil { + if _, _, _, err := sl.append([]byte{}, "", start); err != nil { level.Warn(sl.l).Log("msg", "append failed", "err", err) } } @@ -948,7 +948,7 @@ mainLoop: scrapeErr = appErr } - if err := sl.report(start, time.Since(start), total, added, scrapeErr); err != nil { + if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil { level.Warn(sl.l).Log("msg", "appending scrape report failed", "err", err) } last = start @@ -1008,7 +1008,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // Call sl.append again with an empty scrape to trigger stale markers. // If the target has since been recreated and scraped, the // stale markers will be out of order and ignored. - if _, _, err := sl.append([]byte{}, "", staleTime); err != nil { + if _, _, _, err := sl.append([]byte{}, "", staleTime); err != nil { level.Error(sl.l).Log("msg", "stale append failed", "err", err) } if err := sl.reportStale(staleTime); err != nil { @@ -1045,7 +1045,7 @@ func (s samples) Less(i, j int) bool { return s[i].t < s[j].t } -func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added int, err error) { +func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { var ( app = sl.appender() p = textparse.New(b, contentType) @@ -1178,6 +1178,7 @@ loop: sl.cache.trackStaleness(hash, lset) } sl.cache.addRef(mets, ref, lset, hash) + seriesAdded++ } added++ } @@ -1212,17 +1213,17 @@ loop: } if err != nil { app.Rollback() - return total, added, err + return total, added, seriesAdded, err } if err := app.Commit(); err != nil { - return total, added, err + return total, added, seriesAdded, err } // Only perform cache cleaning if the scrape was not empty. // An empty scrape (usually) is used to indicate a failed scrape. sl.cache.iterDone(len(b) > 0) - return total, added, nil + return total, added, seriesAdded, nil } func yoloString(b []byte) string { @@ -1236,9 +1237,10 @@ const ( scrapeDurationMetricName = "scrape_duration_seconds" + "\xff" scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff" samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff" + scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff" ) -func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error { +func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, err error) error { sl.scraper.report(start, duration, err) ts := timestamp.FromTime(start) @@ -1265,6 +1267,10 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a app.Rollback() return err } + if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil { + app.Rollback() + return err + } return app.Commit() } @@ -1290,6 +1296,10 @@ func (sl *scrapeLoop) reportStale(start time.Time) error { app.Rollback() return err } + if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil { + app.Rollback() + return err + } return app.Commit() } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 3b69e91acd..1b4ff0c32e 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -490,15 +490,15 @@ func TestScrapeLoopStop(t *testing.T) { t.Fatalf("Scrape wasn't stopped.") } - // We expected 1 actual sample for each scrape plus 4 for report samples. + // We expected 1 actual sample for each scrape plus 5 for report samples. // At least 2 scrapes were made, plus the final stale markers. - if len(appender.result) < 5*3 || len(appender.result)%5 != 0 { - t.Fatalf("Expected at least 3 scrapes with 4 samples each, got %d samples", len(appender.result)) + if len(appender.result) < 6*3 || len(appender.result)%6 != 0 { + t.Fatalf("Expected at least 3 scrapes with 6 samples each, got %d samples", len(appender.result)) } // All samples in a scrape must have the same timestamp. var ts int64 for i, s := range appender.result { - if i%5 == 0 { + if i%6 == 0 { ts = s.t } else if s.t != ts { t.Fatalf("Unexpected multiple timestamps within single scrape") @@ -632,7 +632,7 @@ func TestScrapeLoopMetadata(t *testing.T) { ) defer cancel() - total, _, err := sl.append([]byte(`# TYPE test_metric counter + total, _, _, err := sl.append([]byte(`# TYPE test_metric counter # HELP test_metric some help text # UNIT test_metric metric test_metric 1 @@ -661,6 +661,41 @@ test_metric 1 testutil.Equals(t, "", md.Unit) } +func TestScrapeLoopSeriesAdded(t *testing.T) { + // Need a full storage for correct Add/AddFast semantics. + s := testutil.NewStorage(t) + defer s.Close() + + app, err := s.Appender() + if err != nil { + t.Error(err) + } + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + &testScraper{}, + nil, nil, + nopMutator, + nopMutator, + func() storage.Appender { return app }, + nil, + 0, + true, + ) + defer cancel() + + total, added, seriesAdded, err := sl.append([]byte("test_metric 1\n"), "", time.Time{}) + testutil.Ok(t, err) + testutil.Equals(t, 1, total) + testutil.Equals(t, 1, added) + testutil.Equals(t, 1, seriesAdded) + + total, added, seriesAdded, err = sl.append([]byte("test_metric 1\n"), "", time.Time{}) + testutil.Ok(t, err) + testutil.Equals(t, 1, total) + testutil.Equals(t, 1, added) + testutil.Equals(t, 0, seriesAdded) +} + func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { appender := &collectResultAppender{} var ( @@ -707,15 +742,15 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { t.Fatalf("Scrape wasn't stopped.") } - // 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for + // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for // each scrape successful or not. - if len(appender.result) != 22 { - t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result)) + if len(appender.result) != 27 { + t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 27, len(appender.result)) } if appender.result[0].v != 42.0 { t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0) } - if !value.IsStaleNaN(appender.result[5].v) { + if !value.IsStaleNaN(appender.result[6].v) { t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v)) } } @@ -769,16 +804,16 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { t.Fatalf("Scrape wasn't stopped.") } - // 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for + // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for // each scrape successful or not. - if len(appender.result) != 14 { - t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 14, len(appender.result)) + if len(appender.result) != 17 { + t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 17, len(appender.result)) } if appender.result[0].v != 42.0 { t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0) } - if !value.IsStaleNaN(appender.result[5].v) { - t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v)) + if !value.IsStaleNaN(appender.result[6].v) { + t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[6].v)) } } @@ -854,10 +889,10 @@ func TestScrapeLoopCache(t *testing.T) { t.Fatalf("Scrape wasn't stopped.") } - // 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for + // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for // each scrape successful or not. - if len(appender.result) != 22 { - t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result)) + if len(appender.result) != 26 { + t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 26, len(appender.result)) } } @@ -989,7 +1024,7 @@ func TestScrapeLoopAppend(t *testing.T) { now := time.Now() - _, _, err := sl.append([]byte(test.scrapeLabels), "", now) + _, _, _, err := sl.append([]byte(test.scrapeLabels), "", now) if err != nil { t.Fatalf("Unexpected append error: %s", err) } @@ -1037,7 +1072,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { beforeMetricValue := beforeMetric.GetCounter().GetValue() now := time.Now() - _, _, err = sl.append([]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) + _, _, _, err = sl.append([]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) if err != errSampleLimit { t.Fatalf("Did not see expected sample limit error: %s", err) } @@ -1091,11 +1126,11 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { ) now := time.Now() - _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1`), "", now) + _, _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1`), "", now) if err != nil { t.Fatalf("Unexpected append error: %s", err) } - _, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute)) + _, _, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute)) if err != nil { t.Fatalf("Unexpected append error: %s", err) } @@ -1132,11 +1167,11 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { ) now := time.Now() - _, _, err := sl.append([]byte("metric_a 1\n"), "", now) + _, _, _, err := sl.append([]byte("metric_a 1\n"), "", now) if err != nil { t.Fatalf("Unexpected append error: %s", err) } - _, _, err = sl.append([]byte(""), "", now.Add(time.Second)) + _, _, _, err = sl.append([]byte(""), "", now.Add(time.Second)) if err != nil { t.Fatalf("Unexpected append error: %s", err) } @@ -1179,11 +1214,11 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { ) now := time.Now() - _, _, err := sl.append([]byte("metric_a 1 1000\n"), "", now) + _, _, _, err := sl.append([]byte("metric_a 1 1000\n"), "", now) if err != nil { t.Fatalf("Unexpected append error: %s", err) } - _, _, err = sl.append([]byte(""), "", now.Add(time.Second)) + _, _, _, err = sl.append([]byte(""), "", now.Add(time.Second)) if err != nil { t.Fatalf("Unexpected append error: %s", err) } @@ -1299,7 +1334,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T ) now := time.Unix(1, 0) - _, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now) + total, added, seriesAdded, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now) if err != nil { t.Fatalf("Unexpected append error: %s", err) } @@ -1313,6 +1348,9 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T if !reflect.DeepEqual(want, app.result) { t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result) } + testutil.Equals(t, 4, total) + testutil.Equals(t, 1, added) + testutil.Equals(t, 1, seriesAdded) } func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { @@ -1334,15 +1372,10 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { ) now := time.Now().Add(20 * time.Minute) - total, added, err := sl.append([]byte("normal 1\n"), "", now) - if total != 1 { - t.Error("expected 1 metric") - return - } - - if added != 0 { - t.Error("no metric should be added") - } + total, added, seriesAdded, err := sl.append([]byte("normal 1\n"), "", now) + testutil.Equals(t, 1, total) + testutil.Equals(t, 0, added) + testutil.Equals(t, 0, seriesAdded) if err != nil { t.Errorf("expect no error, got %s", err.Error()) @@ -1532,7 +1565,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { ) now := time.Now() - _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now) + _, _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now) if err != nil { t.Fatalf("Unexpected append error: %s", err) } @@ -1569,7 +1602,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { ) now := time.Now() - _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now) + _, _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now) if err != nil { t.Fatalf("Unexpected append error: %s", err) }