diff --git a/CHANGELOG.md b/CHANGELOG.md index 5dcfcadfc6..2e467e5e9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 2.17.0-rc.3 / 2020-03-18 +## 2.17.0-rc.4 / 2020-03-21 This release implements isolation in TSDB. API queries and recording rules are guaranteed to only see full scrapes and full recording rules. This comes with a @@ -20,8 +20,11 @@ some increase in memory usage, CPU usage, or query latency. * [BUGFIX] PromQL: Do not escape HTML-like chars in query log #6834 #6795 * [BUGFIX] React UI: Fix data table matrix values #6896 * [BUGFIX] React UI: Fix new targets page not loading when using non-ASCII characters #6892 +* [BUGFIX] Remote read: Fix duplication of metrics read from remote storage with external labels #6967 #7018 +* [BUGFIX] Remote write: Register WAL watcher and live reader metrics for all remotes, not just the first one #6998 * [BUGFIX] Scrape: Prevent removal of metric names upon relabeling #6891 * [BUGFIX] Scrape: Fix 'superfluous response.WriteHeader call' errors when scrape fails under some circonstances #6986 +* [BUGFIX] Scrape: Fix crash when reloads are separated by two scrape intervals #7011 ## 2.16.0 / 2020-02-13 diff --git a/VERSION b/VERSION index 9dba67016b..f14ff579c2 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.17.0-rc.3 +2.17.0-rc.4 diff --git a/scrape/scrape.go b/scrape/scrape.go index fbabba2472..98cea32923 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -314,6 +314,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { for fp, oldLoop := range sp.loops { var cache *scrapeCache if oc := oldLoop.getCache(); reuseCache && oc != nil { + oldLoop.disableEndOfRunStalenessMarkers() cache = oc } else { cache = newScrapeCache() @@ -593,6 +594,7 @@ type loop interface { run(interval, timeout time.Duration, errc chan<- error) stop() getCache() *scrapeCache + disableEndOfRunStalenessMarkers() } type cacheEntry struct { @@ -619,6 +621,8 @@ type scrapeLoop struct { ctx context.Context cancel func() stopped chan struct{} + + disabledEndOfRunStalenessMarkers bool } // scrapeCache tracks mappings of exposed metric strings to label sets and @@ -996,7 +1000,9 @@ mainLoop: close(sl.stopped) - sl.endOfRunStaleness(last, ticker, interval) + if !sl.disabledEndOfRunStalenessMarkers { + sl.endOfRunStaleness(last, ticker, interval) + } } func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) { @@ -1054,6 +1060,10 @@ func (sl *scrapeLoop) stop() { <-sl.stopped } +func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() { + sl.disabledEndOfRunStalenessMarkers = true +} + func (sl *scrapeLoop) getCache() *scrapeCache { return sl.cache } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 4271f27490..1d3864bb92 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -141,6 +141,9 @@ func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) { l.startFunc(interval, timeout, errc) } +func (l *testLoop) disableEndOfRunStalenessMarkers() { +} + func (l *testLoop) stop() { l.stopFunc() } @@ -1839,3 +1842,39 @@ func TestScrapeAddFast(t *testing.T) { _, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second)) testutil.Ok(t, err) } + +func TestReuseCacheRace(t *testing.T) { + var ( + app = &nopAppendable{} + cfg = &config.ScrapeConfig{ + JobName: "Prometheus", + ScrapeTimeout: model.Duration(5 * time.Second), + ScrapeInterval: model.Duration(5 * time.Second), + MetricsPath: "/metrics", + } + sp, _ = newScrapePool(cfg, app, 0, nil) + t1 = &Target{ + discoveredLabels: labels.Labels{ + labels.Label{ + Name: "labelNew", + Value: "nameNew", + }, + }, + } + ) + sp.sync([]*Target{t1}) + + start := time.Now() + for i := uint(1); i > 0; i++ { + if time.Since(start) > 5*time.Second { + break + } + sp.reload(&config.ScrapeConfig{ + JobName: "Prometheus", + ScrapeTimeout: model.Duration(1 * time.Millisecond), + ScrapeInterval: model.Duration(1 * time.Millisecond), + MetricsPath: "/metrics", + SampleLimit: i, + }) + } +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a4384e03ee..9324062172 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -271,7 +271,7 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { +func NewQueueManager(metrics *queueManagerMetrics, watcherMetrics *wal.WatcherMetrics, readerMetrics *wal.LiveReaderMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { if logger == nil { logger = log.NewNopLogger() } @@ -301,7 +301,7 @@ func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, lo metrics: metrics, } - t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, client.Name(), t, walDir) + t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir) t.shards = t.newShards() return t diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e4917441fc..74a7fddbce 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -61,7 +61,7 @@ func TestSampleDelivery(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) // These should be received by the client. @@ -90,7 +90,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -131,7 +131,7 @@ func TestSampleDeliveryOrder(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() @@ -150,7 +150,8 @@ func TestShutdown(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -188,7 +189,7 @@ func TestSeriesReset(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -218,7 +219,7 @@ func TestReshard(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() @@ -251,7 +252,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil) - m = NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() h.Unlock() h.Lock() @@ -269,7 +270,7 @@ func TestReshardRaceWithStop(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) { metrics := newQueueManagerMetrics(nil) c := NewTestStorageClient() - m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() for i := 1; i < 1000; i++ { @@ -316,7 +317,7 @@ func TestCalculateDesiredsShards(t *testing.T) { for _, c := range cases { metrics := newQueueManagerMetrics(nil) client := NewTestStorageClient() - m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) m.numShards = c.startingShards m.samplesIn.incr(c.samplesIn) m.samplesOut.incr(c.samplesOut) @@ -527,7 +528,7 @@ func BenchmarkSampleDelivery(b *testing.B) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) // These should be received by the client. @@ -569,7 +570,7 @@ func BenchmarkStartup(b *testing.B) { for n := 0; n < b.N; n++ { metrics := newQueueManagerMetrics(nil) c := NewTestBlockedStorageClient() - m := NewQueueManager(nil, metrics, logger, dir, + m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) @@ -620,7 +621,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(nil, metrics, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual diff --git a/storage/remote/write.go b/storage/remote/write.go index 665eb2b071..5d77ec751d 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/wal" ) var ( @@ -46,11 +47,12 @@ var ( // WriteStorage represents all the remote write storage. type WriteStorage struct { - reg prometheus.Registerer logger log.Logger mtx sync.Mutex queueMetrics *queueManagerMetrics + watcherMetrics *wal.WatcherMetrics + liveReaderMetrics *wal.LiveReaderMetrics configHash string externalLabelHash string walDir string @@ -65,13 +67,14 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string logger = log.NewNopLogger() } rws := &WriteStorage{ - queues: make(map[string]*QueueManager), - reg: reg, - queueMetrics: newQueueManagerMetrics(reg), - logger: logger, - flushDeadline: flushDeadline, - samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), - walDir: walDir, + queues: make(map[string]*QueueManager), + queueMetrics: newQueueManagerMetrics(reg), + watcherMetrics: wal.NewWatcherMetrics(reg), + liveReaderMetrics: wal.NewLiveReaderMetrics(reg), + logger: logger, + flushDeadline: flushDeadline, + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + walDir: walDir, } go rws.run() return rws @@ -152,8 +155,9 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { return err } newQueues[hash] = NewQueueManager( - rws.reg, rws.queueMetrics, + rws.watcherMetrics, + rws.liveReaderMetrics, rws.logger, rws.walDir, rws.samplesIn, diff --git a/tsdb/wal/live_reader.go b/tsdb/wal/live_reader.go index 446e859940..7124f6408e 100644 --- a/tsdb/wal/live_reader.go +++ b/tsdb/wal/live_reader.go @@ -28,14 +28,14 @@ import ( ) // liveReaderMetrics holds all metrics exposed by the LiveReader. -type liveReaderMetrics struct { +type LiveReaderMetrics struct { readerCorruptionErrors *prometheus.CounterVec } // NewLiveReaderMetrics instantiates, registers and returns metrics to be injected // at LiveReader instantiation. -func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics { - m := &liveReaderMetrics{ +func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics { + m := &LiveReaderMetrics{ readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "prometheus_tsdb_wal_reader_corruption_errors_total", Help: "Errors encountered when reading the WAL.", @@ -43,15 +43,14 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics { } if reg != nil { - // TODO(codesome): log error. - _ = reg.Register(m.readerCorruptionErrors) + reg.MustRegister(m.readerCorruptionErrors) } return m } // NewLiveReader returns a new live reader. -func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader { +func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader { lr := &LiveReader{ logger: logger, rdr: r, @@ -89,7 +88,7 @@ type LiveReader struct { // NB the non-ive Reader implementation allows for this. permissive bool - metrics *liveReaderMetrics + metrics *LiveReaderMetrics } // Err returns any errors encountered reading the WAL. io.EOFs are not terminal diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index f92386f0de..4a9e7f455d 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -66,7 +66,7 @@ type Watcher struct { walDir string lastCheckpoint string metrics *WatcherMetrics - readerMetrics *liveReaderMetrics + readerMetrics *LiveReaderMetrics startTime time.Time startTimestamp int64 // the start time as a Prometheus timestamp @@ -125,17 +125,17 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { } if reg != nil { - _ = reg.Register(m.recordsRead) - _ = reg.Register(m.recordDecodeFails) - _ = reg.Register(m.samplesSentPreTailing) - _ = reg.Register(m.currentSegment) + reg.MustRegister(m.recordsRead) + reg.MustRegister(m.recordDecodeFails) + reg.MustRegister(m.samplesSentPreTailing) + reg.MustRegister(m.currentSegment) } return m } // NewWatcher creates a new WAL watcher for a given WriteTo. -func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { +func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { if logger == nil { logger = log.NewNopLogger() } @@ -143,7 +143,7 @@ func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.L logger: logger, writer: writer, metrics: metrics, - readerMetrics: NewLiveReaderMetrics(reg), + readerMetrics: readerMetrics, walDir: path.Join(walDir, "wal"), name: name, quit: make(chan struct{}), @@ -179,11 +179,13 @@ func (w *Watcher) Stop() { <-w.done // Records read metric has series and samples. - w.metrics.recordsRead.DeleteLabelValues(w.name, "series") - w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") - w.metrics.recordDecodeFails.DeleteLabelValues(w.name) - w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) - w.metrics.currentSegment.DeleteLabelValues(w.name) + if w.metrics != nil { + w.metrics.recordsRead.DeleteLabelValues(w.name, "series") + w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") + w.metrics.recordDecodeFails.DeleteLabelValues(w.name) + w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) + w.metrics.currentSegment.DeleteLabelValues(w.name) + } level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) } diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index db8e3e89ff..482d96551f 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -138,7 +138,7 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) watcher.SetStartTime(now) // Set the Watcher's metrics so they're not nil pointers. @@ -148,7 +148,7 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) defer segment.Close() - reader := NewLiveReader(nil, NewLiveReaderMetrics(prometheus.DefaultRegisterer), segment) + reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment) // Use tail true so we can ensure we got the right number of samples. watcher.readSegment(reader, i, true) } @@ -217,7 +217,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) go watcher.Start() expected := seriesCount @@ -303,7 +303,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = w.Segments() testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) go watcher.Start() expected := seriesCount * 2 @@ -368,7 +368,7 @@ func TestReadCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) go watcher.Start() expectedSeries := seriesCount @@ -439,7 +439,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { } wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) watcher.MaxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. @@ -510,7 +510,7 @@ func TestCheckpointSeriesReset(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) watcher.MaxSegment = -1 go watcher.Start()