mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-31 00:11:23 +01:00 
			
		
		
		
	* tsdb: add a basic test for read/write isolation * tsdb: store the min time with isolationAppender So that we can see when appending has moved past a certain point in time. * tsdb: allow RangeHead to have isolation disabled This will be used when for head compaction. * tsdb: do head compaction with isolation disabled This saves a lot of work tracking appends done while compaction is ongoing. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
		
			
				
	
	
		
			312 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			312 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package tsdb
 | |
| 
 | |
| import (
 | |
| 	"math"
 | |
| 	"sync"
 | |
| )
 | |
| 
 | |
| // isolationState holds the isolation information.
 | |
| type isolationState struct {
 | |
| 	// We will ignore all appends above the max, or that are incomplete.
 | |
| 	maxAppendID       uint64
 | |
| 	incompleteAppends map[uint64]struct{}
 | |
| 	lowWatermark      uint64 // Lowest of incompleteAppends/maxAppendID.
 | |
| 	isolation         *isolation
 | |
| 	mint, maxt        int64 // Time ranges of the read.
 | |
| 
 | |
| 	// Doubly linked list of active reads.
 | |
| 	next *isolationState
 | |
| 	prev *isolationState
 | |
| }
 | |
| 
 | |
| // Close closes the state.
 | |
| func (i *isolationState) Close() {
 | |
| 	i.isolation.readMtx.Lock()
 | |
| 	defer i.isolation.readMtx.Unlock()
 | |
| 	i.next.prev = i.prev
 | |
| 	i.prev.next = i.next
 | |
| }
 | |
| 
 | |
| func (i *isolationState) IsolationDisabled() bool {
 | |
| 	return i.isolation.disabled
 | |
| }
 | |
| 
 | |
| type isolationAppender struct {
 | |
| 	appendID uint64
 | |
| 	minTime  int64
 | |
| 	prev     *isolationAppender
 | |
| 	next     *isolationAppender
 | |
| }
 | |
| 
 | |
| // isolation is the global isolation state.
 | |
| type isolation struct {
 | |
| 	// Mutex for accessing lastAppendID and appendsOpen.
 | |
| 	appendMtx sync.RWMutex
 | |
| 	// Which appends are currently in progress.
 | |
| 	appendsOpen map[uint64]*isolationAppender
 | |
| 	// New appenders with higher appendID are added to the end. First element keeps lastAppendId.
 | |
| 	// appendsOpenList.next points to the first element and appendsOpenList.prev points to the last element.
 | |
| 	// If there are no appenders, both point back to appendsOpenList.
 | |
| 	appendsOpenList *isolationAppender
 | |
| 	// Pool of reusable *isolationAppender to save on allocations.
 | |
| 	appendersPool sync.Pool
 | |
| 
 | |
| 	// Mutex for accessing readsOpen.
 | |
| 	// If taking both appendMtx and readMtx, take appendMtx first.
 | |
| 	readMtx sync.RWMutex
 | |
| 	// All current in use isolationStates. This is a doubly-linked list.
 | |
| 	readsOpen *isolationState
 | |
| 	// If true, writes are not tracked while reads are still tracked.
 | |
| 	disabled bool
 | |
| }
 | |
| 
 | |
