mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-24 22:11:02 +02:00 
			
		
		
		
	* Disable isolation in isolation struct Signed-off-by: darshanime <deathbullet@gmail.com> * Run tsdb tests with isolation disabled Signed-off-by: darshanime <deathbullet@gmail.com> * Check for isolation disabled in isoState.Close() Signed-off-by: darshanime <deathbullet@gmail.com> * use t.Skip to skip isolation tests when disabled Signed-off-by: darshanime <deathbullet@gmail.com> * address review comments Signed-off-by: darshanime <deathbullet@gmail.com> * fix test for defaultIsolationState Signed-off-by: darshanime <deathbullet@gmail.com> * Change flag name. Set flag in DB. Do not init txRing. Close isoState. Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Test disabled isolation in CircleCI test_go Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Skip isolation related tests in db_test.go Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
		
			
				
	
	
		
			503 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			503 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2021 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package tsdb
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"math"
 | |
| 	"sort"
 | |
| 
 | |
| 	"github.com/go-kit/log/level"
 | |
| 	"github.com/pkg/errors"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/model/labels"
 | |
| 	"github.com/prometheus/prometheus/storage"
 | |
| 	"github.com/prometheus/prometheus/tsdb/chunkenc"
 | |
| 	"github.com/prometheus/prometheus/tsdb/chunks"
 | |
| 	"github.com/prometheus/prometheus/tsdb/index"
 | |
| )
 | |
| 
 | |
| func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
 | |
| 	return h.exemplars.ExemplarQuerier(ctx)
 | |
| }
 | |
| 
 | |
| // Index returns an IndexReader against the block.
 | |
| func (h *Head) Index() (IndexReader, error) {
 | |
| 	return h.indexRange(math.MinInt64, math.MaxInt64), nil
 | |
| }
 | |
| 
 | |
| func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
 | |
| 	if hmin := h.MinTime(); hmin > mint {
 | |
| 		mint = hmin
 | |
| 	}
 | |
| 	return &headIndexReader{head: h, mint: mint, maxt: maxt}
 | |
| }
 | |
| 
 | |
| type headIndexReader struct {
 | |
| 	head       *Head
 | |
| 	mint, maxt int64
 | |
| }
 | |
| 
 | |
| func (h *headIndexReader) Close() error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (h *headIndexReader) Symbols() index.StringIter {
 | |
| 	return h.head.postings.Symbols()
 | |
| }
 | |
| 
 | |
| // SortedLabelValues returns label values present in the head for the
 | |
| // specific label name that are within the time range mint to maxt.
 | |
| // If matchers are specified the returned result set is reduced
 | |
| // to label values of metrics matching the matchers.
 | |
| func (h *headIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
 | |
| 	values, err := h.LabelValues(name, matchers...)
 | |
| 	if err == nil {
 | |
| 		sort.Strings(values)
 | |
| 	}
 | |
| 	return values, err
 | |
| }
 | |
| 
 | |
| // LabelValues returns label values present in the head for the
 | |
| // specific label name that are within the time range mint to maxt.
 | |
| // If matchers are specified the returned result set is reduced
 | |
| // to label values of metrics matching the matchers.
 | |
| func (h *headIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
 | |
| 	if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
 | |
| 		return []string{}, nil
 | |
| 	}
 | |
| 
 | |
| 	if len(matchers) == 0 {
 | |
| 		return h.head.postings.LabelValues(name), nil
 | |
| 	}
 | |
| 
 | |
| 	return labelValuesWithMatchers(h, name, matchers...)
 | |
| }
 | |
| 
 | |
| // LabelNames returns all the unique label names present in the head
 | |
| // that are within the time range mint to maxt.
 | |
| func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) {
 | |
| 	if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
 | |
| 		return []string{}, nil
 | |
| 	}
 | |
| 
 | |
