Merge pull request #16333 from colega/fix-series-create-gc-race

fix: race condition between series creation and garbage collection
This commit is contained in:
Bryan Boreham 2025-04-17 12:15:11 +01:00 committed by GitHub
commit a11772234d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 159 additions and 65 deletions

View File

@ -5041,7 +5041,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample
// Verify that the in-memory ooo chunk is empty. // Verify that the in-memory ooo chunk is empty.
checkEmptyOOOChunk := func(lbls labels.Labels) { checkEmptyOOOChunk := func(lbls labels.Labels) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Nil(t, ms.ooo) require.Nil(t, ms.ooo)
@ -5085,7 +5085,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample
// Verify that the in-memory ooo chunk is not empty. // Verify that the in-memory ooo chunk is not empty.
checkNonEmptyOOOChunk := func(lbls labels.Labels) { checkNonEmptyOOOChunk := func(lbls labels.Labels) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples()) require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
@ -5246,7 +5246,7 @@ func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScen
// Checking that ooo chunk is not empty. // Checking that ooo chunk is not empty.
for _, lbls := range []labels.Labels{series1, series2} { for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples()) require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
@ -5274,7 +5274,7 @@ func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScen
// Checking that ooo chunk is empty. // Checking that ooo chunk is empty.
for _, lbls := range []labels.Labels{series1, series2} { for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Nil(t, ms.ooo) require.Nil(t, ms.ooo)
@ -5357,7 +5357,7 @@ func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScen
// Checking that ooo chunk is not empty. // Checking that ooo chunk is not empty.
for _, lbls := range []labels.Labels{series1, series2} { for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples()) require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
@ -5385,7 +5385,7 @@ func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScen
// Checking that ooo chunk is empty. // Checking that ooo chunk is empty.
for _, lbls := range []labels.Labels{series1, series2} { for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Nil(t, ms.ooo) require.Nil(t, ms.ooo)
@ -5467,7 +5467,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa
// Checking that there are some ooo m-map chunks. // Checking that there are some ooo m-map chunks.
for _, lbls := range []labels.Labels{series1, series2} { for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Len(t, ms.ooo.oooMmappedChunks, 2) require.Len(t, ms.ooo.oooMmappedChunks, 2)
@ -5486,7 +5486,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa
// Check ooo m-map chunks again. // Check ooo m-map chunks again.
for _, lbls := range []labels.Labels{series1, series2} { for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Len(t, ms.ooo.oooMmappedChunks, 2) require.Len(t, ms.ooo.oooMmappedChunks, 2)
@ -5526,7 +5526,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa
// Checking that ooo chunk is empty in Head. // Checking that ooo chunk is empty in Head.
for _, lbls := range []labels.Labels{series1, series2} { for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Nil(t, ms.ooo) require.Nil(t, ms.ooo)
@ -6835,7 +6835,7 @@ func testOOODisabled(t *testing.T, scenario sampleTypeScenario) {
_, err = os.ReadDir(path.Join(db.Dir(), wlog.WblDirName)) _, err = os.ReadDir(path.Join(db.Dir(), wlog.WblDirName))
require.True(t, os.IsNotExist(err)) require.True(t, os.IsNotExist(err))
ms, created, err := db.head.getOrCreate(s1.Hash(), s1) ms, created, err := db.head.getOrCreate(s1.Hash(), s1, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.NotNil(t, ms) require.NotNil(t, ms)
@ -6908,7 +6908,7 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) {
oooMint, oooMaxt := minutes(195), minutes(260) oooMint, oooMaxt := minutes(195), minutes(260)
// Collect the samples only present in the ooo m-map chunks. // Collect the samples only present in the ooo m-map chunks.
ms, created, err := db.head.getOrCreate(s1.Hash(), s1) ms, created, err := db.head.getOrCreate(s1.Hash(), s1, false)
require.False(t, created) require.False(t, created)
require.NoError(t, err) require.NoError(t, err)
var s1MmapSamples []chunks.Sample var s1MmapSamples []chunks.Sample
@ -7088,7 +7088,7 @@ func TestOOOHistogramCompactionWithCounterResets(t *testing.T) {
// Verify that the in-memory ooo chunk is empty. // Verify that the in-memory ooo chunk is empty.
checkEmptyOOOChunk := func(lbls labels.Labels) { checkEmptyOOOChunk := func(lbls labels.Labels) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Nil(t, ms.ooo) require.Nil(t, ms.ooo)
@ -7270,7 +7270,7 @@ func TestOOOHistogramCompactionWithCounterResets(t *testing.T) {
// Verify that the in-memory ooo chunk is not empty. // Verify that the in-memory ooo chunk is not empty.
checkNonEmptyOOOChunk := func(lbls labels.Labels) { checkNonEmptyOOOChunk := func(lbls labels.Labels) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples()) require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
@ -7594,7 +7594,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) {
require.Len(t, db.Blocks(), 3) require.Len(t, db.Blocks(), 3)
// Check that the ooo chunks were removed. // Check that the ooo chunks were removed.
ms, created, err := db.head.getOrCreate(series1.Hash(), series1) ms, created, err := db.head.getOrCreate(series1.Hash(), series1, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Nil(t, ms.ooo) require.Nil(t, ms.ooo)

View File

@ -1734,7 +1734,7 @@ func (h *Head) String() string {
return "head" return "head"
} }
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) { func (h *Head) getOrCreate(hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) {
// Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create // Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create
// a new series on every sample inserted via Add(), which causes allocations // a new series on every sample inserted via Add(), which causes allocations
// and makes our series IDs rather random and harder to compress in postings. // and makes our series IDs rather random and harder to compress in postings.
@ -1746,17 +1746,17 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
// Optimistically assume that we are the first one to create the series. // Optimistically assume that we are the first one to create the series.
id := chunks.HeadSeriesRef(h.lastSeriesID.Inc()) id := chunks.HeadSeriesRef(h.lastSeriesID.Inc())
return h.getOrCreateWithID(id, hash, lset) return h.getOrCreateWithID(id, hash, lset, pendingCommit)
} }
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) {
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
shardHash := uint64(0) shardHash := uint64(0)
if h.opts.EnableSharding { if h.opts.EnableSharding {
shardHash = labels.StableHash(lset) shardHash = labels.StableHash(lset)
} }
return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled) return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled, pendingCommit)
}) })
if err != nil { if err != nil {
return nil, false, err return nil, false, err
@ -2197,12 +2197,13 @@ type memSeriesOOOFields struct {
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
} }
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled bool) *memSeries { func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled, pendingCommit bool) *memSeries {
s := &memSeries{ s := &memSeries{
lset: lset, lset: lset,
ref: id, ref: id,
nextAt: math.MinInt64, nextAt: math.MinInt64,
shardHash: shardHash, shardHash: shardHash,
pendingCommit: pendingCommit,
} }
if !isolationDisabled { if !isolationDisabled {
s.txs = newTxRing(0) s.txs = newTxRing(0)

View File

@ -319,7 +319,8 @@ type headAppender struct {
headMaxt int64 // We track it here to not take the lock for every sample appended. headMaxt int64 // We track it here to not take the lock for every sample appended.
oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample. oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample.
series []record.RefSeries // New series held by this appender. seriesRefs []record.RefSeries // New series records held by this appender.
series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs)
samples []record.RefSample // New float samples held by this appender. samples []record.RefSample // New float samples held by this appender.
sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
histograms []record.RefHistogramSample // New histogram samples held by this appender. histograms []record.RefHistogramSample // New histogram samples held by this appender.
@ -461,15 +462,16 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo
if l, dup := lset.HasDuplicateLabelNames(); dup { if l, dup := lset.HasDuplicateLabelNames(); dup {
return nil, false, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) return nil, false, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
} }
s, created, err = a.head.getOrCreate(lset.Hash(), lset) s, created, err = a.head.getOrCreate(lset.Hash(), lset, true)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
if created { if created {
a.series = append(a.series, record.RefSeries{ a.seriesRefs = append(a.seriesRefs, record.RefSeries{
Ref: s.ref, Ref: s.ref,
Labels: lset, Labels: lset,
}) })
a.series = append(a.series, s)
} }
return s, created, nil return s, created, nil
} }
@ -907,8 +909,8 @@ func (a *headAppender) log() error {
var rec []byte var rec []byte
var enc record.Encoder var enc record.Encoder
if len(a.series) > 0 { if len(a.seriesRefs) > 0 {
rec = enc.Series(a.series, buf) rec = enc.Series(a.seriesRefs, buf)
buf = rec[:0] buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil { if err := a.head.wal.Log(rec); err != nil {
@ -1426,6 +1428,14 @@ func (a *headAppender) commitMetadata() {
} }
} }
func (a *headAppender) unmarkCreatedSeriesAsPendingCommit() {
for _, s := range a.series {
s.Lock()
s.pendingCommit = false
s.Unlock()
}
}
// Commit writes to the WAL and adds the data to the Head. // Commit writes to the WAL and adds the data to the Head.
// TODO(codesome): Refactor this method to reduce indentation and make it more readable. // TODO(codesome): Refactor this method to reduce indentation and make it more readable.
func (a *headAppender) Commit() (err error) { func (a *headAppender) Commit() (err error) {
@ -1479,6 +1489,8 @@ func (a *headAppender) Commit() (err error) {
a.commitHistograms(acc) a.commitHistograms(acc)
a.commitFloatHistograms(acc) a.commitFloatHistograms(acc)
a.commitMetadata() a.commitMetadata()
// Unmark all series as pending commit after all samples have been committed.
a.unmarkCreatedSeriesAsPendingCommit()
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected)) a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected))
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected)) a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected))
@ -1952,6 +1964,7 @@ func (a *headAppender) Rollback() (err error) {
defer a.head.metrics.activeAppenders.Dec() defer a.head.metrics.activeAppenders.Dec()
defer a.head.iso.closeAppend(a.appendID) defer a.head.iso.closeAppend(a.appendID)
defer a.head.putSeriesBuffer(a.sampleSeries) defer a.head.putSeriesBuffer(a.sampleSeries)
defer a.unmarkCreatedSeriesAsPendingCommit()
var series *memSeries var series *memSeries
for i := range a.samples { for i := range a.samples {

View File

@ -43,7 +43,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
defer h.Close() defer h.Close()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i))) h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)), false)
} }
} }
@ -62,7 +62,7 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
i := count.Inc() i := count.Inc()
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i)))) h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))), false)
} }
}) })
} }
@ -82,7 +82,7 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
defer h.Close() defer h.Close()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i))) h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)), false)
} }
} }

