mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-25 14:31:01 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			553 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			553 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2013 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package retrieval
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"net"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/prometheus/common/log"
 | |
| 	"github.com/prometheus/common/model"
 | |
| 	"golang.org/x/net/context"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/config"
 | |
| 	"github.com/prometheus/prometheus/relabel"
 | |
| 	"github.com/prometheus/prometheus/retrieval/discovery"
 | |
| 	"github.com/prometheus/prometheus/storage"
 | |
| )
 | |
| 
 | |
| // A TargetProvider provides information about target groups. It maintains a set
 | |
| // of sources from which TargetGroups can originate. Whenever a target provider
 | |
| // detects a potential change, it sends the TargetGroup through its provided channel.
 | |
| //
 | |
| // The TargetProvider does not have to guarantee that an actual change happened.
 | |
| // It does guarantee that it sends the new TargetGroup whenever a change happens.
 | |
| //
 | |
| // Providers must initially send all known target groups as soon as it can.
 | |
| type TargetProvider interface {
 | |
| 	// Run hands a channel to the target provider through which it can send
 | |
| 	// updated target groups. The channel must be closed by the target provider
 | |
| 	// if no more updates will be sent.
 | |
| 	// On receiving from done Run must return.
 | |
| 	Run(ctx context.Context, up chan<- []*config.TargetGroup)
 | |
| }
 | |
| 
 | |
| // TargetManager maintains a set of targets, starts and stops their scraping and
 | |
| // creates the new targets based on the target groups it receives from various
 | |
| // target providers.
 | |
| type TargetManager struct {
 | |
| 	appender      storage.SampleAppender
 | |
| 	scrapeConfigs []*config.ScrapeConfig
 | |
| 
 | |
| 	mtx    sync.RWMutex
 | |
| 	ctx    context.Context
 | |
| 	cancel func()
 | |
| 	wg     sync.WaitGroup
 | |
| 
 | |
| 	// Set of unqiue targets by scrape configuration.
 | |
| 	targetSets map[string]*targetSet
 | |
| }
 | |
| 
 | |
| // NewTargetManager creates a new TargetManager.
 | |
