mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-25 14:31:01 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			423 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			423 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package tsdb
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"sort"
 | |
| 	"sync"
 | |
| 	"unicode/utf8"
 | |
| 
 | |
| 	"github.com/prometheus/client_golang/prometheus"
 | |
| 	"github.com/prometheus/prometheus/config"
 | |
| 	"github.com/prometheus/prometheus/pkg/exemplar"
 | |
| 	"github.com/prometheus/prometheus/pkg/labels"
 | |
| 	"github.com/prometheus/prometheus/storage"
 | |
| )
 | |
| 
 | |
| // Indicates that there is no index entry for an exmplar.
 | |
| const noExemplar = -1
 | |
| 
 | |
| type CircularExemplarStorage struct {
 | |
| 	lock      sync.RWMutex
 | |
| 	exemplars []*circularBufferEntry
 | |
| 	nextIndex int
 | |
| 	metrics   *ExemplarMetrics
 | |
| 
 | |
| 	// Map of series labels as a string to index entry, which points to the first
 | |
| 	// and last exemplar for the series in the exemplars circular buffer.
 | |
| 	index map[string]*indexEntry
 | |
| }
 | |
| 
 | |
| type indexEntry struct {
 | |
| 	oldest       int
 | |
| 	newest       int
 | |
| 	seriesLabels labels.Labels
 | |
| }
 | |
| 
 | |
| type circularBufferEntry struct {
 | |
| 	exemplar exemplar.Exemplar
 | |
| 	next     int
 | |
| 	ref      *indexEntry
 | |
| }
 | |
| 
 | |
| type ExemplarMetrics struct {
 | |
| 	exemplarsAppended            prometheus.Counter
 | |
| 	exemplarsInStorage           prometheus.Gauge
 | |
| 	seriesWithExemplarsInStorage prometheus.Gauge
 | |
| 	lastExemplarsTs              prometheus.Gauge
 | |
| 	maxExemplars                 prometheus.Gauge
 | |
| 	outOfOrderExemplars          prometheus.Counter
 | |
| }
 | |
| 
 | |
| func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics {
 | |
| 	m := ExemplarMetrics{
 | |
| 		exemplarsAppended: prometheus.NewCounter(prometheus.CounterOpts{
 | |
| 			Name: "prometheus_tsdb_exemplar_exemplars_appended_total",
 | |
| 			Help: "Total number of appended exemplars.",
 | |
| 		}),
 | |
| 		exemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{
 | |
| 			Name: "prometheus_tsdb_exemplar_exemplars_in_storage",
 | |
| 			Help: "Number of exemplars currently in circular storage.",
 | |
| 		}),
 | |
| 		seriesWithExemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{
 | |
| 			Name: "prometheus_tsdb_exemplar_series_with_exemplars_in_storage",
 | |
| 			Help: "Number of series with exemplars currently in circular storage.",
 | |
| 		}),
 | |
| 		lastExemplarsTs: prometheus.NewGauge(prometheus.GaugeOpts{
 | |
| 			Name: "prometheus_tsdb_exemplar_last_exemplars_timestamp_seconds",
 | |
| 			Help: "The timestamp of the oldest exemplar stored in circular storage. Useful to check for what time" +
 | |
| 				"range the current exemplar buffer limit allows. This usually means the last timestamp" +
 | |
| 				"for all exemplars for a typical setup. This is not true though if one of the series timestamp is in future compared to rest series.",
 | |
| 		}),
 | |
| 		outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{
 | |
| 			Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total",
 | |
| 			Help: "Total number of out of order exemplar ingestion failed attempts.",
 | |
| 		}),
 | |
| 		maxExemplars: prometheus.NewGauge(prometheus.GaugeOpts{
 | |
| 			Name: "prometheus_tsdb_exemplar_max_exemplars",
 | |
| 			Help: "Total number of exemplars the exemplar storage can store, resizeable.",
 | |
| 		}),
 | |
| 	}
 | |
| 
 | |
| 	if reg != nil {
 | |
| 		reg.MustRegister(
 | |
| 			m.exemplarsAppended,
 | |
| 			m.exemplarsInStorage,
 | |
| 			m.seriesWithExemplarsInStorage,
 | |
| 			m.lastExemplarsTs,
 | |
| 			m.outOfOrderExemplars,
 | |
| 			m.maxExemplars,
 | |
| 		)
 | |
| 	}
 | |
