mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-11-04 10:21:02 +01:00 
			
		
		
		
	Benchmarks show slight cpu/allocs improvements. benchmark old ns/op new ns/op delta BenchmarkPostingsForMatchers/Head/n="1"-4 269978625 235305110 -12.84% BenchmarkPostingsForMatchers/Head/n="1",j="foo"-4 129739974 121646193 -6.24% BenchmarkPostingsForMatchers/Head/j="foo",n="1"-4 123826274 122056253 -1.43% BenchmarkPostingsForMatchers/Head/n="1",j!="foo"-4 126962188 130038235 +2.42% BenchmarkPostingsForMatchers/Head/i=~".*"-4 6423653989 5991126455 -6.73% BenchmarkPostingsForMatchers/Head/i=~".+"-4 6934647521 7033370634 +1.42% BenchmarkPostingsForMatchers/Head/i=~""-4 1177781285 1121497736 -4.78% BenchmarkPostingsForMatchers/Head/i!=""-4 7033680256 7246094991 +3.02% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",j="foo"-4 293702332 287440212 -2.13% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",i!="2",j="foo"-4 307628268 307039964 -0.19% BenchmarkPostingsForMatchers/Head/n="1",i!=""-4 512247746 480003862 -6.29% BenchmarkPostingsForMatchers/Head/n="1",i!="",j="foo"-4 361199794 367066917 +1.62% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",j="foo"-4 478863761 476037784 -0.59% BenchmarkPostingsForMatchers/Head/n="1",i=~"1.+",j="foo"-4 103394659 102902098 -0.48% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!="2",j="foo"-4 482552781 475453903 -1.47% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!~"2.*",j="foo"-4 559257389 589297047 +5.37% BenchmarkPostingsForMatchers/Block/n="1"-4 36492 37012 +1.42% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 557788 611903 +9.70% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 554443 573814 +3.49% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 553227 553826 +0.11% BenchmarkPostingsForMatchers/Block/i=~".*"-4 113855090 111707221 -1.89% BenchmarkPostingsForMatchers/Block/i=~".+"-4 133994674 136520728 +1.89% BenchmarkPostingsForMatchers/Block/i=~""-4 38138091 36299898 -4.82% BenchmarkPostingsForMatchers/Block/i!=""-4 28861213 27396723 -5.07% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 112699941 110853868 -1.64% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 113198026 111389742 -1.60% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 28994069 27363804 -5.62% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 29709406 28589223 -3.77% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 134695119 135736971 +0.77% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 26783286 25826928 -3.57% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 134733254 134116739 -0.46% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 160713937 158802768 -1.19% benchmark old allocs new allocs delta BenchmarkPostingsForMatchers/Head/n="1"-4 36 36 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j="foo"-4 38 38 +0.00% BenchmarkPostingsForMatchers/Head/j="foo",n="1"-4 38 38 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j!="foo"-4 42 40 -4.76% BenchmarkPostingsForMatchers/Head/i=~".*"-4 61 59 -3.28% BenchmarkPostingsForMatchers/Head/i=~".+"-4 100088 100087 -0.00% BenchmarkPostingsForMatchers/Head/i=~""-4 100053 100051 -0.00% BenchmarkPostingsForMatchers/Head/i!=""-4 100087 100085 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",j="foo"-4 44 42 -4.55% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",i!="2",j="foo"-4 50 48 -4.00% BenchmarkPostingsForMatchers/Head/n="1",i!=""-4 100076 100074 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i!="",j="foo"-4 100077 100075 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",j="foo"-4 100077 100074 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~"1.+",j="foo"-4 11167 11165 -0.02% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!="2",j="foo"-4 100082 100080 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!~"2.*",j="foo"-4 111265 111261 -0.00% BenchmarkPostingsForMatchers/Block/n="1"-4 6 6 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 11 11 +0.00% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 11 11 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 15 13 -13.33% BenchmarkPostingsForMatchers/Block/i=~".*"-4 12 10 -16.67% BenchmarkPostingsForMatchers/Block/i=~".+"-4 100040 100038 -0.00% BenchmarkPostingsForMatchers/Block/i=~""-4 100045 100043 -0.00% BenchmarkPostingsForMatchers/Block/i!=""-4 100041 100039 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 17 15 -11.76% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 23 21 -8.70% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 100046 100044 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 100050 100048 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 100049 100047 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 11150 11148 -0.02% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 100055 100053 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 111238 111234 -0.00% benchmark old bytes new bytes delta BenchmarkPostingsForMatchers/Head/n="1"-4 10887816 10887817 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j="foo"-4 5456648 5456648 +0.00% BenchmarkPostingsForMatchers/Head/j="foo",n="1"-4 5456648 5456648 +0.00% BenchmarkPostingsForMatchers/Head/n="1",j!="foo"-4 5456792 5456712 -0.00% BenchmarkPostingsForMatchers/Head/i=~".*"-4 258254408 258254328 -0.00% BenchmarkPostingsForMatchers/Head/i=~".+"-4 273912888 273912904 +0.00% BenchmarkPostingsForMatchers/Head/i=~""-4 17266680 17266600 -0.00% BenchmarkPostingsForMatchers/Head/i!=""-4 273912416 273912336 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",j="foo"-4 7062578 7062498 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".*",i!="2",j="foo"-4 7062770 7062690 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i!=""-4 28152346 28152266 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i!="",j="foo"-4 22721178 22721098 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",j="foo"-4 22721336 22721224 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~"1.+",j="foo"-4 3623804 3623733 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!="2",j="foo"-4 22721480 22721400 -0.00% BenchmarkPostingsForMatchers/Head/n="1",i=~".+",i!~"2.*",j="foo"-4 24816652 24816444 -0.00% BenchmarkPostingsForMatchers/Block/n="1"-4 296 296 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 424 424 +0.00% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 424 424 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 1544 1464 -5.18% BenchmarkPostingsForMatchers/Block/i=~".*"-4 1606114 1606045 -0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 17264709 17264629 -0.00% BenchmarkPostingsForMatchers/Block/i=~""-4 17264780 17264696 -0.00% BenchmarkPostingsForMatchers/Block/i!=""-4 17264680 17264600 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 1606253 1606165 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 1606445 1606348 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 17264808 17264728 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 17264936 17264856 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 17264965 17264885 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 3148262 3148182 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 17265141 17265061 -0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 20416944 20416784 -0.00% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
		
			
				
	
	
		
			1957 lines
		
	
	
		
			50 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1957 lines
		
	
	
		
			50 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2017 The Prometheus Authors
 | 
						||
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						||
// you may not use this file except in compliance with the License.
 | 
						||
// You may obtain a copy of the License at
 | 
						||
//
 | 
						||
// http://www.apache.org/licenses/LICENSE-2.0
 | 
						||
//
 | 
						||
// Unless required by applicable law or agreed to in writing, software
 | 
						||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						||
// See the License for the specific language governing permissions and
 | 
						||
// limitations under the License.
 | 
						||
 | 
						||
package tsdb
 | 
						||
 | 
						||
import (
 | 
						||
	"fmt"
 | 
						||
	"math"
 | 
						||
	"runtime"
 | 
						||
	"sort"
 | 
						||
	"strings"
 | 
						||
	"sync"
 | 
						||
	"sync/atomic"
 | 
						||
	"time"
 | 
						||
 | 
						||
	"github.com/go-kit/kit/log"
 | 
						||
	"github.com/go-kit/kit/log/level"
 | 
						||
	"github.com/oklog/ulid"
 | 
						||
	"github.com/pkg/errors"
 | 
						||
	"github.com/prometheus/client_golang/prometheus"
 | 
						||
	"github.com/prometheus/prometheus/pkg/labels"
 | 
						||
	"github.com/prometheus/prometheus/tsdb/chunkenc"
 | 
						||
	"github.com/prometheus/prometheus/tsdb/chunks"
 | 
						||
	"github.com/prometheus/prometheus/tsdb/index"
 | 
						||
	"github.com/prometheus/prometheus/tsdb/record"
 | 
						||
	"github.com/prometheus/prometheus/tsdb/tombstones"
 | 
						||
	"github.com/prometheus/prometheus/tsdb/wal"
 | 
						||
)
 | 
						||
 | 
						||
var (
 | 
						||
	// ErrNotFound is returned if a looked up resource was not found.
 | 
						||
	ErrNotFound = errors.Errorf("not found")
 | 
						||
 | 
						||
	// ErrOutOfOrderSample is returned if an appended sample has a
 | 
						||
	// timestamp smaller than the most recent sample.
 | 
						||
	ErrOutOfOrderSample = errors.New("out of order sample")
 | 
						||
 | 
						||
	// ErrAmendSample is returned if an appended sample has the same timestamp
 | 
						||
	// as the most recent sample but a different value.
 | 
						||
	ErrAmendSample = errors.New("amending sample")
 | 
						||
 | 
						||
	// ErrOutOfBounds is returned if an appended sample is out of the
 | 
						||
	// writable time range.
 | 
						||
	ErrOutOfBounds = errors.New("out of bounds")
 | 
						||
 | 
						||
	// emptyTombstoneReader is a no-op Tombstone Reader.
 | 
						||
	// This is used by head to satisfy the Tombstones() function call.
 | 
						||
	emptyTombstoneReader = tombstones.NewMemTombstones()
 | 
						||
)
 | 
						||
 | 
						||
// Head handles reads and writes of time series data within a time window.
 | 
						||
type Head struct {
 | 
						||
	// Keep all 64bit atomically accessed variables at the top of this struct.
 | 
						||
	// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info.
 | 
						||
	chunkRange       int64
 | 
						||
	numSeries        uint64
 | 
						||
	minTime, maxTime int64 // Current min and max of the samples included in the head.
 | 
						||
	minValidTime     int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
 | 
						||
	lastSeriesID     uint64
 | 
						||
 | 
						||
	metrics    *headMetrics
 | 
						||
	wal        *wal.WAL
 | 
						||
	logger     log.Logger
 | 
						||
	appendPool sync.Pool
 | 
						||
	seriesPool sync.Pool
 | 
						||
	bytesPool  sync.Pool
 | 
						||
 | 
						||
	// All series addressable by their ID or hash.
 | 
						||
	series *stripeSeries
 | 
						||
 | 
						||
	symMtx  sync.RWMutex
 | 
						||
	symbols map[string]struct{}
 | 
						||
	values  map[string]stringset // label names to possible values
 | 
						||
 | 
						||
	deletedMtx sync.Mutex
 | 
						||
	deleted    map[uint64]int // Deleted series, and what WAL segment they must be kept until.
 | 
						||
 | 
						||
	postings *index.MemPostings // postings lists for terms
 | 
						||
 | 
						||
	cardinalityMutex      sync.Mutex
 | 
						||
	cardinalityCache      *index.PostingsStats // posting stats cache which will expire after 30sec
 | 
						||
	lastPostingsStatsCall time.Duration        // last posting stats call (PostingsCardinalityStats()) time for caching
 | 
						||
}
 | 
						||
 | 
						||
