mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-11-04 02:11:01 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			765 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			765 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2021 The Prometheus Authors
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
// http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package tsdb
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"math"
 | 
						|
 | 
						|
	"github.com/go-kit/log/level"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
 | 
						|
	"github.com/prometheus/prometheus/pkg/exemplar"
 | 
						|
	"github.com/prometheus/prometheus/pkg/histogram"
 | 
						|
	"github.com/prometheus/prometheus/pkg/labels"
 | 
						|
	"github.com/prometheus/prometheus/storage"
 | 
						|
	"github.com/prometheus/prometheus/tsdb/chunkenc"
 | 
						|
	"github.com/prometheus/prometheus/tsdb/chunks"
 | 
						|
	"github.com/prometheus/prometheus/tsdb/record"
 | 
						|
)
 | 
						|
 | 
						|
// initAppender is a helper to initialize the time bounds of the head
 | 
						|
// upon the first sample it receives.
 | 
						|
type initAppender struct {
 | 
						|
	app  storage.Appender
 | 
						|
	head *Head
 | 
						|
}
 | 
						|
 | 
						|
var _ storage.GetRef = &initAppender{}
 | 
						|
 | 
						|
func (a *initAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
 | 
						|
	if a.app != nil {
 | 
						|
		return a.app.Append(ref, lset, t, v)
 | 
						|
	}
 | 
						|
 | 
						|
	a.head.initTime(t)
 | 
						|
	a.app = a.head.appender()
 | 
						|
	return a.app.Append(ref, lset, t, v)
 | 
						|
}
 | 
						|
 | 
						|
func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
 | 
						|
	// Check if exemplar storage is enabled.
 | 
						|
	if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 {
 | 
						|
		return 0, nil
 | 
						|
	}
 | 
						|
 | 
						|
	if a.app != nil {
 | 
						|
		return a.app.AppendExemplar(ref, l, e)
 | 
						|
	}
 | 
						|
	// We should never reach here given we would call Append before AppendExemplar
 | 
						|
	// and we probably want to always base head/WAL min time on sample times.
 | 
						|
	a.head.initTime(e.Ts)
 | 
						|
	a.app = a.head.appender()
 | 
						|
 | 
						|
	return a.app.AppendExemplar(ref, l, e)
 | 
						|
}
 | 
						|
 | 
						|
func (a *initAppender) AppendHistogram(ref uint64, l labels.Labels, t int64, sh histogram.SparseHistogram) (uint64, error) {
 | 
						|
	if a.app != nil {
 | 
						|
		return a.app.AppendHistogram(ref, l, t, sh)
 | 
						|
	}
 | 
						|
	a.head.initTime(t)
 | 
						|
	a.app = a.head.appender()
 | 
						|
 | 
						|
	return a.app.AppendHistogram(ref, l, t, sh)
 | 
						|
}
 | 
						|
 | 
						|
// initTime initializes a head with the first timestamp. This only needs to be called
 | 
						|
// for a completely fresh head with an empty WAL.
 | 
						|
func (h *Head) initTime(t int64) {
 | 
						|
	if !h.minTime.CAS(math.MaxInt64, t) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	// Ensure that max time is initialized to at least the min time we just set.
 | 
						|
	// Concurrent appenders may already have set it to a higher value.
 | 
						|
	h.maxTime.CAS(math.MinInt64, t)
 | 
						|
}
 | 
						|
 | 
						|
func (a *initAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) {
 | 
						|
	if g, ok := a.app.(storage.GetRef); ok {
 | 
						|
		return g.GetRef(lset)
 | 
						|
	}
 | 
						|
	return 0, nil
 | 
						|
}
 | 
						|
 | 
						|
func (a *initAppender) Commit() error {
 | 
						|
	if a.app == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	return a.app.Commit()
 | 
						|
}
 | 
						|
 | 
						|