| 
 | |
| 	return &m
 | |
| }
 | |
| 
 | |
| // NewCircularExemplarStorage creates an circular in memory exemplar storage.
 | |
| // If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in
 | |
| // 1GB of extra memory, accounting for the fact that this is heap allocated space.
 | |
| // If len <= 0, then the exemplar storage is essentially a noop storage but can later be
 | |
| // resized to store exemplars.
 | |
| func NewCircularExemplarStorage(len int64, m *ExemplarMetrics) (ExemplarStorage, error) {
 | |
| 	if len < 0 {
 | |
| 		len = 0
 | |
| 	}
 | |
| 	c := &CircularExemplarStorage{
 | |
| 		exemplars: make([]*circularBufferEntry, len),
 | |
| 		index:     make(map[string]*indexEntry),
 | |
| 		metrics:   m,
 | |
| 	}
 | |
| 
 | |
| 	c.metrics.maxExemplars.Set(float64(len))
 | |
| 
 | |
| 	return c, nil
 | |
| }
 | |
| 
 | |
| func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error {
 | |
| 	ce.Resize(cfg.StorageConfig.ExemplarsConfig.MaxExemplars)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage {
 | |
| 	return ce
 | |
| }
 | |
| 
 | |
| func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.ExemplarQuerier, error) {
 | |
| 	return ce, nil
 | |
| }
 | |
| 
 | |
| func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQuerier, error) {
 | |
| 	return ce, nil
 | |
| }
 | |
| 
 | |
| // Select returns exemplars for a given set of label matchers.
 | |
