mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-31 16:31:03 +01:00 
			
		
		
		
	* file sd: create and increment an inotify error counter when file-SD is unable to watch files. Additionally, order metrics alphabetically. Signed-off-by: Michael Fuller <mfuller@digitalocean.com> * file.go: consistent naming and help for prometheus_sd_file_watcher_errors_total Signed-off-by: Michael Fuller <mfuller@digitalocean.com> Signed-off-by: Michael Fuller <mfuller@digitalocean.com> Co-authored-by: Michael Fuller <mfuller@digitalocean.com>
		
			
				
	
	
		
			432 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			432 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2015 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package file
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/fsnotify/fsnotify"
 | |
| 	"github.com/go-kit/log"
 | |
| 	"github.com/go-kit/log/level"
 | |
| 	"github.com/grafana/regexp"
 | |
| 	"github.com/prometheus/client_golang/prometheus"
 | |
| 	"github.com/prometheus/common/config"
 | |
| 	"github.com/prometheus/common/model"
 | |
| 	"gopkg.in/yaml.v2"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/discovery"
 | |
| 	"github.com/prometheus/prometheus/discovery/targetgroup"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	fileSDReadErrorsCount = prometheus.NewCounter(
 | |
| 		prometheus.CounterOpts{
 | |
| 			Name: "prometheus_sd_file_read_errors_total",
 | |
| 			Help: "The number of File-SD read errors.",
 | |
| 		})
 | |
| 	fileSDScanDuration = prometheus.NewSummary(
 | |
| 		prometheus.SummaryOpts{
 | |
| 			Name:       "prometheus_sd_file_scan_duration_seconds",
 | |
| 			Help:       "The duration of the File-SD scan in seconds.",
 | |
| 			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
 | |
| 		})
 | |
| 	fileSDTimeStamp        = NewTimestampCollector()
 | |
| 	fileWatcherErrorsCount = prometheus.NewCounter(
 | |
| 		prometheus.CounterOpts{
 | |
| 			Name: "prometheus_sd_file_watcher_errors_total",
 | |
| 			Help: "The number of File-SD errors caused by filesystem watch failures.",
 | |
| 		})
 | |
| 
 | |
| 	patFileSDName = regexp.MustCompile(`^[^*]*(\*[^/]*)?\.(json|yml|yaml|JSON|YML|YAML)$`)
 | |
| 
 | |
| 	// DefaultSDConfig is the default file SD configuration.
 | |
| 	DefaultSDConfig = SDConfig{
 | |
| 		RefreshInterval: model.Duration(5 * time.Minute),
 | |
| 	}
 | |
| )
 | |
| 
 | |
| func init() {
 | |
| 	discovery.RegisterConfig(&SDConfig{})
 | |
| 	prometheus.MustRegister(fileSDReadErrorsCount, fileSDScanDuration, fileSDTimeStamp, fileWatcherErrorsCount)
 | |
| }
 | |
| 
 | |
| // SDConfig is the configuration for file based discovery.
 | |
| type SDConfig struct {
 | |
| 	Files           []string       `yaml:"files"`
 | |
| 	RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
 | |
| }
 | |
| 
 | |
| // Name returns the name of the Config.
 | |
| func (*SDConfig) Name() string { return "file" }
 | |
| 
 | |
| // NewDiscoverer returns a Discoverer for the Config.
 | |
| func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
 | |
| 	return NewDiscovery(c, opts.Logger), nil
 | |
| }
 | |
| 
 | |
| // SetDirectory joins any relative file paths with dir.
 | |