| func newIsolation(disabled bool) *isolation {
 | |
| 	isoState := &isolationState{}
 | |
| 	isoState.next = isoState
 | |
| 	isoState.prev = isoState
 | |
| 
 | |
| 	appender := &isolationAppender{}
 | |
| 	appender.next = appender
 | |
| 	appender.prev = appender
 | |
| 
 | |
| 	return &isolation{
 | |
| 		appendsOpen:     map[uint64]*isolationAppender{},
 | |
| 		appendsOpenList: appender,
 | |
| 		readsOpen:       isoState,
 | |
| 		disabled:        disabled,
 | |
| 		appendersPool:   sync.Pool{New: func() interface{} { return &isolationAppender{} }},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // lowWatermark returns the appendID below which we no longer need to track
 | |
| // which appends were from which appendID.
 | |
| func (i *isolation) lowWatermark() uint64 {
 | |
| 	if i.disabled {
 | |
| 		return 0
 | |
| 	}
 | |
| 
 | |
| 	i.appendMtx.RLock() // Take appendMtx first.
 | |
| 	defer i.appendMtx.RUnlock()
 | |
| 	return i.lowWatermarkLocked()
 | |
| }
 | |
| 
 | |
| func (i *isolation) lowWatermarkLocked() uint64 {
 | |
| 	if i.disabled {
 | |
| 		return 0
 | |
| 	}
 | |
| 
 | |
| 	i.readMtx.RLock()
 | |
| 	defer i.readMtx.RUnlock()
 | |
| 	if i.readsOpen.prev != i.readsOpen {
 | |
| 		return i.readsOpen.prev.lowWatermark
 | |
| 	}
 | |
| 
 | |
| 	// Lowest appendID from appenders, or lastAppendId.
 | |
| 	return i.appendsOpenList.next.appendID
 | |
| }
 | |
| 
 | |
| // lowestAppendTime returns the lowest minTime for any open appender,
 | |
| // or math.MaxInt64 if no open appenders.
 | |
| func (i *isolation) lowestAppendTime() int64 {
 | |
| 	var lowest int64 = math.MaxInt64
 | |
| 	i.appendMtx.RLock()
 | |
| 	defer i.appendMtx.RUnlock()
 | |
| 
 | |
| 	for a := i.appendsOpenList.next; a != i.appendsOpenList; a = a.next {
 | |
| 		if lowest > a.minTime {
 | |
| 			lowest = a.minTime
 | |
| 		}
 | |
| 	}
 | |
| 	return lowest
 | |
| }
 | |
| 
 | |
| // State returns an object used to control isolation
 | |
| // between a query and appends. Must be closed when complete.
 | |
| func (i *isolation) State(mint, maxt int64) *isolationState {
 | |
| 	i.appendMtx.RLock() // Take append mutex before read mutex.
 | |
| 	defer i.appendMtx.RUnlock()
 | |
| 
 | |
| 	// We need to track reads even when isolation is disabled, so that head
 | |
| 	// truncation can wait till reads overlapping that range have finished.
 | |
| 	isoState := &isolationState{
 | |
| 		maxAppendID:       i.appendsOpenList.appendID,
 | |
| 		lowWatermark:      i.appendsOpenList.next.appendID, // Lowest appendID from appenders, or lastAppendId.
 | |
| 		incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)),
 | |
| 		isolation:         i,
 | |
| 		mint:              mint,
 | |
| 		maxt:              maxt,
 | |
| 	}
 | |
| 	for k := range i.appendsOpen {
 | |
| 		isoState.incompleteAppends[k] = struct{}{}
 | |
| 	}
 | |
| 
 | |
| 	i.readMtx.Lock()
 | |
| 	defer i.readMtx.Unlock()
 | |
| 	isoState.prev = i.readsOpen
 | |
| 	isoState.next = i.readsOpen.next
 | |
| 	i.readsOpen.next.prev = isoState
 | |
| 	i.readsOpen.next = isoState
 | |
| 
 | |
| 	return isoState
 | |
| }
 | |
| 
 | |
| // TraverseOpenReads iterates through the open reads and runs the given
 | |
| // function on those states. The given function MUST NOT mutate the isolationState.
 | |
| // The iteration is stopped when the function returns false or once all reads have been iterated.
 | |
