mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-11-04 10:21:02 +01:00 
			
		
		
		
	And a few cases of `EmptyLabels()`. Replacing code which assumes the internal structure of `Labels`. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
		
			
				
	
	
		
			221 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			221 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2017 The Prometheus Authors
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
// http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package remote
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"crypto/md5"
 | 
						|
	"encoding/hex"
 | 
						|
	"fmt"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/go-kit/log"
 | 
						|
	"github.com/prometheus/client_golang/prometheus"
 | 
						|
	"github.com/prometheus/common/model"
 | 
						|
	"gopkg.in/yaml.v2"
 | 
						|
 | 
						|
	"github.com/prometheus/prometheus/config"
 | 
						|
	"github.com/prometheus/prometheus/model/labels"
 | 
						|
	"github.com/prometheus/prometheus/scrape"
 | 
						|
	"github.com/prometheus/prometheus/storage"
 | 
						|
	"github.com/prometheus/prometheus/util/logging"
 | 
						|
)
 | 
						|
 | 
						|
// String constants for instrumentation.
 | 
						|
const (
 | 
						|
	namespace  = "prometheus"
 | 
						|
	subsystem  = "remote_storage"
 | 
						|
	remoteName = "remote_name"
 | 
						|
	endpoint   = "url"
 | 
						|
)
 | 
						|
 | 
						|
type ReadyScrapeManager interface {
 | 
						|
	Get() (*scrape.Manager, error)
 | 
						|
}
 | 
						|
 | 
						|
// startTimeCallback is a callback func that return the oldest timestamp stored in a storage.
 | 
						|
type startTimeCallback func() (int64, error)
 | 
						|
 | 
						|
// Storage represents all the remote read and write endpoints.  It implements
 | 
						|
// storage.Storage.
 | 
						|
type Storage struct {
 | 
						|
	logger *logging.Deduper
 | 
						|
	mtx    sync.Mutex
 | 
						|
 | 
						|
	rws *WriteStorage
 | 
						|
 | 
						|
	// For reads.
 | 
						|
	queryables             []storage.SampleAndChunkQueryable
 | 
						|
	localStartTimeCallback startTimeCallback
 | 
						|
}
 | 
						|
 | 
						|
// NewStorage returns a remote.Storage.
 | 
						|
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage {
 | 
						|
	if l == nil {
 | 
						|
		l = log.NewNopLogger()
 | 
						|
	}
 | 
						|
	logger := logging.Dedupe(l, 1*time.Minute)
 | 
						|
 | 
						|
	s := &Storage{
 | 
						|
		logger:                 logger,
 | 
						|
		localStartTimeCallback: stCallback,
 | 
						|
	}
 | 
						|
	s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm)
 | 
						|
	return s
 | 
						|
}
 | 
						|
 | 
						|
// ApplyConfig updates the state as the new config requires.
 | 
						|
