diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 371079eb6b..61fc7dd450 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -5041,7 +5041,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample // Verify that the in-memory ooo chunk is empty. checkEmptyOOOChunk := func(lbls labels.Labels) { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Nil(t, ms.ooo) @@ -5085,7 +5085,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample // Verify that the in-memory ooo chunk is not empty. checkNonEmptyOOOChunk := func(lbls labels.Labels) { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples()) @@ -5246,7 +5246,7 @@ func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScen // Checking that ooo chunk is not empty. for _, lbls := range []labels.Labels{series1, series2} { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples()) @@ -5274,7 +5274,7 @@ func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScen // Checking that ooo chunk is empty. for _, lbls := range []labels.Labels{series1, series2} { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Nil(t, ms.ooo) @@ -5357,7 +5357,7 @@ func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScen // Checking that ooo chunk is not empty. for _, lbls := range []labels.Labels{series1, series2} { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples()) @@ -5385,7 +5385,7 @@ func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScen // Checking that ooo chunk is empty. for _, lbls := range []labels.Labels{series1, series2} { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Nil(t, ms.ooo) @@ -5467,7 +5467,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa // Checking that there are some ooo m-map chunks. for _, lbls := range []labels.Labels{series1, series2} { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Len(t, ms.ooo.oooMmappedChunks, 2) @@ -5486,7 +5486,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa // Check ooo m-map chunks again. for _, lbls := range []labels.Labels{series1, series2} { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Len(t, ms.ooo.oooMmappedChunks, 2) @@ -5526,7 +5526,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa // Checking that ooo chunk is empty in Head. for _, lbls := range []labels.Labels{series1, series2} { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Nil(t, ms.ooo) @@ -6835,7 +6835,7 @@ func testOOODisabled(t *testing.T, scenario sampleTypeScenario) { _, err = os.ReadDir(path.Join(db.Dir(), wlog.WblDirName)) require.True(t, os.IsNotExist(err)) - ms, created, err := db.head.getOrCreate(s1.Hash(), s1) + ms, created, err := db.head.getOrCreate(s1.Hash(), s1, false) require.NoError(t, err) require.False(t, created) require.NotNil(t, ms) @@ -6908,7 +6908,7 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) { oooMint, oooMaxt := minutes(195), minutes(260) // Collect the samples only present in the ooo m-map chunks. - ms, created, err := db.head.getOrCreate(s1.Hash(), s1) + ms, created, err := db.head.getOrCreate(s1.Hash(), s1, false) require.False(t, created) require.NoError(t, err) var s1MmapSamples []chunks.Sample @@ -7088,7 +7088,7 @@ func TestOOOHistogramCompactionWithCounterResets(t *testing.T) { // Verify that the in-memory ooo chunk is empty. checkEmptyOOOChunk := func(lbls labels.Labels) { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Nil(t, ms.ooo) @@ -7270,7 +7270,7 @@ func TestOOOHistogramCompactionWithCounterResets(t *testing.T) { // Verify that the in-memory ooo chunk is not empty. checkNonEmptyOOOChunk := func(lbls labels.Labels) { - ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples()) @@ -7594,7 +7594,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { require.Len(t, db.Blocks(), 3) // Check that the ooo chunks were removed. - ms, created, err := db.head.getOrCreate(series1.Hash(), series1) + ms, created, err := db.head.getOrCreate(series1.Hash(), series1, false) require.NoError(t, err) require.False(t, created) require.Nil(t, ms.ooo) diff --git a/tsdb/head.go b/tsdb/head.go index a9d4220cbd..80104ec603 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1720,7 +1720,7 @@ func (h *Head) String() string { return "head" } -func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) { +func (h *Head) getOrCreate(hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) { // Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create // a new series on every sample inserted via Add(), which causes allocations // and makes our series IDs rather random and harder to compress in postings. @@ -1732,17 +1732,17 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e // Optimistically assume that we are the first one to create the series. id := chunks.HeadSeriesRef(h.lastSeriesID.Inc()) - return h.getOrCreateWithID(id, hash, lset) + return h.getOrCreateWithID(id, hash, lset, pendingCommit) } -func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { +func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { shardHash := uint64(0) if h.opts.EnableSharding { shardHash = labels.StableHash(lset) } - return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled) + return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled, pendingCommit) }) if err != nil { return nil, false, err @@ -2183,12 +2183,13 @@ type memSeriesOOOFields struct { firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled bool) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled, pendingCommit bool) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - nextAt: math.MinInt64, - shardHash: shardHash, + lset: lset, + ref: id, + nextAt: math.MinInt64, + shardHash: shardHash, + pendingCommit: pendingCommit, } if !isolationDisabled { s.txs = newTxRing(0) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 322ff65022..bb20593ca5 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -319,7 +319,8 @@ type headAppender struct { headMaxt int64 // We track it here to not take the lock for every sample appended. oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample. - series []record.RefSeries // New series held by this appender. + seriesRefs []record.RefSeries // New series records held by this appender. + series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs) samples []record.RefSample // New float samples held by this appender. sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). histograms []record.RefHistogramSample // New histogram samples held by this appender. @@ -461,15 +462,16 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo if l, dup := lset.HasDuplicateLabelNames(); dup { return nil, false, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) } - s, created, err = a.head.getOrCreate(lset.Hash(), lset) + s, created, err = a.head.getOrCreate(lset.Hash(), lset, true) if err != nil { return nil, false, err } if created { - a.series = append(a.series, record.RefSeries{ + a.seriesRefs = append(a.seriesRefs, record.RefSeries{ Ref: s.ref, Labels: lset, }) + a.series = append(a.series, s) } return s, created, nil } @@ -907,8 +909,8 @@ func (a *headAppender) log() error { var rec []byte var enc record.Encoder - if len(a.series) > 0 { - rec = enc.Series(a.series, buf) + if len(a.seriesRefs) > 0 { + rec = enc.Series(a.seriesRefs, buf) buf = rec[:0] if err := a.head.wal.Log(rec); err != nil { @@ -1426,6 +1428,14 @@ func (a *headAppender) commitMetadata() { } } +func (a *headAppender) unmarkCreatedSeriesAsPendingCommit() { + for _, s := range a.series { + s.Lock() + s.pendingCommit = false + s.Unlock() + } +} + // Commit writes to the WAL and adds the data to the Head. // TODO(codesome): Refactor this method to reduce indentation and make it more readable. func (a *headAppender) Commit() (err error) { @@ -1479,6 +1489,8 @@ func (a *headAppender) Commit() (err error) { a.commitHistograms(acc) a.commitFloatHistograms(acc) a.commitMetadata() + // Unmark all series as pending commit after all samples have been committed. + a.unmarkCreatedSeriesAsPendingCommit() a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected)) a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected)) @@ -1968,6 +1980,7 @@ func (a *headAppender) Rollback() (err error) { series.pendingCommit = false series.Unlock() } + a.unmarkCreatedSeriesAsPendingCommit() a.head.putAppendBuffer(a.samples) a.head.putExemplarBuffer(a.exemplars) a.head.putHistogramBuffer(a.histograms) diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index 0ffc75abaf..ff9b22ec60 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -43,7 +43,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) { defer h.Close() for i := 0; i < b.N; i++ { - h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i))) + h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)), false) } } @@ -62,7 +62,7 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { i := count.Inc() - h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i)))) + h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))), false) } }) } @@ -82,7 +82,7 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) { defer h.Close() for i := 0; i < b.N; i++ { - h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i))) + h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)), false) } } diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index 6dd4c0ff55..ae506c1d8e 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -382,7 +382,7 @@ func TestMemSeries_chunk(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - series := newMemSeries(labels.EmptyLabels(), 1, 0, true) + series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false) if tc.setup != nil { tc.setup(t, series, chunkDiskMapper) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 1650c62027..0e9fac18be 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -102,7 +102,7 @@ func BenchmarkCreateSeries(b *testing.B) { b.ResetTimer() for _, s := range series { - h.getOrCreate(s.Labels().Hash(), s.Labels()) + h.getOrCreate(s.Labels().Hash(), s.Labels(), false) } } @@ -374,7 +374,7 @@ func BenchmarkLoadWLs(b *testing.B) { } for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. - s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false) s.append(c.mmappedChunkT, 42, 0, cOpts) // There's only one head chunk because only a single sample is appended. mmapChunks() // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with @@ -895,7 +895,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { { name: "keep series still in the head", prepare: func(t *testing.T, h *Head) { - _, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls) + _, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false) require.NoError(t, err) }, seriesRef: chunks.HeadSeriesRef(existingRef), @@ -1008,6 +1008,49 @@ func TestHead_RaceBetweenSeriesCreationAndGC(t *testing.T) { require.Equal(t, totalSeries, int(head.NumSeries())) } +func TestHead_CanGarbagecollectSeriesCreatedWithoutSamples(t *testing.T) { + for op, finishTxn := range map[string]func(app storage.Appender) error{ + "after commit": func(app storage.Appender) error { return app.Commit() }, + "after rollback": func(app storage.Appender) error { return app.Rollback() }, + } { + t.Run(op, func(t *testing.T) { + chunkRange := time.Hour.Milliseconds() + head, _ := newTestHead(t, chunkRange, compression.None, true) + t.Cleanup(func() { _ = head.Close() }) + + require.NoError(t, head.Init(0)) + + firstSampleTime := 10 * chunkRange + { + // Append first sample, it should init head max time to firstSampleTime. + app := head.Appender(context.Background()) + _, err := app.Append(0, labels.FromStrings("lbl", "ok"), firstSampleTime, 1) + require.NoError(t, err) + require.NoError(t, app.Commit()) + require.Equal(t, 1, int(head.NumSeries())) + } + + // Append a sample in a time range that is not covered by the chunk range, + // We would create series first and then append no sample. + app := head.Appender(context.Background()) + invalidSampleTime := firstSampleTime - chunkRange + _, err := app.Append(0, labels.FromStrings("foo", "bar"), invalidSampleTime, 2) + require.Error(t, err) + // These are our assumptions: we're not testing them, we're just checking them to make debugging a failed + // test easier if someone refactors the code and breaks these assumptions. + // If these assumptions fail after a refactor, feel free to remove them but make sure that the test is still what we intended to test. + require.NotErrorIs(t, err, storage.ErrOutOfBounds, "Failed to append sample shouldn't take the shortcut that returns storage.ErrOutOfBounds") + require.ErrorIs(t, err, storage.ErrTooOldSample, "Failed to append sample should return storage.ErrTooOldSample, because OOO window was enabled but this sample doesn't fall into it.") + // Do commit or rollback, depending on what we're testing. + require.NoError(t, finishTxn(app)) + + // Garbage-collect, since we finished the transaction and series has no samples, it should be collectable. + head.gc() + require.Equal(t, 1, int(head.NumSeries())) + }) + } +} + func TestHead_UnknownWALRecord(t *testing.T) { head, w := newTestHead(t, 1000, compression.None, false) w.Log([]byte{255, 42}) @@ -1062,7 +1105,7 @@ func BenchmarkHead_Truncate(b *testing.B) { } allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...) - s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i]) + s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i], false) s.mmappedChunks = []*mmappedChunk{ {minTime: 1000 * int64(i/churn), maxTime: 999 + 1000*int64(i/churn)}, } @@ -1099,10 +1142,10 @@ func TestHead_Truncate(t *testing.T) { ctx := context.Background() - s1, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) - s2, _, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1")) - s3, _, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2")) - s4, _, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) + s1, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"), false) + s2, _, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1"), false) + s3, _, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2"), false) + s4, _, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"), false) s1.mmappedChunks = []*mmappedChunk{ {minTime: 0, maxTime: 999}, @@ -1199,7 +1242,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { }, } - s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled) + s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled, false) for i := 0; i < 4000; i += 5 { ok, _ := s.append(int64(i), float64(i), 0, cOpts) @@ -1340,7 +1383,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - series := newMemSeries(labels.EmptyLabels(), 1, 0, true) + series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false) cOpts := chunkOpts{ chunkDiskMapper: chunkDiskMapper, @@ -1914,7 +1957,7 @@ func TestMemSeries_append(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false) // Add first two samples at the very end of a chunk range and the next two // on and after it. @@ -1975,7 +2018,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false) histograms := tsdbutil.GenerateTestHistograms(4) histogramWithOneMoreBucket := histograms[3].Copy() @@ -2037,7 +2080,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { samplesPerChunk: samplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false) // At this slow rate, we will fill the chunk in two block durations. slowRate := (DefaultBlockDuration * 2) / samplesPerChunk @@ -2088,7 +2131,7 @@ func TestGCChunkAccess(t *testing.T) { h.initTime(0) - s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) + s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) // Appending 2 samples for the first chunk. ok, chunkCreated := s.append(0, 0, 0, cOpts) @@ -2147,7 +2190,7 @@ func TestGCSeriesAccess(t *testing.T) { h.initTime(0) - s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) + s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) // Appending 2 samples for the first chunk. ok, chunkCreated := s.append(0, 0, 0, cOpts) @@ -2500,7 +2543,7 @@ func TestHeadReadWriterRepair(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) + s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) require.True(t, created, "series was not created") for i := 0; i < 7; i++ { @@ -2860,7 +2903,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) + s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) ok, _ := s.append(0, 0, 0, cOpts) require.True(t, ok, "Series append failed.") @@ -3443,7 +3486,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false) for i := 0; i < 7; i++ { ok, _ := s.append(int64(i), float64(i), 0, cOpts) @@ -4753,7 +4796,7 @@ func TestHistogramCounterResetHeader(t *testing.T) { checkExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) { expHeaders = append(expHeaders, newHeaders...) - ms, _, err := head.getOrCreate(l.Hash(), l) + ms, _, err := head.getOrCreate(l.Hash(), l, false) require.NoError(t, err) ms.mmapChunks(head.chunkDiskMapper) require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk. @@ -4880,7 +4923,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) { checkOOOExpCounterResetHeader := func(newChunks ...expOOOMmappedChunks) { expChunks = append(expChunks, newChunks...) - ms, _, err := head.getOrCreate(l.Hash(), l) + ms, _, err := head.getOrCreate(l.Hash(), l, false) require.NoError(t, err) require.Len(t, ms.ooo.oooMmappedChunks, len(expChunks)) @@ -5023,7 +5066,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { var expResult []chunks.Sample checkExpChunks := func(count int) { - ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls) + ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.NotNil(t, ms) @@ -5327,7 +5370,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { require.NoError(t, h.Init(0)) // Replay happens here. // Get the ooo samples from the Head. - ms, ok, err := h.getOrCreate(l.Hash(), l) + ms, ok, err := h.getOrCreate(l.Hash(), l, false) require.NoError(t, err) require.False(t, ok) require.NotNil(t, ms) @@ -5396,7 +5439,7 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { appendSample(mins) } - ms, ok, err := h.getOrCreate(l.Hash(), l) + ms, ok, err := h.getOrCreate(l.Hash(), l, false) require.NoError(t, err) require.False(t, ok) require.NotNil(t, ms) @@ -5424,7 +5467,7 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { require.NoError(t, h.Init(0)) // Replay happens here. // Get the mmap chunks from the Head. - ms, ok, err = h.getOrCreate(l.Hash(), l) + ms, ok, err = h.getOrCreate(l.Hash(), l, false) require.NoError(t, err) require.False(t, ok) require.NotNil(t, ms) @@ -5479,7 +5522,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { require.NoError(t, app.Commit()) require.Greater(t, prom_testutil.ToFloat64(h.metrics.chunksCreated), 4.0) - series, created, err := h.getOrCreate(seriesLabels.Hash(), seriesLabels) + series, created, err := h.getOrCreate(seriesLabels.Hash(), seriesLabels, false) require.NoError(t, err) require.False(t, created, "should already exist") require.NotNil(t, series, "should return the series we created above") @@ -5496,7 +5539,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { require.NoError(t, err) require.NoError(t, h.Init(0)) - series, created, err = h.getOrCreate(seriesLabels.Hash(), seriesLabels) + series, created, err = h.getOrCreate(seriesLabels.Hash(), seriesLabels, false) require.NoError(t, err) require.False(t, created, "should already exist") require.NotNil(t, series, "should return the series we created above") @@ -5693,7 +5736,7 @@ func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Ap } verifyOOOSamples := func(lbls labels.Labels, expSamples int) { - ms, created, err := h.getOrCreate(lbls.Hash(), lbls) + ms, created, err := h.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.NotNil(t, ms) @@ -5704,7 +5747,7 @@ func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Ap } verifyInOrderSamples := func(lbls labels.Labels, expSamples int) { - ms, created, err := h.getOrCreate(lbls.Hash(), lbls) + ms, created, err := h.getOrCreate(lbls.Hash(), lbls, false) require.NoError(t, err) require.False(t, created) require.NotNil(t, ms) @@ -5832,7 +5875,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { checkHeaders := func() { head.mmapHeadChunks() - ms, _, err := head.getOrCreate(l.Hash(), l) + ms, _, err := head.getOrCreate(l.Hash(), l, false) require.NoError(t, err) require.Len(t, ms.mmappedChunks, 3) expHeaders := []chunkenc.CounterResetHeader{ @@ -5907,7 +5950,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { appendHistogram(hists[4]) checkHeaders := func() { - ms, _, err := head.getOrCreate(l.Hash(), l) + ms, _, err := head.getOrCreate(l.Hash(), l, false) require.NoError(t, err) head.mmapHeadChunks() require.Len(t, ms.mmappedChunks, 3) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 3450eb9b87..926af84603 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -254,7 +254,7 @@ Outer: switch v := d.(type) { case []record.RefSeries: for _, walSeries := range v { - mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels) + mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels, false) if err != nil { seriesCreationErr = err break Outer @@ -1577,7 +1577,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie localRefSeries := shardedRefSeries[idx] for csr := range rc { - series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset) + series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset, false) if err != nil { errChan <- err return diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index d2c490f221..d49c7d8fc3 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -306,7 +306,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { }() require.NoError(t, h.Init(0)) - s1, _, _ := h.getOrCreate(s1ID, s1Lset) + s1, _, _ := h.getOrCreate(s1ID, s1Lset, false) s1.ooo = &memSeriesOOOFields{} // define our expected chunks, by looking at the expected ChunkIntervals and setting...