| func (c *SDConfig) SetDirectory(dir string) {
 | |
| 	for i, file := range c.Files {
 | |
| 		c.Files[i] = config.JoinDir(dir, file)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // UnmarshalYAML implements the yaml.Unmarshaler interface.
 | |
| func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
 | |
| 	*c = DefaultSDConfig
 | |
| 	type plain SDConfig
 | |
| 	err := unmarshal((*plain)(c))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if len(c.Files) == 0 {
 | |
| 		return errors.New("file service discovery config must contain at least one path name")
 | |
| 	}
 | |
| 	for _, name := range c.Files {
 | |
| 		if !patFileSDName.MatchString(name) {
 | |
| 			return fmt.Errorf("path name %q is not valid for file discovery", name)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| const fileSDFilepathLabel = model.MetaLabelPrefix + "filepath"
 | |
| 
 | |
| // TimestampCollector is a Custom Collector for Timestamps of the files.
 | |
| type TimestampCollector struct {
 | |
| 	Description *prometheus.Desc
 | |
| 	discoverers map[*Discovery]struct{}
 | |
| 	lock        sync.RWMutex
 | |
| }
 | |
| 
 | |
| // Describe method sends the description to the channel.
 | |
| func (t *TimestampCollector) Describe(ch chan<- *prometheus.Desc) {
 | |
| 	ch <- t.Description
 | |
| }
 | |
| 
 | |
| // Collect creates constant metrics for each file with last modified time of the file.
 | |
| func (t *TimestampCollector) Collect(ch chan<- prometheus.Metric) {
 | |
| 	// New map to dedup filenames.
 | |
| 	uniqueFiles := make(map[string]float64)
 | |
| 	t.lock.RLock()
 | |
| 	for fileSD := range t.discoverers {
 | |
| 		fileSD.lock.RLock()
 | |
| 		for filename, timestamp := range fileSD.timestamps {
 | |
| 			uniqueFiles[filename] = timestamp
 | |
| 		}
 | |
| 		fileSD.lock.RUnlock()
 | |
| 	}
 | |
| 	t.lock.RUnlock()
 | |
| 	for filename, timestamp := range uniqueFiles {
 | |
| 		ch <- prometheus.MustNewConstMetric(
 | |
| 			t.Description,
 | |
| 			prometheus.GaugeValue,
 | |
| 			timestamp,
 | |
| 			filename,
 | |
| 		)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (t *TimestampCollector) addDiscoverer(disc *Discovery) {
 | |
| 	t.lock.Lock()
 | |
| 	t.discoverers[disc] = struct{}{}
 | |
| 	t.lock.Unlock()
 | |
| }
 | |
| 
 | |
| func (t *TimestampCollector) removeDiscoverer(disc *Discovery) {
 | |
| 	t.lock.Lock()
 | |
| 	delete(t.discoverers, disc)
 | |
| 	t.lock.Unlock()
 | |
| }
 | |
| 
 | |
| // NewTimestampCollector creates a TimestampCollector.
 | |
| func NewTimestampCollector() *TimestampCollector {
 | |
| 	return &TimestampCollector{
 | |
| 		Description: prometheus.NewDesc(
 | |
| 			"prometheus_sd_file_mtime_seconds",
 | |
| 			"Timestamp (mtime) of files read by FileSD. Timestamp is set at read time.",
 | |
| 			[]string{"filename"},
 | |
| 			nil,
 | |
| 		),
 | |
| 		discoverers: make(map[*Discovery]struct{}),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Discovery provides service discovery functionality based
 | |
| // on files that contain target groups in JSON or YAML format. Refreshing
 | |
| // happens using file watches and periodic refreshes.
 | |
| type Discovery struct {
 | |
| 	paths      []string
 | |
| 	watcher    *fsnotify.Watcher
 | |
| 	interval   time.Duration
 | |
| 	timestamps map[string]float64
 | |
| 	lock       sync.RWMutex
 | |
| 
 | |
| 	// lastRefresh stores which files were found during the last refresh
 | |
| 	// and how many target groups they contained.
 | |
| 	// This is used to detect deleted target groups.
 | |
| 	lastRefresh map[string]int
 | |
| 	logger      log.Logger
 | |
| }
 | |
| 
 | |
| // NewDiscovery returns a new file discovery for the given paths.
 | |
| func NewDiscovery(conf *SDConfig, logger log.Logger) *Discovery {
 | |
| 	if logger == nil {
 | |
| 		logger = log.NewNopLogger()
 | |
| 	}
 | |
| 
 | |
| 	disc := &Discovery{
 | |
| 		paths:      conf.Files,
 | |
| 		interval:   time.Duration(conf.RefreshInterval),
 | |
| 		timestamps: make(map[string]float64),
 | |
| 		logger:     logger,
 | |
| 	}
 | |
| 	fileSDTimeStamp.addDiscoverer(disc)
 | |
| 	return disc
 | |
| }
 | |
| 
 | |
| // listFiles returns a list of all files that match the configured patterns.
 | |
| func (d *Discovery) listFiles() []string {
 | |
| 	var paths []string
 | |
| 	for _, p := range d.paths {
 | |
| 		files, err := filepath.Glob(p)
 | |
| 		if err != nil {
 | |
| 			level.Error(d.logger).Log("msg", "Error expanding glob", "glob", p, "err", err)
 | |
| 			continue
 | |
| 		}
 | |
| 		paths = append(paths, files...)
 | |
| 	}
 | |
| 	return paths
 | |
| }
 | |
| 
 | |
| // watchFiles sets watches on all full paths or directories that were configured for
 | |
| // this file discovery.
 | |
| func (d *Discovery) watchFiles() {
 | |
| 	if d.watcher == nil {
 | |
| 		panic("no watcher configured")
 | |
| 	}
 | |
| 	for _, p := range d.paths {
 | |
| 		if idx := strings.LastIndex(p, "/"); idx > -1 {
 | |
| 			p = p[:idx]
 | |
| 		} else {
 | |
| 			p = "./"
 | |
| 		}
 | |
| 		if err := d.watcher.Add(p); err != nil {
 | |
| 			level.Error(d.logger).Log("msg", "Error adding file watch", "path", p, "err", err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Run implements the Discoverer interface.
 | |
| func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
 | |
| 	watcher, err := fsnotify.NewWatcher()
 | |
| 	if err != nil {
 | |
| 		level.Error(d.logger).Log("msg", "Error adding file watcher", "err", err)
 | |
| 		fileWatcherErrorsCount.Inc()
 | |
| 		return
 | |
| 	}
 | |
| 	d.watcher = watcher
 | |
| 	defer d.stop()
 | |
| 
 | |
| 	d.refresh(ctx, ch)
 | |
| 
 | |
| 	ticker := time.NewTicker(d.interval)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 
 | |
| 		case event := <-d.watcher.Events:
 | |
| 			// fsnotify sometimes sends a bunch of events without name or operation.
 | |
| 			// It's unclear what they are and why they are sent - filter them out.
 | |
| 			if len(event.Name) == 0 {
 | |
| 				break
 | |
| 			}
 | |
| 			// Everything but a chmod requires rereading.
 | |
| 			if event.Op^fsnotify.Chmod == 0 {
 | |
| 				break
 | |
| 			}
 | |
| 			// Changes to a file can spawn various sequences of events with
 | |
| 			// different combinations of operations. For all practical purposes
 | |
| 			// this is inaccurate.
 | |
| 			// The most reliable solution is to reload everything if anything happens.
 | |
| 			d.refresh(ctx, ch)
 | |
| 
 | |
| 		case <-ticker.C:
 | |
| 			// Setting a new watch after an update might fail. Make sure we don't lose
 | |
| 			// those files forever.
 | |
| 			d.refresh(ctx, ch)
 | |
| 
 | |
| 		case err := <-d.watcher.Errors:
 | |
| 			if err != nil {
 | |
| 				level.Error(d.logger).Log("msg", "Error watching file", "err", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (d *Discovery) writeTimestamp(filename string, timestamp float64) {
 | |
| 	d.lock.Lock()
 | |
| 	d.timestamps[filename] = timestamp
 | |
| 	d.lock.Unlock()
 | |
| }
 | |
| 
 | |
| func (d *Discovery) deleteTimestamp(filename string) {
 | |
| 	d.lock.Lock()
 | |
| 	delete(d.timestamps, filename)
 | |
| 	d.lock.Unlock()
 | |
| }
 | |
| 
 | |
| // stop shuts down the file watcher.
 | |
| func (d *Discovery) stop() {
 | |
| 	level.Debug(d.logger).Log("msg", "Stopping file discovery...", "paths", fmt.Sprintf("%v", d.paths))
 | |
| 
 | |
| 	done := make(chan struct{})
 | |
| 	defer close(done)
 | |
| 
 | |
| 	fileSDTimeStamp.removeDiscoverer(d)
 | |
| 
 | |
| 	// Closing the watcher will deadlock unless all events and errors are drained.
 | |
| 	go func() {
 | |
| 		for {
 | |
| 			select {
 | |
| 			case <-d.watcher.Errors:
 | |
| 			case <-d.watcher.Events:
 | |
| 				// Drain all events and errors.
 | |
| 			case <-done:
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 	if err := d.watcher.Close(); err != nil {
 | |
| 		level.Error(d.logger).Log("msg", "Error closing file watcher", "paths", fmt.Sprintf("%v", d.paths), "err", err)
 | |
| 	}
 | |
| 
 | |
| 	level.Debug(d.logger).Log("msg", "File discovery stopped")
 | |
| }
 | |
| 
 | |
| // refresh reads all files matching the discovery's patterns and sends the respective
 | |
| // updated target groups through the channel.
 | |
| func (d *Discovery) refresh(ctx context.Context, ch chan<- []*targetgroup.Group) {
 | |
| 	t0 := time.Now()
 | |
| 	defer func() {
 | |
| 		fileSDScanDuration.Observe(time.Since(t0).Seconds())
 | |
| 	}()
 | |
| 	ref := map[string]int{}
 | |
| 	for _, p := range d.listFiles() {
 | |
| 		tgroups, err := d.readFile(p)
 | |
| 		if err != nil {
 | |
| 			fileSDReadErrorsCount.Inc()
 | |
| 
 | |
| 			level.Error(d.logger).Log("msg", "Error reading file", "path", p, "err", err)
 | |
| 			// Prevent deletion down below.
 | |
| 			ref[p] = d.lastRefresh[p]
 | |
| 			continue
 | |
| 		}
 | |
| 		select {
 | |
| 		case ch <- tgroups:
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		ref[p] = len(tgroups)
 | |
| 	}
 | |
| 	// Send empty updates for sources that disappeared.
 | |
| 	for f, n := range d.lastRefresh {
 | |
| 		m, ok := ref[f]
 | |
| 		if !ok || n > m {
 | |
| 			level.Debug(d.logger).Log("msg", "file_sd refresh found file that should be removed", "file", f)
 | |
| 			d.deleteTimestamp(f)
 | |
| 			for i := m; i < n; i++ {
 | |
| 				select {
 | |
| 				case ch <- []*targetgroup.Group{{Source: fileSource(f, i)}}:
 | |
| 				case <-ctx.Done():
 | |
| 					return
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	d.lastRefresh = ref
 | |
| 
 | |
| 	d.watchFiles()
 | |
| }
 | |
| 
 | |
| // readFile reads a JSON or YAML list of targets groups from the file, depending on its
 | |
| // file extension. It returns full configuration target groups.
 | |
| func (d *Discovery) readFile(filename string) ([]*targetgroup.Group, error) {
 | |
| 	fd, err := os.Open(filename)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer fd.Close()
 | |
| 
 | |
| 	content, err := io.ReadAll(fd)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	info, err := fd.Stat()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var targetGroups []*targetgroup.Group
 | |
| 
 | |
| 	switch ext := filepath.Ext(filename); strings.ToLower(ext) {
 | |
| 	case ".json":
 | |
| 		if err := json.Unmarshal(content, &targetGroups); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	case ".yml", ".yaml":
 | |
| 		if err := yaml.UnmarshalStrict(content, &targetGroups); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	default:
 | |
| 		panic(fmt.Errorf("discovery.File.readFile: unhandled file extension %q", ext))
 | |
| 	}
 | |
| 
 | |
| 	for i, tg := range targetGroups {
 | |
| 		if tg == nil {
 | |
| 			err = errors.New("nil target group item found")
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		tg.Source = fileSource(filename, i)
 | |
| 		if tg.Labels == nil {
 | |
| 			tg.Labels = model.LabelSet{}
 | |
| 		}
 | |
| 		tg.Labels[fileSDFilepathLabel] = model.LabelValue(filename)
 | |
| 	}
 | |
| 
 | |
| 	d.writeTimestamp(filename, float64(info.ModTime().Unix()))
 | |
| 
 | |
| 	return targetGroups, nil
 | |
| }
 | |
| 
 | |
| // fileSource returns a source ID for the i-th target group in the file.
 | |
| func fileSource(filename string, i int) string {
 | |
| 	return fmt.Sprintf("%s:%d", filename, i)
 | |
| }
 |