func (s *Storage) ApplyConfig(conf *config.Config) error {
 | 
						|
	s.mtx.Lock()
 | 
						|
	defer s.mtx.Unlock()
 | 
						|
 | 
						|
	if err := s.rws.ApplyConfig(conf); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// Update read clients
 | 
						|
	readHashes := make(map[string]struct{})
 | 
						|
	queryables := make([]storage.SampleAndChunkQueryable, 0, len(conf.RemoteReadConfigs))
 | 
						|
	for _, rrConf := range conf.RemoteReadConfigs {
 | 
						|
		hash, err := toHash(rrConf)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		// Don't allow duplicate remote read configs.
 | 
						|
		if _, ok := readHashes[hash]; ok {
 | 
						|
			return fmt.Errorf("duplicate remote read configs are not allowed, found duplicate for URL: %s", rrConf.URL)
 | 
						|
		}
 | 
						|
		readHashes[hash] = struct{}{}
 | 
						|
 | 
						|
		// Set the queue name to the config hash if the user has not set
 | 
						|
		// a name in their remote write config so we can still differentiate
 | 
						|
		// between queues that have the same remote write endpoint.
 | 
						|
		name := hash[:6]
 | 
						|
		if rrConf.Name != "" {
 | 
						|
			name = rrConf.Name
 | 
						|
		}
 | 
						|
 | 
						|
		c, err := NewReadClient(name, &ClientConfig{
 | 
						|
			URL:              rrConf.URL,
 | 
						|
			Timeout:          rrConf.RemoteTimeout,
 | 
						|
			HTTPClientConfig: rrConf.HTTPClientConfig,
 | 
						|
			Headers:          rrConf.Headers,
 | 
						|
		})
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		externalLabels := conf.GlobalConfig.ExternalLabels
 | 
						|
		if !rrConf.FilterExternalLabels {
 | 
						|
			externalLabels = labels.EmptyLabels()
 | 
						|
		}
 | 
						|
		queryables = append(queryables, NewSampleAndChunkQueryableClient(
 | 
						|
			c,
 | 
						|
			externalLabels,
 | 
						|
			labelsToEqualityMatchers(rrConf.RequiredMatchers),
 | 
						|
			rrConf.ReadRecent,
 | 
						|
			s.localStartTimeCallback,
 | 
						|
		))
 | 
						|
	}
 | 
						|
	s.queryables = queryables
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// StartTime implements the Storage interface.
 | 
						|
func (s *Storage) StartTime() (int64, error) {
 | 
						|
	return int64(model.Latest), nil
 | 
						|
}
 | 
						|
 | 
						|
// Querier returns a storage.MergeQuerier combining the remote client queriers
 | 
						|
// of each configured remote read endpoint.
 | 
						|
// Returned querier will never return error as all queryables are assumed best effort.
 | 
						|
// Additionally all returned queriers ensure that its Select's SeriesSets have ready data after first `Next` invoke.
 | 
						|
// This is because Prometheus (fanout and secondary queries) can't handle the stream failing half way through by design.
 | 
						|
func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
 | 
						|
	s.mtx.Lock()
 | 
						|
	queryables := s.queryables
 | 
						|
	s.mtx.Unlock()
 | 
						|
 | 
						|
	queriers := make([]storage.Querier, 0, len(queryables))
 | 
						|
	for _, queryable := range queryables {
 | 
						|
		q, err := queryable.Querier(ctx, mint, maxt)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		queriers = append(queriers, q)
 | 
						|
	}
 | 
						|
	return storage.NewMergeQuerier(nil, queriers, storage.ChainedSeriesMerge), nil
 | 
						|
}
 | 
						|
 | 
						|
// ChunkQuerier returns a storage.MergeQuerier combining the remote client queriers
 | 
						|
// of each configured remote read endpoint.
 | 
						|
func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
 | 
						|
	s.mtx.Lock()
 | 
						|
	queryables := s.queryables
 | 
						|
	s.mtx.Unlock()
 | 
						|
 | 
						|
	queriers := make([]storage.ChunkQuerier, 0, len(queryables))
 | 
						|
	for _, queryable := range queryables {
 | 
						|
		q, err := queryable.ChunkQuerier(ctx, mint, maxt)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		queriers = append(queriers, q)
 | 
						|
	}
 | 
						|
	return storage.NewMergeChunkQuerier(nil, queriers, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil
 | 
						|
}
 | 
						|
 | 
						|
// Appender implements storage.Storage.
 | 
						|
func (s *Storage) Appender(ctx context.Context) storage.Appender {
 | 
						|
	return s.rws.Appender(ctx)
 | 
						|
}
 | 
						|
 | 
						|
// LowestSentTimestamp returns the lowest sent timestamp across all queues.
 | 
						|
func (s *Storage) LowestSentTimestamp() int64 {
 | 
						|
	return s.rws.LowestSentTimestamp()
 | 
						|
}
 | 
						|
 | 
						|
// Close the background processing of the storage queues.
 | 
						|
func (s *Storage) Close() error {
 | 
						|
	s.logger.Stop()
 | 
						|
	s.mtx.Lock()
 | 
						|
	defer s.mtx.Unlock()
 | 
						|
	return s.rws.Close()
 | 
						|
}
 | 
						|
 | 
						|
func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher {
 | 
						|
	ms := make([]*labels.Matcher, 0, len(ls))
 | 
						|
	for k, v := range ls {
 | 
						|
		ms = append(ms, &labels.Matcher{
 | 
						|
			Type:  labels.MatchEqual,
 | 
						|
			Name:  string(k),
 | 
						|
			Value: string(v),
 | 
						|
		})
 | 
						|
	}
 | 
						|
	return ms
 | 
						|
}
 | 
						|
 | 
						|
// Used for hashing configs and diff'ing hashes in ApplyConfig.
 | 
						|
func toHash(data interface{}) (string, error) {
 | 
						|
	bytes, err := yaml.Marshal(data)
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	hash := md5.Sum(bytes)
 | 
						|
	return hex.EncodeToString(hash[:]), nil
 | 
						|
}
 |