View File

@ -382,7 +382,7 @@ func TestMemSeries_chunk(t *testing.T) {
require.NoError(t, chunkDiskMapper.Close()) require.NoError(t, chunkDiskMapper.Close())
}() }()
series := newMemSeries(labels.EmptyLabels(), 1, 0, true) series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false)
if tc.setup != nil { if tc.setup != nil {
tc.setup(t, series, chunkDiskMapper) tc.setup(t, series, chunkDiskMapper)

View File

@ -102,7 +102,7 @@ func BenchmarkCreateSeries(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for _, s := range series { for _, s := range series {
h.getOrCreate(s.Labels().Hash(), s.Labels()) h.getOrCreate(s.Labels().Hash(), s.Labels(), false)
} }
} }
@ -374,7 +374,7 @@ func BenchmarkLoadWLs(b *testing.B) {
} }
for k := 0; k < c.batches*c.seriesPerBatch; k++ { for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time. // Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled) s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false)
s.append(c.mmappedChunkT, 42, 0, cOpts) s.append(c.mmappedChunkT, 42, 0, cOpts)
// There's only one head chunk because only a single sample is appended. mmapChunks() // There's only one head chunk because only a single sample is appended. mmapChunks()
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
@ -895,7 +895,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
{ {
name: "keep series still in the head", name: "keep series still in the head",
prepare: func(t *testing.T, h *Head) { prepare: func(t *testing.T, h *Head) {
_, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls) _, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false)
require.NoError(t, err) require.NoError(t, err)
}, },
seriesRef: chunks.HeadSeriesRef(existingRef), seriesRef: chunks.HeadSeriesRef(existingRef),
@ -971,6 +971,86 @@ func TestHead_ActiveAppenders(t *testing.T) {
require.Equal(t, 0.0, prom_testutil.ToFloat64(head.metrics.activeAppenders)) require.Equal(t, 0.0, prom_testutil.ToFloat64(head.metrics.activeAppenders))
} }
func TestHead_RaceBetweenSeriesCreationAndGC(t *testing.T) {
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() { _ = head.Close() })
require.NoError(t, head.Init(0))
const totalSeries = 100_000
series := make([]labels.Labels, totalSeries)
for i := 0; i < totalSeries; i++ {
series[i] = labels.FromStrings("foo", strconv.Itoa(i))
}
done := atomic.NewBool(false)
go func() {
defer done.Store(true)
app := head.Appender(context.Background())
defer func() {
if err := app.Commit(); err != nil {
t.Errorf("Failed to commit: %v", err)
}
}()
for i := 0; i < totalSeries; i++ {
_, err := app.Append(0, series[i], 100, 1)
if err != nil {
t.Errorf("Failed to append: %v", err)
return
}
}
}()
// Don't check the atomic.Bool on all iterations in order to perform more gc iterations and make the race condition more likely.
for i := 1; i%128 != 0 || !done.Load(); i++ {
head.gc()
}
require.Equal(t, totalSeries, int(head.NumSeries()))
}
func TestHead_CanGarbagecollectSeriesCreatedWithoutSamples(t *testing.T) {
for op, finishTxn := range map[string]func(app storage.Appender) error{
"after commit": func(app storage.Appender) error { return app.Commit() },
"after rollback": func(app storage.Appender) error { return app.Rollback() },
} {
t.Run(op, func(t *testing.T) {
chunkRange := time.Hour.Milliseconds()
head, _ := newTestHead(t, chunkRange, compression.None, true)
t.Cleanup(func() { _ = head.Close() })
require.NoError(t, head.Init(0))
firstSampleTime := 10 * chunkRange
{
// Append first sample, it should init head max time to firstSampleTime.
app := head.Appender(context.Background())
_, err := app.Append(0, labels.FromStrings("lbl", "ok"), firstSampleTime, 1)
require.NoError(t, err)
require.NoError(t, app.Commit())
require.Equal(t, 1, int(head.NumSeries()))
}
// Append a sample in a time range that is not covered by the chunk range,
// We would create series first and then append no sample.
app := head.Appender(context.Background())
invalidSampleTime := firstSampleTime - chunkRange
_, err := app.Append(0, labels.FromStrings("foo", "bar"), invalidSampleTime, 2)
require.Error(t, err)
// These are our assumptions: we're not testing them, we're just checking them to make debugging a failed
// test easier if someone refactors the code and breaks these assumptions.
// If these assumptions fail after a refactor, feel free to remove them but make sure that the test is still what we intended to test.
require.NotErrorIs(t, err, storage.ErrOutOfBounds, "Failed to append sample shouldn't take the shortcut that returns storage.ErrOutOfBounds")
require.ErrorIs(t, err, storage.ErrTooOldSample, "Failed to append sample should return storage.ErrTooOldSample, because OOO window was enabled but this sample doesn't fall into it.")
// Do commit or rollback, depending on what we're testing.
require.NoError(t, finishTxn(app))
// Garbage-collect, since we finished the transaction and series has no samples, it should be collectable.
head.gc()
require.Equal(t, 1, int(head.NumSeries()))
})
}
}
func TestHead_UnknownWALRecord(t *testing.T) { func TestHead_UnknownWALRecord(t *testing.T) {
head, w := newTestHead(t, 1000, compression.None, false) head, w := newTestHead(t, 1000, compression.None, false)
w.Log([]byte{255, 42}) w.Log([]byte{255, 42})
@ -1025,7 +1105,7 @@ func BenchmarkHead_Truncate(b *testing.B) {
} }
allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...) allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)
s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i]) s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i], false)
s.mmappedChunks = []*mmappedChunk{ s.mmappedChunks = []*mmappedChunk{
{minTime: 1000 * int64(i/churn), maxTime: 999 + 1000*int64(i/churn)}, {minTime: 1000 * int64(i/churn), maxTime: 999 + 1000*int64(i/churn)},
} }
@ -1062,10 +1142,10 @@ func TestHead_Truncate(t *testing.T) {
ctx := context.Background() ctx := context.Background()
s1, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) s1, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"), false)
s2, _, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1")) s2, _, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1"), false)
s3, _, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2")) s3, _, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2"), false)
s4, _, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) s4, _, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"), false)
s1.mmappedChunks = []*mmappedChunk{ s1.mmappedChunks = []*mmappedChunk{
{minTime: 0, maxTime: 999}, {minTime: 0, maxTime: 999},
@ -1162,7 +1242,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
}, },
} }
s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled) s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled, false)
for i := 0; i < 4000; i += 5 { for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, cOpts) ok, _ := s.append(int64(i), float64(i), 0, cOpts)
@ -1303,7 +1383,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
require.NoError(t, chunkDiskMapper.Close()) require.NoError(t, chunkDiskMapper.Close())
}() }()
series := newMemSeries(labels.EmptyLabels(), 1, 0, true) series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false)
cOpts := chunkOpts{ cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper, chunkDiskMapper: chunkDiskMapper,
@ -1877,7 +1957,7 @@ func TestMemSeries_append(t *testing.T) {
samplesPerChunk: DefaultSamplesPerChunk, samplesPerChunk: DefaultSamplesPerChunk,
} }
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
// Add first two samples at the very end of a chunk range and the next two // Add first two samples at the very end of a chunk range and the next two
// on and after it. // on and after it.
@ -1938,7 +2018,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
samplesPerChunk: DefaultSamplesPerChunk, samplesPerChunk: DefaultSamplesPerChunk,
} }
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
histograms := tsdbutil.GenerateTestHistograms(4) histograms := tsdbutil.GenerateTestHistograms(4)
histogramWithOneMoreBucket := histograms[3].Copy() histogramWithOneMoreBucket := histograms[3].Copy()
@ -2000,7 +2080,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
samplesPerChunk: samplesPerChunk, samplesPerChunk: samplesPerChunk,
} }
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
// At this slow rate, we will fill the chunk in two block durations. // At this slow rate, we will fill the chunk in two block durations.
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
@ -2051,7 +2131,7 @@ func TestGCChunkAccess(t *testing.T) {
h.initTime(0) h.initTime(0)
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
// Appending 2 samples for the first chunk. // Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, cOpts) ok, chunkCreated := s.append(0, 0, 0, cOpts)
@ -2110,7 +2190,7 @@ func TestGCSeriesAccess(t *testing.T) {
h.initTime(0) h.initTime(0)
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
// Appending 2 samples for the first chunk. // Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, cOpts) ok, chunkCreated := s.append(0, 0, 0, cOpts)
@ -2463,7 +2543,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
samplesPerChunk: DefaultSamplesPerChunk, samplesPerChunk: DefaultSamplesPerChunk,
} }
s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
require.True(t, created, "series was not created") require.True(t, created, "series was not created")
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
@ -2823,7 +2903,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
samplesPerChunk: DefaultSamplesPerChunk, samplesPerChunk: DefaultSamplesPerChunk,
} }
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
ok, _ := s.append(0, 0, 0, cOpts) ok, _ := s.append(0, 0, 0, cOpts)
require.True(t, ok, "Series append failed.") require.True(t, ok, "Series append failed.")
@ -3406,7 +3486,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
samplesPerChunk: DefaultSamplesPerChunk, samplesPerChunk: DefaultSamplesPerChunk,
} }
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
ok, _ := s.append(int64(i), float64(i), 0, cOpts) ok, _ := s.append(int64(i), float64(i), 0, cOpts)
@ -4716,7 +4796,7 @@ func TestHistogramCounterResetHeader(t *testing.T) {
checkExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) { checkExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) {
expHeaders = append(expHeaders, newHeaders...) expHeaders = append(expHeaders, newHeaders...)
ms, _, err := head.getOrCreate(l.Hash(), l) ms, _, err := head.getOrCreate(l.Hash(), l, false)
require.NoError(t, err) require.NoError(t, err)
ms.mmapChunks(head.chunkDiskMapper) ms.mmapChunks(head.chunkDiskMapper)
require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk. require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk.
@ -4843,7 +4923,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) {
checkOOOExpCounterResetHeader := func(newChunks ...expOOOMmappedChunks) { checkOOOExpCounterResetHeader := func(newChunks ...expOOOMmappedChunks) {
expChunks = append(expChunks, newChunks...) expChunks = append(expChunks, newChunks...)
ms, _, err := head.getOrCreate(l.Hash(), l) ms, _, err := head.getOrCreate(l.Hash(), l, false)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, ms.ooo.oooMmappedChunks, len(expChunks)) require.Len(t, ms.ooo.oooMmappedChunks, len(expChunks))
@ -4986,7 +5066,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
var expResult []chunks.Sample var expResult []chunks.Sample
checkExpChunks := func(count int) { checkExpChunks := func(count int) {
ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls) ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.NotNil(t, ms) require.NotNil(t, ms)
@ -5290,7 +5370,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
require.NoError(t, h.Init(0)) // Replay happens here. require.NoError(t, h.Init(0)) // Replay happens here.
// Get the ooo samples from the Head. // Get the ooo samples from the Head.
ms, ok, err := h.getOrCreate(l.Hash(), l) ms, ok, err := h.getOrCreate(l.Hash(), l, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, ok) require.False(t, ok)
require.NotNil(t, ms) require.NotNil(t, ms)
@ -5359,7 +5439,7 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
appendSample(mins) appendSample(mins)
} }
ms, ok, err := h.getOrCreate(l.Hash(), l) ms, ok, err := h.getOrCreate(l.Hash(), l, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, ok) require.False(t, ok)
require.NotNil(t, ms) require.NotNil(t, ms)
@ -5387,7 +5467,7 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
require.NoError(t, h.Init(0)) // Replay happens here. require.NoError(t, h.Init(0)) // Replay happens here.
// Get the mmap chunks from the Head. // Get the mmap chunks from the Head.
ms, ok, err = h.getOrCreate(l.Hash(), l) ms, ok, err = h.getOrCreate(l.Hash(), l, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, ok) require.False(t, ok)
require.NotNil(t, ms) require.NotNil(t, ms)
@ -5442,7 +5522,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.Greater(t, prom_testutil.ToFloat64(h.metrics.chunksCreated), 4.0) require.Greater(t, prom_testutil.ToFloat64(h.metrics.chunksCreated), 4.0)
series, created, err := h.getOrCreate(seriesLabels.Hash(), seriesLabels) series, created, err := h.getOrCreate(seriesLabels.Hash(), seriesLabels, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created, "should already exist") require.False(t, created, "should already exist")
require.NotNil(t, series, "should return the series we created above") require.NotNil(t, series, "should return the series we created above")
@ -5459,7 +5539,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.Init(0)) require.NoError(t, h.Init(0))
series, created, err = h.getOrCreate(seriesLabels.Hash(), seriesLabels) series, created, err = h.getOrCreate(seriesLabels.Hash(), seriesLabels, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created, "should already exist") require.False(t, created, "should already exist")
require.NotNil(t, series, "should return the series we created above") require.NotNil(t, series, "should return the series we created above")
@ -5656,7 +5736,7 @@ func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Ap
} }
verifyOOOSamples := func(lbls labels.Labels, expSamples int) { verifyOOOSamples := func(lbls labels.Labels, expSamples int) {
ms, created, err := h.getOrCreate(lbls.Hash(), lbls) ms, created, err := h.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.NotNil(t, ms) require.NotNil(t, ms)
@ -5667,7 +5747,7 @@ func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Ap
} }
verifyInOrderSamples := func(lbls labels.Labels, expSamples int) { verifyInOrderSamples := func(lbls labels.Labels, expSamples int) {
ms, created, err := h.getOrCreate(lbls.Hash(), lbls) ms, created, err := h.getOrCreate(lbls.Hash(), lbls, false)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.NotNil(t, ms) require.NotNil(t, ms)
@ -5795,7 +5875,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
checkHeaders := func() { checkHeaders := func() {
head.mmapHeadChunks() head.mmapHeadChunks()
ms, _, err := head.getOrCreate(l.Hash(), l) ms, _, err := head.getOrCreate(l.Hash(), l, false)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, ms.mmappedChunks, 3) require.Len(t, ms.mmappedChunks, 3)
expHeaders := []chunkenc.CounterResetHeader{ expHeaders := []chunkenc.CounterResetHeader{
@ -5870,7 +5950,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
appendHistogram(hists[4]) appendHistogram(hists[4])
checkHeaders := func() { checkHeaders := func() {
ms, _, err := head.getOrCreate(l.Hash(), l) ms, _, err := head.getOrCreate(l.Hash(), l, false)
require.NoError(t, err) require.NoError(t, err)
head.mmapHeadChunks() head.mmapHeadChunks()
require.Len(t, ms.mmappedChunks, 3) require.Len(t, ms.mmappedChunks, 3)

View File

@ -254,7 +254,7 @@ Outer:
switch v := d.(type) { switch v := d.(type) {
case []record.RefSeries: case []record.RefSeries:
for _, walSeries := range v { for _, walSeries := range v {
mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels) mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels, false)
if err != nil { if err != nil {
seriesCreationErr = err seriesCreationErr = err
break Outer break Outer
@ -1577,7 +1577,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
localRefSeries := shardedRefSeries[idx] localRefSeries := shardedRefSeries[idx]
for csr := range rc { for csr := range rc {
series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset) series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset, false)
if err != nil { if err != nil {
errChan <- err errChan <- err
return return

View File

@ -306,7 +306,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
}() }()
require.NoError(t, h.Init(0)) require.NoError(t, h.Init(0))
s1, _, _ := h.getOrCreate(s1ID, s1Lset) s1, _, _ := h.getOrCreate(s1ID, s1Lset, false)
s1.ooo = &memSeriesOOOFields{} s1.ooo = &memSeriesOOOFields{}
// define our expected chunks, by looking at the expected ChunkIntervals and setting... // define our expected chunks, by looking at the expected ChunkIntervals and setting...