| func (i *isolation) TraverseOpenReads(f func(s *isolationState) bool) {
 | |
| 	i.readMtx.RLock()
 | |
| 	defer i.readMtx.RUnlock()
 | |
| 	s := i.readsOpen.next
 | |
| 	for s != i.readsOpen {
 | |
| 		if !f(s) {
 | |
| 			return
 | |
| 		}
 | |
| 		s = s.next
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // newAppendID increments the transaction counter and returns a new transaction
 | |
| // ID. The first ID returned is 1.
 | |
| // Also returns the low watermark, to keep lock/unlock operations down.
 | |
| func (i *isolation) newAppendID(minTime int64) (uint64, uint64) {
 | |
| 	if i.disabled {
 | |
| 		return 0, 0
 | |
| 	}
 | |
| 
 | |
| 	i.appendMtx.Lock()
 | |
| 	defer i.appendMtx.Unlock()
 | |
| 
 | |
| 	// Last used appendID is stored in head element.
 | |
| 	i.appendsOpenList.appendID++
 | |
| 
 | |
| 	app := i.appendersPool.Get().(*isolationAppender)
 | |
| 	app.appendID = i.appendsOpenList.appendID
 | |
| 	app.minTime = minTime
 | |
| 	app.prev = i.appendsOpenList.prev
 | |
| 	app.next = i.appendsOpenList
 | |
| 
 | |
| 	i.appendsOpenList.prev.next = app
 | |
| 	i.appendsOpenList.prev = app
 | |
| 
 | |
| 	i.appendsOpen[app.appendID] = app
 | |
| 	return app.appendID, i.lowWatermarkLocked()
 | |
| }
 | |
| 
 | |
| func (i *isolation) lastAppendID() uint64 {
 | |
| 	if i.disabled {
 | |
| 		return 0
 | |
| 	}
 | |
| 
 | |
| 	i.appendMtx.RLock()
 | |
| 	defer i.appendMtx.RUnlock()
 | |
| 
 | |
| 	return i.appendsOpenList.appendID
 | |
| }
 | |
| 
 | |
| func (i *isolation) closeAppend(appendID uint64) {
 | |
| 	if i.disabled {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	i.appendMtx.Lock()
 | |
| 	defer i.appendMtx.Unlock()
 | |
| 
 | |
| 	app := i.appendsOpen[appendID]
 | |
| 	if app != nil {
 | |
| 		app.prev.next = app.next
 | |
| 		app.next.prev = app.prev
 | |
| 
 | |
| 		delete(i.appendsOpen, appendID)
 | |
| 
 | |
| 		// Clear all fields, and return to the pool.
 | |
| 		*app = isolationAppender{}
 | |
| 		i.appendersPool.Put(app)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // The transactionID ring buffer.
 | |
| type txRing struct {
 | |
| 	txIDs     []uint64
 | |
| 	txIDFirst int // Position of the first id in the ring.
 | |
| 	txIDCount int // How many ids in the ring.
 | |
| }
 | |
| 
 | |
| func newTxRing(cap int) *txRing {
 | |
| 	return &txRing{
 | |
| 		txIDs: make([]uint64, cap),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (txr *txRing) add(appendID uint64) {
 | |
| 	if txr.txIDCount == len(txr.txIDs) {
 | |
| 		// Ring buffer is full, expand by doubling.
 | |
| 		newRing := make([]uint64, txr.txIDCount*2)
 | |
| 		idx := copy(newRing[:], txr.txIDs[txr.txIDFirst:])
 | |
| 		copy(newRing[idx:], txr.txIDs[:txr.txIDFirst])
 | |
| 		txr.txIDs = newRing
 | |
| 		txr.txIDFirst = 0
 | |
| 	}
 | |
| 
 | |
| 	txr.txIDs[(txr.txIDFirst+txr.txIDCount)%len(txr.txIDs)] = appendID
 | |
| 	txr.txIDCount++
 | |
| }
 | |
| 
 | |
| func (txr *txRing) cleanupAppendIDsBelow(bound uint64) {
 | |
| 	pos := txr.txIDFirst
 | |
| 
 | |
| 	for txr.txIDCount > 0 {
 | |
| 		if txr.txIDs[pos] < bound {
 | |
| 			txr.txIDFirst++
 | |
| 			txr.txIDCount--
 | |
| 		} else {
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		pos++
 | |
| 		if pos == len(txr.txIDs) {
 | |
| 			pos = 0
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	txr.txIDFirst %= len(txr.txIDs)
 | |
| }
 | |
| 
 | |
| func (txr *txRing) iterator() *txRingIterator {
 | |
| 	return &txRingIterator{
 | |
| 		pos: txr.txIDFirst,
 | |
| 		ids: txr.txIDs,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // txRingIterator lets you iterate over the ring. It doesn't terminate,
 | |
| // it DOESN'T terminate.
 | |
| type txRingIterator struct {
 | |
| 	ids []uint64
 | |
| 
 | |
| 	pos int
 | |
| }
 | |
| 
 | |
| func (it *txRingIterator) At() uint64 {
 | |
| 	return it.ids[it.pos]
 | |
| }
 | |
| 
 | |
| func (it *txRingIterator) Next() {
 | |
| 	it.pos++
 | |
| 	if it.pos == len(it.ids) {
 | |
| 		it.pos = 0
 | |
| 	}
 | |
| }
 |