| 	if len(matchers) == 0 {
 | |
| 		labelNames := h.head.postings.LabelNames()
 | |
| 		sort.Strings(labelNames)
 | |
| 		return labelNames, nil
 | |
| 	}
 | |
| 
 | |
| 	return labelNamesWithMatchers(h, matchers...)
 | |
| }
 | |
| 
 | |
| // Postings returns the postings list iterator for the label pairs.
 | |
| func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
 | |
| 	res := make([]index.Postings, 0, len(values))
 | |
| 	for _, value := range values {
 | |
| 		res = append(res, h.head.postings.Get(name, value))
 | |
| 	}
 | |
| 	return index.Merge(res...), nil
 | |
| }
 | |
| 
 | |
| func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
 | |
| 	series := make([]*memSeries, 0, 128)
 | |
| 
 | |
| 	// Fetch all the series only once.
 | |
| 	for p.Next() {
 | |
| 		s := h.head.series.getByID(chunks.HeadSeriesRef(p.At()))
 | |
| 		if s == nil {
 | |
| 			level.Debug(h.head.logger).Log("msg", "Looked up series not found")
 | |
| 		} else {
 | |
| 			series = append(series, s)
 | |
| 		}
 | |
| 	}
 | |
| 	if err := p.Err(); err != nil {
 | |
| 		return index.ErrPostings(errors.Wrap(err, "expand postings"))
 | |
| 	}
 | |
| 
 | |
| 	sort.Slice(series, func(i, j int) bool {
 | |
| 		return labels.Compare(series[i].lset, series[j].lset) < 0
 | |
| 	})
 | |
| 
 | |
| 	// Convert back to list.
 | |
| 	ep := make([]storage.SeriesRef, 0, len(series))
 | |
| 	for _, p := range series {
 | |
| 		ep = append(ep, storage.SeriesRef(p.ref))
 | |
| 	}
 | |
| 	return index.NewListPostings(ep)
 | |
| }
 | |
| 
 | |
| // Series returns the series for the given reference.
 | |
| func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chks *[]chunks.Meta) error {
 | |
| 	s := h.head.series.getByID(chunks.HeadSeriesRef(ref))
 | |
| 
 | |
| 	if s == nil {
 | |
| 		h.head.metrics.seriesNotFound.Inc()
 | |
| 		return storage.ErrNotFound
 | |
| 	}
 | |
| 	*lbls = append((*lbls)[:0], s.lset...)
 | |
| 
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 
 | |
| 	*chks = (*chks)[:0]
 | |
| 
 | |
| 	for i, c := range s.mmappedChunks {
 | |
| 		// Do not expose chunks that are outside of the specified range.
 | |
| 		if !c.OverlapsClosedInterval(h.mint, h.maxt) {
 | |
| 			continue
 | |
| 		}
 | |
| 		*chks = append(*chks, chunks.Meta{
 | |
| 			MinTime: c.minTime,
 | |
| 			MaxTime: c.maxTime,
 | |
| 			Ref:     chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
 | |
| 		})
 | |
| 	}
 | |
| 	if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) {
 | |
| 		*chks = append(*chks, chunks.Meta{
 | |
| 			MinTime: s.headChunk.minTime,
 | |
| 			MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to).
 | |
| 			Ref:     chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)))),
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // headChunkID returns the HeadChunkID corresponding to .mmappedChunks[pos]
 | |
| func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
 | |
| 	return chunks.HeadChunkID(pos) + s.firstChunkID
 | |
| }
 | |
| 
 | |
| // LabelValueFor returns label value for the given label name in the series referred to by ID.
 | |
| func (h *headIndexReader) LabelValueFor(id storage.SeriesRef, label string) (string, error) {
 | |
| 	memSeries := h.head.series.getByID(chunks.HeadSeriesRef(id))
 | |
| 	if memSeries == nil {
 | |
| 		return "", storage.ErrNotFound
 | |
| 	}
 | |
| 
 | |
| 	value := memSeries.lset.Get(label)
 | |
| 	if value == "" {
 | |
| 		return "", storage.ErrNotFound
 | |
| 	}
 | |
| 
 | |
| 	return value, nil
 | |
| }
 | |