| func NewTargetManager(app storage.SampleAppender) *TargetManager {
 | |
| 	return &TargetManager{
 | |
| 		appender:   app,
 | |
| 		targetSets: map[string]*targetSet{},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Run starts background processing to handle target updates.
 | |
| func (tm *TargetManager) Run() {
 | |
| 	log.Info("Starting target manager...")
 | |
| 
 | |
| 	tm.mtx.Lock()
 | |
| 
 | |
| 	tm.ctx, tm.cancel = context.WithCancel(context.Background())
 | |
| 	tm.reload()
 | |
| 
 | |
| 	tm.mtx.Unlock()
 | |
| 
 | |
| 	tm.wg.Wait()
 | |
| }
 | |
| 
 | |
| // Stop all background processing.
 | |
| func (tm *TargetManager) Stop() {
 | |
| 	log.Infoln("Stopping target manager...")
 | |
| 
 | |
| 	tm.mtx.Lock()
 | |
| 	// Cancel the base context, this will cause all target providers to shut down
 | |
| 	// and all in-flight scrapes to abort immmediately.
 | |
| 	// Started inserts will be finished before terminating.
 | |
| 	tm.cancel()
 | |
| 	tm.mtx.Unlock()
 | |
| 
 | |
| 	// Wait for all scrape inserts to complete.
 | |
| 	tm.wg.Wait()
 | |
| 
 | |
| 	log.Debugln("Target manager stopped")
 | |
| }
 | |
| 
 | |
| func (tm *TargetManager) reload() {
 | |
| 	jobs := map[string]struct{}{}
 | |
| 
 | |
| 	// Start new target sets and update existing ones.
 | |
| 	for _, scfg := range tm.scrapeConfigs {
 | |
| 		jobs[scfg.JobName] = struct{}{}
 | |
| 
 | |
| 		ts, ok := tm.targetSets[scfg.JobName]
 | |
| 		if !ok {
 | |
| 			ts = newTargetSet(scfg, tm.appender)
 | |
| 			tm.targetSets[scfg.JobName] = ts
 | |
| 
 | |
| 			tm.wg.Add(1)
 | |
| 
 | |
| 			go func(ts *targetSet) {
 | |
| 				ts.runScraping(tm.ctx)
 | |
| 				tm.wg.Done()
 | |
| 			}(ts)
 | |
| 		} else {
 | |
| 			ts.reload(scfg)
 | |
| 		}
 | |
| 		ts.runProviders(tm.ctx, providersFromConfig(scfg))
 | |
| 	}
 | |
| 
 | |
| 	// Remove old target sets. Waiting for stopping is already guaranteed
 | |
| 	// by the goroutine that started the target set.
 | |
| 	for name, ts := range tm.targetSets {
 | |
| 		if _, ok := jobs[name]; !ok {
 | |
| 			ts.cancel()
 | |
| 			delete(tm.targetSets, name)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Pools returns the targets currently being scraped bucketed by their job name.
 | |
| func (tm *TargetManager) Pools() map[string]Targets {
 | |
| 	tm.mtx.RLock()
 | |
| 	defer tm.mtx.RUnlock()
 | |
| 
 | |
| 	pools := map[string]Targets{}
 | |
| 
 | |
| 	// TODO(fabxc): this is just a hack to maintain compatibility for now.
 | |
| 	for _, ps := range tm.targetSets {
 | |
| 		ps.scrapePool.mtx.RLock()
 | |
| 
 | |
| 		for _, t := range ps.scrapePool.targets {
 | |
| 			job := string(t.Labels()[model.JobLabel])
 | |
| 			pools[job] = append(pools[job], t)
 | |
| 		}
 | |
| 
 | |
| 		ps.scrapePool.mtx.RUnlock()
 | |
| 	}
 | |
| 	for _, targets := range pools {
 | |
| 		sort.Sort(targets)
 | |
| 	}
 | |
| 	return pools
 | |
| }
 | |
| 
 | |
| // ApplyConfig resets the manager's target providers and job configurations as defined
 | |
| // by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
 | |
| func (tm *TargetManager) ApplyConfig(cfg *config.Config) error {
 | |
| 	tm.mtx.Lock()
 | |
| 	defer tm.mtx.Unlock()
 | |
| 
 | |
| 	tm.scrapeConfigs = cfg.ScrapeConfigs
 | |
| 
 | |
| 	if tm.ctx != nil {
 | |
| 		tm.reload()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // targetSet holds several TargetProviders for which the same scrape configuration
 | |
| // is used. It maintains target groups from all given providers and sync them
 | |
| // to a scrape pool.
 | |
| type targetSet struct {
 | |
| 	mtx sync.RWMutex
 | |
| 
 | |
| 	// Sets of targets by a source string that is unique across target providers.
 | |
| 	tgroups map[string][]*Target
 | |
| 
 | |
| 	scrapePool *scrapePool
 | |
| 	config     *config.ScrapeConfig
 | |
| 
 | |
| 	syncCh          chan struct{}
 | |
| 	cancelScraping  func()
 | |
| 	cancelProviders func()
 | |
| }
 | |
| 
 | |
| func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
 | |
| 	ts := &targetSet{
 | |
| 		scrapePool: newScrapePool(cfg, app),
 | |
| 		syncCh:     make(chan struct{}, 1),
 | |
| 		config:     cfg,
 | |
| 	}
 | |
| 	return ts
 | |
| }
 | |
| 
 | |
| func (ts *targetSet) cancel() {
 | |
| 	ts.mtx.RLock()
 | |
| 	defer ts.mtx.RUnlock()
 | |
| 
 | |
| 	if ts.cancelScraping != nil {
 | |
| 		ts.cancelScraping()
 | |
| 	}
 | |
| 	if ts.cancelProviders != nil {
 | |
| 		ts.cancelProviders()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (ts *targetSet) reload(cfg *config.ScrapeConfig) {
 | |
| 	ts.mtx.Lock()
 | |
| 	ts.config = cfg
 | |
| 	ts.mtx.Unlock()
 | |
| 
 | |
| 	ts.scrapePool.reload(cfg)
 | |
| }
 | |
| 
 | |
| func (ts *targetSet) runScraping(ctx context.Context) {
 | |
| 	ctx, ts.cancelScraping = context.WithCancel(ctx)
 | |
| 
 | |
| 	ts.scrapePool.init(ctx)
 | |
| 
 | |
| Loop:
 | |
| 	for {
 | |
| 		// Throttle syncing to once per five seconds.
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			break Loop
 | |
| 		case <-time.After(5 * time.Second):
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			break Loop
 | |
| 		case <-ts.syncCh:
 | |
| 			ts.mtx.RLock()
 | |
| 			ts.sync()
 | |
| 			ts.mtx.RUnlock()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// We want to wait for all pending target scrapes to complete though to ensure there'll
 | |
| 	// be no more storage writes after this point.
 | |
| 	ts.scrapePool.stop()
 | |
| }
 | |
| 
 | |
| func (ts *targetSet) sync() {
 | |
| 	var all []*Target
 | |
| 	for _, targets := range ts.tgroups {
 | |
| 		all = append(all, targets...)
 | |
| 	}
 | |
| 	ts.scrapePool.sync(all)
 | |
| }
 | |
| 
 | |
| func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) {
 | |
| 	// Lock for the entire time. This may mean up to 5 seconds until the full initial set
 | |
| 	// is retrieved and applied.
 | |
| 	// We could release earlier with some tweaks, but this is easier to reason about.
 | |
| 	ts.mtx.Lock()
 | |
| 	defer ts.mtx.Unlock()
 | |
| 
 | |
| 	var wg sync.WaitGroup
 | |
| 
 | |
| 	if ts.cancelProviders != nil {
 | |
| 		ts.cancelProviders()
 | |
| 	}
 | |
| 	ctx, ts.cancelProviders = context.WithCancel(ctx)
 | |
| 
 | |
| 	// (Re-)create a fresh tgroups map to not keep stale targets around. We
 | |
| 	// will retrieve all targets below anyway, so cleaning up everything is
 | |
| 	// safe and doesn't inflict any additional cost.
 | |
| 	ts.tgroups = map[string][]*Target{}
 | |
| 
 | |
| 	for name, prov := range providers {
 | |
| 		wg.Add(1)
 | |
| 
 | |
| 		updates := make(chan []*config.TargetGroup)
 | |
| 
 | |
| 		go func(name string, prov TargetProvider) {
 | |
| 			select {
 | |
| 			case <-ctx.Done():
 | |
| 			case initial, ok := <-updates:
 | |
| 				// Handle the case that a target provider exits and closes the channel
 | |
| 				// before the context is done.
 | |
| 				if !ok {
 | |
| 					break
 | |
| 				}
 | |
| 				// First set of all targets the provider knows.
 | |
| 				for _, tgroup := range initial {
 | |
| 					if tgroup == nil {
 | |
| 						continue
 | |
| 					}
 | |
| 					targets, err := targetsFromGroup(tgroup, ts.config)
 | |
| 					if err != nil {
 | |
| 						log.With("target_group", tgroup).Errorf("Target update failed: %s", err)
 | |
| 						continue
 | |
| 					}
 | |
| 					ts.tgroups[name+"/"+tgroup.Source] = targets
 | |
| 				}
 | |
| 			case <-time.After(5 * time.Second):
 | |
| 				// Initial set didn't arrive. Act as if it was empty
 | |
| 				// and wait for updates later on.
 | |
| 			}
 | |
| 
 | |
| 			wg.Done()
 | |
| 
 | |
| 			// Start listening for further updates.
 | |
| 			for {
 | |
| 				select {
 | |
| 				case <-ctx.Done():
 | |
| 					return
 | |
| 				case tgs, ok := <-updates:
 | |
| 					// Handle the case that a target provider exits and closes the channel
 | |
| 					// before the context is done.
 | |
| 					if !ok {
 | |
| 						return
 | |
| 					}
 | |
| 					for _, tg := range tgs {
 | |
| 						if err := ts.update(name, tg); err != nil {
 | |
| 							log.With("target_group", tg).Errorf("Target update failed: %s", err)
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}(name, prov)
 | |
| 
 | |
| 		go prov.Run(ctx, updates)
 | |
| 	}
 | |
| 
 | |
| 	// We wait for a full initial set of target groups before releasing the mutex
 | |
| 	// to ensure the initial sync is complete and there are no races with subsequent updates.
 | |
| 	wg.Wait()
 | |
| 	// Just signal that there are initial sets to sync now. Actual syncing must only
 | |
| 	// happen in the runScraping loop.
 | |
| 	select {
 | |
| 	case ts.syncCh <- struct{}{}:
 | |
| 	default:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // update handles a target group update from a target provider identified by the name.
 | |
| func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error {
 | |
| 	if tgroup == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	targets, err := targetsFromGroup(tgroup, ts.config)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	ts.mtx.Lock()
 | |
| 	defer ts.mtx.Unlock()
 | |
| 
 | |
| 	ts.tgroups[name+"/"+tgroup.Source] = targets
 | |
| 
 | |
| 	select {
 | |
| 	case ts.syncCh <- struct{}{}:
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // providersFromConfig returns all TargetProviders configured in cfg.
 | |
| func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
 | |
| 	providers := map[string]TargetProvider{}
 | |
| 
 | |
| 	app := func(mech string, i int, tp TargetProvider) {
 | |
| 		providers[fmt.Sprintf("%s/%d", mech, i)] = tp
 | |
| 	}
 | |
| 
 | |
| 	for i, c := range cfg.DNSSDConfigs {
 | |
| 		app("dns", i, discovery.NewDNS(c))
 | |
| 	}
 | |
| 	for i, c := range cfg.FileSDConfigs {
 | |
| 		app("file", i, discovery.NewFileDiscovery(c))
 | |
| 	}
 | |
| 	for i, c := range cfg.ConsulSDConfigs {
 | |
| 		k, err := discovery.NewConsul(c)
 | |
| 		if err != nil {
 | |
| 			log.Errorf("Cannot create Consul discovery: %s", err)
 | |
| 			continue
 | |
| 		}
 | |
| 		app("consul", i, k)
 | |
| 	}
 | |
| 	for i, c := range cfg.MarathonSDConfigs {
 | |
| 		app("marathon", i, discovery.NewMarathon(c))
 | |
| 	}
 | |
| 	for i, c := range cfg.KubernetesSDConfigs {
 | |
| 		k, err := discovery.NewKubernetesDiscovery(c)
 | |
| 		if err != nil {
 | |
| 			log.Errorf("Cannot create Kubernetes discovery: %s", err)
 | |
| 			continue
 | |
| 		}
 | |
| 		app("kubernetes", i, k)
 | |
| 	}
 | |
| 	for i, c := range cfg.ServersetSDConfigs {
 | |
| 		app("serverset", i, discovery.NewServersetDiscovery(c))
 | |
| 	}
 | |
| 	for i, c := range cfg.NerveSDConfigs {
 | |
| 		app("nerve", i, discovery.NewNerveDiscovery(c))
 | |
| 	}
 | |
| 	for i, c := range cfg.EC2SDConfigs {
 | |
| 		app("ec2", i, discovery.NewEC2Discovery(c))
 | |
| 	}
 | |
| 	for i, c := range cfg.GCESDConfigs {
 | |
| 		gced, err := discovery.NewGCEDiscovery(c)
 | |
| 		if err != nil {
 | |
| 			log.Errorf("Cannot initialize GCE discovery: %s", err)
 | |
| 			continue
 | |
| 		}
 | |
| 		app("gce", i, gced)
 | |
| 	}
 | |
| 	for i, c := range cfg.AzureSDConfigs {
 | |
| 		app("azure", i, discovery.NewAzureDiscovery(c))
 | |
| 	}
 | |
| 	if len(cfg.StaticConfigs) > 0 {
 | |
| 		app("static", 0, NewStaticProvider(cfg.StaticConfigs))
 | |
| 	}
 | |
| 
 | |
| 	return providers
 | |
| }
 | |
| 
 | |
| // populateLabels builds a label set from the given label set and scrape configuration.
 | |
| // It returns a label set before relabeling was applied as the second return value.
 | |
| // Returns a nil label set if the target is dropped during relabeling.
 | |
| func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) {
 | |
| 	if _, ok := lset[model.AddressLabel]; !ok {
 | |
| 		return nil, nil, fmt.Errorf("no address")
 | |
| 	}
 | |
| 	// Copy labels into the labelset for the target if they are not
 | |
| 	// set already. Apply the labelsets in order of decreasing precedence.
 | |
| 	scrapeLabels := model.LabelSet{
 | |
| 		model.SchemeLabel:      model.LabelValue(cfg.Scheme),
 | |
| 		model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath),
 | |
| 		model.JobLabel:         model.LabelValue(cfg.JobName),
 | |
| 	}
 | |
| 	for ln, lv := range scrapeLabels {
 | |
| 		if _, ok := lset[ln]; !ok {
 | |
| 			lset[ln] = lv
 | |
| 		}
 | |
| 	}
 | |
| 	// Encode scrape query parameters as labels.
 | |
| 	for k, v := range cfg.Params {
 | |
| 		if len(v) > 0 {
 | |
| 			lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0])
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	preRelabelLabels := lset
 | |
| 	lset = relabel.Process(lset, cfg.RelabelConfigs...)
 | |
| 
 | |
| 	// Check if the target was dropped.
 | |
| 	if lset == nil {
 | |
| 		return nil, nil, nil
 | |
| 	}
 | |
| 
 | |
| 	// addPort checks whether we should add a default port to the address.
 | |
| 	// If the address is not valid, we don't append a port either.
 | |
| 	addPort := func(s string) bool {
 | |
| 		// If we can split, a port exists and we don't have to add one.
 | |
| 		if _, _, err := net.SplitHostPort(s); err == nil {
 | |
| 			return false
 | |
| 		}
 | |
| 		// If adding a port makes it valid, the previous error
 | |
| 		// was not due to an invalid address and we can append a port.
 | |
| 		_, _, err := net.SplitHostPort(s + ":1234")
 | |
| 		return err == nil
 | |
| 	}
 | |
| 	// If it's an address with no trailing port, infer it based on the used scheme.
 | |
| 	if addr := string(lset[model.AddressLabel]); addPort(addr) {
 | |
| 		// Addresses reaching this point are already wrapped in [] if necessary.
 | |
| 		switch lset[model.SchemeLabel] {
 | |
| 		case "http", "":
 | |
| 			addr = addr + ":80"
 | |
| 		case "https":
 | |
| 			addr = addr + ":443"
 | |
| 		default:
 | |
| 			return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
 | |
| 		}
 | |
| 		lset[model.AddressLabel] = model.LabelValue(addr)
 | |
| 	}
 | |
| 	if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Meta labels are deleted after relabelling. Other internal labels propagate to
 | |
| 	// the target which decides whether they will be part of their label set.
 | |
| 	for ln := range lset {
 | |
| 		if strings.HasPrefix(string(ln), model.MetaLabelPrefix) {
 | |
| 			delete(lset, ln)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Default the instance label to the target address.
 | |
| 	if _, ok := lset[model.InstanceLabel]; !ok {
 | |
| 		lset[model.InstanceLabel] = lset[model.AddressLabel]
 | |
| 	}
 | |
| 	return lset, preRelabelLabels, nil
 | |
| }
 | |
| 
 | |
| // targetsFromGroup builds targets based on the given TargetGroup and config.
 | |
| func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
 | |
| 	targets := make([]*Target, 0, len(tg.Targets))
 | |
| 
 | |
| 	for i, lset := range tg.Targets {
 | |
| 		// Combine target labels with target group labels.
 | |
| 		for ln, lv := range tg.Labels {
 | |
| 			if _, ok := lset[ln]; !ok {
 | |
| 				lset[ln] = lv
 | |
| 			}
 | |
| 		}
 | |
| 		labels, origLabels, err := populateLabels(lset, cfg)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err)
 | |
| 		}
 | |
| 		if labels != nil {
 | |
| 			targets = append(targets, NewTarget(labels, origLabels, cfg.Params))
 | |
| 		}
 | |
| 	}
 | |
| 	return targets, nil
 | |
| }
 | |
| 
 | |
| // StaticProvider holds a list of target groups that never change.
 | |
| type StaticProvider struct {
 | |
| 	TargetGroups []*config.TargetGroup
 | |
| }
 | |
| 
 | |
| // NewStaticProvider returns a StaticProvider configured with the given
 | |
| // target groups.
 | |
| func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
 | |
| 	for i, tg := range groups {
 | |
| 		tg.Source = fmt.Sprintf("%d", i)
 | |
| 	}
 | |
| 	return &StaticProvider{groups}
 | |
| }
 | |
| 
 | |
| // Run implements the TargetProvider interface.
 | |
| func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
 | |
| 	// We still have to consider that the consumer exits right away in which case
 | |
| 	// the context will be canceled.
 | |
| 	select {
 | |
| 	case ch <- sd.TargetGroups:
 | |
| 	case <-ctx.Done():
 | |
| 	}
 | |
| 	close(ch)
 | |
| }
 |