func (a *initAppender) Rollback() error {
 | 
						|
	if a.app == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	return a.app.Rollback()
 | 
						|
}
 | 
						|
 | 
						|
// Appender returns a new Appender on the database.
 | 
						|
func (h *Head) Appender(_ context.Context) storage.Appender {
 | 
						|
	h.metrics.activeAppenders.Inc()
 | 
						|
 | 
						|
	// The head cache might not have a starting point yet. The init appender
 | 
						|
	// picks up the first appended timestamp as the base.
 | 
						|
	if h.MinTime() == math.MaxInt64 {
 | 
						|
		return &initAppender{
 | 
						|
			head: h,
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return h.appender()
 | 
						|
}
 | 
						|
 | 
						|
func (h *Head) appender() *headAppender {
 | 
						|
	appendID, cleanupAppendIDsBelow := h.iso.newAppendID()
 | 
						|
 | 
						|
	// Allocate the exemplars buffer only if exemplars are enabled.
 | 
						|
	var exemplarsBuf []exemplarWithSeriesRef
 | 
						|
	if h.opts.EnableExemplarStorage {
 | 
						|
		exemplarsBuf = h.getExemplarBuffer()
 | 
						|
	}
 | 
						|
 | 
						|
	return &headAppender{
 | 
						|
		head:                  h,
 | 
						|
		minValidTime:          h.appendableMinValidTime(),
 | 
						|
		mint:                  math.MaxInt64,
 | 
						|
		maxt:                  math.MinInt64,
 | 
						|
		samples:               h.getAppendBuffer(),
 | 
						|
		sampleSeries:          h.getSeriesBuffer(),
 | 
						|
		exemplars:             exemplarsBuf,
 | 
						|
		appendID:              appendID,
 | 
						|
		cleanupAppendIDsBelow: cleanupAppendIDsBelow,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (h *Head) appendableMinValidTime() int64 {
 | 
						|
	// Setting the minimum valid time to whichever is greater, the head min valid time or the compaction window,
 | 
						|
	// ensures that no samples will be added within the compaction window to avoid races.
 | 
						|
	return max(h.minValidTime.Load(), h.MaxTime()-h.chunkRange.Load()/2)
 | 
						|
}
 | 
						|
 | 
						|
func max(a, b int64) int64 {
 | 
						|
	if a > b {
 | 
						|
		return a
 | 
						|
	}
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (h *Head) getAppendBuffer() []record.RefSample {
 | 
						|
	b := h.appendPool.Get()
 | 
						|
	if b == nil {
 | 
						|
		return make([]record.RefSample, 0, 512)
 | 
						|
	}
 | 
						|
	return b.([]record.RefSample)
 | 
						|
}
 | 
						|
 | 
						|
func (h *Head) putAppendBuffer(b []record.RefSample) {
 | 
						|
	//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
 | 
						|
	h.appendPool.Put(b[:0])
 | 
						|
}
 | 
						|
 | 
						|
func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef {
 | 
						|
	b := h.exemplarsPool.Get()
 | 
						|
	if b == nil {
 | 
						|
		return make([]exemplarWithSeriesRef, 0, 512)
 | 
						|
	}
 | 
						|
	return b.([]exemplarWithSeriesRef)
 | 
						|
}
 | 
						|
 | 
						|
func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) {
 | 
						|
	if b == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
 | 
						|
	h.exemplarsPool.Put(b[:0])
 | 
						|
}
 | 
						|
 | 
						|
func (h *Head) getSeriesBuffer() []*memSeries {
 | 
						|
	b := h.seriesPool.Get()
 | 
						|
	if b == nil {
 | 
						|
		return make([]*memSeries, 0, 512)
 | 
						|
	}
 | 
						|
	return b.([]*memSeries)
 | 
						|
}
 | 
						|
 | 
						|
func (h *Head) putSeriesBuffer(b []*memSeries) {
 | 
						|
	//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
 | 
						|
	h.seriesPool.Put(b[:0])
 | 
						|
}
 | 
						|
 | 
						|
func (h *Head) getBytesBuffer() []byte {
 | 
						|
	b := h.bytesPool.Get()
 | 
						|
	if b == nil {
 | 
						|
		return make([]byte, 0, 1024)
 | 
						|
	}
 | 
						|
	return b.([]byte)
 | 
						|
}
 | 
						|
 | 
						|
func (h *Head) putBytesBuffer(b []byte) {
 | 
						|
	//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
 | 
						|
	h.bytesPool.Put(b[:0])
 | 
						|
}
 | 
						|
 | 
						|
type exemplarWithSeriesRef struct {
 | 
						|
	ref      uint64
 | 
						|
	exemplar exemplar.Exemplar
 | 
						|
}
 | 
						|
 | 
						|
type headAppender struct {
 | 
						|
	head         *Head
 | 
						|
	minValidTime int64 // No samples below this timestamp are allowed.
 | 
						|
	mint, maxt   int64
 | 
						|
 | 
						|
	series          []record.RefSeries
 | 
						|
	samples         []record.RefSample
 | 
						|
	exemplars       []exemplarWithSeriesRef
 | 
						|
	sampleSeries    []*memSeries
 | 
						|
	histograms      []record.RefHistogram
 | 
						|
	histogramSeries []*memSeries
 | 
						|
 | 
						|
	appendID, cleanupAppendIDsBelow uint64
 | 
						|
	closed                          bool
 | 
						|
}
 | 
						|
 | 
						|
func (a *headAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
 | 
						|
	if t < a.minValidTime {
 | 
						|
		a.head.metrics.outOfBoundSamples.Inc()
 | 
						|
		return 0, storage.ErrOutOfBounds
 | 
						|
	}
 | 
						|
 | 
						|
	s := a.head.series.getByID(ref)
 | 
						|
	if s == nil {
 | 
						|
		// Ensure no empty labels have gotten through.
 | 
						|
		lset = lset.WithoutEmpty()
 | 
						|
		if len(lset) == 0 {
 | 
						|
			return 0, errors.Wrap(ErrInvalidSample, "empty labelset")
 | 
						|
		}
 | 
						|
 | 
						|
		if l, dup := lset.HasDuplicateLabelNames(); dup {
 | 
						|
			return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
 | 
						|
		}
 | 
						|
 | 
						|
		var created bool
 | 
						|
		var err error
 | 
						|
		s, created, err = a.head.getOrCreate(lset.Hash(), lset)
 | 
						|
		if err != nil {
 | 
						|
			return 0, err
 | 
						|
		}
 | 
						|
		if created {
 | 
						|
			a.series = append(a.series, record.RefSeries{
 | 
						|
				Ref:    s.ref,
 | 
						|
				Labels: lset,
 | 
						|
			})
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	s.Lock()
 | 
						|
	if err := s.appendable(t, v); err != nil {
 | 
						|
		s.Unlock()
 | 
						|
		if err == storage.ErrOutOfOrderSample {
 | 
						|
			a.head.metrics.outOfOrderSamples.Inc()
 | 
						|
		}
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	s.pendingCommit = true
 | 
						|
	s.Unlock()
 | 
						|
 | 
						|
	if t < a.mint {
 | 
						|
		a.mint = t
 | 
						|
	}
 | 
						|
	if t > a.maxt {
 | 
						|
		a.maxt = t
 | 
						|
	}
 | 
						|
 | 
						|
	a.samples = append(a.samples, record.RefSample{
 | 
						|
		Ref: s.ref,
 | 
						|
		T:   t,
 | 
						|
		V:   v,
 | 
						|
	})
 | 
						|
	a.sampleSeries = append(a.sampleSeries, s)
 | 
						|
	return s.ref, nil
 | 
						|
}
 | 
						|
 | 
						|
// appendable checks whether the given sample is valid for appending to the series.
 | 
						|
func (s *memSeries) appendable(t int64, v float64) error {
 | 
						|
	c := s.head()
 | 
						|
	if c == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	if t > c.maxTime {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if t < c.maxTime {
 | 
						|
		return storage.ErrOutOfOrderSample
 | 
						|
	}
 | 
						|
	// We are allowing exact duplicates as we can encounter them in valid cases
 | 
						|
	// like federation and erroring out at that time would be extremely noisy.
 | 
						|
	if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
 | 
						|
		return storage.ErrDuplicateSampleForTimestamp
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// appendableHistogram checks whether the given sample is valid for appending to the series.
 | 
						|
func (s *memSeries) appendableHistogram(t int64, sh histogram.SparseHistogram) error {
 | 
						|
	c := s.head()
 | 
						|
	if c == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	if t > c.maxTime {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if t < c.maxTime {
 | 
						|
		return storage.ErrOutOfOrderSample
 | 
						|
	}
 | 
						|
	// TODO: do it for histogram.
 | 
						|
	// We are allowing exact duplicates as we can encounter them in valid cases
 | 
						|
	// like federation and erroring out at that time would be extremely noisy.
 | 
						|
	//if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
 | 
						|
	//	return storage.ErrDuplicateSampleForTimestamp
 | 
						|
	//}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't
 | 
						|
// use getOrCreate or make any of the lset sanity checks that Append does.
 | 
						|
func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exemplar) (uint64, error) {
 | 
						|
	// Check if exemplar storage is enabled.
 | 
						|
	if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 {
 | 
						|
		return 0, nil
 | 
						|
	}
 | 
						|
	s := a.head.series.getByID(ref)
 | 
						|
	if s == nil {
 | 
						|
		return 0, fmt.Errorf("unknown series ref. when trying to add exemplar: %d", ref)
 | 
						|
	}
 | 
						|
 | 
						|
	// Ensure no empty labels have gotten through.
 | 
						|
	e.Labels = e.Labels.WithoutEmpty()
 | 
						|
 | 
						|
	err := a.head.exemplars.ValidateExemplar(s.lset, e)
 | 
						|
	if err != nil {
 | 
						|
		if err == storage.ErrDuplicateExemplar || err == storage.ErrExemplarsDisabled {
 | 
						|
			// Duplicate, don't return an error but don't accept the exemplar.
 | 
						|
			return 0, nil
 | 
						|
		}
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
 | 
						|
	a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e})
 | 
						|
 | 
						|
	return s.ref, nil
 | 
						|
}
 | 
						|
 | 
						|
func (a *headAppender) AppendHistogram(ref uint64, lset labels.Labels, t int64, sh histogram.SparseHistogram) (uint64, error) {
 | 
						|
	if t < a.minValidTime {
 | 
						|
		a.head.metrics.outOfBoundSamples.Inc()
 | 
						|
		return 0, storage.ErrOutOfBounds
 | 
						|
	}
 | 
						|
 | 
						|
	s := a.head.series.getByID(ref)
 | 
						|
	if s == nil {
 | 
						|
		// Ensure no empty labels have gotten through.
 | 
						|
		lset = lset.WithoutEmpty()
 | 
						|
		if len(lset) == 0 {
 | 
						|
			return 0, errors.Wrap(ErrInvalidSample, "empty labelset")
 | 
						|
		}
 | 
						|
 | 
						|
		if l, dup := lset.HasDuplicateLabelNames(); dup {
 | 
						|
			return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
 | 
						|
		}
 | 
						|
 | 
						|
		var created bool
 | 
						|
		var err error
 | 
						|
		s, created, err = a.head.getOrCreate(lset.Hash(), lset)
 | 
						|
		if err != nil {
 | 
						|
			return 0, err
 | 
						|
		}
 | 
						|
		if created {
 | 
						|
			a.series = append(a.series, record.RefSeries{
 | 
						|
				Ref:    s.ref,
 | 
						|
				Labels: lset,
 | 
						|
			})
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	s.Lock()
 | 
						|
	if err := s.appendableHistogram(t, sh); err != nil {
 | 
						|
		s.Unlock()
 | 
						|
		if err == storage.ErrOutOfOrderSample {
 | 
						|
			a.head.metrics.outOfOrderSamples.Inc()
 | 
						|
		}
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	s.pendingCommit = true
 | 
						|
	s.Unlock()
 | 
						|
 | 
						|
	if t < a.mint {
 | 
						|
		a.mint = t
 | 
						|
	}
 | 
						|
	if t > a.maxt {
 | 
						|
		a.maxt = t
 | 
						|
	}
 | 
						|
 | 
						|
	a.histograms = append(a.histograms, record.RefHistogram{
 | 
						|
		Ref: s.ref,
 | 
						|
		T:   t,
 | 
						|
		H:   sh,
 | 
						|
	})
 | 
						|
	a.histogramSeries = append(a.histogramSeries, s)
 | 
						|
	return s.ref, nil
 | 
						|
}
 | 
						|
 | 
						|
var _ storage.GetRef = &headAppender{}
 | 
						|
 | 
						|
func (a *headAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) {
 | 
						|
	s := a.head.series.getByHash(lset.Hash(), lset)
 | 
						|
	if s == nil {
 | 
						|
		return 0, nil
 | 
						|
	}
 | 
						|
	// returned labels must be suitable to pass to Append()
 | 
						|
	return s.ref, s.lset
 | 
						|
}
 | 
						|
 | 
						|
func (a *headAppender) log() error {
 | 
						|
	if a.head.wal == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	buf := a.head.getBytesBuffer()
 | 
						|
	defer func() { a.head.putBytesBuffer(buf) }()
 | 
						|
 | 
						|
	var rec []byte
 | 
						|
	var enc record.Encoder
 | 
						|
 | 
						|
	if len(a.series) > 0 {
 | 
						|
		rec = enc.Series(a.series, buf)
 | 
						|
		buf = rec[:0]
 | 
						|
 | 
						|
		if err := a.head.wal.Log(rec); err != nil {
 | 
						|
			return errors.Wrap(err, "log series")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if len(a.samples) > 0 {
 | 
						|
		rec = enc.Samples(a.samples, buf)
 | 
						|
		buf = rec[:0]
 | 
						|
 | 
						|
		if err := a.head.wal.Log(rec); err != nil {
 | 
						|
			return errors.Wrap(err, "log samples")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if len(a.exemplars) > 0 {
 | 
						|
		rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf)
 | 
						|
		buf = rec[:0]
 | 
						|
 | 
						|
		if err := a.head.wal.Log(rec); err != nil {
 | 
						|
			return errors.Wrap(err, "log exemplars")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func exemplarsForEncoding(es []exemplarWithSeriesRef) []record.RefExemplar {
 | 
						|
	ret := make([]record.RefExemplar, 0, len(es))
 | 
						|
	for _, e := range es {
 | 
						|
		ret = append(ret, record.RefExemplar{
 | 
						|
			Ref:    e.ref,
 | 
						|
			T:      e.exemplar.Ts,
 | 
						|
			V:      e.exemplar.Value,
 | 
						|
			Labels: e.exemplar.Labels,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
func (a *headAppender) Commit() (err error) {
 | 
						|
	if a.closed {
 | 
						|
		return ErrAppenderClosed
 | 
						|
	}
 | 
						|
	defer func() { a.closed = true }()
 | 
						|
 | 
						|
	if err := a.log(); err != nil {
 | 
						|
		_ = a.Rollback() // Most likely the same error will happen again.
 | 
						|
		return errors.Wrap(err, "write to WAL")
 | 
						|
	}
 | 
						|
 | 
						|
	// No errors logging to WAL, so pass the exemplars along to the in memory storage.
 | 
						|
	for _, e := range a.exemplars {
 | 
						|
		s := a.head.series.getByID(e.ref)
 | 
						|
		// We don't instrument exemplar appends here, all is instrumented by storage.
 | 
						|
		if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil {
 | 
						|
			if err == storage.ErrOutOfOrderExemplar {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	defer a.head.metrics.activeAppenders.Dec()
 | 
						|
	defer a.head.putAppendBuffer(a.samples)
 | 
						|
	defer a.head.putSeriesBuffer(a.sampleSeries)
 | 
						|
	defer a.head.putExemplarBuffer(a.exemplars)
 | 
						|
	defer a.head.iso.closeAppend(a.appendID)
 | 
						|
 | 
						|
	total := len(a.samples)
 | 
						|
	var series *memSeries
 | 
						|
	for i, s := range a.samples {
 | 
						|
		series = a.sampleSeries[i]
 | 
						|
		series.Lock()
 | 
						|
		ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper)
 | 
						|
		series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
 | 
						|
		series.pendingCommit = false
 | 
						|
		series.Unlock()
 | 
						|
 | 
						|
		if !ok {
 | 
						|
			total--
 | 
						|
			a.head.metrics.outOfOrderSamples.Inc()
 | 
						|
		}
 | 
						|
		if chunkCreated {
 | 
						|
			a.head.metrics.chunks.Inc()
 | 
						|
			a.head.metrics.chunksCreated.Inc()
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	total += len(a.histograms) // TODO: different metric?
 | 
						|
	for i, s := range a.histograms {
 | 
						|
		series = a.histogramSeries[i]
 | 
						|
		series.Lock()
 | 
						|
		a.head.hasHistograms.Store(true)
 | 
						|
		ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper)
 | 
						|
		series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
 | 
						|
		series.pendingCommit = false
 | 
						|
		series.Unlock()
 | 
						|
 | 
						|
		if !ok {
 | 
						|
			total--
 | 
						|
			a.head.metrics.outOfOrderSamples.Inc()
 | 
						|
		}
 | 
						|
		if chunkCreated {
 | 
						|
			a.head.metrics.chunks.Inc()
 | 
						|
			a.head.metrics.chunksCreated.Inc()
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	a.head.metrics.samplesAppended.Add(float64(total))
 | 
						|
	a.head.updateMinMaxTime(a.mint, a.maxt)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// append adds the sample (t, v) to the series. The caller also has to provide
 | 
						|
// the appendID for isolation. (The appendID can be zero, which results in no
 | 
						|
// isolation for this append.)
 | 
						|
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
 | 
						|
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
 | 
						|
	c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper)
 | 
						|
	if !sampleInOrder {
 | 
						|
		return sampleInOrder, chunkCreated
 | 
						|
	}
 | 
						|
 | 
						|
	s.app.Append(t, v)
 | 
						|
 | 
						|
	c.maxTime = t
 | 
						|
 | 
						|
	s.sampleBuf[0] = s.sampleBuf[1]
 | 
						|
	s.sampleBuf[1] = s.sampleBuf[2]
 | 
						|
	s.sampleBuf[2] = s.sampleBuf[3]
 | 
						|
	s.sampleBuf[3] = sample{t: t, v: v}
 | 
						|
 | 
						|
	if appendID > 0 {
 | 
						|
		s.txs.add(appendID)
 | 
						|
	}
 | 
						|
 | 
						|
	return true, chunkCreated
 | 
						|
}
 | 
						|
 | 
						|
// appendHistogram adds the sparse histogram.
 | 
						|
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
 | 
						|
func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
 | 
						|
	c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper)
 | 
						|
	if !sampleInOrder {
 | 
						|
		return sampleInOrder, chunkCreated
 | 
						|
	}
 | 
						|
 | 
						|
	if !chunkCreated {
 | 
						|
		// Head controls the execution of recoding, so that we own the proper chunk reference afterwards
 | 
						|
		app, _ := s.app.(*chunkenc.HistoAppender)
 | 
						|
		posInterjections, negInterjections, ok := app.Appendable(sh)
 | 
						|
		// we have 3 cases here
 | 
						|
		// !ok -> we need to cut a new chunk
 | 
						|
		// ok but we have interjections -> existing chunk needs recoding before we can append our histogram
 | 
						|
		// ok and no interjections -> chunk is ready to support our histogram
 | 
						|
		if !ok {
 | 
						|
			c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper)
 | 
						|
			chunkCreated = true
 | 
						|
		} else if len(posInterjections) > 0 || len(negInterjections) > 0 {
 | 
						|
			// new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one.
 | 
						|
			chunk, app := app.Recode(posInterjections, negInterjections, sh.PositiveSpans, sh.NegativeSpans)
 | 
						|
			s.headChunk = &memChunk{
 | 
						|
				minTime: s.headChunk.minTime,
 | 
						|
				maxTime: s.headChunk.maxTime,
 | 
						|
				chunk:   chunk,
 | 
						|
			}
 | 
						|
			s.app = app
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	s.app.AppendHistogram(t, sh)
 | 
						|
 | 
						|
	c.maxTime = t
 | 
						|
 | 
						|
	s.histBuf[0] = s.histBuf[1]
 | 
						|
	s.histBuf[1] = s.histBuf[2]
 | 
						|
	s.histBuf[2] = s.histBuf[3]
 | 
						|
	s.histBuf[3] = hist{t: t, h: sh}
 | 
						|
 | 
						|
	if appendID > 0 {
 | 
						|
		s.txs.add(appendID)
 | 
						|
	}
 | 
						|
 | 
						|
	return true, chunkCreated
 | 
						|
}
 | 
						|
 | 
						|
// appendPreprocessor takes care of cutting new chunks and m-mapping old chunks.
 | 
						|
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
 | 
						|
// This should be called only when appending data.
 | 
						|
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper) (c *memChunk, sampleInOrder, chunkCreated bool) {
 | 
						|
	// Based on Gorilla white papers this offers near-optimal compression ratio
 | 
						|
	// so anything bigger that this has diminishing returns and increases
 | 
						|
	// the time range within which we have to decompress all samples.
 | 
						|
	const samplesPerChunk = 120
 | 
						|
 | 
						|
	c = s.head()
 | 
						|
 | 
						|
	if c == nil {
 | 
						|
		if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t {
 | 
						|
			// Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it.
 | 
						|
			return c, false, false
 | 
						|
		}
 | 
						|
		// There is no chunk in this series yet, create the first chunk for the sample.
 | 
						|
		c = s.cutNewHeadChunk(t, e, chunkDiskMapper)
 | 
						|
		chunkCreated = true
 | 
						|
	}
 | 
						|
	numSamples := c.chunk.NumSamples()
 | 
						|
 | 
						|
	// Out of order sample.
 | 
						|
	if c.maxTime >= t {
 | 
						|
		return c, false, chunkCreated
 | 
						|
	}
 | 
						|
	// If we reach 25% of a chunk's desired sample count, set a definitive time
 | 
						|
	// at which to start the next chunk.
 | 
						|
	// At latest it must happen at the timestamp set when the chunk was cut.
 | 
						|
	if numSamples == samplesPerChunk/4 {
 | 
						|
		s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
 | 
						|
	}
 | 
						|
	if t >= s.nextAt {
 | 
						|
		c = s.cutNewHeadChunk(t, e, chunkDiskMapper)
 | 
						|
		chunkCreated = true
 | 
						|
	}
 | 
						|
	return c, true, chunkCreated
 | 
						|
}
 | 
						|
 | 
						|
// computeChunkEndTime estimates the end timestamp based the beginning of a
 | 
						|
// chunk, its current timestamp and the upper bound up to which we insert data.
 | 
						|
// It assumes that the time range is 1/4 full.
 | 
						|
// Assuming that the samples will keep arriving at the same rate, it will make the
 | 
						|
// remaining n chunks within this chunk range (before max) equally sized.
 | 
						|
func computeChunkEndTime(start, cur, max int64) int64 {
 | 
						|
	n := (max - start) / ((cur - start + 1) * 4)
 | 
						|
	if n <= 1 {
 | 
						|
		return max
 | 
						|
	}
 | 
						|
	return start + (max-start)/n
 | 
						|
}
 | 
						|
 | 
						|
func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
 | 
						|
	s.mmapCurrentHeadChunk(chunkDiskMapper)
 | 
						|
 | 
						|
	s.headChunk = &memChunk{
 | 
						|
		minTime: mint,
 | 
						|
		maxTime: math.MinInt64,
 | 
						|
	}
 | 
						|
 | 
						|
	if chunkenc.IsValidEncoding(e) {
 | 
						|
		var err error
 | 
						|
		s.headChunk.chunk, err = chunkenc.NewEmptyChunk(e)
 | 
						|
		if err != nil {
 | 
						|
			panic(err) // This should never happen.
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		s.headChunk.chunk = chunkenc.NewXORChunk()
 | 
						|
	}
 | 
						|
 | 
						|
	// Set upper bound on when the next chunk must be started. An earlier timestamp
 | 
						|
	// may be chosen dynamically at a later point.
 | 
						|
	s.nextAt = rangeForTimestamp(mint, s.chunkRange)
 | 
						|
 | 
						|
	app, err := s.headChunk.chunk.Appender()
 | 
						|
	if err != nil {
 | 
						|
		panic(err)
 | 
						|
	}
 | 
						|
	s.app = app
 | 
						|
	return s.headChunk
 | 
						|
}
 | 
						|
 | 
						|
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) {
 | 
						|
	if s.headChunk == nil || s.headChunk.chunk.NumSamples() == 0 {
 | 
						|
		// There is no head chunk, so nothing to m-map here.
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk)
 | 
						|
	if err != nil {
 | 
						|
		if err != chunks.ErrChunkDiskMapperClosed {
 | 
						|
			panic(err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
 | 
						|
		ref:        chunkRef,
 | 
						|
		numSamples: uint16(s.headChunk.chunk.NumSamples()),
 | 
						|
		minTime:    s.headChunk.minTime,
 | 
						|
		maxTime:    s.headChunk.maxTime,
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func (a *headAppender) Rollback() (err error) {
 | 
						|
	if a.closed {
 | 
						|
		return ErrAppenderClosed
 | 
						|
	}
 | 
						|
	defer func() { a.closed = true }()
 | 
						|
	defer a.head.metrics.activeAppenders.Dec()
 | 
						|
	defer a.head.iso.closeAppend(a.appendID)
 | 
						|
	defer a.head.putSeriesBuffer(a.sampleSeries)
 | 
						|
 | 
						|
	var series *memSeries
 | 
						|
	for i := range a.samples {
 | 
						|
		series = a.sampleSeries[i]
 | 
						|
		series.Lock()
 | 
						|
		series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
 | 
						|
		series.pendingCommit = false
 | 
						|
		series.Unlock()
 | 
						|
	}
 | 
						|
	a.head.putAppendBuffer(a.samples)
 | 
						|
	a.head.putExemplarBuffer(a.exemplars)
 | 
						|
	a.samples = nil
 | 
						|
	a.exemplars = nil
 | 
						|
 | 
						|
	// Series are created in the head memory regardless of rollback. Thus we have
 | 
						|
	// to log them to the WAL in any case.
 | 
						|
	return a.log()
 | 
						|
}
 |