diff --git a/README.md b/README.md index be44109f7e..2bb49f32d4 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ intervals, evaluate rule expressions, display the results, and trigger an action if some condition is observed to be true. TODO: The above description is somewhat esoteric. Rephrase it into -somethith that tells normal people how they will usually benefit from +something that tells normal people how they will usually benefit from using Prometheus. ## Install diff --git a/retrieval/target.go b/retrieval/target.go index 11ec36d816..1a134b9cd4 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -19,6 +19,7 @@ import ( "net/http" "os" "strings" + "sync" "time" "github.com/golang/glog" @@ -136,14 +137,9 @@ type Target interface { RunScraper(extraction.Ingester, time.Duration) // Stop scraping, synchronous. StopScraper() - // Do a single scrape. - scrape(ingester extraction.Ingester) error } // target is a Target that refers to a singular HTTP or HTTPS endpoint. -// -// TODO: The implementation is not yet goroutine safe, but for the web status, -// methods are called concurrently. type target struct { // The current health state of the target. state TargetState @@ -151,8 +147,10 @@ type target struct { lastError error // The last time a scrape was attempted. lastScrape time.Time - // Closing stopScraper signals that scraping should stop. - stopScraper chan struct{} + // Closing scraperStopping signals that scraping should stop. + scraperStopping chan struct{} + // Closing scraperStopped signals that scraping has been stopped. + scraperStopped chan struct{} // Channel to queue base labels to be replaced. newBaseLabels chan clientmodel.LabelSet @@ -163,17 +161,25 @@ type target struct { baseLabels clientmodel.LabelSet // The HTTP client used to scrape the target's endpoint. httpClient *http.Client + + // Mutex protects lastError, lastScrape, state, and baseLabels. Writing + // the above must only happen in the goroutine running the RunScraper + // loop, and it must happen under the lock. In that way, no mutex lock + // is required for reading the above in the goroutine running the + // RunScraper loop, but only for reading in other goroutines. + sync.Mutex } // Furnish a reasonably configured target for querying. func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target { target := &target{ - address: address, - Deadline: deadline, - baseLabels: baseLabels, - httpClient: utility.NewDeadlineClient(deadline), - stopScraper: make(chan struct{}), - newBaseLabels: make(chan clientmodel.LabelSet, 1), + address: address, + Deadline: deadline, + baseLabels: baseLabels, + httpClient: utility.NewDeadlineClient(deadline), + scraperStopping: make(chan struct{}), + scraperStopped: make(chan struct{}), + newBaseLabels: make(chan clientmodel.LabelSet, 1), } return target @@ -213,6 +219,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration case <-t.newBaseLabels: // Do nothing. default: + close(t.scraperStopped) return } } @@ -221,7 +228,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64())) select { case <-jitterTimer.C: - case <-t.stopScraper: + case <-t.scraperStopping: jitterTimer.Stop() return } @@ -230,32 +237,40 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration ticker := time.NewTicker(interval) defer ticker.Stop() + t.Lock() // Writing t.lastScrape requires the lock. t.lastScrape = time.Now() + t.Unlock() t.scrape(ingester) // Explanation of the contraption below: // - // In case t.newBaseLabels or t.stopScraper have something to receive, + // In case t.newBaseLabels or t.scraperStopping have something to receive, // we want to read from those channels rather than starting a new scrape // (which might take very long). That's why the outer select has no - // ticker.C. Should neither t.newBaseLabels nor t.stopScraper have + // ticker.C. Should neither t.newBaseLabels nor t.scraperStopping have // anything to receive, we go into the inner select, where ticker.C is // in the mix. for { select { case newBaseLabels := <-t.newBaseLabels: + t.Lock() // Writing t.baseLabels requires the lock. t.baseLabels = newBaseLabels - case <-t.stopScraper: + t.Unlock() + case <-t.scraperStopping: return default: select { case newBaseLabels := <-t.newBaseLabels: + t.Lock() // Writing t.baseLabels requires the lock. t.baseLabels = newBaseLabels - case <-t.stopScraper: + t.Unlock() + case <-t.scraperStopping: return case <-ticker.C: targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second)) + t.Lock() // Write t.lastScrape requires locking. t.lastScrape = time.Now() + t.Unlock() t.scrape(ingester) } } @@ -264,7 +279,8 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration // StopScraper implements Target. func (t *target) StopScraper() { - close(t.stopScraper) + close(t.scraperStopping) + <-t.scraperStopped } const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` @@ -278,6 +294,7 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) { instance: t.Address(), outcome: success, } + t.Lock() // Writing t.state and t.lastError requires the lock. if err == nil { t.state = ALIVE t.recordScrapeHealth(ingester, timestamp, true) @@ -288,6 +305,7 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) { } targetOperationLatencies.With(labels).Observe(ms) t.lastError = err + t.Unlock() }(time.Now()) req, err := http.NewRequest("GET", t.Address(), nil) @@ -310,8 +328,6 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) { return err } - // TODO: This is a wart; we need to handle this more gracefully down the - // road, especially once we have service discovery support. baseLabels := clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.Address())} for baseLabel, baseValue := range t.baseLabels { baseLabels[baseLabel] = baseValue @@ -331,16 +347,22 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) { // LastError implements Target. func (t *target) LastError() error { + t.Lock() + defer t.Unlock() return t.lastError } // State implements Target. func (t *target) State() TargetState { + t.Lock() + defer t.Unlock() return t.state } // LastScrape implements Target. func (t *target) LastScrape() time.Time { + t.Lock() + defer t.Unlock() return t.lastScrape } @@ -365,6 +387,8 @@ func (t *target) GlobalAddress() string { // BaseLabels implements Target. func (t *target) BaseLabels() clientmodel.LabelSet { + t.Lock() + defer t.Unlock() return t.baseLabels } @@ -375,5 +399,3 @@ func (t *target) SetBaseLabelsFrom(newTarget Target) { } t.newBaseLabels <- newTarget.BaseLabels() } - -type targets []Target diff --git a/retrieval/target_test.go b/retrieval/target_test.go index de43dc9635..22797dcc21 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -100,7 +100,7 @@ func TestTargetScrapeTimeout(t *testing.T) { // scrape once without timeout signal <- true - if err := testTarget.scrape(ingester); err != nil { + if err := testTarget.(*target).scrape(ingester); err != nil { t.Fatal(err) } @@ -109,12 +109,12 @@ func TestTargetScrapeTimeout(t *testing.T) { // now scrape again signal <- true - if err := testTarget.scrape(ingester); err != nil { + if err := testTarget.(*target).scrape(ingester); err != nil { t.Fatal(err) } // now timeout - if err := testTarget.scrape(ingester); err == nil { + if err := testTarget.(*target).scrape(ingester); err == nil { t.Fatal("expected scrape to timeout") } else { signal <- true // let handler continue @@ -122,7 +122,7 @@ func TestTargetScrapeTimeout(t *testing.T) { // now scrape again without timeout signal <- true - if err := testTarget.scrape(ingester); err != nil { + if err := testTarget.(*target).scrape(ingester); err != nil { t.Fatal(err) } } @@ -138,7 +138,7 @@ func TestTargetScrape404(t *testing.T) { ingester := nopIngester{} want := errors.New("server returned HTTP status 404 Not Found") - got := testTarget.scrape(ingester) + got := testTarget.(*target).scrape(ingester) if got == nil || want.Error() != got.Error() { t.Fatalf("want err %q, got %q", want, got) } @@ -146,10 +146,11 @@ func TestTargetScrape404(t *testing.T) { func TestTargetRunScraperScrapes(t *testing.T) { testTarget := target{ - state: UNKNOWN, - address: "bad schema", - httpClient: utility.NewDeadlineClient(0), - stopScraper: make(chan struct{}), + state: UNKNOWN, + address: "bad schema", + httpClient: utility.NewDeadlineClient(0), + scraperStopping: make(chan struct{}), + scraperStopped: make(chan struct{}), } go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond)) diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 882b5364b1..4976510b82 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -23,16 +23,18 @@ import ( "github.com/prometheus/prometheus/config" ) +// TargetManager manages all scrape targets. All methods are goroutine-safe. type TargetManager interface { AddTarget(job config.JobConfig, t Target) ReplaceTargets(job config.JobConfig, newTargets []Target) Remove(t Target) AddTargetsFromConfig(config config.Config) Stop() - Pools() map[string]*TargetPool + Pools() map[string]*TargetPool // Returns a copy of the name -> TargetPool mapping. } type targetManager struct { + sync.Mutex // Protects poolByJob. poolsByJob map[string]*TargetPool ingester extraction.Ingester } @@ -44,7 +46,7 @@ func NewTargetManager(ingester extraction.Ingester) TargetManager { } } -func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool { +func (m *targetManager) targetPoolForJob(job config.JobConfig) *TargetPool { targetPool, ok := m.poolsByJob[job.GetName()] if !ok { @@ -58,7 +60,6 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool { glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName()) m.poolsByJob[job.GetName()] = targetPool - // TODO: Investigate whether this auto-goroutine creation is desired. go targetPool.Run() } @@ -66,24 +67,32 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool { } func (m *targetManager) AddTarget(job config.JobConfig, t Target) { - targetPool := m.TargetPoolForJob(job) + m.Lock() + defer m.Unlock() + + targetPool := m.targetPoolForJob(job) targetPool.AddTarget(t) m.poolsByJob[job.GetName()] = targetPool } func (m *targetManager) ReplaceTargets(job config.JobConfig, newTargets []Target) { - targetPool := m.TargetPoolForJob(job) + m.Lock() + defer m.Unlock() + + targetPool := m.targetPoolForJob(job) targetPool.ReplaceTargets(newTargets) } -func (m targetManager) Remove(t Target) { +func (m *targetManager) Remove(t Target) { panic("not implemented") } func (m *targetManager) AddTargetsFromConfig(config config.Config) { for _, job := range config.Jobs() { if job.SdName != nil { - m.TargetPoolForJob(job) + m.Lock() + m.targetPoolForJob(job) + m.Unlock() continue } @@ -106,6 +115,9 @@ func (m *targetManager) AddTargetsFromConfig(config config.Config) { } func (m *targetManager) Stop() { + m.Lock() + defer m.Unlock() + glog.Info("Stopping target manager...") var wg sync.WaitGroup for j, p := range m.poolsByJob { @@ -121,7 +133,13 @@ func (m *targetManager) Stop() { glog.Info("Target manager stopped.") } -// TODO: Not goroutine-safe. Only used in /status page for now. func (m *targetManager) Pools() map[string]*TargetPool { - return m.poolsByJob + m.Lock() + defer m.Unlock() + + result := make(map[string]*TargetPool, len(m.poolsByJob)) + for k, v := range m.poolsByJob { + result[k] = v + } + return result } diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index 83bf13a335..a0c27b71d5 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -122,9 +122,9 @@ func (p *TargetPool) ReplaceTargets(newTargets []Target) { defer wg.Done() glog.V(1).Infof("Stopping scraper for target %s...", k) oldTarget.StopScraper() - delete(p.targetsByAddress, k) glog.V(1).Infof("Scraper for target %s stopped.", k) }(k, oldTarget) + delete(p.targetsByAddress, k) } } wg.Wait() diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index 636600add2..4d8fb21c91 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -114,32 +114,36 @@ func TestTargetPool(t *testing.T) { func TestTargetPoolReplaceTargets(t *testing.T) { pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1)) oldTarget1 := &target{ - address: "example1", - state: UNREACHABLE, - stopScraper: make(chan struct{}), - newBaseLabels: make(chan clientmodel.LabelSet, 1), - httpClient: &http.Client{}, + address: "example1", + state: UNREACHABLE, + scraperStopping: make(chan struct{}), + scraperStopped: make(chan struct{}), + newBaseLabels: make(chan clientmodel.LabelSet, 1), + httpClient: &http.Client{}, } oldTarget2 := &target{ - address: "example2", - state: UNREACHABLE, - stopScraper: make(chan struct{}), - newBaseLabels: make(chan clientmodel.LabelSet, 1), - httpClient: &http.Client{}, + address: "example2", + state: UNREACHABLE, + scraperStopping: make(chan struct{}), + scraperStopped: make(chan struct{}), + newBaseLabels: make(chan clientmodel.LabelSet, 1), + httpClient: &http.Client{}, } newTarget1 := &target{ - address: "example1", - state: ALIVE, - stopScraper: make(chan struct{}), - newBaseLabels: make(chan clientmodel.LabelSet, 1), - httpClient: &http.Client{}, + address: "example1", + state: ALIVE, + scraperStopping: make(chan struct{}), + scraperStopped: make(chan struct{}), + newBaseLabels: make(chan clientmodel.LabelSet, 1), + httpClient: &http.Client{}, } newTarget2 := &target{ - address: "example3", - state: ALIVE, - stopScraper: make(chan struct{}), - newBaseLabels: make(chan clientmodel.LabelSet, 1), - httpClient: &http.Client{}, + address: "example3", + state: ALIVE, + scraperStopping: make(chan struct{}), + scraperStopped: make(chan struct{}), + newBaseLabels: make(chan clientmodel.LabelSet, 1), + httpClient: &http.Client{}, } pool.addTarget(oldTarget1) diff --git a/rules/ast/functions.go b/rules/ast/functions.go index 9671be49bd..328d6b83d9 100644 --- a/rules/ast/functions.go +++ b/rules/ast/functions.go @@ -255,8 +255,7 @@ func dropCommonLabelsImpl(timestamp clientmodel.Timestamp, args []Node) interfac } common := clientmodel.LabelSet{} for k, v := range vector[0].Metric { - // TODO(julius): Revisit this when https://github.com/prometheus/prometheus/issues/380 - // is implemented. + // TODO(julius): Should we also drop common metric names? if k == clientmodel.MetricNameLabel { continue } diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 0145e2e15f..bd871498cb 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -125,6 +125,13 @@ func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool { return !t.Before(cd.firstTime()) && !t.After(cd.lastTime()) } +func (cd *chunkDesc) getChunk() chunk { + cd.Lock() + defer cd.Unlock() + + return cd.chunk +} + func (cd *chunkDesc) setChunk(c chunk) { cd.Lock() defer cd.Unlock() diff --git a/storage/local/series.go b/storage/local/series.go index 9d2286067a..154488d7af 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -381,11 +381,11 @@ func (s *memorySeries) preloadChunksForRange( func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator { chunks := make([]chunk, 0, len(s.chunkDescs)) for i, cd := range s.chunkDescs { - if !cd.isEvicted() { + if chunk := cd.getChunk(); chunk != nil { if i == len(s.chunkDescs)-1 && !s.headChunkPersisted { s.headChunkUsedByIterator = true } - chunks = append(chunks, cd.chunk) + chunks = append(chunks, chunk) } }