type headMetrics struct {
 | 
						||
	activeAppenders         prometheus.Gauge
 | 
						||
	series                  prometheus.GaugeFunc
 | 
						||
	seriesCreated           prometheus.Counter
 | 
						||
	seriesRemoved           prometheus.Counter
 | 
						||
	seriesNotFound          prometheus.Counter
 | 
						||
	chunks                  prometheus.Gauge
 | 
						||
	chunksCreated           prometheus.Counter
 | 
						||
	chunksRemoved           prometheus.Counter
 | 
						||
	gcDuration              prometheus.Summary
 | 
						||
	minTime                 prometheus.GaugeFunc
 | 
						||
	maxTime                 prometheus.GaugeFunc
 | 
						||
	samplesAppended         prometheus.Counter
 | 
						||
	walTruncateDuration     prometheus.Summary
 | 
						||
	walCorruptionsTotal     prometheus.Counter
 | 
						||
	headTruncateFail        prometheus.Counter
 | 
						||
	headTruncateTotal       prometheus.Counter
 | 
						||
	checkpointDeleteFail    prometheus.Counter
 | 
						||
	checkpointDeleteTotal   prometheus.Counter
 | 
						||
	checkpointCreationFail  prometheus.Counter
 | 
						||
	checkpointCreationTotal prometheus.Counter
 | 
						||
}
 | 
						||
 | 
						||
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
 | 
						||
	m := &headMetrics{}
 | 
						||
 | 
						||
	m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
 | 
						||
		Name: "prometheus_tsdb_head_active_appenders",
 | 
						||
		Help: "Number of currently active appender transactions",
 | 
						||
	})
 | 
						||
	m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
 | 
						||
		Name: "prometheus_tsdb_head_series",
 | 
						||
		Help: "Total number of series in the head block.",
 | 
						||
	}, func() float64 {
 | 
						||
		return float64(h.NumSeries())
 | 
						||
	})
 | 
						||
	m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_head_series_created_total",
 | 
						||
		Help: "Total number of series created in the head",
 | 
						||
	})
 | 
						||
	m.seriesRemoved = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_head_series_removed_total",
 | 
						||
		Help: "Total number of series removed in the head",
 | 
						||
	})
 | 
						||
	m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_head_series_not_found_total",
 | 
						||
		Help: "Total number of requests for series that were not found.",
 | 
						||
	})
 | 
						||
	m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
 | 
						||
		Name: "prometheus_tsdb_head_chunks",
 | 
						||
		Help: "Total number of chunks in the head block.",
 | 
						||
	})
 | 
						||
	m.chunksCreated = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_head_chunks_created_total",
 | 
						||
		Help: "Total number of chunks created in the head",
 | 
						||
	})
 | 
						||
	m.chunksRemoved = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_head_chunks_removed_total",
 | 
						||
		Help: "Total number of chunks removed in the head",
 | 
						||
	})
 | 
						||
	m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
 | 
						||
		Name:       "prometheus_tsdb_head_gc_duration_seconds",
 | 
						||
		Help:       "Runtime of garbage collection in the head block.",
 | 
						||
		Objectives: map[float64]float64{},
 | 
						||
	})
 | 
						||
	m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
 | 
						||
		Name: "prometheus_tsdb_head_max_time",
 | 
						||
		Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.",
 | 
						||
	}, func() float64 {
 | 
						||
		return float64(h.MaxTime())
 | 
						||
	})
 | 
						||
	m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
 | 
						||
		Name: "prometheus_tsdb_head_min_time",
 | 
						||
		Help: "Minimum time bound of the head block. The unit is decided by the library consumer.",
 | 
						||
	}, func() float64 {
 | 
						||
		return float64(h.MinTime())
 | 
						||
	})
 | 
						||
	m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
 | 
						||
		Name:       "prometheus_tsdb_wal_truncate_duration_seconds",
 | 
						||
		Help:       "Duration of WAL truncation.",
 | 
						||
		Objectives: map[float64]float64{},
 | 
						||
	})
 | 
						||
	m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_wal_corruptions_total",
 | 
						||
		Help: "Total number of WAL corruptions.",
 | 
						||
	})
 | 
						||
	m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_head_samples_appended_total",
 | 
						||
		Help: "Total number of appended samples.",
 | 
						||
	})
 | 
						||
	m.headTruncateFail = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_head_truncations_failed_total",
 | 
						||
		Help: "Total number of head truncations that failed.",
 | 
						||
	})
 | 
						||
	m.headTruncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_head_truncations_total",
 | 
						||
		Help: "Total number of head truncations attempted.",
 | 
						||
	})
 | 
						||
	m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
 | 
						||
		Help: "Total number of checkpoint deletions that failed.",
 | 
						||
	})
 | 
						||
	m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_checkpoint_deletions_total",
 | 
						||
		Help: "Total number of checkpoint deletions attempted.",
 | 
						||
	})
 | 
						||
	m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_checkpoint_creations_failed_total",
 | 
						||
		Help: "Total number of checkpoint creations that failed.",
 | 
						||
	})
 | 
						||
	m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
 | 
						||
		Name: "prometheus_tsdb_checkpoint_creations_total",
 | 
						||
		Help: "Total number of checkpoint creations attempted.",
 | 
						||
	})
 | 
						||
 | 
						||
	if r != nil {
 | 
						||
		r.MustRegister(
 | 
						||
			m.activeAppenders,
 | 
						||
			m.chunks,
 | 
						||
			m.chunksCreated,
 | 
						||
			m.chunksRemoved,
 | 
						||
			m.series,
 | 
						||
			m.seriesCreated,
 | 
						||
			m.seriesRemoved,
 | 
						||
			m.seriesNotFound,
 | 
						||
			m.minTime,
 | 
						||
			m.maxTime,
 | 
						||
			m.gcDuration,
 | 
						||
			m.walTruncateDuration,
 | 
						||
			m.walCorruptionsTotal,
 | 
						||
			m.samplesAppended,
 | 
						||
			m.headTruncateFail,
 | 
						||
			m.headTruncateTotal,
 | 
						||
			m.checkpointDeleteFail,
 | 
						||
			m.checkpointDeleteTotal,
 | 
						||
			m.checkpointCreationFail,
 | 
						||
			m.checkpointCreationTotal,
 | 
						||
		)
 | 
						||
	}
 | 
						||
	return m
 | 
						||
}
 | 
						||
 | 
						||
const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
 | 
						||
 | 
						||
// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names.
 | 
						||
func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats {
 | 
						||
	h.cardinalityMutex.Lock()
 | 
						||
	defer h.cardinalityMutex.Unlock()
 | 
						||
	currentTime := time.Duration(time.Now().Unix()) * time.Second
 | 
						||
	seconds := currentTime - h.lastPostingsStatsCall
 | 
						||
	if seconds > cardinalityCacheExpirationTime {
 | 
						||
		h.cardinalityCache = nil
 | 
						||
	}
 | 
						||
	if h.cardinalityCache != nil {
 | 
						||
		return h.cardinalityCache
 | 
						||
	}
 | 
						||
	h.cardinalityCache = h.postings.Stats(statsByLabelName)
 | 
						||
	h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second
 | 
						||
 | 
						||
	return h.cardinalityCache
 | 
						||
}
 | 
						||
 | 
						||
// NewHead opens the head block in dir.
 | 
						||
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) {
 | 
						||
	if l == nil {
 | 
						||
		l = log.NewNopLogger()
 | 
						||
	}
 | 
						||
	if chunkRange < 1 {
 | 
						||
		return nil, errors.Errorf("invalid chunk range %d", chunkRange)
 | 
						||
	}
 | 
						||
	h := &Head{
 | 
						||
		wal:        wal,
 | 
						||
		logger:     l,
 | 
						||
		chunkRange: chunkRange,
 | 
						||
		minTime:    math.MaxInt64,
 | 
						||
		maxTime:    math.MinInt64,
 | 
						||
		series:     newStripeSeries(),
 | 
						||
		values:     map[string]stringset{},
 | 
						||
		symbols:    map[string]struct{}{},
 | 
						||
		postings:   index.NewUnorderedMemPostings(),
 | 
						||
		deleted:    map[uint64]int{},
 | 
						||
	}
 | 
						||
	h.metrics = newHeadMetrics(h, r)
 | 
						||
 | 
						||
	return h, nil
 | 
						||
}
 | 
						||
 | 
						||
// processWALSamples adds a partition of samples it receives to the head and passes
 | 
						||
// them on to other workers.
 | 
						||
// Samples before the mint timestamp are discarded.
 | 
						||