| func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) {
 | |
| 	ret := make([]exemplar.QueryResult, 0)
 | |
| 
 | |
| 	if len(ce.exemplars) <= 0 {
 | |
| 		return ret, nil
 | |
| 	}
 | |
| 
 | |
| 	ce.lock.RLock()
 | |
| 	defer ce.lock.RUnlock()
 | |
| 
 | |
| 	// Loop through each index entry, which will point us to first/last exemplar for each series.
 | |
| 	for _, idx := range ce.index {
 | |
| 		var se exemplar.QueryResult
 | |
| 		e := ce.exemplars[idx.oldest]
 | |
| 		if e.exemplar.Ts > end || ce.exemplars[idx.newest].exemplar.Ts < start {
 | |
| 			continue
 | |
| 		}
 | |
| 		if !matchesSomeMatcherSet(idx.seriesLabels, matchers) {
 | |
| 			continue
 | |
| 		}
 | |
| 		se.SeriesLabels = idx.seriesLabels
 | |
| 
 | |
| 		// Loop through all exemplars in the circular buffer for the current series.
 | |
| 		for e.exemplar.Ts <= end {
 | |
| 			if e.exemplar.Ts >= start {
 | |
| 				se.Exemplars = append(se.Exemplars, e.exemplar)
 | |
| 			}
 | |
| 			if e.next == noExemplar {
 | |
| 				break
 | |
| 			}
 | |
| 			e = ce.exemplars[e.next]
 | |
| 		}
 | |
| 		if len(se.Exemplars) > 0 {
 | |
| 			ret = append(ret, se)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	sort.Slice(ret, func(i, j int) bool {
 | |
| 		return labels.Compare(ret[i].SeriesLabels, ret[j].SeriesLabels) < 0
 | |
| 	})
 | |
| 
 | |
| 	return ret, nil
 | |
| }
 | |
| 
 | |
| func matchesSomeMatcherSet(lbls labels.Labels, matchers [][]*labels.Matcher) bool {
 | |
| Outer:
 | |
| 	for _, ms := range matchers {
 | |
| 		for _, m := range ms {
 | |
| 			if !m.Matches(lbls.Get(m.Name)) {
 | |
| 				continue Outer
 | |
| 			}
 | |
| 		}
 | |
| 		return true
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error {
 | |
| 	seriesLabels := l.String()
 | |
| 
 | |
| 	// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
 | |
| 	// Optimize by moving the lock to be per series (& benchmark it).
 | |
| 	ce.lock.RLock()
 | |
| 	defer ce.lock.RUnlock()
 | |
| 	return ce.validateExemplar(seriesLabels, e, false)
 | |
| }
 | |
| 
 | |
| // Not thread safe. The append parameters tells us whether this is an external validation, or internal
 | |
| // as a result of an AddExemplar call, in which case we should update any relevant metrics.
 | |
| func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exemplar, append bool) error {
 | |
| 	if len(ce.exemplars) <= 0 {
 | |
| 		return storage.ErrExemplarsDisabled
 | |
| 	}
 | |
| 
 | |
| 	// Exemplar label length does not include chars involved in text rendering such as quotes
 | |
| 	// equals sign, or commas. See definition of const ExemplarMaxLabelLength.
 | |
| 	labelSetLen := 0
 | |
| 	for _, l := range e.Labels {
 | |
| 		labelSetLen += utf8.RuneCountInString(l.Name)
 | |
| 		labelSetLen += utf8.RuneCountInString(l.Value)
 | |
| 
 | |
| 		if labelSetLen > exemplar.ExemplarMaxLabelSetLength {
 | |
| 			return storage.ErrExemplarLabelLength
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	idx, ok := ce.index[l]
 | |
| 	if !ok {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Check for duplicate vs last stored exemplar for this series.
 | |
| 	// NB these are expected, and appending them is a no-op.
 | |
| 	if ce.exemplars[idx.newest].exemplar.Equals(e) {
 | |
| 		return storage.ErrDuplicateExemplar
 | |
| 	}
 | |
| 
 | |
| 	if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts {
 | |
| 		if append {
 | |
| 			ce.metrics.outOfOrderExemplars.Inc()
 | |
| 		}
 | |
| 		return storage.ErrOutOfOrderExemplar
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it.
 | |
| // Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.
 | |
| func (ce *CircularExemplarStorage) Resize(l int64) int {
 | |
| 	// Accept negative values as just 0 size.
 | |
| 	if l <= 0 {
 | |
| 		l = 0
 | |
| 	}
 | |
| 
 | |
| 	if l == int64(len(ce.exemplars)) {
 | |
| 		return 0
 | |
| 	}
 | |
| 
 | |
| 	ce.lock.Lock()
 | |
| 	defer ce.lock.Unlock()
 | |
| 
 | |
| 	oldBuffer := ce.exemplars
 | |
| 	oldNextIndex := int64(ce.nextIndex)
 | |
| 
 | |
| 	ce.exemplars = make([]*circularBufferEntry, l)
 | |
| 	ce.index = make(map[string]*indexEntry)
 | |
| 	ce.nextIndex = 0
 | |
| 
 | |
| 	// Replay as many entries as needed, starting with oldest first.
 | |
| 	count := int64(len(oldBuffer))
 | |
| 	if l < count {
 | |
| 		count = l
 | |
| 	}
 | |
| 
 | |
| 	migrated := 0
 | |
| 
 | |
| 	if l > 0 && len(oldBuffer) > 0 {
 | |
| 		// Rewind previous next index by count with wrap-around.
 | |
| 		// This math is essentially looking at nextIndex, where we would write the next exemplar to,
 | |
| 		// and find the index in the old exemplar buffer that we should start migrating exemplars from.
 | |
| 		// This way we don't migrate exemplars that would just be overwritten when migrating later exemplars.
 | |
| 		var startIndex int64 = (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer))
 | |
| 
 | |
| 		for i := int64(0); i < count; i++ {
 | |
| 			idx := (startIndex + i) % int64(len(oldBuffer))
 | |
| 			if entry := oldBuffer[idx]; entry != nil {
 | |
| 				ce.migrate(entry)
 | |
| 				migrated++
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	ce.computeMetrics()
 | |
| 	ce.metrics.maxExemplars.Set(float64(l))
 | |
| 
 | |
| 	return migrated
 | |
| }
 | |
| 
 | |
| // migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires
 | |
| // external lock and does not compute metrics.
 | |
| func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) {
 | |
| 	seriesLabels := entry.ref.seriesLabels.String()
 | |
| 
 | |
| 	idx, ok := ce.index[seriesLabels]
 | |
| 	if !ok {
 | |
| 		idx = entry.ref
 | |
| 		idx.oldest = ce.nextIndex
 | |
| 		ce.index[seriesLabels] = idx
 | |
| 	} else {
 | |
| 		entry.ref = idx
 | |
| 		ce.exemplars[idx.newest].next = ce.nextIndex
 | |
| 	}
 | |
| 	idx.newest = ce.nextIndex
 | |
| 
 | |
| 	entry.next = noExemplar
 | |
| 	ce.exemplars[ce.nextIndex] = entry
 | |
| 
 | |
| 	ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
 | |
| }
 | |
| 
 | |
| func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error {
 | |
| 	if len(ce.exemplars) <= 0 {
 | |
| 		return storage.ErrExemplarsDisabled
 | |
| 	}
 | |
| 
 | |
| 	seriesLabels := l.String()
 | |
| 
 | |
| 	// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
 | |
| 	// Optimize by moving the lock to be per series (& benchmark it).
 | |
| 	ce.lock.Lock()
 | |
| 	defer ce.lock.Unlock()
 | |
| 
 | |
| 	err := ce.validateExemplar(seriesLabels, e, true)
 | |
| 	if err != nil {
 | |
| 		if err == storage.ErrDuplicateExemplar {
 | |
| 			// Duplicate exemplar, noop.
 | |
| 			return nil
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	_, ok := ce.index[seriesLabels]
 | |
| 	if !ok {
 | |
| 		ce.index[seriesLabels] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
 | |
| 	} else {
 | |
| 		ce.exemplars[ce.index[seriesLabels].newest].next = ce.nextIndex
 | |
| 	}
 | |
| 
 | |
| 	if prev := ce.exemplars[ce.nextIndex]; prev == nil {
 | |
| 		ce.exemplars[ce.nextIndex] = &circularBufferEntry{}
 | |
| 	} else {
 | |
| 		// There exists exemplar already on this ce.nextIndex entry, drop it, to make place
 | |
| 		// for others.
 | |
| 		prevLabels := prev.ref.seriesLabels.String()
 | |
| 		if prev.next == noExemplar {
 | |
| 			// Last item for this series, remove index entry.
 | |
| 			delete(ce.index, prevLabels)
 | |
| 		} else {
 | |
| 			ce.index[prevLabels].oldest = prev.next
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select)
 | |
| 	// since this is the first exemplar stored for this series.
 | |
| 	ce.exemplars[ce.nextIndex].next = noExemplar
 | |
| 	ce.exemplars[ce.nextIndex].exemplar = e
 | |
| 	ce.exemplars[ce.nextIndex].ref = ce.index[seriesLabels]
 | |
| 	ce.index[seriesLabels].newest = ce.nextIndex
 | |
| 
 | |
| 	ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
 | |
| 
 | |
| 	ce.metrics.exemplarsAppended.Inc()
 | |
| 	ce.computeMetrics()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ce *CircularExemplarStorage) computeMetrics() {
 | |
| 	ce.metrics.seriesWithExemplarsInStorage.Set(float64(len(ce.index)))
 | |
| 
 | |
| 	if len(ce.exemplars) == 0 {
 | |
| 		ce.metrics.exemplarsInStorage.Set(float64(0))
 | |
| 		ce.metrics.lastExemplarsTs.Set(float64(0))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if next := ce.exemplars[ce.nextIndex]; next != nil {
 | |
| 		ce.metrics.exemplarsInStorage.Set(float64(len(ce.exemplars)))
 | |
| 		ce.metrics.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// We did not yet fill the buffer.
 | |
| 	ce.metrics.exemplarsInStorage.Set(float64(ce.nextIndex))
 | |
| 	if ce.exemplars[0] != nil {
 | |
| 		ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // IterateExemplars iterates through all the exemplars from oldest to newest appended and calls
 | |
| // the given function on all of them till the end (or) till the first function call that returns an error.
 | |
| func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error {
 | |
| 	ce.lock.RLock()
 | |
| 	defer ce.lock.RUnlock()
 | |
| 
 | |
| 	idx := ce.nextIndex
 | |
| 	l := len(ce.exemplars)
 | |
| 	for i := 0; i < l; i, idx = i+1, (idx+1)%l {
 | |
| 		if ce.exemplars[idx] == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		err := f(ce.exemplars[idx].ref.seriesLabels, ce.exemplars[idx].exemplar)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |