From 65eba080a076ff12a0aa6706a5a467d984d8421b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 12 Feb 2016 15:43:27 +0100 Subject: [PATCH] Cleanup internal target data --- retrieval/target.go | 265 +++++++++++++++++++++------------------ retrieval/target_test.go | 46 +++---- 2 files changed, 167 insertions(+), 144 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index d509c012d0..a1f875ff80 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -119,6 +119,7 @@ type TargetStatus struct { func (ts *TargetStatus) LastError() error { ts.mu.RLock() defer ts.mu.RUnlock() + return ts.lastError } @@ -126,6 +127,7 @@ func (ts *TargetStatus) LastError() error { func (ts *TargetStatus) LastScrape() time.Time { ts.mu.RLock() defer ts.mu.RUnlock() + return ts.lastScrape } @@ -133,18 +135,21 @@ func (ts *TargetStatus) LastScrape() time.Time { func (ts *TargetStatus) Health() TargetHealth { ts.mu.RLock() defer ts.mu.RUnlock() + return ts.health } func (ts *TargetStatus) setLastScrape(t time.Time) { ts.mu.Lock() defer ts.mu.Unlock() + ts.lastScrape = t } func (ts *TargetStatus) setLastError(err error) { ts.mu.Lock() defer ts.mu.Unlock() + if err == nil { ts.health = HealthGood } else { @@ -164,37 +169,26 @@ type Target struct { // Mutex protects the members below. sync.RWMutex - // The HTTP client used to scrape the target's endpoint. - httpClient *http.Client - // url is the URL to be scraped. Its host is immutable. - url *url.URL + + scrapeConfig *config.ScrapeConfig + // Labels before any processing. metaLabels model.LabelSet // Any base labels that are added to this target and its metrics. - baseLabels model.LabelSet - // Internal labels, such as scheme. - internalLabels model.LabelSet - // The time between two scrapes. - scrapeInterval time.Duration - // Whether the target's labels have precedence over the base labels - // assigned by the scraping instance. - honorLabels bool - // Metric relabel configuration. - metricRelabelConfigs []*config.RelabelConfig + labels model.LabelSet + + // The HTTP client used to scrape the target's endpoint. + httpClient *http.Client } // NewTarget creates a reasonably configured target for querying. -func NewTarget(cfg *config.ScrapeConfig, baseLabels, metaLabels model.LabelSet) (*Target, error) { +func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) (*Target, error) { t := &Target{ - url: &url.URL{ - Scheme: string(baseLabels[model.SchemeLabel]), - Host: string(baseLabels[model.AddressLabel]), - }, status: &TargetStatus{}, scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), } - err := t.Update(cfg, baseLabels, metaLabels) + err := t.Update(cfg, labels, metaLabels) return t, err } @@ -207,55 +201,21 @@ func (t *Target) Status() *TargetStatus { // it belongs to. func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.LabelSet) error { t.Lock() - defer t.Unlock() - httpClient, err := newHTTPClient(cfg) - if err != nil { - return fmt.Errorf("cannot create HTTP client: %v", err) - } - t.httpClient = httpClient - - t.url.Scheme = string(baseLabels[model.SchemeLabel]) - t.url.Path = string(baseLabels[model.MetricsPathLabel]) - - t.internalLabels = model.LabelSet{} - t.internalLabels[model.SchemeLabel] = baseLabels[model.SchemeLabel] - t.internalLabels[model.MetricsPathLabel] = baseLabels[model.MetricsPathLabel] - t.internalLabels[model.AddressLabel] = model.LabelValue(t.url.Host) - - params := url.Values{} - - for k, v := range cfg.Params { - params[k] = make([]string, len(v)) - copy(params[k], v) - } - for k, v := range baseLabels { - if strings.HasPrefix(string(k), model.ParamLabelPrefix) { - if len(params[string(k[len(model.ParamLabelPrefix):])]) > 0 { - params[string(k[len(model.ParamLabelPrefix):])][0] = string(v) - } else { - params[string(k[len(model.ParamLabelPrefix):])] = []string{string(v)} - } - t.internalLabels[model.ParamLabelPrefix+k[len(model.ParamLabelPrefix):]] = v - } - } - t.url.RawQuery = params.Encode() - - t.scrapeInterval = time.Duration(cfg.ScrapeInterval) - - t.honorLabels = cfg.HonorLabels + t.scrapeConfig = cfg + t.labels = baseLabels t.metaLabels = metaLabels - t.baseLabels = model.LabelSet{} - // All remaining internal labels will not be part of the label set. - for name, val := range baseLabels { - if !strings.HasPrefix(string(name), model.ReservedLabelPrefix) { - t.baseLabels[name] = val - } + + t.Unlock() + + httpClient, err := t.client() + if err != nil { + return fmt.Errorf("cannot create HTTP client: %s", err) } - if _, ok := t.baseLabels[model.InstanceLabel]; !ok { - t.baseLabels[model.InstanceLabel] = model.LabelValue(t.InstanceIdentifier()) - } - t.metricRelabelConfigs = cfg.MetricRelabelConfigs + t.Lock() + t.httpClient = httpClient + t.Unlock() + return nil } @@ -304,16 +264,105 @@ func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) { } func (t *Target) String() string { - return t.url.Host + return t.host() +} + +func (t *Target) client() (*http.Client, error) { + t.RLock() + defer t.RUnlock() + + return newHTTPClient(t.scrapeConfig) +} + +func (t *Target) interval() time.Duration { + t.RLock() + defer t.RUnlock() + + return time.Duration(t.scrapeConfig.ScrapeInterval) +} + +func (t *Target) timeout() time.Duration { + t.RLock() + defer t.RUnlock() + + return time.Duration(t.scrapeConfig.ScrapeTimeout) +} + +func (t *Target) scheme() string { + t.RLock() + defer t.RUnlock() + + return string(t.labels[model.SchemeLabel]) +} + +func (t *Target) host() string { + t.RLock() + defer t.RUnlock() + + return string(t.labels[model.AddressLabel]) +} + +func (t *Target) path() string { + t.RLock() + defer t.RUnlock() + + return string(t.labels[model.MetricsPathLabel]) +} + +// URL returns a copy of the target's URL. +func (t *Target) URL() *url.URL { + t.RLock() + defer t.RUnlock() + + params := url.Values{} + + for k, v := range t.scrapeConfig.Params { + params[k] = make([]string, len(v)) + copy(params[k], v) + } + for k, v := range t.labels { + if !strings.HasPrefix(string(k), model.ParamLabelPrefix) { + continue + } + ks := string(k[len(model.ParamLabelPrefix):]) + + if len(params[ks]) > 0 { + params[ks][0] = string(v) + } else { + params[ks] = []string{string(v)} + } + } + + return &url.URL{ + Scheme: string(t.labels[model.SchemeLabel]), + Host: string(t.labels[model.AddressLabel]), + Path: string(t.labels[model.MetricsPathLabel]), + RawQuery: params.Encode(), + } +} + +// InstanceIdentifier returns the identifier for the target. +func (t *Target) InstanceIdentifier() string { + return t.host() +} + +func (t *Target) fullLabels() model.LabelSet { + t.RLock() + defer t.RUnlock() + + lset := t.labels.Clone() + + if _, ok := lset[model.InstanceLabel]; !ok { + lset[model.InstanceLabel] = t.labels[model.AddressLabel] + } + return lset } // RunScraper implements Target. func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { defer close(t.scraperStopped) - t.RLock() - lastScrapeInterval := t.scrapeInterval - t.RUnlock() + lastScrapeInterval := t.interval() log.Debugf("Starting scraper for target %v...", t) @@ -353,15 +402,13 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { intervalStr := lastScrapeInterval.String() - t.RLock() // On changed scrape interval the new interval becomes effective // after the next scrape. - if lastScrapeInterval != t.scrapeInterval { + if iv := t.interval(); iv != lastScrapeInterval { ticker.Stop() - ticker = time.NewTicker(t.scrapeInterval) - lastScrapeInterval = t.scrapeInterval + ticker = time.NewTicker(iv) + lastScrapeInterval = iv } - t.RUnlock() targetIntervalLength.WithLabelValues(intervalStr).Observe( float64(took) / float64(time.Second), // Sub-second precision. @@ -389,27 +436,28 @@ func (t *Target) StopScraper() { const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` -func (t *Target) scrape(appender storage.SampleAppender) (err error) { - start := time.Now() - baseLabels := t.BaseLabels() - +func (t *Target) scrape(appender storage.SampleAppender) error { + var ( + err error + start = time.Now() + baseLabels = t.BaseLabels() + ) defer func(appender storage.SampleAppender) { t.status.setLastError(err) recordScrapeHealth(appender, start, baseLabels, t.status.Health(), time.Since(start)) }(appender) t.RLock() - // The relabelAppender has to be inside the label-modifying appenders // so the relabeling rules are applied to the correct label set. - if len(t.metricRelabelConfigs) > 0 { + if len(t.scrapeConfig.MetricRelabelConfigs) > 0 { appender = relabelAppender{ SampleAppender: appender, - relabelings: t.metricRelabelConfigs, + relabelings: t.scrapeConfig.MetricRelabelConfigs, } } - if t.honorLabels { + if t.scrapeConfig.HonorLabels { appender = honorLabelsAppender{ SampleAppender: appender, labels: baseLabels, @@ -422,7 +470,6 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { } httpClient := t.httpClient - t.RUnlock() req, err := http.NewRequest("GET", t.URL().String(), nil) @@ -538,43 +585,22 @@ func (app relabelAppender) Append(s *model.Sample) error { return app.SampleAppender.Append(s) } -// URL returns a copy of the target's URL. -func (t *Target) URL() *url.URL { - t.RLock() - defer t.RUnlock() - - u := &url.URL{} - *u = *t.url - return u -} - -// InstanceIdentifier returns the identifier for the target. -func (t *Target) InstanceIdentifier() string { - return t.url.Host -} - -// fullLabels returns the base labels plus internal labels defining the target. -func (t *Target) fullLabels() model.LabelSet { - t.RLock() - defer t.RUnlock() - lset := make(model.LabelSet, len(t.baseLabels)+len(t.internalLabels)) - for ln, lv := range t.baseLabels { - lset[ln] = lv - } - for k, v := range t.internalLabels { - lset[k] = v - } - return lset -} - // BaseLabels returns a copy of the target's base labels. func (t *Target) BaseLabels() model.LabelSet { t.RLock() defer t.RUnlock() - lset := make(model.LabelSet, len(t.baseLabels)) - for ln, lv := range t.baseLabels { - lset[ln] = lv + + lset := make(model.LabelSet, len(t.labels)) + for ln, lv := range t.labels { + if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) { + lset[ln] = lv + } } + + if _, ok := lset[model.InstanceLabel]; !ok { + lset[model.InstanceLabel] = t.labels[model.AddressLabel] + } + return lset } @@ -582,11 +608,8 @@ func (t *Target) BaseLabels() model.LabelSet { func (t *Target) MetaLabels() model.LabelSet { t.RLock() defer t.RUnlock() - lset := make(model.LabelSet, len(t.metaLabels)) - for ln, lv := range t.metaLabels { - lset[ln] = lv - } - return lset + + return t.metaLabels.Clone() } func recordScrapeHealth( diff --git a/retrieval/target_test.go b/retrieval/target_test.go index bc88ede61a..214f98c83f 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -105,7 +105,7 @@ func TestOverwriteLabels(t *testing.T) { target := newTestTarget(server.URL, time.Second, nil) - target.honorLabels = false + target.scrapeConfig.HonorLabels = false app := &collectResultAppender{} if err := target.scrape(app); err != nil { t.Fatal(err) @@ -117,7 +117,7 @@ func TestOverwriteLabels(t *testing.T) { } } - target.honorLabels = true + target.scrapeConfig.HonorLabels = true app = &collectResultAppender{} if err := target.scrape(app); err != nil { t.Fatal(err) @@ -185,7 +185,7 @@ func TestTargetScrapeMetricRelabelConfigs(t *testing.T) { ) defer server.Close() testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{}) - testTarget.metricRelabelConfigs = []*config.RelabelConfig{ + testTarget.scrapeConfig.MetricRelabelConfigs = []*config.RelabelConfig{ { SourceLabels: model.LabelNames{"__name__"}, Regex: config.MustNewRegexp(".*drop.*"), @@ -216,7 +216,7 @@ func TestTargetScrapeMetricRelabelConfigs(t *testing.T) { Metric: model.Metric{ model.MetricNameLabel: "test_metric_relabel", "foo": "bar", - model.InstanceLabel: model.LabelValue(testTarget.url.Host), + model.InstanceLabel: model.LabelValue(testTarget.host()), }, Timestamp: 0, Value: 0, @@ -225,7 +225,7 @@ func TestTargetScrapeMetricRelabelConfigs(t *testing.T) { { Metric: model.Metric{ model.MetricNameLabel: scrapeHealthMetricName, - model.InstanceLabel: model.LabelValue(testTarget.url.Host), + model.InstanceLabel: model.LabelValue(testTarget.host()), }, Timestamp: 0, Value: 0, @@ -233,7 +233,7 @@ func TestTargetScrapeMetricRelabelConfigs(t *testing.T) { { Metric: model.Metric{ model.MetricNameLabel: scrapeDurationMetricName, - model.InstanceLabel: model.LabelValue(testTarget.url.Host), + model.InstanceLabel: model.LabelValue(testTarget.host()), }, Timestamp: 0, Value: 0, @@ -438,7 +438,8 @@ func TestURLParams(t *testing.T) { model.AddressLabel: model.LabelValue(serverURL.Host), "__param_foo": "bar", }, - nil) + nil, + ) if err != nil { t.Fatal(err) } @@ -448,29 +449,28 @@ func TestURLParams(t *testing.T) { } } -func newTestTarget(targetURL string, deadline time.Duration, baseLabels model.LabelSet) *Target { - cfg := &config.ScrapeConfig{ - ScrapeTimeout: model.Duration(deadline), - } - c, _ := newHTTPClient(cfg) +func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelSet) *Target { + labels = labels.Clone() + labels[model.SchemeLabel] = "http" + labels[model.AddressLabel] = model.LabelValue(strings.TrimLeft(targetURL, "http://")) + labels[model.MetricsPathLabel] = "/metrics" + t := &Target{ - url: &url.URL{ - Scheme: "http", - Host: strings.TrimLeft(targetURL, "http://"), - Path: "/metrics", + scrapeConfig: &config.ScrapeConfig{ + ScrapeInterval: model.Duration(time.Millisecond), + ScrapeTimeout: model.Duration(deadline), }, + labels: labels, status: &TargetStatus{}, - scrapeInterval: 1 * time.Millisecond, - httpClient: c, scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), } - t.baseLabels = model.LabelSet{ - model.InstanceLabel: model.LabelValue(t.InstanceIdentifier()), - } - for baseLabel, baseValue := range baseLabels { - t.baseLabels[baseLabel] = baseValue + + var err error + if t.httpClient, err = t.client(); err != nil { + panic(err) } + return t }