From 1f3e25df09a5b71a7354437b02a1d0fcdf0c70c6 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 12 Mar 2026 14:25:29 +0000 Subject: [PATCH 1/3] tmp Signed-off-by: bwplotka --- scrape/scrape.go | 85 ++++++++++++++++++++++++------------------------ 1 file changed, 42 insertions(+), 43 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index c73d26a262..0710013478 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -248,6 +248,7 @@ func (sp *scrapePool) getScrapeFailureLogger() FailureLogger { func (sp *scrapePool) stop() { sp.mtx.Lock() defer sp.mtx.Unlock() + sp.cancel() // ? var wg sync.WaitGroup sp.targetMtx.Lock() @@ -267,10 +268,10 @@ func (sp *scrapePool) stop() { sp.targetMtx.Unlock() wg.Wait() - // Cancel the pool's base context only after all loops have stopped. - // This ensures that loops performing a final ScrapeOnShutdown have - // a valid, uncanceled appender context to successfully write their data. - sp.cancel() + //// Cancel the pool's base context only after all loops have stopped. + //// This ensures that loops performing a final ScrapeOnShutdown have + //// a valid, uncanceled appender context to successfully write their data. + //sp.cancel() sp.client.CloseIdleConnections() if sp.config != nil { @@ -833,14 +834,13 @@ type cacheEntry struct { type scrapeLoop struct { // Parameters. - ctx context.Context - cancel func() - stopped chan struct{} - shutdownScrape chan struct{} - parentCtx context.Context - appenderCtx context.Context - l *slog.Logger - cache *scrapeCache + ctx context.Context + cancel func() + stopped chan struct{} + parentCtx context.Context + appenderCtx context.Context + l *slog.Logger + cache *scrapeCache interval time.Duration timeout time.Duration @@ -1182,14 +1182,13 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop { ctx, cancel := context.WithCancel(opts.sp.ctx) return &scrapeLoop{ - ctx: ctx, - cancel: cancel, - stopped: make(chan struct{}), - shutdownScrape: make(chan struct{}), - parentCtx: opts.sp.ctx, - appenderCtx: appenderCtx, - l: opts.sp.logger.With("target", opts.target), - cache: opts.cache, + ctx: ctx, + cancel: cancel, + stopped: make(chan struct{}), + parentCtx: opts.sp.ctx, + appenderCtx: appenderCtx, + l: opts.sp.logger.With("target", opts.target), + cache: opts.cache, interval: opts.interval, timeout: opts.timeout, @@ -1260,8 +1259,6 @@ func (sl *scrapeLoop) run(errc chan<- error) { select { case <-time.After(sl.getScrapeOffset()): // Continue after a scraping offset. - case <-sl.shutdownScrape: - sl.cancel() case <-sl.ctx.Done(): close(sl.stopped) return @@ -1275,6 +1272,15 @@ func (sl *scrapeLoop) run(errc chan<- error) { mainLoop: for { + select { + case <-sl.parentCtx.Done(): + close(sl.stopped) + return + case <-sl.ctx.Done(): + break mainLoop + default: + } + // Temporary workaround for a jitter in go timers that causes disk space // increase in TSDB. // See https://github.com/prometheus/prometheus/issues/7846 @@ -1296,21 +1302,13 @@ mainLoop: } last = sl.scrapeAndReport(last, scrapeTime, errc) - - select { - case <-sl.parentCtx.Done(): - close(sl.stopped) - return - case <-sl.ctx.Done(): - break mainLoop - case <-sl.shutdownScrape: - sl.cancel() - case <-ticker.C: - } } close(sl.stopped) + if sl.scrapeOnShutdown { + last = sl.scrapeAndReport(last, time.Now().Round(0), errc) + } if !sl.disabledEndOfRunStalenessMarkers.Load() { sl.endOfRunStaleness(last, ticker, sl.interval) } @@ -1532,16 +1530,17 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // Stop the scraping. May still write data and stale markers after it has // returned. Cancel the context to stop all writes. func (sl *scrapeLoop) stop() { - if sl.scrapeOnShutdown { - select { - case sl.shutdownScrape <- struct{}{}: - case <-sl.stopped: - // Prevents deadlock: shutdownScrape is unbuffered. If the scrape loop - // has already exited, a direct send will block forever. - } - } else { - sl.cancel() - } + //if sl.scrapeOnShutdown { + // select { + // case sl.shutdownScrape <- struct{}{}: + // case <-sl.stopped: + // // Prevents deadlock: shutdownScrape is unbuffered. If the scrape loop + // // has already exited, a direct send will block forever. + // } + //} else { + // sl.cancel() + //} + sl.cancel() <-sl.stopped } From ede7de014eb7de500b3f37c76a4a73c38fe0a91e Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 12 Mar 2026 16:17:30 +0000 Subject: [PATCH 2/3] exp2 Signed-off-by: bwplotka --- scrape/scrape.go | 66 ++++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 38 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index 0710013478..db2e7c682b 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -248,7 +248,7 @@ func (sp *scrapePool) getScrapeFailureLogger() FailureLogger { func (sp *scrapePool) stop() { sp.mtx.Lock() defer sp.mtx.Unlock() - sp.cancel() // ? + sp.cancel() var wg sync.WaitGroup sp.targetMtx.Lock() @@ -268,10 +268,6 @@ func (sp *scrapePool) stop() { sp.targetMtx.Unlock() wg.Wait() - //// Cancel the pool's base context only after all loops have stopped. - //// This ensures that loops performing a final ScrapeOnShutdown have - //// a valid, uncanceled appender context to successfully write their data. - //sp.cancel() sp.client.CloseIdleConnections() if sp.config != nil { @@ -1256,28 +1252,41 @@ func (sl *scrapeLoop) getScrapeOffset() time.Duration { } func (sl *scrapeLoop) run(errc chan<- error) { - select { - case <-time.After(sl.getScrapeOffset()): - // Continue after a scraping offset. - case <-sl.ctx.Done(): + var ( + last time.Time + alignedScrapeTime = time.Now().Round(0) + ticker = time.NewTicker(sl.interval) + ) + defer ticker.Stop() + defer func() { close(sl.stopped) - return + if sl.parentCtx.Err() == nil { + if sl.scrapeOnShutdown { + last = sl.scrapeAndReport(last, time.Now().Round(0), errc) + } + if !sl.disabledEndOfRunStalenessMarkers.Load() { + sl.endOfRunStaleness(last, ticker, sl.interval) + } + } + }() + + // Initial offset and jitter offset, if any. + offset := sl.getScrapeOffset() + if offset > 0 { + select { + case <-time.After(offset): + // Continue after a scraping offset. + case <-sl.ctx.Done(): + return + } } - var last time.Time - - alignedScrapeTime := time.Now().Round(0) - ticker := time.NewTicker(sl.interval) - defer ticker.Stop() - -mainLoop: for { select { case <-sl.parentCtx.Done(): - close(sl.stopped) return case <-sl.ctx.Done(): - break mainLoop + return default: } @@ -1303,15 +1312,6 @@ mainLoop: last = sl.scrapeAndReport(last, scrapeTime, errc) } - - close(sl.stopped) - - if sl.scrapeOnShutdown { - last = sl.scrapeAndReport(last, time.Now().Round(0), errc) - } - if !sl.disabledEndOfRunStalenessMarkers.Load() { - sl.endOfRunStaleness(last, ticker, sl.interval) - } } func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter { @@ -1530,16 +1530,6 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int // Stop the scraping. May still write data and stale markers after it has // returned. Cancel the context to stop all writes. func (sl *scrapeLoop) stop() { - //if sl.scrapeOnShutdown { - // select { - // case sl.shutdownScrape <- struct{}{}: - // case <-sl.stopped: - // // Prevents deadlock: shutdownScrape is unbuffered. If the scrape loop - // // has already exited, a direct send will block forever. - // } - //} else { - // sl.cancel() - //} sl.cancel() <-sl.stopped } From 765e7386c821b89e52e232cc1a4adc3803b4533f Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 12 Mar 2026 16:36:48 +0000 Subject: [PATCH 3/3] fix Signed-off-by: bwplotka --- scrape/manager_test.go | 6 +++--- scrape/scrape.go | 17 +++++++++++------ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 568f567c52..b3ef0b21bc 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -1753,10 +1753,10 @@ func TestManager_InitialScrapeOffset(t *testing.T) { runDuration: 59 * time.Minute, }, { - name: "scrape happens exactly when large offset elapses", + name: "scrape happens when large offset elapses", initialScrapeOffset: 1 * time.Hour, - runDuration: 1 * time.Hour, - expectedSamples: 1, + runDuration: 1*time.Hour + 2*time.Second, + expectedSamples: 2, }, } { t.Run(tcase.name, func(t *testing.T) { diff --git a/scrape/scrape.go b/scrape/scrape.go index db2e7c682b..7fe9a4ad80 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1257,13 +1257,14 @@ func (sl *scrapeLoop) run(errc chan<- error) { alignedScrapeTime = time.Now().Round(0) ticker = time.NewTicker(sl.interval) ) - defer ticker.Stop() defer func() { + ticker.Stop() + if sl.scrapeOnShutdown { + last = sl.scrapeAndReport(last, time.Now().Round(0), errc) + } + // Let the stop() it can unblock. close(sl.stopped) if sl.parentCtx.Err() == nil { - if sl.scrapeOnShutdown { - last = sl.scrapeAndReport(last, time.Now().Round(0), errc) - } if !sl.disabledEndOfRunStalenessMarkers.Load() { sl.endOfRunStaleness(last, ticker, sl.interval) } @@ -1283,8 +1284,6 @@ func (sl *scrapeLoop) run(errc chan<- error) { for { select { - case <-sl.parentCtx.Done(): - return case <-sl.ctx.Done(): return default: @@ -1311,6 +1310,12 @@ func (sl *scrapeLoop) run(errc chan<- error) { } last = sl.scrapeAndReport(last, scrapeTime, errc) + + select { + case <-sl.ctx.Done(): + return + case <-ticker.C: + } } }