diff --git a/config/config.go b/config/config.go index 01dfee3b26..383803b592 100644 --- a/config/config.go +++ b/config/config.go @@ -58,7 +58,7 @@ func (config *Config) AddJob(options map[string]string, targets []Targets) error if len(targets) == 0 { return fmt.Errorf("No targets configured for job '%v'", name) } - job := &JobConfig{ + job := JobConfig{ Targets: tmpJobTargets, } for option, value := range options { @@ -66,10 +66,20 @@ func (config *Config) AddJob(options map[string]string, targets []Targets) error return err } } - config.Jobs = append(config.Jobs, *job) + config.Jobs = append(config.Jobs, job) return nil } +func (config *Config) GetJobByName(name string) (jobConfig *JobConfig) { + for _, job := range config.Jobs { + if job.Name == name { + jobConfig = &job + break + } + } + return +} + func (config *GlobalConfig) SetOption(option string, value string) (err error) { switch option { case "scrape_interval": diff --git a/retrieval/target.go b/retrieval/target.go index eef5fe4c11..695e7f59dd 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -79,8 +79,12 @@ type Target interface { // points in this interface, this one is the best candidate to change given // the ways to express the endpoint. Address() string - // How frequently queries occur. - Interval() time.Duration + // Return the target's base labels. + BaseLabels() model.LabelSet + // Merge a new externally supplied target definition (e.g. with changed base + // labels) into an old target definition for the same endpoint. Preserve + // remaining information - like health state - from the old target. + Merge(newTarget Target) } // target is a Target that refers to a singular HTTP or HTTPS endpoint. @@ -94,19 +98,15 @@ type target struct { // What is the deadline for the HTTP or HTTPS against this endpoint. Deadline time.Duration // Any base labels that are added to this target and its metrics. - BaseLabels model.LabelSet - - // XXX: Move this to a field with the target manager initialization instead of here. - interval time.Duration + baseLabels model.LabelSet } // Furnish a reasonably configured target for querying. -func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) Target { +func NewTarget(address string, deadline time.Duration, baseLabels model.LabelSet) Target { target := &target{ address: address, Deadline: deadline, - interval: interval, - BaseLabels: baseLabels, + baseLabels: baseLabels, } scheduler := &healthScheduler{ @@ -155,7 +155,7 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err // XXX: This is a wart; we need to handle this more gracefully down the // road, especially once we have service discovery support. baseLabels := model.LabelSet{instance: model.LabelValue(t.Address())} - for baseLabel, baseValue := range t.BaseLabels { + for baseLabel, baseValue := range t.baseLabels { baseLabels[baseLabel] = baseValue } @@ -200,6 +200,16 @@ func (t target) Address() string { return t.address } -func (t target) Interval() time.Duration { - return t.interval +func (t target) BaseLabels() model.LabelSet { + return t.baseLabels +} + +// Merge a new externally supplied target definition (e.g. with changed base +// labels) into an old target definition for the same endpoint. Preserve +// remaining information - like health state - from the old target. +func (t *target) Merge(newTarget Target) { + if t.Address() != newTarget.Address() { + panic("targets don't refer to the same endpoint") + } + t.baseLabels = newTarget.BaseLabels() } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 1c1f520067..f4a24093a7 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -14,7 +14,6 @@ package retrieval import ( - "container/heap" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/retrieval/format" @@ -25,14 +24,16 @@ import ( type TargetManager interface { acquire() release() - Add(t Target) + AddTarget(job *config.JobConfig, t Target, defaultScrapeInterval time.Duration) + ReplaceTargets(job *config.JobConfig, newTargets []Target, defaultScrapeInterval time.Duration) Remove(t Target) AddTargetsFromConfig(config *config.Config) + Pools() map[string]*TargetPool } type targetManager struct { requestAllowance chan bool - pools map[time.Duration]*TargetPool + poolsByJob map[string]*TargetPool results chan format.Result } @@ -40,7 +41,7 @@ func NewTargetManager(results chan format.Result, requestAllowance int) TargetMa return &targetManager{ requestAllowance: make(chan bool, requestAllowance), results: results, - pools: make(map[time.Duration]*TargetPool), + poolsByJob: make(map[string]*TargetPool), } } @@ -52,17 +53,32 @@ func (m *targetManager) release() { <-m.requestAllowance } -func (m *targetManager) Add(t Target) { - targetPool, ok := m.pools[t.Interval()] +func (m *targetManager) TargetPoolForJob(job *config.JobConfig, defaultScrapeInterval time.Duration) (targetPool *TargetPool) { + targetPool, ok := m.poolsByJob[job.Name] if !ok { targetPool = NewTargetPool(m) - log.Printf("Pool %s does not exist; creating and starting...", t.Interval()) - go targetPool.Run(m.results, t.Interval()) - } + log.Printf("Pool for job %s does not exist; creating and starting...", job.Name) - heap.Push(targetPool, t) - m.pools[t.Interval()] = targetPool + interval := job.ScrapeInterval + if interval == 0 { + interval = defaultScrapeInterval + } + m.poolsByJob[job.Name] = targetPool + go targetPool.Run(m.results, interval) + } + return +} + +func (m *targetManager) AddTarget(job *config.JobConfig, t Target, defaultScrapeInterval time.Duration) { + targetPool := m.TargetPoolForJob(job, defaultScrapeInterval) + targetPool.AddTarget(t) + m.poolsByJob[job.Name] = targetPool +} + +func (m *targetManager) ReplaceTargets(job *config.JobConfig, newTargets []Target, defaultScrapeInterval time.Duration) { + targetPool := m.TargetPoolForJob(job, defaultScrapeInterval) + targetPool.replaceTargets(newTargets) } func (m targetManager) Remove(t Target) { @@ -79,15 +95,15 @@ func (m *targetManager) AddTargetsFromConfig(config *config.Config) { baseLabels[label] = value } - interval := job.ScrapeInterval - if interval == 0 { - interval = config.Global.ScrapeInterval - } - for _, endpoint := range configTargets.Endpoints { - target := NewTarget(endpoint, interval, time.Second*5, baseLabels) - m.Add(target) + target := NewTarget(endpoint, time.Second*5, baseLabels) + m.AddTarget(&job, target, config.Global.ScrapeInterval) } } } } + +// XXX: Not really thread-safe. Only used in /status page for now. +func (m *targetManager) Pools() map[string]*TargetPool { + return m.poolsByJob +} diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 7eb6909c82..a793b6bb27 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -14,6 +14,8 @@ package retrieval import ( + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/retrieval/format" "github.com/prometheus/prometheus/utility/test" "testing" @@ -31,6 +33,10 @@ func (t fakeTarget) Address() string { return "fake" } +func (t fakeTarget) BaseLabels() model.LabelSet { + return model.LabelSet{} +} + func (t fakeTarget) Interval() time.Duration { return t.interval } @@ -52,9 +58,17 @@ func (t *fakeTarget) scheduledFor() (time time.Time) { return } +func (t *fakeTarget) Merge(newTarget Target) {} + func testTargetManager(t test.Tester) { results := make(chan format.Result, 5) targetManager := NewTargetManager(results, 3) + testJob1 := &config.JobConfig{ + Name: "test_job1", + } + testJob2 := &config.JobConfig{ + Name: "test_job2", + } target1GroupA := &fakeTarget{ schedules: []time.Time{time.Now()}, @@ -65,16 +79,15 @@ func testTargetManager(t test.Tester) { interval: time.Minute, } - targetManager.Add(target1GroupA) - targetManager.Add(target2GroupA) + targetManager.AddTarget(testJob1, target1GroupA, 0) + targetManager.AddTarget(testJob1, target2GroupA, 0) target1GroupB := &fakeTarget{ schedules: []time.Time{time.Now()}, interval: time.Minute * 2, } - targetManager.Add(target1GroupB) - + targetManager.AddTarget(testJob2, target1GroupB, 0) } func TestTargetManager(t *testing.T) { diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index 8fb5f81773..a6b43b9de5 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -1,9 +1,22 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package retrieval import ( - "container/heap" "github.com/prometheus/prometheus/retrieval/format" "log" + "sort" "time" ) @@ -12,14 +25,18 @@ const ( ) type TargetPool struct { - done chan bool - manager TargetManager - targets []Target + done chan bool + manager TargetManager + targets []Target + addTargetQueue chan Target + replaceTargetsQueue chan []Target } func NewTargetPool(m TargetManager) (p *TargetPool) { return &TargetPool{ - manager: m, + manager: m, + addTargetQueue: make(chan Target), + replaceTargetsQueue: make(chan []Target), } } @@ -31,20 +48,6 @@ func (p TargetPool) Less(i, j int) bool { return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor()) } -func (p *TargetPool) Pop() interface{} { - oldPool := p.targets - futureLength := p.Len() - 1 - element := oldPool[futureLength] - futurePool := oldPool[0:futureLength] - p.targets = futurePool - - return element -} - -func (p *TargetPool) Push(element interface{}) { - p.targets = append(p.targets, element.(Target)) -} - func (p TargetPool) Swap(i, j int) { p.targets[i], p.targets[j] = p.targets[j], p.targets[i] } @@ -56,6 +59,10 @@ func (p *TargetPool) Run(results chan format.Result, interval time.Duration) { select { case <-ticker: p.runIteration(results, interval) + case newTarget := <-p.addTargetQueue: + p.addTarget(newTarget) + case newTargets := <-p.replaceTargetsQueue: + p.replaceTargets(newTargets) case <-p.done: log.Printf("TargetPool exiting...") break @@ -67,6 +74,33 @@ func (p TargetPool) Stop() { p.done <- true } +func (p *TargetPool) AddTarget(target Target) { + p.addTargetQueue <- target +} + +func (p *TargetPool) addTarget(target Target) { + p.targets = append(p.targets, target) +} + +func (p *TargetPool) ReplaceTargets(newTargets []Target) { + p.replaceTargetsQueue <- newTargets +} + +func (p *TargetPool) replaceTargets(newTargets []Target) { + // Replace old target list by new one, but reuse those targets from the old + // list of targets which are also in the new list (to preserve scheduling and + // health state). + for j, newTarget := range newTargets { + for _, oldTarget := range p.targets { + if oldTarget.Address() == newTarget.Address() { + oldTarget.Merge(newTargets[j]) + newTargets[j] = oldTarget + } + } + } + p.targets = newTargets +} + func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) { p.manager.acquire() defer p.manager.release() @@ -80,33 +114,35 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura targetCount := p.Len() finished := make(chan bool, targetCount) - for i := 0; i < targetCount; i++ { - target := heap.Pop(p).(Target) - if target == nil { - break - } + // Sort p.targets by next scheduling time so we can process the earliest + // targets first. + sort.Sort(p) + for _, target := range p.targets { now := time.Now() if target.scheduledFor().After(now) { - heap.Push(p, target) // None of the remaining targets are ready to be scheduled. Signal that // we're done processing them in this scrape iteration. - for j := i; j < targetCount; j++ { - finished <- true - } - break + finished <- true + continue } - go func() { - p.runSingle(now, results, target) - heap.Push(p, target) + go func(t Target) { + p.runSingle(now, results, t) finished <- true - }() + }(target) } - for i := 0; i < targetCount; i++ { - <-finished + for i := 0; i < targetCount; { + select { + case <-finished: + i++ + case newTarget := <-p.addTargetQueue: + p.addTarget(newTarget) + case newTargets := <-p.replaceTargetsQueue: + p.replaceTargets(newTargets) + } } close(finished) @@ -114,3 +150,8 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura duration := float64(time.Since(begin) / time.Millisecond) retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration) } + +// XXX: Not really thread-safe. Only used in /status page for now. +func (p *TargetPool) Targets() []Target { + return p.targets +} diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index a6ff13d0e8..871421799b 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -14,9 +14,9 @@ package retrieval import ( - "container/heap" "github.com/prometheus/prometheus/retrieval/format" "github.com/prometheus/prometheus/utility/test" + "sort" "testing" "time" ) @@ -111,34 +111,20 @@ func testTargetPool(t test.Tester) { scheduler: literalScheduler(input.scheduledFor), } - heap.Push(&pool, &target) + pool.addTarget(&target) } - - targets := []Target{} + sort.Sort(pool) if pool.Len() != len(scenario.outputs) { t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len()) } else { for j, output := range scenario.outputs { - target := heap.Pop(&pool).(Target) + target := pool.targets[j] if target.Address() != output.address { t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address()) } - targets = append(targets, target) - } - - if pool.Len() != 0 { - t.Errorf("%s %d. expected pool to be empty, had %d", scenario.name, i, pool.Len()) - } - - if len(targets) != len(scenario.outputs) { - t.Errorf("%s %d. expected to receive %d elements, got %d", scenario.name, i, len(scenario.outputs), len(targets)) - } - - for _, target := range targets { - heap.Push(&pool, target) } if pool.Len() != len(scenario.outputs) { @@ -158,7 +144,7 @@ func TestTargetPoolIterationWithUnhealthyTargetsFinishes(t *testing.T) { address: "http://example.com/metrics.json", scheduler: literalScheduler(time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)), } - pool.Push(target) + pool.addTarget(target) done := make(chan bool) go func() { @@ -174,6 +160,49 @@ func TestTargetPoolIterationWithUnhealthyTargetsFinishes(t *testing.T) { } } +func TestTargetPoolReplaceTargets(t *testing.T) { + pool := TargetPool{} + oldTarget1 := &target{ + address: "http://example1.com/metrics.json", + scheduler: literalScheduler(time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)), + state: UNREACHABLE, + } + oldTarget2 := &target{ + address: "http://example2.com/metrics.json", + scheduler: literalScheduler(time.Date(7500, 1, 1, 0, 0, 0, 0, time.UTC)), + state: UNREACHABLE, + } + newTarget1 := &target{ + address: "http://example1.com/metrics.json", + scheduler: literalScheduler(time.Date(5000, 1, 1, 0, 0, 0, 0, time.UTC)), + state: ALIVE, + } + newTarget2 := &target{ + address: "http://example3.com/metrics.json", + scheduler: literalScheduler(time.Date(2500, 1, 1, 0, 0, 0, 0, time.UTC)), + state: ALIVE, + } + + pool.addTarget(oldTarget1) + pool.addTarget(oldTarget2) + + pool.replaceTargets([]Target{newTarget1, newTarget2}) + sort.Sort(pool) + + if pool.Len() != 2 { + t.Errorf("Expected 2 elements in pool, had %d", pool.Len()) + } + + target1 := pool.targets[0].(*target) + if target1.state != newTarget1.state { + t.Errorf("Wrong first target returned from pool, expected %v, got %v", newTarget2, target1) + } + target2 := pool.targets[1].(*target) + if target2.state != oldTarget1.state { + t.Errorf("Wrong second target returned from pool, expected %v, got %v", oldTarget1, target2) + } +} + func BenchmarkTargetPool(b *testing.B) { for i := 0; i < b.N; i++ { testTargetPool(b) diff --git a/web/api/api.go b/web/api/api.go index 7c7b6155af..b0da2de194 100644 --- a/web/api/api.go +++ b/web/api/api.go @@ -1,8 +1,21 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package api import ( "code.google.com/p/gorest" - "github.com/prometheus/prometheus/storage/metric" + "github.com/prometheus/prometheus/appstate" "github.com/prometheus/prometheus/utility" ) @@ -13,12 +26,14 @@ type MetricsService struct { queryRange gorest.EndPoint `method:"GET" path:"/query_range?{expr:string}&{end:int64}&{range:int64}&{step:int64}" output:"string"` metrics gorest.EndPoint `method:"GET" path:"/metrics" output:"string"` - persistence metric.MetricPersistence - time utility.Time + setTargets gorest.EndPoint `method:"PUT" path:"/jobs/{jobName:string}/targets" postdata:"[]TargetGroup"` + + appState *appstate.ApplicationState + time utility.Time } -func NewMetricsService(persistence metric.MetricPersistence) *MetricsService { +func NewMetricsService(appState *appstate.ApplicationState) *MetricsService { return &MetricsService{ - persistence: persistence, + appState: appState, } } diff --git a/web/api/query.go b/web/api/query.go index 3041a0bfb8..f007cd97a2 100644 --- a/web/api/query.go +++ b/web/api/query.go @@ -1,3 +1,16 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package api import ( @@ -70,7 +83,7 @@ func (serv MetricsService) QueryRange(Expr string, End int64, Range int64, Step } func (serv MetricsService) Metrics() string { - metricNames, err := serv.persistence.GetAllMetricNames() + metricNames, err := serv.appState.Persistence.GetAllMetricNames() rb := serv.ResponseBuilder() rb.SetContentType(gorest.Application_Json) if err != nil { diff --git a/web/api/targets.go b/web/api/targets.go new file mode 100644 index 0000000000..8140c41204 --- /dev/null +++ b/web/api/targets.go @@ -0,0 +1,48 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/retrieval" + "net/http" + "time" +) + +type TargetGroup struct { + Endpoints []string `json:"endpoints"` + BaseLabels map[string]string `json:"baseLabels"` +} + +func (serv MetricsService) SetTargets(targetGroups []TargetGroup, jobName string) { + if job := serv.appState.Config.GetJobByName(jobName); job == nil { + rb := serv.ResponseBuilder() + rb.SetResponseCode(http.StatusNotFound) + } else { + newTargets := []retrieval.Target{} + for _, targetGroup := range targetGroups { + // Do mandatory map type conversion due to Go shortcomings. + baseLabels := model.LabelSet{} + for label, value := range targetGroup.BaseLabels { + baseLabels[model.LabelName(label)] = model.LabelValue(value) + } + + for _, endpoint := range targetGroup.Endpoints { + newTarget := retrieval.NewTarget(endpoint, time.Second*5, baseLabels) + newTargets = append(newTargets, newTarget) + } + } + serv.appState.TargetManager.ReplaceTargets(job, newTargets, serv.appState.Config.Global.ScrapeInterval) + } +} diff --git a/web/status.go b/web/status.go index 9e01161603..f8a7ed0ae4 100644 --- a/web/status.go +++ b/web/status.go @@ -15,15 +15,16 @@ package web import ( "github.com/prometheus/prometheus/appstate" + "github.com/prometheus/prometheus/retrieval" "html/template" "net/http" ) type PrometheusStatus struct { - Config string - Rules string - Status string - Targets string + Config string + Rules string + Status string + TargetPools map[string]*retrieval.TargetPool } type StatusHandler struct { @@ -32,10 +33,10 @@ type StatusHandler struct { func (h *StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { status := &PrometheusStatus{ - Config: h.appState.Config.ToString(0), - Rules: "TODO: list rules here", - Status: "TODO: add status information here", - Targets: "TODO: list targets here", + Config: h.appState.Config.ToString(0), + Rules: "TODO: list rules here", + Status: "TODO: add status information here", + TargetPools: h.appState.TargetManager.Pools(), } t, _ := template.ParseFiles("web/templates/status.html") t.Execute(w, status) diff --git a/web/templates/status.html b/web/templates/status.html index 8820846d71..7d3bf90720 100644 --- a/web/templates/status.html +++ b/web/templates/status.html @@ -26,8 +26,20 @@