mirror of
https://github.com/prometheus/prometheus.git
synced 2025-08-05 13:47:10 +02:00
Create memSeries with pendingCommit=true
This fixes TestHead_RaceBetweenSeriesCreationAndGC. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
parent
df33f1aace
commit
e4fe8d8684
@ -5041,7 +5041,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample
|
||||
|
||||
// Verify that the in-memory ooo chunk is empty.
|
||||
checkEmptyOOOChunk := func(lbls labels.Labels) {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Nil(t, ms.ooo)
|
||||
@ -5085,7 +5085,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample
|
||||
|
||||
// Verify that the in-memory ooo chunk is not empty.
|
||||
checkNonEmptyOOOChunk := func(lbls labels.Labels) {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
|
||||
@ -5246,7 +5246,7 @@ func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScen
|
||||
|
||||
// Checking that ooo chunk is not empty.
|
||||
for _, lbls := range []labels.Labels{series1, series2} {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
|
||||
@ -5274,7 +5274,7 @@ func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScen
|
||||
|
||||
// Checking that ooo chunk is empty.
|
||||
for _, lbls := range []labels.Labels{series1, series2} {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Nil(t, ms.ooo)
|
||||
@ -5357,7 +5357,7 @@ func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScen
|
||||
|
||||
// Checking that ooo chunk is not empty.
|
||||
for _, lbls := range []labels.Labels{series1, series2} {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
|
||||
@ -5385,7 +5385,7 @@ func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScen
|
||||
|
||||
// Checking that ooo chunk is empty.
|
||||
for _, lbls := range []labels.Labels{series1, series2} {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Nil(t, ms.ooo)
|
||||
@ -5467,7 +5467,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa
|
||||
|
||||
// Checking that there are some ooo m-map chunks.
|
||||
for _, lbls := range []labels.Labels{series1, series2} {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Len(t, ms.ooo.oooMmappedChunks, 2)
|
||||
@ -5486,7 +5486,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa
|
||||
|
||||
// Check ooo m-map chunks again.
|
||||
for _, lbls := range []labels.Labels{series1, series2} {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Len(t, ms.ooo.oooMmappedChunks, 2)
|
||||
@ -5526,7 +5526,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa
|
||||
|
||||
// Checking that ooo chunk is empty in Head.
|
||||
for _, lbls := range []labels.Labels{series1, series2} {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Nil(t, ms.ooo)
|
||||
@ -6835,7 +6835,7 @@ func testOOODisabled(t *testing.T, scenario sampleTypeScenario) {
|
||||
_, err = os.ReadDir(path.Join(db.Dir(), wlog.WblDirName))
|
||||
require.True(t, os.IsNotExist(err))
|
||||
|
||||
ms, created, err := db.head.getOrCreate(s1.Hash(), s1)
|
||||
ms, created, err := db.head.getOrCreate(s1.Hash(), s1, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.NotNil(t, ms)
|
||||
@ -6908,7 +6908,7 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) {
|
||||
oooMint, oooMaxt := minutes(195), minutes(260)
|
||||
|
||||
// Collect the samples only present in the ooo m-map chunks.
|
||||
ms, created, err := db.head.getOrCreate(s1.Hash(), s1)
|
||||
ms, created, err := db.head.getOrCreate(s1.Hash(), s1, false)
|
||||
require.False(t, created)
|
||||
require.NoError(t, err)
|
||||
var s1MmapSamples []chunks.Sample
|
||||
@ -7088,7 +7088,7 @@ func TestOOOHistogramCompactionWithCounterResets(t *testing.T) {
|
||||
|
||||
// Verify that the in-memory ooo chunk is empty.
|
||||
checkEmptyOOOChunk := func(lbls labels.Labels) {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Nil(t, ms.ooo)
|
||||
@ -7270,7 +7270,7 @@ func TestOOOHistogramCompactionWithCounterResets(t *testing.T) {
|
||||
|
||||
// Verify that the in-memory ooo chunk is not empty.
|
||||
checkNonEmptyOOOChunk := func(lbls labels.Labels) {
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
|
||||
@ -7594,7 +7594,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) {
|
||||
require.Len(t, db.Blocks(), 3)
|
||||
|
||||
// Check that the ooo chunks were removed.
|
||||
ms, created, err := db.head.getOrCreate(series1.Hash(), series1)
|
||||
ms, created, err := db.head.getOrCreate(series1.Hash(), series1, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.Nil(t, ms.ooo)
|
||||
|
19
tsdb/head.go
19
tsdb/head.go
@ -1720,7 +1720,7 @@ func (h *Head) String() string {
|
||||
return "head"
|
||||
}
|
||||
|
||||
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||||
func (h *Head) getOrCreate(hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) {
|
||||
// Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create
|
||||
// a new series on every sample inserted via Add(), which causes allocations
|
||||
// and makes our series IDs rather random and harder to compress in postings.
|
||||
@ -1732,17 +1732,17 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
|
||||
// Optimistically assume that we are the first one to create the series.
|
||||
id := chunks.HeadSeriesRef(h.lastSeriesID.Inc())
|
||||
|
||||
return h.getOrCreateWithID(id, hash, lset)
|
||||
return h.getOrCreateWithID(id, hash, lset, pendingCommit)
|
||||
}
|
||||
|
||||
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||||
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) {
|
||||
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
|
||||
shardHash := uint64(0)
|
||||
if h.opts.EnableSharding {
|
||||
shardHash = labels.StableHash(lset)
|
||||
}
|
||||
|
||||
return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled)
|
||||
return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled, pendingCommit)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
@ -2183,12 +2183,13 @@ type memSeriesOOOFields struct {
|
||||
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
|
||||
}
|
||||
|
||||
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled bool) *memSeries {
|
||||
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled, pendingCommit bool) *memSeries {
|
||||
s := &memSeries{
|
||||
lset: lset,
|
||||
ref: id,
|
||||
nextAt: math.MinInt64,
|
||||
shardHash: shardHash,
|
||||
lset: lset,
|
||||
ref: id,
|
||||
nextAt: math.MinInt64,
|
||||
shardHash: shardHash,
|
||||
pendingCommit: pendingCommit,
|
||||
}
|
||||
if !isolationDisabled {
|
||||
s.txs = newTxRing(0)
|
||||
|
@ -319,7 +319,8 @@ type headAppender struct {
|
||||
headMaxt int64 // We track it here to not take the lock for every sample appended.
|
||||
oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample.
|
||||
|
||||
series []record.RefSeries // New series held by this appender.
|
||||
seriesRefs []record.RefSeries // New series records held by this appender.
|
||||
series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs)
|
||||
samples []record.RefSample // New float samples held by this appender.
|
||||
sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
histograms []record.RefHistogramSample // New histogram samples held by this appender.
|
||||
@ -461,15 +462,16 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo
|
||||
if l, dup := lset.HasDuplicateLabelNames(); dup {
|
||||
return nil, false, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
|
||||
}
|
||||
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
|
||||
s, created, err = a.head.getOrCreate(lset.Hash(), lset, true)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if created {
|
||||
a.series = append(a.series, record.RefSeries{
|
||||
a.seriesRefs = append(a.seriesRefs, record.RefSeries{
|
||||
Ref: s.ref,
|
||||
Labels: lset,
|
||||
})
|
||||
a.series = append(a.series, s)
|
||||
}
|
||||
return s, created, nil
|
||||
}
|
||||
@ -907,8 +909,8 @@ func (a *headAppender) log() error {
|
||||
var rec []byte
|
||||
var enc record.Encoder
|
||||
|
||||
if len(a.series) > 0 {
|
||||
rec = enc.Series(a.series, buf)
|
||||
if len(a.seriesRefs) > 0 {
|
||||
rec = enc.Series(a.seriesRefs, buf)
|
||||
buf = rec[:0]
|
||||
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
@ -1426,6 +1428,14 @@ func (a *headAppender) commitMetadata() {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *headAppender) unmarkCreatedSeriesAsPendingCommit() {
|
||||
for _, s := range a.series {
|
||||
s.Lock()
|
||||
s.pendingCommit = false
|
||||
s.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Commit writes to the WAL and adds the data to the Head.
|
||||
// TODO(codesome): Refactor this method to reduce indentation and make it more readable.
|
||||
func (a *headAppender) Commit() (err error) {
|
||||
@ -1479,6 +1489,8 @@ func (a *headAppender) Commit() (err error) {
|
||||
a.commitHistograms(acc)
|
||||
a.commitFloatHistograms(acc)
|
||||
a.commitMetadata()
|
||||
// Unmark all series as pending commit after all samples have been committed.
|
||||
a.unmarkCreatedSeriesAsPendingCommit()
|
||||
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected))
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected))
|
||||
@ -1968,6 +1980,7 @@ func (a *headAppender) Rollback() (err error) {
|
||||
series.pendingCommit = false
|
||||
series.Unlock()
|
||||
}
|
||||
a.unmarkCreatedSeriesAsPendingCommit()
|
||||
a.head.putAppendBuffer(a.samples)
|
||||
a.head.putExemplarBuffer(a.exemplars)
|
||||
a.head.putHistogramBuffer(a.histograms)
|
||||
|
@ -43,7 +43,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
|
||||
defer h.Close()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
|
||||
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)), false)
|
||||
}
|
||||
}
|
||||
|
||||
@ -62,7 +62,7 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
i := count.Inc()
|
||||
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))))
|
||||
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(int(i))), false)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -82,7 +82,7 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
|
||||
defer h.Close()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
|
||||
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)), false)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -382,7 +382,7 @@ func TestMemSeries_chunk(t *testing.T) {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
|
||||
series := newMemSeries(labels.EmptyLabels(), 1, 0, true)
|
||||
series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false)
|
||||
|
||||
if tc.setup != nil {
|
||||
tc.setup(t, series, chunkDiskMapper)
|
||||
|
@ -102,7 +102,7 @@ func BenchmarkCreateSeries(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
|
||||
for _, s := range series {
|
||||
h.getOrCreate(s.Labels().Hash(), s.Labels())
|
||||
h.getOrCreate(s.Labels().Hash(), s.Labels(), false)
|
||||
}
|
||||
}
|
||||
|
||||
@ -374,7 +374,7 @@ func BenchmarkLoadWLs(b *testing.B) {
|
||||
}
|
||||
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
|
||||
// Create one mmapped chunk per series, with one sample at the given time.
|
||||
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false)
|
||||
s.append(c.mmappedChunkT, 42, 0, cOpts)
|
||||
// There's only one head chunk because only a single sample is appended. mmapChunks()
|
||||
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
|
||||
@ -895,7 +895,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
|
||||
{
|
||||
name: "keep series still in the head",
|
||||
prepare: func(t *testing.T, h *Head) {
|
||||
_, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls)
|
||||
_, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false)
|
||||
require.NoError(t, err)
|
||||
},
|
||||
seriesRef: chunks.HeadSeriesRef(existingRef),
|
||||
@ -1008,6 +1008,49 @@ func TestHead_RaceBetweenSeriesCreationAndGC(t *testing.T) {
|
||||
require.Equal(t, totalSeries, int(head.NumSeries()))
|
||||
}
|
||||
|
||||
func TestHead_CanGarbagecollectSeriesCreatedWithoutSamples(t *testing.T) {
|
||||
for op, finishTxn := range map[string]func(app storage.Appender) error{
|
||||
"after commit": func(app storage.Appender) error { return app.Commit() },
|
||||
"after rollback": func(app storage.Appender) error { return app.Rollback() },
|
||||
} {
|
||||
t.Run(op, func(t *testing.T) {
|
||||
chunkRange := time.Hour.Milliseconds()
|
||||
head, _ := newTestHead(t, chunkRange, compression.None, true)
|
||||
t.Cleanup(func() { _ = head.Close() })
|
||||
|
||||
require.NoError(t, head.Init(0))
|
||||
|
||||
firstSampleTime := 10 * chunkRange
|
||||
{
|
||||
// Append first sample, it should init head max time to firstSampleTime.
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.Append(0, labels.FromStrings("lbl", "ok"), firstSampleTime, 1)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, 1, int(head.NumSeries()))
|
||||
}
|
||||
|
||||
// Append a sample in a time range that is not covered by the chunk range,
|
||||
// We would create series first and then append no sample.
|
||||
app := head.Appender(context.Background())
|
||||
invalidSampleTime := firstSampleTime - chunkRange
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), invalidSampleTime, 2)
|
||||
require.Error(t, err)
|
||||
// These are our assumptions: we're not testing them, we're just checking them to make debugging a failed
|
||||
// test easier if someone refactors the code and breaks these assumptions.
|
||||
// If these assumptions fail after a refactor, feel free to remove them but make sure that the test is still what we intended to test.
|
||||
require.NotErrorIs(t, err, storage.ErrOutOfBounds, "Failed to append sample shouldn't take the shortcut that returns storage.ErrOutOfBounds")
|
||||
require.ErrorIs(t, err, storage.ErrTooOldSample, "Failed to append sample should return storage.ErrTooOldSample, because OOO window was enabled but this sample doesn't fall into it.")
|
||||
// Do commit or rollback, depending on what we're testing.
|
||||
require.NoError(t, finishTxn(app))
|
||||
|
||||
// Garbage-collect, since we finished the transaction and series has no samples, it should be collectable.
|
||||
head.gc()
|
||||
require.Equal(t, 1, int(head.NumSeries()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHead_UnknownWALRecord(t *testing.T) {
|
||||
head, w := newTestHead(t, 1000, compression.None, false)
|
||||
w.Log([]byte{255, 42})
|
||||
@ -1062,7 +1105,7 @@ func BenchmarkHead_Truncate(b *testing.B) {
|
||||
}
|
||||
|
||||
allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)
|
||||
s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i])
|
||||
s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i], false)
|
||||
s.mmappedChunks = []*mmappedChunk{
|
||||
{minTime: 1000 * int64(i/churn), maxTime: 999 + 1000*int64(i/churn)},
|
||||
}
|
||||
@ -1099,10 +1142,10 @@ func TestHead_Truncate(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
s1, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"))
|
||||
s2, _, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1"))
|
||||
s3, _, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2"))
|
||||
s4, _, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))
|
||||
s1, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"), false)
|
||||
s2, _, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1"), false)
|
||||
s3, _, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2"), false)
|
||||
s4, _, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"), false)
|
||||
|
||||
s1.mmappedChunks = []*mmappedChunk{
|
||||
{minTime: 0, maxTime: 999},
|
||||
@ -1199,7 +1242,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled, false)
|
||||
|
||||
for i := 0; i < 4000; i += 5 {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
|
||||
@ -1340,7 +1383,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
|
||||
series := newMemSeries(labels.EmptyLabels(), 1, 0, true)
|
||||
series := newMemSeries(labels.EmptyLabels(), 1, 0, true, false)
|
||||
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
@ -1914,7 +1957,7 @@ func TestMemSeries_append(t *testing.T) {
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
|
||||
|
||||
// Add first two samples at the very end of a chunk range and the next two
|
||||
// on and after it.
|
||||
@ -1975,7 +2018,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
|
||||
|
||||
histograms := tsdbutil.GenerateTestHistograms(4)
|
||||
histogramWithOneMoreBucket := histograms[3].Copy()
|
||||
@ -2037,7 +2080,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
|
||||
samplesPerChunk: samplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
|
||||
|
||||
// At this slow rate, we will fill the chunk in two block durations.
|
||||
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
|
||||
@ -2088,7 +2131,7 @@ func TestGCChunkAccess(t *testing.T) {
|
||||
|
||||
h.initTime(0)
|
||||
|
||||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
|
||||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
|
||||
|
||||
// Appending 2 samples for the first chunk.
|
||||
ok, chunkCreated := s.append(0, 0, 0, cOpts)
|
||||
@ -2147,7 +2190,7 @@ func TestGCSeriesAccess(t *testing.T) {
|
||||
|
||||
h.initTime(0)
|
||||
|
||||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
|
||||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
|
||||
|
||||
// Appending 2 samples for the first chunk.
|
||||
ok, chunkCreated := s.append(0, 0, 0, cOpts)
|
||||
@ -2500,7 +2543,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
|
||||
s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
|
||||
require.True(t, created, "series was not created")
|
||||
|
||||
for i := 0; i < 7; i++ {
|
||||
@ -2860,7 +2903,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
|
||||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false)
|
||||
|
||||
ok, _ := s.append(0, 0, 0, cOpts)
|
||||
require.True(t, ok, "Series append failed.")
|
||||
@ -3443,7 +3486,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
|
||||
|
||||
for i := 0; i < 7; i++ {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
|
||||
@ -4753,7 +4796,7 @@ func TestHistogramCounterResetHeader(t *testing.T) {
|
||||
checkExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) {
|
||||
expHeaders = append(expHeaders, newHeaders...)
|
||||
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l)
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l, false)
|
||||
require.NoError(t, err)
|
||||
ms.mmapChunks(head.chunkDiskMapper)
|
||||
require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk.
|
||||
@ -4880,7 +4923,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) {
|
||||
checkOOOExpCounterResetHeader := func(newChunks ...expOOOMmappedChunks) {
|
||||
expChunks = append(expChunks, newChunks...)
|
||||
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l)
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, ms.ooo.oooMmappedChunks, len(expChunks))
|
||||
@ -5023,7 +5066,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
||||
|
||||
var expResult []chunks.Sample
|
||||
checkExpChunks := func(count int) {
|
||||
ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := db.Head().getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.NotNil(t, ms)
|
||||
@ -5327,7 +5370,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
|
||||
require.NoError(t, h.Init(0)) // Replay happens here.
|
||||
|
||||
// Get the ooo samples from the Head.
|
||||
ms, ok, err := h.getOrCreate(l.Hash(), l)
|
||||
ms, ok, err := h.getOrCreate(l.Hash(), l, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok)
|
||||
require.NotNil(t, ms)
|
||||
@ -5396,7 +5439,7 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
|
||||
appendSample(mins)
|
||||
}
|
||||
|
||||
ms, ok, err := h.getOrCreate(l.Hash(), l)
|
||||
ms, ok, err := h.getOrCreate(l.Hash(), l, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok)
|
||||
require.NotNil(t, ms)
|
||||
@ -5424,7 +5467,7 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
|
||||
require.NoError(t, h.Init(0)) // Replay happens here.
|
||||
|
||||
// Get the mmap chunks from the Head.
|
||||
ms, ok, err = h.getOrCreate(l.Hash(), l)
|
||||
ms, ok, err = h.getOrCreate(l.Hash(), l, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok)
|
||||
require.NotNil(t, ms)
|
||||
@ -5479,7 +5522,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
|
||||
require.NoError(t, app.Commit())
|
||||
require.Greater(t, prom_testutil.ToFloat64(h.metrics.chunksCreated), 4.0)
|
||||
|
||||
series, created, err := h.getOrCreate(seriesLabels.Hash(), seriesLabels)
|
||||
series, created, err := h.getOrCreate(seriesLabels.Hash(), seriesLabels, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created, "should already exist")
|
||||
require.NotNil(t, series, "should return the series we created above")
|
||||
@ -5496,7 +5539,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
series, created, err = h.getOrCreate(seriesLabels.Hash(), seriesLabels)
|
||||
series, created, err = h.getOrCreate(seriesLabels.Hash(), seriesLabels, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created, "should already exist")
|
||||
require.NotNil(t, series, "should return the series we created above")
|
||||
@ -5693,7 +5736,7 @@ func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Ap
|
||||
}
|
||||
|
||||
verifyOOOSamples := func(lbls labels.Labels, expSamples int) {
|
||||
ms, created, err := h.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := h.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.NotNil(t, ms)
|
||||
@ -5704,7 +5747,7 @@ func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Ap
|
||||
}
|
||||
|
||||
verifyInOrderSamples := func(lbls labels.Labels, expSamples int) {
|
||||
ms, created, err := h.getOrCreate(lbls.Hash(), lbls)
|
||||
ms, created, err := h.getOrCreate(lbls.Hash(), lbls, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.NotNil(t, ms)
|
||||
@ -5832,7 +5875,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
|
||||
|
||||
checkHeaders := func() {
|
||||
head.mmapHeadChunks()
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l)
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, ms.mmappedChunks, 3)
|
||||
expHeaders := []chunkenc.CounterResetHeader{
|
||||
@ -5907,7 +5950,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
|
||||
appendHistogram(hists[4])
|
||||
|
||||
checkHeaders := func() {
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l)
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l, false)
|
||||
require.NoError(t, err)
|
||||
head.mmapHeadChunks()
|
||||
require.Len(t, ms.mmappedChunks, 3)
|
||||
|
@ -254,7 +254,7 @@ Outer:
|
||||
switch v := d.(type) {
|
||||
case []record.RefSeries:
|
||||
for _, walSeries := range v {
|
||||
mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels)
|
||||
mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels, false)
|
||||
if err != nil {
|
||||
seriesCreationErr = err
|
||||
break Outer
|
||||
@ -1577,7 +1577,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
|
||||
localRefSeries := shardedRefSeries[idx]
|
||||
|
||||
for csr := range rc {
|
||||
series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset)
|
||||
series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset, false)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
|
@ -306,7 +306,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
||||
}()
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
s1, _, _ := h.getOrCreate(s1ID, s1Lset)
|
||||
s1, _, _ := h.getOrCreate(s1ID, s1Lset, false)
|
||||
s1.ooo = &memSeriesOOOFields{}
|
||||
|
||||
// define our expected chunks, by looking at the expected ChunkIntervals and setting...
|
||||
|
Loading…
Reference in New Issue
Block a user