From ede7de014eb7de500b3f37c76a4a73c38fe0a91e Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 12 Mar 2026 16:17:30 +0000 Subject: [PATCH] exp2 Signed-off-by: bwplotka --- scrape/scrape.go | 66 ++++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 38 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index 0710013478..db2e7c682b 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -248,7 +248,7 @@ func (sp *scrapePool) getScrapeFailureLogger() FailureLogger { func (sp *scrapePool) stop() { sp.mtx.Lock() defer sp.mtx.Unlock() - sp.cancel() // ? + sp.cancel() var wg sync.WaitGroup sp.targetMtx.Lock() @@ -268,10 +268,6 @@ func (sp *scrapePool) stop() { sp.targetMtx.Unlock() wg.Wait() - //// Cancel the pool's base context only after all loops have stopped. - //// This ensures that loops performing a final ScrapeOnShutdown have - //// a valid, uncanceled appender context to successfully write their data. - //sp.cancel() sp.client.CloseIdleConnections() if sp.config != nil { @@ -1256,28 +1252,41 @@ func (sl *scrapeLoop) getScrapeOffset() time.Duration { } func (sl *scrapeLoop) run(errc chan<- error) { - select { - case <-time.After(sl.getScrapeOffset()): - // Continue after a scraping offset. - case <-sl.ctx.Done(): + var ( + last time.Time + alignedScrapeTime = time.Now().Round(0) + ticker = time.NewTicker(sl.interval) + ) + defer ticker.Stop() + defer func() { close(sl.stopped) - return + if sl.parentCtx.Err() == nil { + if sl.scrapeOnShutdown { + last = sl.scrapeAndReport(last, time.Now().Round(0), errc) + } + if !sl.disabledEndOfRunStalenessMarkers.Load() { + sl.endOfRunStaleness(last, ticker, sl.interval) + } + } + }() + + // Initial offset and jitter offset, if any. + offset := sl.getScrapeOffset() + if offset > 0 { + select { + case <-time.After(offset): + // Continue after a scraping offset. + case <-sl.ctx.Done(): + return + } } - var last time.Time - - alignedScrapeTime := time.Now().Round(0) - ticker := time.NewTicker(sl.interval) - defer ticker.Stop() - -mainLoop: for { select { case <-sl.parentCtx.Done(): - close(sl.stopped) return case <-sl.ctx.Done(): - break mainLoop + return default: } @@ -1303,15 +1312,6 @@ mainLoop: last = sl.scrapeAndReport(last, scrapeTime, errc) } - - close(sl.stopped) - - if sl.scrapeOnShutdown { - last = sl.scrapeAndReport(last, time.Now().Round(0), errc) - } - if !sl.disabledEndOfRunStalenessMarkers.Load() { - sl.endOfRunStaleness(last, ticker, sl.interval) - } } func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter { @@ -1530,16 +1530,6 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // Stop the scraping. May still write data and stale markers after it has // returned. Cancel the context to stop all writes. func (sl *scrapeLoop) stop() { - //if sl.scrapeOnShutdown { - // select { - // case sl.shutdownScrape <- struct{}{}: - // case <-sl.stopped: - // // Prevents deadlock: shutdownScrape is unbuffered. If the scrape loop - // // has already exited, a direct send will block forever. - // } - //} else { - // sl.cancel() - //} sl.cancel() <-sl.stopped }