diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 3678627ced..6d097d55c4 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -155,7 +155,7 @@ func (sp *scrapePool) stop() { // reload the scrape pool with the given scrape configuration. The target state is preserved // but all scrape loops are restarted with the new scrape configuration. -// This method returns after all scrape loops that were stopped have fully terminated. +// This method returns after all scrape loops that were stopped have stopped scraping. func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { start := time.Now() @@ -428,9 +428,10 @@ type scrapeLoop struct { lsetCache map[uint64]lsetCacheEntry // Ref to labelset and string samplesInPreviousScrape map[string]labels.Labels - done chan struct{} - ctx context.Context - cancel func() + ctx context.Context + scrapeCtx context.Context + cancel func() + stopped chan struct{} } func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender) loop { @@ -440,20 +441,20 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag reportAppender: reportApp, refCache: map[string]uint64{}, lsetCache: map[uint64]lsetCacheEntry{}, - done: make(chan struct{}), + stopped: make(chan struct{}), + ctx: ctx, } - sl.ctx, sl.cancel = context.WithCancel(ctx) + sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) return sl } func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { - defer close(sl.done) - select { case <-time.After(sl.scraper.offset(interval)): // Continue after a scraping offset. - case <-sl.ctx.Done(): + case <-sl.scrapeCtx.Done(): + close(sl.stopped) return } @@ -464,11 +465,15 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { buf := bytes.NewBuffer(make([]byte, 0, 16000)) +mainLoop: for { buf.Reset() select { case <-sl.ctx.Done(): + close(sl.stopped) return + case <-sl.scrapeCtx.Done(): + break mainLoop default: } @@ -509,15 +514,65 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { select { case <-sl.ctx.Done(): + close(sl.stopped) return + case <-sl.scrapeCtx.Done(): + break mainLoop case <-ticker.C: } } + + close(sl.stopped) + + // Scraping has stopped. We want to write stale markers but + // the target may be recreated, so we wait just over 2 scrape intervals + // before creating them. + // If the context is cancelled, we presume the server is shutting down + // and will restart where is was. We do not attempt to write stale markers + // in this case. + + if last.IsZero() { + // There never was a scrape, so there will be no stale markers. + return + } + + // Wait for when the next scrape would have been, record its timestamp. + var staleTime time.Time + select { + case <-sl.ctx.Done(): + return + case <-ticker.C: + staleTime = time.Now() + } + + // Wait for when the next scrape would have been, if the target was recreated + // samples should have been ingested by now. + select { + case <-sl.ctx.Done(): + return + case <-ticker.C: + } + + // Wait for an extra 10% of the interval, just to be safe. + select { + case <-sl.ctx.Done(): + return + case <-time.After(interval / 10): + } + + // Call sl.append again with an empty scrape to trigger stale markers. + // If the target has since been recreated and scraped, the + // stale markers will be out of order and ignored. + if _, _, err := sl.append([]byte{}, staleTime); err != nil { + log.With("err", err).Error("stale append failed") + } } +// Stop the scraping. May still write data and stale markers after it has +// returned. Cancel the context to stop all writes. func (sl *scrapeLoop) stop() { sl.cancel() - <-sl.done + <-sl.stopped } type sample struct { @@ -624,6 +679,12 @@ loop: switch err { case nil: case errSeriesDropped: + err = nil + continue + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not log here, as this is expected if a target goes away and comes back + // again with a new scrape loop. + err = nil continue default: break diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 266e8ae7a2..2477eaf250 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -308,7 +308,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { } } -func TestScrapeLoopStop(t *testing.T) { +func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{} sl := newScrapeLoop(context.Background(), scraper, nil, nil) @@ -355,6 +355,51 @@ func TestScrapeLoopStop(t *testing.T) { } } +func TestScrapeLoopStop(t *testing.T) { + appender := &collectResultAppender{} + var ( + signal = make(chan struct{}) + + scraper = &testScraper{} + app = func() storage.Appender { return appender } + reportApp = func() storage.Appender { return &nopAppender{} } + numScrapes = 0 + ) + defer close(signal) + + sl := newScrapeLoop(context.Background(), scraper, app, reportApp) + + // Succeed once, several failures, then stop. + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + numScrapes += 1 + if numScrapes == 2 { + go func() { + sl.stop() + }() + } + w.Write([]byte("metric_a 42\n")) + return nil + } + + go func() { + sl.run(10*time.Millisecond, time.Hour, nil) + signal <- struct{}{} + }() + + select { + case <-signal: + case <-time.After(5 * time.Second): + t.Fatalf("Scrape wasn't stopped.") + } + + if len(appender.result) < 2 { + t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 2, len(appender.result)) + } + if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) { + t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)].v)) + } +} + func TestScrapeLoopRun(t *testing.T) { var ( signal = make(chan struct{})