diff --git a/scrape/scrape.go b/scrape/scrape.go index c73d26a262..0710013478 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -248,6 +248,7 @@ func (sp *scrapePool) getScrapeFailureLogger() FailureLogger { func (sp *scrapePool) stop() { sp.mtx.Lock() defer sp.mtx.Unlock() + sp.cancel() // ? var wg sync.WaitGroup sp.targetMtx.Lock() @@ -267,10 +268,10 @@ func (sp *scrapePool) stop() { sp.targetMtx.Unlock() wg.Wait() - // Cancel the pool's base context only after all loops have stopped. - // This ensures that loops performing a final ScrapeOnShutdown have - // a valid, uncanceled appender context to successfully write their data. - sp.cancel() + //// Cancel the pool's base context only after all loops have stopped. + //// This ensures that loops performing a final ScrapeOnShutdown have + //// a valid, uncanceled appender context to successfully write their data. + //sp.cancel() sp.client.CloseIdleConnections() if sp.config != nil { @@ -833,14 +834,13 @@ type cacheEntry struct { type scrapeLoop struct { // Parameters. - ctx context.Context - cancel func() - stopped chan struct{} - shutdownScrape chan struct{} - parentCtx context.Context - appenderCtx context.Context - l *slog.Logger - cache *scrapeCache + ctx context.Context + cancel func() + stopped chan struct{} + parentCtx context.Context + appenderCtx context.Context + l *slog.Logger + cache *scrapeCache interval time.Duration timeout time.Duration @@ -1182,14 +1182,13 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop { ctx, cancel := context.WithCancel(opts.sp.ctx) return &scrapeLoop{ - ctx: ctx, - cancel: cancel, - stopped: make(chan struct{}), - shutdownScrape: make(chan struct{}), - parentCtx: opts.sp.ctx, - appenderCtx: appenderCtx, - l: opts.sp.logger.With("target", opts.target), - cache: opts.cache, + ctx: ctx, + cancel: cancel, + stopped: make(chan struct{}), + parentCtx: opts.sp.ctx, + appenderCtx: appenderCtx, + l: opts.sp.logger.With("target", opts.target), + cache: opts.cache, interval: opts.interval, timeout: opts.timeout, @@ -1260,8 +1259,6 @@ func (sl *scrapeLoop) run(errc chan<- error) { select { case <-time.After(sl.getScrapeOffset()): // Continue after a scraping offset. - case <-sl.shutdownScrape: - sl.cancel() case <-sl.ctx.Done(): close(sl.stopped) return @@ -1275,6 +1272,15 @@ func (sl *scrapeLoop) run(errc chan<- error) { mainLoop: for { + select { + case <-sl.parentCtx.Done(): + close(sl.stopped) + return + case <-sl.ctx.Done(): + break mainLoop + default: + } + // Temporary workaround for a jitter in go timers that causes disk space // increase in TSDB. // See https://github.com/prometheus/prometheus/issues/7846 @@ -1296,21 +1302,13 @@ mainLoop: } last = sl.scrapeAndReport(last, scrapeTime, errc) - - select { - case <-sl.parentCtx.Done(): - close(sl.stopped) - return - case <-sl.ctx.Done(): - break mainLoop - case <-sl.shutdownScrape: - sl.cancel() - case <-ticker.C: - } } close(sl.stopped) + if sl.scrapeOnShutdown { + last = sl.scrapeAndReport(last, time.Now().Round(0), errc) + } if !sl.disabledEndOfRunStalenessMarkers.Load() { sl.endOfRunStaleness(last, ticker, sl.interval) } @@ -1532,16 +1530,17 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // Stop the scraping. May still write data and stale markers after it has // returned. Cancel the context to stop all writes. func (sl *scrapeLoop) stop() { - if sl.scrapeOnShutdown { - select { - case sl.shutdownScrape <- struct{}{}: - case <-sl.stopped: - // Prevents deadlock: shutdownScrape is unbuffered. If the scrape loop - // has already exited, a direct send will block forever. - } - } else { - sl.cancel() - } + //if sl.scrapeOnShutdown { + // select { + // case sl.shutdownScrape <- struct{}{}: + // case <-sl.stopped: + // // Prevents deadlock: shutdownScrape is unbuffered. If the scrape loop + // // has already exited, a direct send will block forever. + // } + //} else { + // sl.cancel() + //} + sl.cancel() <-sl.stopped }