func (h *Head) processWALSamples(
 | 
						||
	minValidTime int64,
 | 
						||
	input <-chan []record.RefSample, output chan<- []record.RefSample,
 | 
						||
) (unknownRefs uint64) {
 | 
						||
	defer close(output)
 | 
						||
 | 
						||
	// Mitigate lock contention in getByID.
 | 
						||
	refSeries := map[uint64]*memSeries{}
 | 
						||
 | 
						||
	mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
 | 
						||
 | 
						||
	for samples := range input {
 | 
						||
		for _, s := range samples {
 | 
						||
			if s.T < minValidTime {
 | 
						||
				continue
 | 
						||
			}
 | 
						||
			ms := refSeries[s.Ref]
 | 
						||
			if ms == nil {
 | 
						||
				ms = h.series.getByID(s.Ref)
 | 
						||
				if ms == nil {
 | 
						||
					unknownRefs++
 | 
						||
					continue
 | 
						||
				}
 | 
						||
				refSeries[s.Ref] = ms
 | 
						||
			}
 | 
						||
			_, chunkCreated := ms.append(s.T, s.V)
 | 
						||
			if chunkCreated {
 | 
						||
				h.metrics.chunksCreated.Inc()
 | 
						||
				h.metrics.chunks.Inc()
 | 
						||
			}
 | 
						||
			if s.T > maxt {
 | 
						||
				maxt = s.T
 | 
						||
			}
 | 
						||
			if s.T < mint {
 | 
						||
				mint = s.T
 | 
						||
			}
 | 
						||
		}
 | 
						||
		output <- samples
 | 
						||
	}
 | 
						||
	h.updateMinMaxTime(mint, maxt)
 | 
						||
 | 
						||
	return unknownRefs
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) updateMinMaxTime(mint, maxt int64) {
 | 
						||
	for {
 | 
						||
		lt := h.MinTime()
 | 
						||
		if mint >= lt {
 | 
						||
			break
 | 
						||
		}
 | 
						||
		if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) {
 | 
						||
			break
 | 
						||
		}
 | 
						||
	}
 | 
						||
	for {
 | 
						||
		ht := h.MaxTime()
 | 
						||
		if maxt <= ht {
 | 
						||
			break
 | 
						||
		}
 | 
						||
		if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) {
 | 
						||
			break
 | 
						||
		}
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
 | 
						||
	// Track number of samples that referenced a series we don't know about
 | 
						||
	// for error reporting.
 | 
						||
	var unknownRefs uint64
 | 
						||
 | 
						||
	// Start workers that each process samples for a partition of the series ID space.
 | 
						||
	// They are connected through a ring of channels which ensures that all sample batches
 | 
						||
	// read from the WAL are processed in order.
 | 
						||
	var (
 | 
						||
		wg      sync.WaitGroup
 | 
						||
		n       = runtime.GOMAXPROCS(0)
 | 
						||
		inputs  = make([]chan []record.RefSample, n)
 | 
						||
		outputs = make([]chan []record.RefSample, n)
 | 
						||
	)
 | 
						||
	wg.Add(n)
 | 
						||
 | 
						||
	defer func() {
 | 
						||
		// For CorruptionErr ensure to terminate all workers before exiting.
 | 
						||
		if _, ok := err.(*wal.CorruptionErr); ok {
 | 
						||
			for i := 0; i < n; i++ {
 | 
						||
				close(inputs[i])
 | 
						||
				for range outputs[i] {
 | 
						||
				}
 | 
						||
			}
 | 
						||
			wg.Wait()
 | 
						||
		}
 | 
						||
	}()
 | 
						||
 | 
						||
	for i := 0; i < n; i++ {
 | 
						||
		outputs[i] = make(chan []record.RefSample, 300)
 | 
						||
		inputs[i] = make(chan []record.RefSample, 300)
 | 
						||
 | 
						||
		go func(input <-chan []record.RefSample, output chan<- []record.RefSample) {
 | 
						||
			unknown := h.processWALSamples(h.minValidTime, input, output)
 | 
						||
			atomic.AddUint64(&unknownRefs, unknown)
 | 
						||
			wg.Done()
 | 
						||
		}(inputs[i], outputs[i])
 | 
						||
	}
 | 
						||
 | 
						||
	var (
 | 
						||
		dec       record.Decoder
 | 
						||
		allStones = tombstones.NewMemTombstones()
 | 
						||
		shards    = make([][]record.RefSample, n)
 | 
						||
	)
 | 
						||
	defer func() {
 | 
						||
		if err := allStones.Close(); err != nil {
 | 
						||
			level.Warn(h.logger).Log("msg", "closing  memTombstones during wal read", "err", err)
 | 
						||
		}
 | 
						||
	}()
 | 
						||
 | 
						||
	var (
 | 
						||
		decoded    = make(chan interface{}, 10)
 | 
						||
		errCh      = make(chan error, 1)
 | 
						||
		seriesPool = sync.Pool{
 | 
						||
			New: func() interface{} {
 | 
						||
				return []record.RefSeries{}
 | 
						||
			},
 | 
						||
		}
 | 
						||
		samplesPool = sync.Pool{
 | 
						||
			New: func() interface{} {
 | 
						||
				return []record.RefSample{}
 | 
						||
			},
 | 
						||
		}
 | 
						||
		tstonesPool = sync.Pool{
 | 
						||
			New: func() interface{} {
 | 
						||
				return []tombstones.Stone{}
 | 
						||
			},
 | 
						||
		}
 | 
						||
	)
 | 
						||
	go func() {
 | 
						||
		defer close(decoded)
 | 
						||
		for r.Next() {
 | 
						||
			rec := r.Record()
 | 
						||
			switch dec.Type(rec) {
 | 
						||
			case record.Series:
 | 
						||
				series := seriesPool.Get().([]record.RefSeries)[:0]
 | 
						||
				series, err = dec.Series(rec, series)
 | 
						||
				if err != nil {
 | 
						||
					errCh <- &wal.CorruptionErr{
 | 
						||
						Err:     errors.Wrap(err, "decode series"),
 | 
						||
						Segment: r.Segment(),
 | 
						||
						Offset:  r.Offset(),
 | 
						||
					}
 | 
						||
					return
 | 
						||
				}
 | 
						||
				decoded <- series
 | 
						||
			case record.Samples:
 | 
						||
				samples := samplesPool.Get().([]record.RefSample)[:0]
 | 
						||
				samples, err = dec.Samples(rec, samples)
 | 
						||
				if err != nil {
 | 
						||
					errCh <- &wal.CorruptionErr{
 | 
						||
						Err:     errors.Wrap(err, "decode samples"),
 | 
						||
						Segment: r.Segment(),
 | 
						||
						Offset:  r.Offset(),
 | 
						||
					}
 | 
						||
					return
 | 
						||
				}
 | 
						||
				decoded <- samples
 | 
						||
			case record.Tombstones:
 | 
						||
				tstones := tstonesPool.Get().([]tombstones.Stone)[:0]
 | 
						||
				tstones, err = dec.Tombstones(rec, tstones)
 | 
						||
				if err != nil {
 | 
						||
					errCh <- &wal.CorruptionErr{
 | 
						||
						Err:     errors.Wrap(err, "decode tombstones"),
 | 
						||
						Segment: r.Segment(),
 | 
						||
						Offset:  r.Offset(),
 | 
						||
					}
 | 
						||
					return
 | 
						||
				}
 | 
						||
				decoded <- tstones
 | 
						||
			default:
 | 
						||
				errCh <- &wal.CorruptionErr{
 | 
						||
					Err:     errors.Errorf("invalid record type %v", dec.Type(rec)),
 | 
						||
					Segment: r.Segment(),
 | 
						||
					Offset:  r.Offset(),
 | 
						||
				}
 | 
						||
				return
 | 
						||
			}
 | 
						||
		}
 | 
						||
	}()
 | 
						||
 | 
						||
	for d := range decoded {
 | 
						||
		switch v := d.(type) {
 | 
						||
		case []record.RefSeries:
 | 
						||
			for _, s := range v {
 | 
						||
				series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
 | 
						||
 | 
						||
				if !created {
 | 
						||
					// There's already a different ref for this series.
 | 
						||
					multiRef[s.Ref] = series.ref
 | 
						||
				}
 | 
						||
 | 
						||
				if h.lastSeriesID < s.Ref {
 | 
						||
					h.lastSeriesID = s.Ref
 | 
						||
				}
 | 
						||
			}
 | 
						||
			//lint:ignore SA6002 relax staticcheck verification.
 | 
						||
			seriesPool.Put(v)
 | 
						||
		case []record.RefSample:
 | 
						||
			samples := v
 | 
						||
			// We split up the samples into chunks of 5000 samples or less.
 | 
						||
			// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
 | 
						||
			// cause thousands of very large in flight buffers occupying large amounts
 | 
						||
			// of unused memory.
 | 
						||
			for len(samples) > 0 {
 | 
						||
				m := 5000
 | 
						||
				if len(samples) < m {
 | 
						||
					m = len(samples)
 | 
						||
				}
 | 
						||
				for i := 0; i < n; i++ {
 | 
						||
					var buf []record.RefSample
 | 
						||
					select {
 | 
						||
					case buf = <-outputs[i]:
 | 
						||
					default:
 | 
						||
					}
 | 
						||
					shards[i] = buf[:0]
 | 
						||
				}
 | 
						||
				for _, sam := range samples[:m] {
 | 
						||
					if r, ok := multiRef[sam.Ref]; ok {
 | 
						||
						sam.Ref = r
 | 
						||
					}
 | 
						||
					mod := sam.Ref % uint64(n)
 | 
						||
					shards[mod] = append(shards[mod], sam)
 | 
						||
				}
 | 
						||
				for i := 0; i < n; i++ {
 | 
						||
					inputs[i] <- shards[i]
 | 
						||
				}
 | 
						||
				samples = samples[m:]
 | 
						||
			}
 | 
						||
			//lint:ignore SA6002 relax staticcheck verification.
 | 
						||
			samplesPool.Put(v)
 | 
						||
		case []tombstones.Stone:
 | 
						||
			for _, s := range v {
 | 
						||
				for _, itv := range s.Intervals {
 | 
						||
					if itv.Maxt < h.minValidTime {
 | 
						||
						continue
 | 
						||
					}
 | 
						||
					if m := h.series.getByID(s.Ref); m == nil {
 | 
						||
						unknownRefs++
 | 
						||
						continue
 | 
						||
					}
 | 
						||
					allStones.AddInterval(s.Ref, itv)
 | 
						||
				}
 | 
						||
			}
 | 
						||
			//lint:ignore SA6002 relax staticcheck verification.
 | 
						||
			tstonesPool.Put(v)
 | 
						||
		default:
 | 
						||
			panic(fmt.Errorf("unexpected decoded type: %T", d))
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	select {
 | 
						||
	case err := <-errCh:
 | 
						||
		return err
 | 
						||
	default:
 | 
						||
	}
 | 
						||
 | 
						||
	// Signal termination to each worker and wait for it to close its output channel.
 | 
						||
	for i := 0; i < n; i++ {
 | 
						||
		close(inputs[i])
 | 
						||
		for range outputs[i] {
 | 
						||
		}
 | 
						||
	}
 | 
						||
	wg.Wait()
 | 
						||
 | 
						||
	if r.Err() != nil {
 | 
						||
		return errors.Wrap(r.Err(), "read records")
 | 
						||
	}
 | 
						||
 | 
						||
	if err := allStones.Iter(func(ref uint64, dranges tombstones.Intervals) error {
 | 
						||
		return h.chunkRewrite(ref, dranges)
 | 
						||
	}); err != nil {
 | 
						||
		return errors.Wrap(r.Err(), "deleting samples from tombstones")
 | 
						||
	}
 | 
						||
 | 
						||
	if unknownRefs > 0 {
 | 
						||
		level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
 | 
						||
	}
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// Init loads data from the write ahead log and prepares the head for writes.
 | 
						||
// It should be called before using an appender so that
 | 
						||
// limits the ingested samples to the head min valid time.
 | 
						||
func (h *Head) Init(minValidTime int64) error {
 | 
						||
	h.minValidTime = minValidTime
 | 
						||
	defer h.postings.EnsureOrder()
 | 
						||
	defer h.gc() // After loading the wal remove the obsolete data from the head.
 | 
						||
 | 
						||
	if h.wal == nil {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
 | 
						||
	level.Info(h.logger).Log("msg", "replaying WAL, this may take awhile")
 | 
						||
	// Backfill the checkpoint first if it exists.
 | 
						||
	dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir())
 | 
						||
	if err != nil && err != record.ErrNotFound {
 | 
						||
		return errors.Wrap(err, "find last checkpoint")
 | 
						||
	}
 | 
						||
	multiRef := map[uint64]uint64{}
 | 
						||
	if err == nil {
 | 
						||
		sr, err := wal.NewSegmentsReader(dir)
 | 
						||
		if err != nil {
 | 
						||
			return errors.Wrap(err, "open checkpoint")
 | 
						||
		}
 | 
						||
		defer func() {
 | 
						||
			if err := sr.Close(); err != nil {
 | 
						||
				level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
 | 
						||
			}
 | 
						||
		}()
 | 
						||
 | 
						||
		// A corrupted checkpoint is a hard error for now and requires user
 | 
						||
		// intervention. There's likely little data that can be recovered anyway.
 | 
						||
		if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil {
 | 
						||
			return errors.Wrap(err, "backfill checkpoint")
 | 
						||
		}
 | 
						||
		startFrom++
 | 
						||
		level.Info(h.logger).Log("msg", "WAL checkpoint loaded")
 | 
						||
	}
 | 
						||
 | 
						||
	// Find the last segment.
 | 
						||
	_, last, err := h.wal.Segments()
 | 
						||
	if err != nil {
 | 
						||
		return errors.Wrap(err, "finding WAL segments")
 | 
						||
	}
 | 
						||
 | 
						||
	// Backfill segments from the most recent checkpoint onwards.
 | 
						||
	for i := startFrom; i <= last; i++ {
 | 
						||
		s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
 | 
						||
		if err != nil {
 | 
						||
			return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
 | 
						||
		}
 | 
						||
 | 
						||
		sr := wal.NewSegmentBufReader(s)
 | 
						||
		err = h.loadWAL(wal.NewReader(sr), multiRef)
 | 
						||
		if err := sr.Close(); err != nil {
 | 
						||
			level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
 | 
						||
		}
 | 
						||
		if err != nil {
 | 
						||
			return err
 | 
						||
		}
 | 
						||
		level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
 | 
						||
	}
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// Truncate removes old data before mint from the head.
 | 
						||
func (h *Head) Truncate(mint int64) (err error) {
 | 
						||
	defer func() {
 | 
						||
		if err != nil {
 | 
						||
			h.metrics.headTruncateFail.Inc()
 | 
						||
		}
 | 
						||
	}()
 | 
						||
	initialize := h.MinTime() == math.MaxInt64
 | 
						||
 | 
						||
	if h.MinTime() >= mint && !initialize {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
	atomic.StoreInt64(&h.minTime, mint)
 | 
						||
	atomic.StoreInt64(&h.minValidTime, mint)
 | 
						||
 | 
						||
	// Ensure that max time is at least as high as min time.
 | 
						||
	for h.MaxTime() < mint {
 | 
						||
		atomic.CompareAndSwapInt64(&h.maxTime, h.MaxTime(), mint)
 | 
						||
	}
 | 
						||
 | 
						||
	// This was an initial call to Truncate after loading blocks on startup.
 | 
						||
	// We haven't read back the WAL yet, so do not attempt to truncate it.
 | 
						||
	if initialize {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
 | 
						||
	h.metrics.headTruncateTotal.Inc()
 | 
						||
	start := time.Now()
 | 
						||
 | 
						||
	h.gc()
 | 
						||
	level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start))
 | 
						||
	h.metrics.gcDuration.Observe(time.Since(start).Seconds())
 | 
						||
 | 
						||
	if h.wal == nil {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
	start = time.Now()
 | 
						||
 | 
						||
	first, last, err := h.wal.Segments()
 | 
						||
	if err != nil {
 | 
						||
		return errors.Wrap(err, "get segment range")
 | 
						||
	}
 | 
						||
	// Start a new segment, so low ingestion volume TSDB don't have more WAL than
 | 
						||
	// needed.
 | 
						||
	err = h.wal.NextSegment()
 | 
						||
	if err != nil {
 | 
						||
		return errors.Wrap(err, "next segment")
 | 
						||
	}
 | 
						||
	last-- // Never consider last segment for checkpoint.
 | 
						||
	if last < 0 {
 | 
						||
		return nil // no segments yet.
 | 
						||
	}
 | 
						||
	// The lower third of segments should contain mostly obsolete samples.
 | 
						||
	// If we have less than three segments, it's not worth checkpointing yet.
 | 
						||
	last = first + (last-first)/3
 | 
						||
	if last <= first {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
 | 
						||
	keep := func(id uint64) bool {
 | 
						||
		if h.series.getByID(id) != nil {
 | 
						||
			return true
 | 
						||
		}
 | 
						||
		h.deletedMtx.Lock()
 | 
						||
		_, ok := h.deleted[id]
 | 
						||
		h.deletedMtx.Unlock()
 | 
						||
		return ok
 | 
						||
	}
 | 
						||
	h.metrics.checkpointCreationTotal.Inc()
 | 
						||
	if _, err = wal.Checkpoint(h.wal, first, last, keep, mint); err != nil {
 | 
						||
		h.metrics.checkpointCreationFail.Inc()
 | 
						||
		return errors.Wrap(err, "create checkpoint")
 | 
						||
	}
 | 
						||
	if err := h.wal.Truncate(last + 1); err != nil {
 | 
						||
		// If truncating fails, we'll just try again at the next checkpoint.
 | 
						||
		// Leftover segments will just be ignored in the future if there's a checkpoint
 | 
						||
		// that supersedes them.
 | 
						||
		level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
 | 
						||
	}
 | 
						||
 | 
						||
	// The checkpoint is written and segments before it is truncated, so we no
 | 
						||
	// longer need to track deleted series that are before it.
 | 
						||
	h.deletedMtx.Lock()
 | 
						||
	for ref, segment := range h.deleted {
 | 
						||
		if segment < first {
 | 
						||
			delete(h.deleted, ref)
 | 
						||
		}
 | 
						||
	}
 | 
						||
	h.deletedMtx.Unlock()
 | 
						||
 | 
						||
	h.metrics.checkpointDeleteTotal.Inc()
 | 
						||
	if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
 | 
						||
		// Leftover old checkpoints do not cause problems down the line beyond
 | 
						||
		// occupying disk space.
 | 
						||
		// They will just be ignored since a higher checkpoint exists.
 | 
						||
		level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err)
 | 
						||
		h.metrics.checkpointDeleteFail.Inc()
 | 
						||
	}
 | 
						||
	h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
 | 
						||
 | 
						||
	level.Info(h.logger).Log("msg", "WAL checkpoint complete",
 | 
						||
		"first", first, "last", last, "duration", time.Since(start))
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// initTime initializes a head with the first timestamp. This only needs to be called
 | 
						||
// for a completely fresh head with an empty WAL.
 | 
						||
// Returns true if the initialization took an effect.
 | 
						||
func (h *Head) initTime(t int64) (initialized bool) {
 | 
						||
	if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) {
 | 
						||
		return false
 | 
						||
	}
 | 
						||
	// Ensure that max time is initialized to at least the min time we just set.
 | 
						||
	// Concurrent appenders may already have set it to a higher value.
 | 
						||
	atomic.CompareAndSwapInt64(&h.maxTime, math.MinInt64, t)
 | 
						||
 | 
						||
	return true
 | 
						||
}
 | 
						||
 | 
						||
type rangeHead struct {
 | 
						||
	head       *Head
 | 
						||
	mint, maxt int64
 | 
						||
}
 | 
						||
 | 
						||
func (h *rangeHead) Index() (IndexReader, error) {
 | 
						||
	return h.head.indexRange(h.mint, h.maxt), nil
 | 
						||
}
 | 
						||
 | 
						||
func (h *rangeHead) Chunks() (ChunkReader, error) {
 | 
						||
	return h.head.chunksRange(h.mint, h.maxt), nil
 | 
						||
}
 | 
						||
 | 
						||
func (h *rangeHead) Tombstones() (tombstones.Reader, error) {
 | 
						||
	return emptyTombstoneReader, nil
 | 
						||
}
 | 
						||
 | 
						||
func (h *rangeHead) MinTime() int64 {
 | 
						||
	return h.mint
 | 
						||
}
 | 
						||
 | 
						||
func (h *rangeHead) MaxTime() int64 {
 | 
						||
	return h.maxt
 | 
						||
}
 | 
						||
 | 
						||
func (h *rangeHead) NumSeries() uint64 {
 | 
						||
	return h.head.NumSeries()
 | 
						||
}
 | 
						||
 | 
						||
func (h *rangeHead) Meta() BlockMeta {
 | 
						||
	return BlockMeta{
 | 
						||
		MinTime: h.MinTime(),
 | 
						||
		MaxTime: h.MaxTime(),
 | 
						||
		ULID:    h.head.Meta().ULID,
 | 
						||
		Stats: BlockStats{
 | 
						||
			NumSeries: h.NumSeries(),
 | 
						||
		},
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
// initAppender is a helper to initialize the time bounds of the head
 | 
						||
// upon the first sample it receives.
 | 
						||
type initAppender struct {
 | 
						||
	app  Appender
 | 
						||
	head *Head
 | 
						||
}
 | 
						||
 | 
						||
func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
 | 
						||
	if a.app != nil {
 | 
						||
		return a.app.Add(lset, t, v)
 | 
						||
	}
 | 
						||
	a.head.initTime(t)
 | 
						||
	a.app = a.head.appender()
 | 
						||
 | 
						||
	return a.app.Add(lset, t, v)
 | 
						||
}
 | 
						||
 | 
						||
func (a *initAppender) AddFast(ref uint64, t int64, v float64) error {
 | 
						||
	if a.app == nil {
 | 
						||
		return ErrNotFound
 | 
						||
	}
 | 
						||
	return a.app.AddFast(ref, t, v)
 | 
						||
}
 | 
						||
 | 
						||
func (a *initAppender) Commit() error {
 | 
						||
	if a.app == nil {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
	return a.app.Commit()
 | 
						||
}
 | 
						||
 | 
						||
func (a *initAppender) Rollback() error {
 | 
						||
	if a.app == nil {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
	return a.app.Rollback()
 | 
						||
}
 | 
						||
 | 
						||
// Appender returns a new Appender on the database.
 | 
						||
func (h *Head) Appender() Appender {
 | 
						||
	h.metrics.activeAppenders.Inc()
 | 
						||
 | 
						||
	// The head cache might not have a starting point yet. The init appender
 | 
						||
	// picks up the first appended timestamp as the base.
 | 
						||
	if h.MinTime() == math.MaxInt64 {
 | 
						||
		return &initAppender{head: h}
 | 
						||
	}
 | 
						||
	return h.appender()
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) appender() *headAppender {
 | 
						||
	return &headAppender{
 | 
						||
		head: h,
 | 
						||
		// Set the minimum valid time to whichever is greater the head min valid time or the compaction window.
 | 
						||
		// This ensures that no samples will be added within the compaction window to avoid races.
 | 
						||
		minValidTime: max(atomic.LoadInt64(&h.minValidTime), h.MaxTime()-h.chunkRange/2),
 | 
						||
		mint:         math.MaxInt64,
 | 
						||
		maxt:         math.MinInt64,
 | 
						||
		samples:      h.getAppendBuffer(),
 | 
						||
		sampleSeries: h.getSeriesBuffer(),
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
func max(a, b int64) int64 {
 | 
						||
	if a > b {
 | 
						||
		return a
 | 
						||
	}
 | 
						||
	return b
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) getAppendBuffer() []record.RefSample {
 | 
						||
	b := h.appendPool.Get()
 | 
						||
	if b == nil {
 | 
						||
		return make([]record.RefSample, 0, 512)
 | 
						||
	}
 | 
						||
	return b.([]record.RefSample)
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) putAppendBuffer(b []record.RefSample) {
 | 
						||
	//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
 | 
						||
	h.appendPool.Put(b[:0])
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) getSeriesBuffer() []*memSeries {
 | 
						||
	b := h.seriesPool.Get()
 | 
						||
	if b == nil {
 | 
						||
		return make([]*memSeries, 0, 512)
 | 
						||
	}
 | 
						||
	return b.([]*memSeries)
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) putSeriesBuffer(b []*memSeries) {
 | 
						||
	//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
 | 
						||
	h.seriesPool.Put(b[:0])
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) getBytesBuffer() []byte {
 | 
						||
	b := h.bytesPool.Get()
 | 
						||
	if b == nil {
 | 
						||
		return make([]byte, 0, 1024)
 | 
						||
	}
 | 
						||
	return b.([]byte)
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) putBytesBuffer(b []byte) {
 | 
						||
	//lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
 | 
						||
	h.bytesPool.Put(b[:0])
 | 
						||
}
 | 
						||
 | 
						||
type headAppender struct {
 | 
						||
	head         *Head
 | 
						||
	minValidTime int64 // No samples below this timestamp are allowed.
 | 
						||
	mint, maxt   int64
 | 
						||
 | 
						||
	series       []record.RefSeries
 | 
						||
	samples      []record.RefSample
 | 
						||
	sampleSeries []*memSeries
 | 
						||
}
 | 
						||
 | 
						||
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
 | 
						||
	if t < a.minValidTime {
 | 
						||
		return 0, ErrOutOfBounds
 | 
						||
	}
 | 
						||
 | 
						||
	// Ensure no empty labels have gotten through.
 | 
						||
	lset = lset.WithoutEmpty()
 | 
						||
 | 
						||
	s, created := a.head.getOrCreate(lset.Hash(), lset)
 | 
						||
	if created {
 | 
						||
		a.series = append(a.series, record.RefSeries{
 | 
						||
			Ref:    s.ref,
 | 
						||
			Labels: lset,
 | 
						||
		})
 | 
						||
	}
 | 
						||
	return s.ref, a.AddFast(s.ref, t, v)
 | 
						||
}
 | 
						||
 | 
						||
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
 | 
						||
	if t < a.minValidTime {
 | 
						||
		return ErrOutOfBounds
 | 
						||
	}
 | 
						||
 | 
						||
	s := a.head.series.getByID(ref)
 | 
						||
	if s == nil {
 | 
						||
		return errors.Wrap(ErrNotFound, "unknown series")
 | 
						||
	}
 | 
						||
	s.Lock()
 | 
						||
	if err := s.appendable(t, v); err != nil {
 | 
						||
		s.Unlock()
 | 
						||
		return err
 | 
						||
	}
 | 
						||
	s.pendingCommit = true
 | 
						||
	s.Unlock()
 | 
						||
 | 
						||
	if t < a.mint {
 | 
						||
		a.mint = t
 | 
						||
	}
 | 
						||
	if t > a.maxt {
 | 
						||
		a.maxt = t
 | 
						||
	}
 | 
						||
 | 
						||
	a.samples = append(a.samples, record.RefSample{
 | 
						||
		Ref: ref,
 | 
						||
		T:   t,
 | 
						||
		V:   v,
 | 
						||
	})
 | 
						||
	a.sampleSeries = append(a.sampleSeries, s)
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
func (a *headAppender) log() error {
 | 
						||
	if a.head.wal == nil {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
 | 
						||
	buf := a.head.getBytesBuffer()
 | 
						||
	defer func() { a.head.putBytesBuffer(buf) }()
 | 
						||
 | 
						||
	var rec []byte
 | 
						||
	var enc record.Encoder
 | 
						||
 | 
						||
	if len(a.series) > 0 {
 | 
						||
		rec = enc.Series(a.series, buf)
 | 
						||
		buf = rec[:0]
 | 
						||
 | 
						||
		if err := a.head.wal.Log(rec); err != nil {
 | 
						||
			return errors.Wrap(err, "log series")
 | 
						||
		}
 | 
						||
	}
 | 
						||
	if len(a.samples) > 0 {
 | 
						||
		rec = enc.Samples(a.samples, buf)
 | 
						||
		buf = rec[:0]
 | 
						||
 | 
						||
		if err := a.head.wal.Log(rec); err != nil {
 | 
						||
			return errors.Wrap(err, "log samples")
 | 
						||
		}
 | 
						||
	}
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
func (a *headAppender) Commit() error {
 | 
						||
	defer a.head.metrics.activeAppenders.Dec()
 | 
						||
	defer a.head.putAppendBuffer(a.samples)
 | 
						||
	defer a.head.putSeriesBuffer(a.sampleSeries)
 | 
						||
 | 
						||
	if err := a.log(); err != nil {
 | 
						||
		return errors.Wrap(err, "write to WAL")
 | 
						||
	}
 | 
						||
 | 
						||
	total := len(a.samples)
 | 
						||
	var series *memSeries
 | 
						||
	for i, s := range a.samples {
 | 
						||
		series = a.sampleSeries[i]
 | 
						||
		series.Lock()
 | 
						||
		ok, chunkCreated := series.append(s.T, s.V)
 | 
						||
		series.pendingCommit = false
 | 
						||
		series.Unlock()
 | 
						||
 | 
						||
		if !ok {
 | 
						||
			total--
 | 
						||
		}
 | 
						||
		if chunkCreated {
 | 
						||
			a.head.metrics.chunks.Inc()
 | 
						||
			a.head.metrics.chunksCreated.Inc()
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	a.head.metrics.samplesAppended.Add(float64(total))
 | 
						||
	a.head.updateMinMaxTime(a.mint, a.maxt)
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
func (a *headAppender) Rollback() error {
 | 
						||
	a.head.metrics.activeAppenders.Dec()
 | 
						||
	var series *memSeries
 | 
						||
	for i := range a.samples {
 | 
						||
		series = a.sampleSeries[i]
 | 
						||
		series.Lock()
 | 
						||
		series.pendingCommit = false
 | 
						||
		series.Unlock()
 | 
						||
	}
 | 
						||
	a.head.putAppendBuffer(a.samples)
 | 
						||
 | 
						||
	// Series are created in the head memory regardless of rollback. Thus we have
 | 
						||
	// to log them to the WAL in any case.
 | 
						||
	a.samples = nil
 | 
						||
	return a.log()
 | 
						||
}
 | 
						||
 | 
						||
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
 | 
						||
// label matchers.
 | 
						||
func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
 | 
						||
	// Do not delete anything beyond the currently valid range.
 | 
						||
	mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime())
 | 
						||
 | 
						||
	ir := h.indexRange(mint, maxt)
 | 
						||
 | 
						||
	p, err := PostingsForMatchers(ir, ms...)
 | 
						||
	if err != nil {
 | 
						||
		return errors.Wrap(err, "select series")
 | 
						||
	}
 | 
						||
 | 
						||
	var stones []tombstones.Stone
 | 
						||
	dirty := false
 | 
						||
	for p.Next() {
 | 
						||
		series := h.series.getByID(p.At())
 | 
						||
 | 
						||
		t0, t1 := series.minTime(), series.maxTime()
 | 
						||
		if t0 == math.MinInt64 || t1 == math.MinInt64 {
 | 
						||
			continue
 | 
						||
		}
 | 
						||
		// Delete only until the current values and not beyond.
 | 
						||
		t0, t1 = clampInterval(mint, maxt, t0, t1)
 | 
						||
		if h.wal != nil {
 | 
						||
			stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}})
 | 
						||
		}
 | 
						||
		if err := h.chunkRewrite(p.At(), tombstones.Intervals{{Mint: t0, Maxt: t1}}); err != nil {
 | 
						||
			return errors.Wrap(err, "delete samples")
 | 
						||
		}
 | 
						||
		dirty = true
 | 
						||
	}
 | 
						||
	if p.Err() != nil {
 | 
						||
		return p.Err()
 | 
						||
	}
 | 
						||
	var enc record.Encoder
 | 
						||
	if h.wal != nil {
 | 
						||
		// Although we don't store the stones in the head
 | 
						||
		// we need to write them to the WAL to mark these as deleted
 | 
						||
		// after a restart while loading the WAL.
 | 
						||
		if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
 | 
						||
			return err
 | 
						||
		}
 | 
						||
	}
 | 
						||
	if dirty {
 | 
						||
		h.gc()
 | 
						||
	}
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// chunkRewrite re-writes the chunks which overlaps with deleted ranges
 | 
						||
// and removes the samples in the deleted ranges.
 | 
						||
// Chunks is deleted if no samples are left at the end.
 | 
						||
func (h *Head) chunkRewrite(ref uint64, dranges tombstones.Intervals) (err error) {
 | 
						||
	if len(dranges) == 0 {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
 | 
						||
	ms := h.series.getByID(ref)
 | 
						||
	ms.Lock()
 | 
						||
	defer ms.Unlock()
 | 
						||
	if len(ms.chunks) == 0 {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
 | 
						||
	metas := ms.chunksMetas()
 | 
						||
	mint, maxt := metas[0].MinTime, metas[len(metas)-1].MaxTime
 | 
						||
	it := newChunkSeriesIterator(metas, dranges, mint, maxt)
 | 
						||
 | 
						||
	ms.reset()
 | 
						||
	for it.Next() {
 | 
						||
		t, v := it.At()
 | 
						||
		ok, _ := ms.append(t, v)
 | 
						||
		if !ok {
 | 
						||
			level.Warn(h.logger).Log("msg", "failed to add sample during delete")
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// gc removes data before the minimum timestamp from the head.
 | 
						||
func (h *Head) gc() {
 | 
						||
	// Only data strictly lower than this timestamp must be deleted.
 | 
						||
	mint := h.MinTime()
 | 
						||
 | 
						||
	// Drop old chunks and remember series IDs and hashes if they can be
 | 
						||
	// deleted entirely.
 | 
						||
	deleted, chunksRemoved := h.series.gc(mint)
 | 
						||
	seriesRemoved := len(deleted)
 | 
						||
 | 
						||
	h.metrics.seriesRemoved.Add(float64(seriesRemoved))
 | 
						||
	h.metrics.chunksRemoved.Add(float64(chunksRemoved))
 | 
						||
	h.metrics.chunks.Sub(float64(chunksRemoved))
 | 
						||
	// Using AddUint64 to subtract series removed.
 | 
						||
	// See: https://golang.org/pkg/sync/atomic/#AddUint64.
 | 
						||
	atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1))
 | 
						||
 | 
						||
	// Remove deleted series IDs from the postings lists.
 | 
						||
	h.postings.Delete(deleted)
 | 
						||
 | 
						||
	if h.wal != nil {
 | 
						||
		_, last, _ := h.wal.Segments()
 | 
						||
		h.deletedMtx.Lock()
 | 
						||
		// Keep series records until we're past segment 'last'
 | 
						||
		// because the WAL will still have samples records with
 | 
						||
		// this ref ID. If we didn't keep these series records then
 | 
						||
		// on start up when we replay the WAL, or any other code
 | 
						||
		// that reads the WAL, wouldn't be able to use those
 | 
						||
		// samples since we would have no labels for that ref ID.
 | 
						||
		for ref := range deleted {
 | 
						||
			h.deleted[ref] = last
 | 
						||
		}
 | 
						||
		h.deletedMtx.Unlock()
 | 
						||
	}
 | 
						||
 | 
						||
	// Rebuild symbols and label value indices from what is left in the postings terms.
 | 
						||
	symbols := make(map[string]struct{}, len(h.symbols))
 | 
						||
	values := make(map[string]stringset, len(h.values))
 | 
						||
 | 
						||
	if err := h.postings.Iter(func(t labels.Label, _ index.Postings) error {
 | 
						||
		symbols[t.Name] = struct{}{}
 | 
						||
		symbols[t.Value] = struct{}{}
 | 
						||
 | 
						||
		ss, ok := values[t.Name]
 | 
						||
		if !ok {
 | 
						||
			ss = stringset{}
 | 
						||
			values[t.Name] = ss
 | 
						||
		}
 | 
						||
		ss.set(t.Value)
 | 
						||
		return nil
 | 
						||
	}); err != nil {
 | 
						||
		// This should never happen, as the iteration function only returns nil.
 | 
						||
		panic(err)
 | 
						||
	}
 | 
						||
 | 
						||
	h.symMtx.Lock()
 | 
						||
 | 
						||
	h.symbols = symbols
 | 
						||
	h.values = values
 | 
						||
 | 
						||
	h.symMtx.Unlock()
 | 
						||
}
 | 
						||
 | 
						||
// Tombstones returns a new reader over the head's tombstones
 | 
						||
func (h *Head) Tombstones() (tombstones.Reader, error) {
 | 
						||
	return emptyTombstoneReader, nil
 | 
						||
}
 | 
						||
 | 
						||
// Index returns an IndexReader against the block.
 | 
						||
func (h *Head) Index() (IndexReader, error) {
 | 
						||
	return h.indexRange(math.MinInt64, math.MaxInt64), nil
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
 | 
						||
	if hmin := h.MinTime(); hmin > mint {
 | 
						||
		mint = hmin
 | 
						||
	}
 | 
						||
	return &headIndexReader{head: h, mint: mint, maxt: maxt}
 | 
						||
}
 | 
						||
 | 
						||
// Chunks returns a ChunkReader against the block.
 | 
						||
func (h *Head) Chunks() (ChunkReader, error) {
 | 
						||
	return h.chunksRange(math.MinInt64, math.MaxInt64), nil
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) chunksRange(mint, maxt int64) *headChunkReader {
 | 
						||
	if hmin := h.MinTime(); hmin > mint {
 | 
						||
		mint = hmin
 | 
						||
	}
 | 
						||
	return &headChunkReader{head: h, mint: mint, maxt: maxt}
 | 
						||
}
 | 
						||
 | 
						||
// NumSeries returns the number of active series in the head.
 | 
						||
func (h *Head) NumSeries() uint64 {
 | 
						||
	return atomic.LoadUint64(&h.numSeries)
 | 
						||
}
 | 
						||
 | 
						||
// Meta returns meta information about the head.
 | 
						||
// The head is dynamic so will return dynamic results.
 | 
						||
func (h *Head) Meta() BlockMeta {
 | 
						||
	var id [16]byte
 | 
						||
	copy(id[:], "______head______")
 | 
						||
	return BlockMeta{
 | 
						||
		MinTime: h.MinTime(),
 | 
						||
		MaxTime: h.MaxTime(),
 | 
						||
		ULID:    ulid.ULID(id),
 | 
						||
		Stats: BlockStats{
 | 
						||
			NumSeries: h.NumSeries(),
 | 
						||
		},
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
// MinTime returns the lowest time bound on visible data in the head.
 | 
						||
func (h *Head) MinTime() int64 {
 | 
						||
	return atomic.LoadInt64(&h.minTime)
 | 
						||
}
 | 
						||
 | 
						||
// MaxTime returns the highest timestamp seen in data of the head.
 | 
						||
func (h *Head) MaxTime() int64 {
 | 
						||
	return atomic.LoadInt64(&h.maxTime)
 | 
						||
}
 | 
						||
 | 
						||
// compactable returns whether the head has a compactable range.
 | 
						||
// The head has a compactable range when the head time range is 1.5 times the chunk range.
 | 
						||
// The 0.5 acts as a buffer of the appendable window.
 | 
						||
func (h *Head) compactable() bool {
 | 
						||
	return h.MaxTime()-h.MinTime() > h.chunkRange/2*3
 | 
						||
}
 | 
						||
 | 
						||
// Close flushes the WAL and closes the head.
 | 
						||
func (h *Head) Close() error {
 | 
						||
	if h.wal == nil {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
	return h.wal.Close()
 | 
						||
}
 | 
						||
 | 
						||
type headChunkReader struct {
 | 
						||
	head       *Head
 | 
						||
	mint, maxt int64
 | 
						||
}
 | 
						||
 | 
						||
func (h *headChunkReader) Close() error {
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
// packChunkID packs a seriesID and a chunkID within it into a global 8 byte ID.
 | 
						||
// It panicks if the seriesID exceeds 5 bytes or the chunk ID 3 bytes.
 | 
						||
func packChunkID(seriesID, chunkID uint64) uint64 {
 | 
						||
	if seriesID > (1<<40)-1 {
 | 
						||
		panic("series ID exceeds 5 bytes")
 | 
						||
	}
 | 
						||
	if chunkID > (1<<24)-1 {
 | 
						||
		panic("chunk ID exceeds 3 bytes")
 | 
						||
	}
 | 
						||
	return (seriesID << 24) | chunkID
 | 
						||
}
 | 
						||
 | 
						||
func unpackChunkID(id uint64) (seriesID, chunkID uint64) {
 | 
						||
	return id >> 24, (id << 40) >> 40
 | 
						||
}
 | 
						||
 | 
						||
// Chunk returns the chunk for the reference number.
 | 
						||
func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
 | 
						||
	sid, cid := unpackChunkID(ref)
 | 
						||
 | 
						||
	s := h.head.series.getByID(sid)
 | 
						||
	// This means that the series has been garbage collected.
 | 
						||
	if s == nil {
 | 
						||
		return nil, ErrNotFound
 | 
						||
	}
 | 
						||
 | 
						||
	s.Lock()
 | 
						||
	c := s.chunk(int(cid))
 | 
						||
 | 
						||
	// This means that the chunk has been garbage collected or is outside
 | 
						||
	// the specified range.
 | 
						||
	if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
 | 
						||
		s.Unlock()
 | 
						||
		return nil, ErrNotFound
 | 
						||
	}
 | 
						||
	s.Unlock()
 | 
						||
 | 
						||
	return &safeChunk{
 | 
						||
		Chunk: c.chunk,
 | 
						||
		s:     s,
 | 
						||
		cid:   int(cid),
 | 
						||
	}, nil
 | 
						||
}
 | 
						||
 | 
						||
type safeChunk struct {
 | 
						||
	chunkenc.Chunk
 | 
						||
	s   *memSeries
 | 
						||
	cid int
 | 
						||
}
 | 
						||
 | 
						||
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
 | 
						||
	c.s.Lock()
 | 
						||
	it := c.s.iterator(c.cid, reuseIter)
 | 
						||
	c.s.Unlock()
 | 
						||
	return it
 | 
						||
}
 | 
						||
 | 
						||
type headIndexReader struct {
 | 
						||
	head       *Head
 | 
						||
	mint, maxt int64
 | 
						||
}
 | 
						||
 | 
						||
func (h *headIndexReader) Close() error {
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
func (h *headIndexReader) Symbols() index.StringIter {
 | 
						||
	h.head.symMtx.RLock()
 | 
						||
	res := make([]string, 0, len(h.head.symbols))
 | 
						||
 | 
						||
	for s := range h.head.symbols {
 | 
						||
		res = append(res, s)
 | 
						||
	}
 | 
						||
	h.head.symMtx.RUnlock()
 | 
						||
 | 
						||
	sort.Strings(res)
 | 
						||
	return index.NewStringListIter(res)
 | 
						||
}
 | 
						||
 | 
						||
// LabelValues returns the possible label values
 | 
						||
func (h *headIndexReader) LabelValues(name string) ([]string, error) {
 | 
						||
	h.head.symMtx.RLock()
 | 
						||
	sl := make([]string, 0, len(h.head.values[name]))
 | 
						||
	for s := range h.head.values[name] {
 | 
						||
		sl = append(sl, s)
 | 
						||
	}
 | 
						||
	h.head.symMtx.RUnlock()
 | 
						||
	sort.Strings(sl)
 | 
						||
	return sl, nil
 | 
						||
}
 | 
						||
 | 
						||
// LabelNames returns all the unique label names present in the head.
 | 
						||
func (h *headIndexReader) LabelNames() ([]string, error) {
 | 
						||
	h.head.symMtx.RLock()
 | 
						||
	defer h.head.symMtx.RUnlock()
 | 
						||
	labelNames := make([]string, 0, len(h.head.values))
 | 
						||
	for name := range h.head.values {
 | 
						||
		if name == "" {
 | 
						||
			continue
 | 
						||
		}
 | 
						||
		labelNames = append(labelNames, name)
 | 
						||
	}
 | 
						||
	sort.Strings(labelNames)
 | 
						||
	return labelNames, nil
 | 
						||
}
 | 
						||
 | 
						||
// Postings returns the postings list iterator for the label pairs.
 | 
						||
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
 | 
						||
	res := make([]index.Postings, 0, len(values))
 | 
						||
	for _, value := range values {
 | 
						||
		res = append(res, h.head.postings.Get(name, value))
 | 
						||
	}
 | 
						||
	return index.Merge(res...), nil
 | 
						||
}
 | 
						||
 | 
						||
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
 | 
						||
	series := make([]*memSeries, 0, 128)
 | 
						||
 | 
						||
	// Fetch all the series only once.
 | 
						||
	for p.Next() {
 | 
						||
		s := h.head.series.getByID(p.At())
 | 
						||
		if s == nil {
 | 
						||
			level.Debug(h.head.logger).Log("msg", "looked up series not found")
 | 
						||
		} else {
 | 
						||
			series = append(series, s)
 | 
						||
		}
 | 
						||
	}
 | 
						||
	if err := p.Err(); err != nil {
 | 
						||
		return index.ErrPostings(errors.Wrap(err, "expand postings"))
 | 
						||
	}
 | 
						||
 | 
						||
	sort.Slice(series, func(i, j int) bool {
 | 
						||
		return labels.Compare(series[i].lset, series[j].lset) < 0
 | 
						||
	})
 | 
						||
 | 
						||
	// Convert back to list.
 | 
						||
	ep := make([]uint64, 0, len(series))
 | 
						||
	for _, p := range series {
 | 
						||
		ep = append(ep, p.ref)
 | 
						||
	}
 | 
						||
	return index.NewListPostings(ep)
 | 
						||
}
 | 
						||
 | 
						||
// Series returns the series for the given reference.
 | 
						||
func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
 | 
						||
	s := h.head.series.getByID(ref)
 | 
						||
 | 
						||
	if s == nil {
 | 
						||
		h.head.metrics.seriesNotFound.Inc()
 | 
						||
		return ErrNotFound
 | 
						||
	}
 | 
						||
	*lbls = append((*lbls)[:0], s.lset...)
 | 
						||
 | 
						||
	s.Lock()
 | 
						||
	defer s.Unlock()
 | 
						||
 | 
						||
	*chks = (*chks)[:0]
 | 
						||
 | 
						||
	for i, c := range s.chunks {
 | 
						||
		// Do not expose chunks that are outside of the specified range.
 | 
						||
		if !c.OverlapsClosedInterval(h.mint, h.maxt) {
 | 
						||
			continue
 | 
						||
		}
 | 
						||
		// Set the head chunks as open (being appended to).
 | 
						||
		maxTime := c.maxTime
 | 
						||
		if s.headChunk == c {
 | 
						||
			maxTime = math.MaxInt64
 | 
						||
		}
 | 
						||
 | 
						||
		*chks = append(*chks, chunks.Meta{
 | 
						||
			MinTime: c.minTime,
 | 
						||
			MaxTime: maxTime,
 | 
						||
			Ref:     packChunkID(s.ref, uint64(s.chunkID(i))),
 | 
						||
		})
 | 
						||
	}
 | 
						||
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
 | 
						||
	// Just using `getOrSet` below would be semantically sufficient, but we'd create
 | 
						||
	// a new series on every sample inserted via Add(), which causes allocations
 | 
						||
	// and makes our series IDs rather random and harder to compress in postings.
 | 
						||
	s := h.series.getByHash(hash, lset)
 | 
						||
	if s != nil {
 | 
						||
		return s, false
 | 
						||
	}
 | 
						||
 | 
						||
	// Optimistically assume that we are the first one to create the series.
 | 
						||
	id := atomic.AddUint64(&h.lastSeriesID, 1)
 | 
						||
 | 
						||
	return h.getOrCreateWithID(id, hash, lset)
 | 
						||
}
 | 
						||
 | 
						||
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) {
 | 
						||
	s := newMemSeries(lset, id, h.chunkRange)
 | 
						||
 | 
						||
	s, created := h.series.getOrSet(hash, s)
 | 
						||
	if !created {
 | 
						||
		return s, false
 | 
						||
	}
 | 
						||
 | 
						||
	h.metrics.seriesCreated.Inc()
 | 
						||
	atomic.AddUint64(&h.numSeries, 1)
 | 
						||
 | 
						||
	h.postings.Add(id, lset)
 | 
						||
 | 
						||
	h.symMtx.Lock()
 | 
						||
	defer h.symMtx.Unlock()
 | 
						||
 | 
						||
	for _, l := range lset {
 | 
						||
		valset, ok := h.values[l.Name]
 | 
						||
		if !ok {
 | 
						||
			valset = stringset{}
 | 
						||
			h.values[l.Name] = valset
 | 
						||
		}
 | 
						||
		valset.set(l.Value)
 | 
						||
 | 
						||
		h.symbols[l.Name] = struct{}{}
 | 
						||
		h.symbols[l.Value] = struct{}{}
 | 
						||
	}
 | 
						||
 | 
						||
	return s, true
 | 
						||
}
 | 
						||
 | 
						||
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
 | 
						||
// on top of a regular hashmap and holds a slice of series to resolve hash collisions.
 | 
						||
// Its methods require the hash to be submitted with it to avoid re-computations throughout
 | 
						||
// the code.
 | 
						||
type seriesHashmap map[uint64][]*memSeries
 | 
						||
 | 
						||
func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
 | 
						||
	for _, s := range m[hash] {
 | 
						||
		if labels.Equal(s.lset, lset) {
 | 
						||
			return s
 | 
						||
		}
 | 
						||
	}
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
func (m seriesHashmap) set(hash uint64, s *memSeries) {
 | 
						||
	l := m[hash]
 | 
						||
	for i, prev := range l {
 | 
						||
		if labels.Equal(prev.lset, s.lset) {
 | 
						||
			l[i] = s
 | 
						||
			return
 | 
						||
		}
 | 
						||
	}
 | 
						||
	m[hash] = append(l, s)
 | 
						||
}
 | 
						||
 | 
						||
func (m seriesHashmap) del(hash uint64, lset labels.Labels) {
 | 
						||
	var rem []*memSeries
 | 
						||
	for _, s := range m[hash] {
 | 
						||
		if !labels.Equal(s.lset, lset) {
 | 
						||
			rem = append(rem, s)
 | 
						||
		}
 | 
						||
	}
 | 
						||
	if len(rem) == 0 {
 | 
						||
		delete(m, hash)
 | 
						||
	} else {
 | 
						||
		m[hash] = rem
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention.
 | 
						||
// The locks are padded to not be on the same cache line. Filling the padded space
 | 
						||
// with the maps was profiled to be slower – likely due to the additional pointer
 | 
						||
// dereferences.
 | 
						||
type stripeSeries struct {
 | 
						||
	series [stripeSize]map[uint64]*memSeries
 | 
						||
	hashes [stripeSize]seriesHashmap
 | 
						||
	locks  [stripeSize]stripeLock
 | 
						||
}
 | 
						||
 | 
						||
const (
 | 
						||
	stripeSize = 1 << 14
 | 
						||
	stripeMask = stripeSize - 1
 | 
						||
)
 | 
						||
 | 
						||
type stripeLock struct {
 | 
						||
	sync.RWMutex
 | 
						||
	// Padding to avoid multiple locks being on the same cache line.
 | 
						||
	_ [40]byte
 | 
						||
}
 | 
						||
 | 
						||
func newStripeSeries() *stripeSeries {
 | 
						||
	s := &stripeSeries{}
 | 
						||
 | 
						||
	for i := range s.series {
 | 
						||
		s.series[i] = map[uint64]*memSeries{}
 | 
						||
	}
 | 
						||
	for i := range s.hashes {
 | 
						||
		s.hashes[i] = seriesHashmap{}
 | 
						||
	}
 | 
						||
	return s
 | 
						||
}
 | 
						||
 | 
						||
// gc garbage collects old chunks that are strictly before mint and removes
 | 
						||
// series entirely that have no chunks left.
 | 
						||
func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
 | 
						||
	var (
 | 
						||
		deleted  = map[uint64]struct{}{}
 | 
						||
		rmChunks = 0
 | 
						||
	)
 | 
						||
	// Run through all series and truncate old chunks. Mark those with no
 | 
						||
	// chunks left as deleted and store their ID.
 | 
						||
	for i := 0; i < stripeSize; i++ {
 | 
						||
		s.locks[i].Lock()
 | 
						||
 | 
						||
		for hash, all := range s.hashes[i] {
 | 
						||
			for _, series := range all {
 | 
						||
				series.Lock()
 | 
						||
				rmChunks += series.truncateChunksBefore(mint)
 | 
						||
 | 
						||
				if len(series.chunks) > 0 || series.pendingCommit {
 | 
						||
					series.Unlock()
 | 
						||
					continue
 | 
						||
				}
 | 
						||
 | 
						||
				// The series is gone entirely. We need to keep the series lock
 | 
						||
				// and make sure we have acquired the stripe locks for hash and ID of the
 | 
						||
				// series alike.
 | 
						||
				// If we don't hold them all, there's a very small chance that a series receives
 | 
						||
				// samples again while we are half-way into deleting it.
 | 
						||
				j := int(series.ref & stripeMask)
 | 
						||
 | 
						||
				if i != j {
 | 
						||
					s.locks[j].Lock()
 | 
						||
				}
 | 
						||
 | 
						||
				deleted[series.ref] = struct{}{}
 | 
						||
				s.hashes[i].del(hash, series.lset)
 | 
						||
				delete(s.series[j], series.ref)
 | 
						||
 | 
						||
				if i != j {
 | 
						||
					s.locks[j].Unlock()
 | 
						||
				}
 | 
						||
 | 
						||
				series.Unlock()
 | 
						||
			}
 | 
						||
		}
 | 
						||
 | 
						||
		s.locks[i].Unlock()
 | 
						||
	}
 | 
						||
 | 
						||
	return deleted, rmChunks
 | 
						||
}
 | 
						||
 | 
						||
func (s *stripeSeries) getByID(id uint64) *memSeries {
 | 
						||
	i := id & stripeMask
 | 
						||
 | 
						||
	s.locks[i].RLock()
 | 
						||
	series := s.series[i][id]
 | 
						||
	s.locks[i].RUnlock()
 | 
						||
 | 
						||
	return series
 | 
						||
}
 | 
						||
 | 
						||
func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
 | 
						||
	i := hash & stripeMask
 | 
						||
 | 
						||
	s.locks[i].RLock()
 | 
						||
	series := s.hashes[i].get(hash, lset)
 | 
						||
	s.locks[i].RUnlock()
 | 
						||
 | 
						||
	return series
 | 
						||
}
 | 
						||
 | 
						||
func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) {
 | 
						||
	i := hash & stripeMask
 | 
						||
 | 
						||
	s.locks[i].Lock()
 | 
						||
 | 
						||
	if prev := s.hashes[i].get(hash, series.lset); prev != nil {
 | 
						||
		s.locks[i].Unlock()
 | 
						||
		return prev, false
 | 
						||
	}
 | 
						||
	s.hashes[i].set(hash, series)
 | 
						||
	s.locks[i].Unlock()
 | 
						||
 | 
						||
	i = series.ref & stripeMask
 | 
						||
 | 
						||
	s.locks[i].Lock()
 | 
						||
	s.series[i][series.ref] = series
 | 
						||
	s.locks[i].Unlock()
 | 
						||
 | 
						||
	return series, true
 | 
						||
}
 | 
						||
 | 
						||
type sample struct {
 | 
						||
	t int64
 | 
						||
	v float64
 | 
						||
}
 | 
						||
 | 
						||
func (s sample) T() int64 {
 | 
						||
	return s.t
 | 
						||
}
 | 
						||
 | 
						||
func (s sample) V() float64 {
 | 
						||
	return s.v
 | 
						||
}
 | 
						||
 | 
						||
// memSeries is the in-memory representation of a series. None of its methods
 | 
						||
// are goroutine safe and it is the caller's responsibility to lock it.
 | 
						||
type memSeries struct {
 | 
						||
	sync.Mutex
 | 
						||
 | 
						||
	ref          uint64
 | 
						||
	lset         labels.Labels
 | 
						||
	chunks       []*memChunk
 | 
						||
	headChunk    *memChunk
 | 
						||
	chunkRange   int64
 | 
						||
	firstChunkID int
 | 
						||
 | 
						||
	nextAt        int64 // Timestamp at which to cut the next chunk.
 | 
						||
	sampleBuf     [4]sample
 | 
						||
	pendingCommit bool // Whether there are samples waiting to be committed to this series.
 | 
						||
 | 
						||
	app chunkenc.Appender // Current appender for the chunk.
 | 
						||
}
 | 
						||
 | 
						||
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
 | 
						||
	s := &memSeries{
 | 
						||
		lset:       lset,
 | 
						||
		ref:        id,
 | 
						||
		chunkRange: chunkRange,
 | 
						||
		nextAt:     math.MinInt64,
 | 
						||
	}
 | 
						||
	return s
 | 
						||
}
 | 
						||
 | 
						||
func (s *memSeries) minTime() int64 {
 | 
						||
	if len(s.chunks) == 0 {
 | 
						||
		return math.MinInt64
 | 
						||
	}
 | 
						||
	return s.chunks[0].minTime
 | 
						||
}
 | 
						||
 | 
						||
func (s *memSeries) maxTime() int64 {
 | 
						||
	c := s.head()
 | 
						||
	if c == nil {
 | 
						||
		return math.MinInt64
 | 
						||
	}
 | 
						||
	return c.maxTime
 | 
						||
}
 | 
						||
 | 
						||
func (s *memSeries) cut(mint int64) *memChunk {
 | 
						||
	c := &memChunk{
 | 
						||
		chunk:   chunkenc.NewXORChunk(),
 | 
						||
		minTime: mint,
 | 
						||
		maxTime: math.MinInt64,
 | 
						||
	}
 | 
						||
	s.chunks = append(s.chunks, c)
 | 
						||
	s.headChunk = c
 | 
						||
 | 
						||
	// Set upper bound on when the next chunk must be started. An earlier timestamp
 | 
						||
	// may be chosen dynamically at a later point.
 | 
						||
	s.nextAt = rangeForTimestamp(mint, s.chunkRange)
 | 
						||
 | 
						||
	app, err := c.chunk.Appender()
 | 
						||
	if err != nil {
 | 
						||
		panic(err)
 | 
						||
	}
 | 
						||
	s.app = app
 | 
						||
	return c
 | 
						||
}
 | 
						||
 | 
						||
func (s *memSeries) chunksMetas() []chunks.Meta {
 | 
						||
	metas := make([]chunks.Meta, 0, len(s.chunks))
 | 
						||
	for _, chk := range s.chunks {
 | 
						||
		metas = append(metas, chunks.Meta{Chunk: chk.chunk, MinTime: chk.minTime, MaxTime: chk.maxTime})
 | 
						||
	}
 | 
						||
	return metas
 | 
						||
}
 | 
						||
 | 
						||
// reset re-initialises all the variable in the memSeries except 'lset', 'ref',
 | 
						||
// and 'chunkRange', like how it would appear after 'newMemSeries(...)'.
 | 
						||
func (s *memSeries) reset() {
 | 
						||
	s.chunks = nil
 | 
						||
	s.headChunk = nil
 | 
						||
	s.firstChunkID = 0
 | 
						||
	s.nextAt = math.MinInt64
 | 
						||
	s.sampleBuf = [4]sample{}
 | 
						||
	s.pendingCommit = false
 | 
						||
	s.app = nil
 | 
						||
}
 | 
						||
 | 
						||
// appendable checks whether the given sample is valid for appending to the series.
 | 
						||
func (s *memSeries) appendable(t int64, v float64) error {
 | 
						||
	c := s.head()
 | 
						||
	if c == nil {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
 | 
						||
	if t > c.maxTime {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
	if t < c.maxTime {
 | 
						||
		return ErrOutOfOrderSample
 | 
						||
	}
 | 
						||
	// We are allowing exact duplicates as we can encounter them in valid cases
 | 
						||
	// like federation and erroring out at that time would be extremely noisy.
 | 
						||
	if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
 | 
						||
		return ErrAmendSample
 | 
						||
	}
 | 
						||
	return nil
 | 
						||
}
 | 
						||
 | 
						||
func (s *memSeries) chunk(id int) *memChunk {
 | 
						||
	ix := id - s.firstChunkID
 | 
						||
	if ix < 0 || ix >= len(s.chunks) {
 | 
						||
		return nil
 | 
						||
	}
 | 
						||
	return s.chunks[ix]
 | 
						||
}
 | 
						||
 | 
						||
func (s *memSeries) chunkID(pos int) int {
 | 
						||
	return pos + s.firstChunkID
 | 
						||
}
 | 
						||
 | 
						||
// truncateChunksBefore removes all chunks from the series that have not timestamp
 | 
						||
// at or after mint. Chunk IDs remain unchanged.
 | 
						||
func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
 | 
						||
	var k int
 | 
						||
	for i, c := range s.chunks {
 | 
						||
		if c.maxTime >= mint {
 | 
						||
			break
 | 
						||
		}
 | 
						||
		k = i + 1
 | 
						||
	}
 | 
						||
	s.chunks = append(s.chunks[:0], s.chunks[k:]...)
 | 
						||
	s.firstChunkID += k
 | 
						||
	if len(s.chunks) == 0 {
 | 
						||
		s.headChunk = nil
 | 
						||
	} else {
 | 
						||
		s.headChunk = s.chunks[len(s.chunks)-1]
 | 
						||
	}
 | 
						||
 | 
						||
	return k
 | 
						||
}
 | 
						||
 | 
						||
// append adds the sample (t, v) to the series.
 | 
						||
func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
 | 
						||
	// Based on Gorilla white papers this offers near-optimal compression ratio
 | 
						||
	// so anything bigger that this has diminishing returns and increases
 | 
						||
	// the time range within which we have to decompress all samples.
 | 
						||
	const samplesPerChunk = 120
 | 
						||
 | 
						||
	c := s.head()
 | 
						||
 | 
						||
	if c == nil {
 | 
						||
		c = s.cut(t)
 | 
						||
		chunkCreated = true
 | 
						||
	}
 | 
						||
	numSamples := c.chunk.NumSamples()
 | 
						||
 | 
						||
	// Out of order sample.
 | 
						||
	if c.maxTime >= t {
 | 
						||
		return false, chunkCreated
 | 
						||
	}
 | 
						||
	// If we reach 25% of a chunk's desired sample count, set a definitive time
 | 
						||
	// at which to start the next chunk.
 | 
						||
	// At latest it must happen at the timestamp set when the chunk was cut.
 | 
						||
	if numSamples == samplesPerChunk/4 {
 | 
						||
		s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
 | 
						||
	}
 | 
						||
	if t >= s.nextAt {
 | 
						||
		c = s.cut(t)
 | 
						||
		chunkCreated = true
 | 
						||
	}
 | 
						||
	s.app.Append(t, v)
 | 
						||
 | 
						||
	c.maxTime = t
 | 
						||
 | 
						||
	s.sampleBuf[0] = s.sampleBuf[1]
 | 
						||
	s.sampleBuf[1] = s.sampleBuf[2]
 | 
						||
	s.sampleBuf[2] = s.sampleBuf[3]
 | 
						||
	s.sampleBuf[3] = sample{t: t, v: v}
 | 
						||
 | 
						||
	return true, chunkCreated
 | 
						||
}
 | 
						||
 | 
						||
// computeChunkEndTime estimates the end timestamp based the beginning of a chunk,
 | 
						||
// its current timestamp and the upper bound up to which we insert data.
 | 
						||
// It assumes that the time range is 1/4 full.
 | 
						||
func computeChunkEndTime(start, cur, max int64) int64 {
 | 
						||
	a := (max - start) / ((cur - start + 1) * 4)
 | 
						||
	if a == 0 {
 | 
						||
		return max
 | 
						||
	}
 | 
						||
	return start + (max-start)/a
 | 
						||
}
 | 
						||
 | 
						||
func (s *memSeries) iterator(id int, it chunkenc.Iterator) chunkenc.Iterator {
 | 
						||
	c := s.chunk(id)
 | 
						||
	// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
 | 
						||
	// which got then garbage collected before it got accessed.
 | 
						||
	// We must ensure to not garbage collect as long as any readers still hold a reference.
 | 
						||
	if c == nil {
 | 
						||
		return chunkenc.NewNopIterator()
 | 
						||
	}
 | 
						||
 | 
						||
	if id-s.firstChunkID < len(s.chunks)-1 {
 | 
						||
		return c.chunk.Iterator(it)
 | 
						||
	}
 | 
						||
	// Serve the last 4 samples for the last chunk from the sample buffer
 | 
						||
	// as their compressed bytes may be mutated by added samples.
 | 
						||
	if msIter, ok := it.(*memSafeIterator); ok {
 | 
						||
		msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
 | 
						||
		msIter.i = -1
 | 
						||
		msIter.total = c.chunk.NumSamples()
 | 
						||
		msIter.buf = s.sampleBuf
 | 
						||
		return msIter
 | 
						||
	}
 | 
						||
	return &memSafeIterator{
 | 
						||
		Iterator: c.chunk.Iterator(it),
 | 
						||
		i:        -1,
 | 
						||
		total:    c.chunk.NumSamples(),
 | 
						||
		buf:      s.sampleBuf,
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
func (s *memSeries) head() *memChunk {
 | 
						||
	return s.headChunk
 | 
						||
}
 | 
						||
 | 
						||
type memChunk struct {
 | 
						||
	chunk            chunkenc.Chunk
 | 
						||
	minTime, maxTime int64
 | 
						||
}
 | 
						||
 | 
						||
// Returns true if the chunk overlaps [mint, maxt].
 | 
						||
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
 | 
						||
	return mc.minTime <= maxt && mint <= mc.maxTime
 | 
						||
}
 | 
						||
 | 
						||
type memSafeIterator struct {
 | 
						||
	chunkenc.Iterator
 | 
						||
 | 
						||
	i     int
 | 
						||
	total int
 | 
						||
	buf   [4]sample
 | 
						||
}
 | 
						||
 | 
						||
func (it *memSafeIterator) Next() bool {
 | 
						||
	if it.i+1 >= it.total {
 | 
						||
		return false
 | 
						||
	}
 | 
						||
	it.i++
 | 
						||
	if it.total-it.i > 4 {
 | 
						||
		return it.Iterator.Next()
 | 
						||
	}
 | 
						||
	return true
 | 
						||
}
 | 
						||
 | 
						||
func (it *memSafeIterator) At() (int64, float64) {
 | 
						||
	if it.total-it.i > 4 {
 | 
						||
		return it.Iterator.At()
 | 
						||
	}
 | 
						||
	s := it.buf[4-(it.total-it.i)]
 | 
						||
	return s.t, s.v
 | 
						||
}
 | 
						||
 | 
						||
type stringset map[string]struct{}
 | 
						||
 | 
						||
func (ss stringset) set(s string) {
 | 
						||
	ss[s] = struct{}{}
 | 
						||
}
 | 
						||
 | 
						||
func (ss stringset) String() string {
 | 
						||
	return strings.Join(ss.slice(), ",")
 | 
						||
}
 | 
						||
 | 
						||
func (ss stringset) slice() []string {
 | 
						||
	slice := make([]string, 0, len(ss))
 | 
						||
	for k := range ss {
 | 
						||
		slice = append(slice, k)
 | 
						||
	}
 | 
						||
	sort.Strings(slice)
 | 
						||
	return slice
 | 
						||
}
 |