| 
 | |
| // LabelNamesFor returns all the label names for the series referred to by IDs.
 | |
| // The names returned are sorted.
 | |
| func (h *headIndexReader) LabelNamesFor(ids ...storage.SeriesRef) ([]string, error) {
 | |
| 	namesMap := make(map[string]struct{})
 | |
| 	for _, id := range ids {
 | |
| 		memSeries := h.head.series.getByID(chunks.HeadSeriesRef(id))
 | |
| 		if memSeries == nil {
 | |
| 			return nil, storage.ErrNotFound
 | |
| 		}
 | |
| 		for _, lbl := range memSeries.lset {
 | |
| 			namesMap[lbl.Name] = struct{}{}
 | |
| 		}
 | |
| 	}
 | |
| 	names := make([]string, 0, len(namesMap))
 | |
| 	for name := range namesMap {
 | |
| 		names = append(names, name)
 | |
| 	}
 | |
| 	sort.Strings(names)
 | |
| 	return names, nil
 | |
| }
 | |
| 
 | |
| // Chunks returns a ChunkReader against the block.
 | |
| func (h *Head) Chunks() (ChunkReader, error) {
 | |
| 	return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State(math.MinInt64, math.MaxInt64))
 | |
| }
 | |
| 
 | |
| func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) {
 | |
| 	h.closedMtx.Lock()
 | |
| 	defer h.closedMtx.Unlock()
 | |
| 	if h.closed {
 | |
| 		return nil, errors.New("can't read from a closed head")
 | |
| 	}
 | |
| 	if hmin := h.MinTime(); hmin > mint {
 | |
| 		mint = hmin
 | |
| 	}
 | |
| 	return &headChunkReader{
 | |
| 		head:     h,
 | |
| 		mint:     mint,
 | |
| 		maxt:     maxt,
 | |
| 		isoState: is,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| type headChunkReader struct {
 | |
| 	head       *Head
 | |
| 	mint, maxt int64
 | |
| 	isoState   *isolationState
 | |
| }
 | |
| 
 | |
| func (h *headChunkReader) Close() error {
 | |
| 	h.isoState.Close()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Chunk returns the chunk for the reference number.
 | |
| func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
 | |
| 	sid, cid := chunks.HeadChunkRef(ref).Unpack()
 | |
| 
 | |
| 	s := h.head.series.getByID(sid)
 | |
| 	// This means that the series has been garbage collected.
 | |
| 	if s == nil {
 | |
| 		return nil, storage.ErrNotFound
 | |
| 	}
 | |
| 
 | |
| 	s.Lock()
 | |
| 	c, garbageCollect, err := s.chunk(cid, h.head.chunkDiskMapper)
 | |
| 	if err != nil {
 | |
| 		s.Unlock()
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		if garbageCollect {
 | |
| 			// Set this to nil so that Go GC can collect it after it has been used.
 | |
| 			c.chunk = nil
 | |
| 			s.memChunkPool.Put(c)
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	// This means that the chunk is outside the specified range.
 | |
| 	if !c.OverlapsClosedInterval(h.mint, h.maxt) {
 | |
| 		s.Unlock()
 | |
| 		return nil, storage.ErrNotFound
 | |
| 	}
 | |
| 	s.Unlock()
 | |
| 
 | |
| 	return &safeChunk{
 | |
| 		Chunk:           c.chunk,
 | |
| 		s:               s,
 | |
| 		cid:             cid,
 | |
| 		isoState:        h.isoState,
 | |
| 		chunkDiskMapper: h.head.chunkDiskMapper,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk.
 | |
| // If garbageCollect is true, it means that the returned *memChunk
 | |
| // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
 | |
| func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
 | |
| 	// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
 | |
| 	// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
 | |
| 	// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
 | |
| 	// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
 | |
| 	ix := int(id) - int(s.firstChunkID)
 | |
| 	if ix < 0 || ix > len(s.mmappedChunks) {
 | |
| 		return nil, false, storage.ErrNotFound
 | |
| 	}
 | |
| 	if ix == len(s.mmappedChunks) {
 | |
| 		if s.headChunk == nil {
 | |
| 			return nil, false, errors.New("invalid head chunk")
 | |
| 		}
 | |
| 		return s.headChunk, false, nil
 | |
| 	}
 | |
| 	chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
 | |
| 	if err != nil {
 | |
| 		if _, ok := err.(*chunks.CorruptionErr); ok {
 | |
| 			panic(err)
 | |
| 		}
 | |
| 		return nil, false, err
 | |
| 	}
 | |
| 	mc := s.memChunkPool.Get().(*memChunk)
 | |
| 	mc.chunk = chk
 | |
| 	mc.minTime = s.mmappedChunks[ix].minTime
 | |
| 	mc.maxTime = s.mmappedChunks[ix].maxTime
 | |
| 	return mc, true, nil
 | |
| }
 | |
| 
 | |
| type safeChunk struct {
 | |
| 	chunkenc.Chunk
 | |
| 	s               *memSeries
 | |
| 	cid             chunks.HeadChunkID
 | |
| 	isoState        *isolationState
 | |
| 	chunkDiskMapper *chunks.ChunkDiskMapper
 | |
| }
 | |
| 
 | |
| func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
 | |
| 	c.s.Lock()
 | |
| 	it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, reuseIter)
 | |
| 	c.s.Unlock()
 | |
| 	return it
 | |
| }
 | |
| 
 | |
| // iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
 | |
| // It is unsafe to call this concurrently with s.append(...) without holding the series lock.
 | |
| func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
 | |
| 	c, garbageCollect, err := s.chunk(id, chunkDiskMapper)
 | |
| 	// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
 | |
| 	// series's chunk, which got then garbage collected before it got
 | |
| 	// accessed.  We must ensure to not garbage collect as long as any
 | |
| 	// readers still hold a reference.
 | |
| 	if err != nil {
 | |
| 		return chunkenc.NewNopIterator()
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		if garbageCollect {
 | |
| 			// Set this to nil so that Go GC can collect it after it has been used.
 | |
| 			// This should be done always at the end.
 | |
| 			c.chunk = nil
 | |
| 			s.memChunkPool.Put(c)
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	ix := int(id) - int(s.firstChunkID)
 | |
| 
 | |
| 	numSamples := c.chunk.NumSamples()
 | |
| 	stopAfter := numSamples
 | |
| 
 | |
| 	if isoState != nil && !isoState.IsolationDisabled() {
 | |
| 		totalSamples := 0    // Total samples in this series.
 | |
| 		previousSamples := 0 // Samples before this chunk.
 | |
| 
 | |
| 		for j, d := range s.mmappedChunks {
 | |
| 			totalSamples += int(d.numSamples)
 | |
| 			if j < ix {
 | |
| 				previousSamples += int(d.numSamples)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if s.headChunk != nil {
 | |
| 			totalSamples += s.headChunk.chunk.NumSamples()
 | |
| 		}
 | |
| 
 | |
| 		// Removing the extra transactionIDs that are relevant for samples that
 | |
| 		// come after this chunk, from the total transactionIDs.
 | |
| 		appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples))
 | |
| 
 | |
| 		// Iterate over the appendIDs, find the first one that the isolation state says not
 | |
| 		// to return.
 | |
| 		it := s.txs.iterator()
 | |
| 		for index := 0; index < appendIDsToConsider; index++ {
 | |
| 			appendID := it.At()
 | |
| 			if appendID <= isoState.maxAppendID { // Easy check first.
 | |
| 				if _, ok := isoState.incompleteAppends[appendID]; !ok {
 | |
| 					it.Next()
 | |
| 					continue
 | |
| 				}
 | |
| 			}
 | |
| 			stopAfter = numSamples - (appendIDsToConsider - index)
 | |
| 			if stopAfter < 0 {
 | |
| 				stopAfter = 0 // Stopped in a previous chunk.
 | |
| 			}
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if stopAfter == 0 {
 | |
| 		return chunkenc.NewNopIterator()
 | |
| 	}
 | |
| 
 | |
| 	if int(id)-int(s.firstChunkID) < len(s.mmappedChunks) {
 | |
| 		if stopAfter == numSamples {
 | |
| 			return c.chunk.Iterator(it)
 | |
| 		}
 | |
| 		if msIter, ok := it.(*stopIterator); ok {
 | |
| 			msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
 | |
| 			msIter.i = -1
 | |
| 			msIter.stopAfter = stopAfter
 | |
| 			return msIter
 | |
| 		}
 | |
| 		return &stopIterator{
 | |
| 			Iterator:  c.chunk.Iterator(it),
 | |
| 			i:         -1,
 | |
| 			stopAfter: stopAfter,
 | |
| 		}
 | |
| 	}
 | |
| 	// Serve the last 4 samples for the last chunk from the sample buffer
 | |
| 	// as their compressed bytes may be mutated by added samples.
 | |
| 	if msIter, ok := it.(*memSafeIterator); ok {
 | |
| 		msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
 | |
| 		msIter.i = -1
 | |
| 		msIter.total = numSamples
 | |
| 		msIter.stopAfter = stopAfter
 | |
| 		msIter.buf = s.sampleBuf
 | |
| 		return msIter
 | |
| 	}
 | |
| 	return &memSafeIterator{
 | |
| 		stopIterator: stopIterator{
 | |
| 			Iterator:  c.chunk.Iterator(it),
 | |
| 			i:         -1,
 | |
| 			stopAfter: stopAfter,
 | |
| 		},
 | |
| 		total: numSamples,
 | |
| 		buf:   s.sampleBuf,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // memSafeIterator returns values from the wrapped stopIterator
 | |
| // except the last 4, which come from buf.
 | |
| type memSafeIterator struct {
 | |
| 	stopIterator
 | |
| 
 | |
| 	total int
 | |
| 	buf   [4]sample
 | |
| }
 | |
| 
 | |
| func (it *memSafeIterator) Seek(t int64) bool {
 | |
| 	if it.Err() != nil {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	ts, _ := it.At()
 | |
| 
 | |
| 	for t > ts || it.i == -1 {
 | |
| 		if !it.Next() {
 | |
| 			return false
 | |
| 		}
 | |
| 		ts, _ = it.At()
 | |
| 	}
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (it *memSafeIterator) Next() bool {
 | |
| 	if it.i+1 >= it.stopAfter {
 | |
| 		return false
 | |
| 	}
 | |
| 	it.i++
 | |
| 	if it.total-it.i > 4 {
 | |
| 		return it.Iterator.Next()
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (it *memSafeIterator) At() (int64, float64) {
 | |
| 	if it.total-it.i > 4 {
 | |
| 		return it.Iterator.At()
 | |
| 	}
 | |
| 	s := it.buf[4-(it.total-it.i)]
 | |
| 	return s.t, s.v
 | |
| }
 | |
| 
 | |
| // stopIterator wraps an Iterator, but only returns the first
 | |
| // stopAfter values, if initialized with i=-1.
 | |
| type stopIterator struct {
 | |
| 	chunkenc.Iterator
 | |
| 
 | |
| 	i, stopAfter int
 | |
| }
 | |
| 
 | |
| func (it *stopIterator) Next() bool {
 | |
| 	if it.i+1 >= it.stopAfter {
 | |
| 		return false
 | |
| 	}
 | |
| 	it.i++
 | |
| 	return it.Iterator.Next()
 | |
| }
 |