diff --git a/.circleci/config.yml b/.circleci/config.yml index dafcbea5f5..5584864062 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -36,6 +36,7 @@ jobs: GOOPTS: "-p 2" GOMAXPROCS: "2" GO111MODULE: "on" + - run: go test ./tsdb/ -test.tsdb-isolation=false - prometheus/check_proto: version: "3.15.8" - prometheus/store_artifact: @@ -93,6 +94,7 @@ jobs: steps: - checkout - run: go test ./tsdb/... + - run: go test ./tsdb/ -test.tsdb-isolation=false test_mixins: executor: golang diff --git a/tsdb/db.go b/tsdb/db.go index 5a04e19777..f00bd39f9b 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -79,6 +79,7 @@ func DefaultOptions() *Options { WALCompression: false, StripeSize: DefaultStripeSize, HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, + IsolationDisabled: defaultIsolationDisabled, } } @@ -143,7 +144,7 @@ type Options struct { // mainly meant for external users who import TSDB. BlocksToDelete BlocksToDeleteFunc - // Enables the in memory exemplar storage,. + // Enables the in memory exemplar storage. EnableExemplarStorage bool // Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster. @@ -152,6 +153,9 @@ type Options struct { // MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory. // See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage. MaxExemplars int64 + + // Disables isolation between reads and in-flight appends. + IsolationDisabled bool } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -705,6 +709,10 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.EnableExemplarStorage = opts.EnableExemplarStorage headOpts.MaxExemplars.Store(opts.MaxExemplars) headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown + if opts.IsolationDisabled { + // We only override this flag if isolation is disabled at DB level. We use the default otherwise. + headOpts.IsolationDisabled = opts.IsolationDisabled + } db.head, err = NewHead(r, l, wlog, headOpts, stats.Head) if err != nil { return nil, err diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 4448c65230..a6e85f90cb 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -17,6 +17,7 @@ import ( "bufio" "context" "encoding/binary" + "flag" "fmt" "hash/crc32" "io/ioutil" @@ -54,6 +55,11 @@ import ( ) func TestMain(m *testing.M) { + var isolationEnabled bool + flag.BoolVar(&isolationEnabled, "test.tsdb-isolation", true, "enable isolation") + flag.Parse() + defaultIsolationDisabled = !isolationEnabled + goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func1"), goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func2")) } @@ -2407,6 +2413,10 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { } func TestDBCannotSeePartialCommits(t *testing.T) { + if defaultIsolationDisabled { + t.Skip("skipping test since tsdb isolation is disabled") + } + tmpdir, _ := ioutil.TempDir("", "test") defer func() { require.NoError(t, os.RemoveAll(tmpdir)) @@ -2477,6 +2487,10 @@ func TestDBCannotSeePartialCommits(t *testing.T) { } func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { + if defaultIsolationDisabled { + t.Skip("skipping test since tsdb isolation is disabled") + } + tmpdir, _ := ioutil.TempDir("", "test") defer func() { require.NoError(t, os.RemoveAll(tmpdir)) diff --git a/tsdb/head.go b/tsdb/head.go index b4806addcb..dff2b0c675 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -52,6 +52,9 @@ var ( // ErrAppenderClosed is returned if an appender has already be successfully // rolled back or committed. ErrAppenderClosed = errors.New("appender closed") + + // defaultIsolationDisabled is true if isolation is disabled by default. + defaultIsolationDisabled = false ) // Head handles reads and writes of time series data within a time window. @@ -131,6 +134,8 @@ type HeadOptions struct { SeriesCallback SeriesLifecycleCallback EnableExemplarStorage bool EnableMemorySnapshotOnShutdown bool + + IsolationDisabled bool } func DefaultHeadOptions() *HeadOptions { @@ -141,6 +146,7 @@ func DefaultHeadOptions() *HeadOptions { ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, StripeSize: DefaultStripeSize, SeriesCallback: &noopSeriesLifecycleCallback{}, + IsolationDisabled: defaultIsolationDisabled, } } @@ -230,12 +236,13 @@ func (h *Head) resetInMemoryState() error { return err } + h.iso = newIsolation(h.opts.IsolationDisabled) + h.exemplarMetrics = em h.exemplars = es h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback) h.postings = index.NewUnorderedMemPostings() h.tombstones = tombstones.NewMemTombstones() - h.iso = newIsolation() h.deleted = map[chunks.HeadSeriesRef]int{} h.chunkRange.Store(h.opts.ChunkRange) h.minTime.Store(math.MaxInt64) @@ -1226,7 +1233,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool) + return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool, h.opts.IsolationDisabled) }) if err != nil { return nil, false, err @@ -1503,18 +1510,21 @@ type memSeries struct { memChunkPool *sync.Pool + // txs is nil if isolation is disabled. txs *txRing } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, memChunkPool *sync.Pool) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, memChunkPool *sync.Pool, isolationDisabled bool) *memSeries { s := &memSeries{ lset: lset, ref: id, chunkRange: chunkRange, nextAt: math.MinInt64, - txs: newTxRing(4), memChunkPool: memChunkPool, } + if !isolationDisabled { + s.txs = newTxRing(4) + } return s } @@ -1567,7 +1577,9 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { // cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after // acquiring lock. func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { - s.txs.cleanupAppendIDsBelow(bound) + if s.txs != nil { + s.txs.cleanupAppendIDsBelow(bound) + } } func (s *memSeries) head() *memChunk { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index f270cb79e9..4a14785d46 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -524,7 +524,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} - if appendID > 0 { + if appendID > 0 && s.txs != nil { s.txs.add(appendID) } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 965db74bff..8d34a54439 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -361,7 +361,7 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch numSamples := c.chunk.NumSamples() stopAfter := numSamples - if isoState != nil { + if isoState != nil && !isoState.IsolationDisabled() { totalSamples := 0 // Total samples in this series. previousSamples := 0 // Samples before this chunk. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 99a0cdb0d7..f9030f5ea0 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -60,6 +60,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. opts.ChunkDirRoot = dir opts.EnableExemplarStorage = true opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) + h, err := NewHead(nil, nil, wlog, opts, nil) require.NoError(t, err) @@ -227,7 +228,7 @@ func BenchmarkLoadWAL(b *testing.B) { require.NoError(b, err) for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. - s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, c.mmappedChunkT, nil) + s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, c.mmappedChunkT, nil, defaultIsolationDisabled) s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper) s.mmapCurrentHeadChunk(chunkDiskMapper) } @@ -551,7 +552,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { }, } - s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool) + s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool, defaultIsolationDisabled) for i := 0; i < 4000; i += 5 { ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) @@ -1089,7 +1090,7 @@ func TestMemSeries_append(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - s := newMemSeries(labels.Labels{}, 1, 500, nil) + s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled) // Add first two samples at the very end of a chunk range and the next two // on and after it. @@ -1547,6 +1548,10 @@ func TestAddDuplicateLabelName(t *testing.T) { } func TestMemSeriesIsolation(t *testing.T) { + if defaultIsolationDisabled { + t.Skip("skipping test since tsdb isolation is disabled") + } + // Put a series, select it. GC it and then access it. lastValue := func(h *Head, maxAppendID uint64) int { idx, err := h.Index() @@ -1718,6 +1723,10 @@ func TestMemSeriesIsolation(t *testing.T) { } func TestIsolationRollback(t *testing.T) { + if defaultIsolationDisabled { + t.Skip("skipping test since tsdb isolation is disabled") + } + // Rollback after a failed append and test if the low watermark has progressed anyway. hb, _ := newTestHead(t, 1000, false) defer func() { @@ -1746,6 +1755,10 @@ func TestIsolationRollback(t *testing.T) { } func TestIsolationLowWatermarkMonotonous(t *testing.T) { + if defaultIsolationDisabled { + t.Skip("skipping test since tsdb isolation is disabled") + } + hb, _ := newTestHead(t, 1000, false) defer func() { require.NoError(t, hb.Close()) @@ -1779,6 +1792,10 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { } func TestIsolationAppendIDZeroIsNoop(t *testing.T) { + if defaultIsolationDisabled { + t.Skip("skipping test since tsdb isolation is disabled") + } + h, _ := newTestHead(t, 1000, false) defer func() { require.NoError(t, h.Close()) @@ -1800,6 +1817,10 @@ func TestHeadSeriesChunkRace(t *testing.T) { } func TestIsolationWithoutAdd(t *testing.T) { + if defaultIsolationDisabled { + t.Skip("skipping test since tsdb isolation is disabled") + } + hb, _ := newTestHead(t, 1000, false) defer func() { require.NoError(t, hb.Close()) @@ -2255,7 +2276,7 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - s := newMemSeries(labels.Labels{}, 1, 500, nil) + s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled) for i := 0; i < 7; i++ { ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) diff --git a/tsdb/isolation.go b/tsdb/isolation.go index ca6ce1980f..4919bfe915 100644 --- a/tsdb/isolation.go +++ b/tsdb/isolation.go @@ -39,6 +39,10 @@ func (i *isolationState) Close() { i.prev.next = i.next } +func (i *isolationState) IsolationDisabled() bool { + return i.isolation.disabled +} + type isolationAppender struct { appendID uint64 prev *isolationAppender @@ -63,9 +67,11 @@ type isolation struct { readMtx sync.RWMutex // All current in use isolationStates. This is a doubly-linked list. readsOpen *isolationState + // If true, writes are not tracked while reads are still tracked. + disabled bool } -func newIsolation() *isolation { +func newIsolation(disabled bool) *isolation { isoState := &isolationState{} isoState.next = isoState isoState.prev = isoState @@ -78,6 +84,7 @@ func newIsolation() *isolation { appendsOpen: map[uint64]*isolationAppender{}, appendsOpenList: appender, readsOpen: isoState, + disabled: disabled, appendersPool: sync.Pool{New: func() interface{} { return &isolationAppender{} }}, } } @@ -85,12 +92,20 @@ func newIsolation() *isolation { // lowWatermark returns the appendID below which we no longer need to track // which appends were from which appendID. func (i *isolation) lowWatermark() uint64 { + if i.disabled { + return 0 + } + i.appendMtx.RLock() // Take appendMtx first. defer i.appendMtx.RUnlock() return i.lowWatermarkLocked() } func (i *isolation) lowWatermarkLocked() uint64 { + if i.disabled { + return 0 + } + i.readMtx.RLock() defer i.readMtx.RUnlock() if i.readsOpen.prev != i.readsOpen { @@ -106,6 +121,8 @@ func (i *isolation) lowWatermarkLocked() uint64 { func (i *isolation) State(mint, maxt int64) *isolationState { i.appendMtx.RLock() // Take append mutex before read mutex. defer i.appendMtx.RUnlock() + + // We need to track the reads even when isolation is disabled. isoState := &isolationState{ maxAppendID: i.appendsOpenList.appendID, lowWatermark: i.appendsOpenList.next.appendID, // Lowest appendID from appenders, or lastAppendId. @@ -124,6 +141,7 @@ func (i *isolation) State(mint, maxt int64) *isolationState { isoState.next = i.readsOpen.next i.readsOpen.next.prev = isoState i.readsOpen.next = isoState + return isoState } @@ -146,6 +164,10 @@ func (i *isolation) TraverseOpenReads(f func(s *isolationState) bool) { // ID. The first ID returned is 1. // Also returns the low watermark, to keep lock/unlock operations down. func (i *isolation) newAppendID() (uint64, uint64) { + if i.disabled { + return 0, 0 + } + i.appendMtx.Lock() defer i.appendMtx.Unlock() @@ -165,6 +187,10 @@ func (i *isolation) newAppendID() (uint64, uint64) { } func (i *isolation) lastAppendID() uint64 { + if i.disabled { + return 0 + } + i.appendMtx.RLock() defer i.appendMtx.RUnlock() @@ -172,6 +198,10 @@ func (i *isolation) lastAppendID() uint64 { } func (i *isolation) closeAppend(appendID uint64) { + if i.disabled { + return + } + i.appendMtx.Lock() defer i.appendMtx.Unlock() diff --git a/tsdb/isolation_test.go b/tsdb/isolation_test.go index e41e3f5a2f..6e27e06fc3 100644 --- a/tsdb/isolation_test.go +++ b/tsdb/isolation_test.go @@ -23,7 +23,7 @@ import ( func BenchmarkIsolation(b *testing.B) { for _, goroutines := range []int{10, 100, 1000, 10000} { b.Run(strconv.Itoa(goroutines), func(b *testing.B) { - iso := newIsolation() + iso := newIsolation(false) wg := sync.WaitGroup{} start := make(chan struct{}) @@ -53,7 +53,7 @@ func BenchmarkIsolation(b *testing.B) { func BenchmarkIsolationWithState(b *testing.B) { for _, goroutines := range []int{10, 100, 1000, 10000} { b.Run(strconv.Itoa(goroutines), func(b *testing.B) { - iso := newIsolation() + iso := newIsolation(false) wg := sync.WaitGroup{} start := make(chan struct{})