mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-14 09:06:37 +02:00
Merge pull request #1 from prometheus/scrapeshutdownpair
Playing with alternatives
This commit is contained in:
commit
dc0d919f4b
@ -1753,10 +1753,10 @@ func TestManager_InitialScrapeOffset(t *testing.T) {
|
||||
runDuration: 59 * time.Minute,
|
||||
},
|
||||
{
|
||||
name: "scrape happens exactly when large offset elapses",
|
||||
name: "scrape happens when large offset elapses",
|
||||
initialScrapeOffset: 1 * time.Hour,
|
||||
runDuration: 1 * time.Hour,
|
||||
expectedSamples: 1,
|
||||
runDuration: 1*time.Hour + 2*time.Second,
|
||||
expectedSamples: 2,
|
||||
},
|
||||
} {
|
||||
t.Run(tcase.name, func(t *testing.T) {
|
||||
|
||||
106
scrape/scrape.go
106
scrape/scrape.go
@ -248,6 +248,7 @@ func (sp *scrapePool) getScrapeFailureLogger() FailureLogger {
|
||||
func (sp *scrapePool) stop() {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
sp.cancel()
|
||||
var wg sync.WaitGroup
|
||||
|
||||
sp.targetMtx.Lock()
|
||||
@ -267,10 +268,6 @@ func (sp *scrapePool) stop() {
|
||||
sp.targetMtx.Unlock()
|
||||
|
||||
wg.Wait()
|
||||
// Cancel the pool's base context only after all loops have stopped.
|
||||
// This ensures that loops performing a final ScrapeOnShutdown have
|
||||
// a valid, uncanceled appender context to successfully write their data.
|
||||
sp.cancel()
|
||||
sp.client.CloseIdleConnections()
|
||||
|
||||
if sp.config != nil {
|
||||
@ -833,14 +830,13 @@ type cacheEntry struct {
|
||||
|
||||
type scrapeLoop struct {
|
||||
// Parameters.
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
stopped chan struct{}
|
||||
shutdownScrape chan struct{}
|
||||
parentCtx context.Context
|
||||
appenderCtx context.Context
|
||||
l *slog.Logger
|
||||
cache *scrapeCache
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
stopped chan struct{}
|
||||
parentCtx context.Context
|
||||
appenderCtx context.Context
|
||||
l *slog.Logger
|
||||
cache *scrapeCache
|
||||
|
||||
interval time.Duration
|
||||
timeout time.Duration
|
||||
@ -1182,14 +1178,13 @@ func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
|
||||
|
||||
ctx, cancel := context.WithCancel(opts.sp.ctx)
|
||||
return &scrapeLoop{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
stopped: make(chan struct{}),
|
||||
shutdownScrape: make(chan struct{}),
|
||||
parentCtx: opts.sp.ctx,
|
||||
appenderCtx: appenderCtx,
|
||||
l: opts.sp.logger.With("target", opts.target),
|
||||
cache: opts.cache,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
stopped: make(chan struct{}),
|
||||
parentCtx: opts.sp.ctx,
|
||||
appenderCtx: appenderCtx,
|
||||
l: opts.sp.logger.With("target", opts.target),
|
||||
cache: opts.cache,
|
||||
|
||||
interval: opts.interval,
|
||||
timeout: opts.timeout,
|
||||
@ -1257,24 +1252,43 @@ func (sl *scrapeLoop) getScrapeOffset() time.Duration {
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) run(errc chan<- error) {
|
||||
select {
|
||||
case <-time.After(sl.getScrapeOffset()):
|
||||
// Continue after a scraping offset.
|
||||
case <-sl.shutdownScrape:
|
||||
sl.cancel()
|
||||
case <-sl.ctx.Done():
|
||||
var (
|
||||
last time.Time
|
||||
alignedScrapeTime = time.Now().Round(0)
|
||||
ticker = time.NewTicker(sl.interval)
|
||||
)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
if sl.scrapeOnShutdown {
|
||||
last = sl.scrapeAndReport(last, time.Now().Round(0), errc)
|
||||
}
|
||||
// Let the stop() it can unblock.
|
||||
close(sl.stopped)
|
||||
return
|
||||
if sl.parentCtx.Err() == nil {
|
||||
if !sl.disabledEndOfRunStalenessMarkers.Load() {
|
||||
sl.endOfRunStaleness(last, ticker, sl.interval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Initial offset and jitter offset, if any.
|
||||
offset := sl.getScrapeOffset()
|
||||
if offset > 0 {
|
||||
select {
|
||||
case <-time.After(offset):
|
||||
// Continue after a scraping offset.
|
||||
case <-sl.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var last time.Time
|
||||
|
||||
alignedScrapeTime := time.Now().Round(0)
|
||||
ticker := time.NewTicker(sl.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
mainLoop:
|
||||
for {
|
||||
select {
|
||||
case <-sl.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Temporary workaround for a jitter in go timers that causes disk space
|
||||
// increase in TSDB.
|
||||
// See https://github.com/prometheus/prometheus/issues/7846
|
||||
@ -1298,22 +1312,11 @@ mainLoop:
|
||||
last = sl.scrapeAndReport(last, scrapeTime, errc)
|
||||
|
||||
select {
|
||||
case <-sl.parentCtx.Done():
|
||||
close(sl.stopped)
|
||||
return
|
||||
case <-sl.ctx.Done():
|
||||
break mainLoop
|
||||
case <-sl.shutdownScrape:
|
||||
sl.cancel()
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
|
||||
close(sl.stopped)
|
||||
|
||||
if !sl.disabledEndOfRunStalenessMarkers.Load() {
|
||||
sl.endOfRunStaleness(last, ticker, sl.interval)
|
||||
}
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter {
|
||||
@ -1532,16 +1535,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
|
||||
// Stop the scraping. May still write data and stale markers after it has
|
||||
// returned. Cancel the context to stop all writes.
|
||||
func (sl *scrapeLoop) stop() {
|
||||
if sl.scrapeOnShutdown {
|
||||
select {
|
||||
case sl.shutdownScrape <- struct{}{}:
|
||||
case <-sl.stopped:
|
||||
// Prevents deadlock: shutdownScrape is unbuffered. If the scrape loop
|
||||
// has already exited, a direct send will block forever.
|
||||
}
|
||||
} else {
|
||||
sl.cancel()
|
||||
}
|
||||
sl.cancel()
|
||||
<-sl.stopped
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user