diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 529ed771a9..d7711a6cdb 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -327,7 +327,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { } func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head { - head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize) + head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(tb, err) app := head.Appender() diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index a733c1e990..9ed29b4135 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -875,7 +875,7 @@ func BenchmarkCompactionFromHead(b *testing.B) { defer func() { testutil.Ok(b, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(b, err) for ln := 0; ln < labelNames; ln++ { app := h.Appender() diff --git a/tsdb/db.go b/tsdb/db.go index 17a69b01a8..e797c95f06 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -114,6 +114,10 @@ type Options struct { // Unit agnostic as long as unit is consistent with MinBlockDuration and RetentionDuration. // Typically it is in milliseconds. MaxBlockDuration int64 + + // SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. + // It is always a no-op in Prometheus and mainly meant for external users who import TSDB. + SeriesLifecycleCallback SeriesLifecycleCallback } // DB handles reads and writes of time series falling into @@ -309,7 +313,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { if err != nil { return err } - head, err := NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize) + head, err := NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize, nil) if err != nil { return err } @@ -368,7 +372,7 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu blocks[i] = b } - head, err := NewHead(nil, db.logger, nil, 1, db.dir, nil, DefaultStripeSize) + head, err := NewHead(nil, db.logger, nil, 1, db.dir, nil, DefaultStripeSize, nil) if err != nil { return nil, err } @@ -386,7 +390,7 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu if err != nil { return nil, err } - head, err = NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize) + head, err = NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize, nil) if err != nil { return nil, err } @@ -599,8 +603,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } } - db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.StripeSize) - + db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.StripeSize, opts.SeriesLifecycleCallback) if err != nil { return nil, err } diff --git a/tsdb/head.go b/tsdb/head.go index d56f167e31..281d763a73 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -65,7 +65,8 @@ type Head struct { memChunkPool sync.Pool // All series addressable by their ID or hash. - series *stripeSeries + series *stripeSeries + seriesCallback SeriesLifecycleCallback symMtx sync.RWMutex symbols map[string]struct{} @@ -284,20 +285,23 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings // stripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series. -func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, pool chunkenc.Pool, stripeSize int) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, pool chunkenc.Pool, stripeSize int, seriesCallback SeriesLifecycleCallback) (*Head, error) { if l == nil { l = log.NewNopLogger() } if chunkRange < 1 { return nil, errors.Errorf("invalid chunk range %d", chunkRange) } + if seriesCallback == nil { + seriesCallback = &noopSeriesLifecycleCallback{} + } h := &Head{ wal: wal, logger: l, chunkRange: chunkRange, minTime: math.MaxInt64, maxTime: math.MinInt64, - series: newStripeSeries(stripeSize), + series: newStripeSeries(stripeSize, seriesCallback), values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), @@ -309,7 +313,8 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int return &memChunk{} }, }, - chunkDirRoot: chkDirRoot, + chunkDirRoot: chkDirRoot, + seriesCallback: seriesCallback, } h.metrics = newHeadMetrics(h, r) @@ -408,41 +413,13 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks n = runtime.GOMAXPROCS(0) inputs = make([]chan []record.RefSample, n) outputs = make([]chan []record.RefSample, n) - ) - wg.Add(n) - defer func() { - // For CorruptionErr ensure to terminate all workers before exiting. - if _, ok := err.(*wal.CorruptionErr); ok { - for i := 0; i < n; i++ { - close(inputs[i]) - for range outputs[i] { - } - } - wg.Wait() - } - }() - - for i := 0; i < n; i++ { - outputs[i] = make(chan []record.RefSample, 300) - inputs[i] = make(chan []record.RefSample, 300) - - go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { - unknown := h.processWALSamples(h.minValidTime, input, output) - atomic.AddUint64(&unknownRefs, unknown) - wg.Done() - }(inputs[i], outputs[i]) - } - - var ( dec record.Decoder shards = make([][]record.RefSample, n) - ) - var ( - decoded = make(chan interface{}, 10) - errCh = make(chan error, 1) - seriesPool = sync.Pool{ + decoded = make(chan interface{}, 10) + decodeErr, seriesCreationErr error + seriesPool = sync.Pool{ New: func() interface{} { return []record.RefSeries{} }, @@ -458,6 +435,32 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks }, } ) + + defer func() { + // For CorruptionErr ensure to terminate all workers before exiting. + _, ok := err.(*wal.CorruptionErr) + if ok || seriesCreationErr != nil { + for i := 0; i < n; i++ { + close(inputs[i]) + for range outputs[i] { + } + } + wg.Wait() + } + }() + + wg.Add(n) + for i := 0; i < n; i++ { + outputs[i] = make(chan []record.RefSample, 300) + inputs[i] = make(chan []record.RefSample, 300) + + go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { + unknown := h.processWALSamples(h.minValidTime, input, output) + atomic.AddUint64(&unknownRefs, unknown) + wg.Done() + }(inputs[i], outputs[i]) + } + go func() { defer close(decoded) for r.Next() { @@ -467,7 +470,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks series := seriesPool.Get().([]record.RefSeries)[:0] series, err = dec.Series(rec, series) if err != nil { - errCh <- &wal.CorruptionErr{ + decodeErr = &wal.CorruptionErr{ Err: errors.Wrap(err, "decode series"), Segment: r.Segment(), Offset: r.Offset(), @@ -479,7 +482,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks samples := samplesPool.Get().([]record.RefSample)[:0] samples, err = dec.Samples(rec, samples) if err != nil { - errCh <- &wal.CorruptionErr{ + decodeErr = &wal.CorruptionErr{ Err: errors.Wrap(err, "decode samples"), Segment: r.Segment(), Offset: r.Offset(), @@ -491,7 +494,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks tstones := tstonesPool.Get().([]tombstones.Stone)[:0] tstones, err = dec.Tombstones(rec, tstones) if err != nil { - errCh <- &wal.CorruptionErr{ + decodeErr = &wal.CorruptionErr{ Err: errors.Wrap(err, "decode tombstones"), Segment: r.Segment(), Offset: r.Offset(), @@ -500,7 +503,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks } decoded <- tstones default: - errCh <- &wal.CorruptionErr{ + decodeErr = &wal.CorruptionErr{ Err: errors.Errorf("invalid record type %v", dec.Type(rec)), Segment: r.Segment(), Offset: r.Offset(), @@ -510,11 +513,16 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks } }() +Outer: for d := range decoded { switch v := d.(type) { case []record.RefSeries: for _, s := range v { - series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + series, created, err := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + if err != nil { + seriesCreationErr = err + break Outer + } if created { // If this series gets a duplicate record, we don't restore its mmapped chunks, @@ -593,10 +601,14 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks } } - select { - case err := <-errCh: - return err - default: + if decodeErr != nil { + return decodeErr + } + if seriesCreationErr != nil { + // Drain the channel to unblock the goroutine. + for range decoded { + } + return seriesCreationErr } // Signal termination to each worker and wait for it to close its output channel. @@ -1084,7 +1096,10 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) } - s, created := a.head.getOrCreate(lset.Hash(), lset) + s, created, err := a.head.getOrCreate(lset.Hash(), lset) + if err != nil { + return 0, err + } if created { a.series = append(a.series, record.RefSeries{ Ref: s.ref, @@ -1611,13 +1626,13 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks return nil } -func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) { +func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) { // Just using `getOrSet` below would be semantically sufficient, but we'd create // a new series on every sample inserted via Add(), which causes allocations // and makes our series IDs rather random and harder to compress in postings. s := h.series.getByHash(hash, lset) if s != nil { - return s, false + return s, false, nil } // Optimistically assume that we are the first one to create the series. @@ -1626,12 +1641,15 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) { return h.getOrCreateWithID(id, hash, lset) } -func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) { +func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s := newMemSeries(lset, id, h.chunkRange, &h.memChunkPool) - s, created := h.series.getOrSet(hash, s) + s, created, err := h.series.getOrSet(hash, s) + if err != nil { + return nil, false, err + } if !created { - return s, false + return s, false, nil } h.metrics.seriesCreated.Inc() @@ -1654,7 +1672,7 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie h.symbols[l.Value] = struct{}{} } - return s, true + return s, true, nil } // seriesHashmap is a simple hashmap for memSeries by their label set. It is built @@ -1707,10 +1725,11 @@ const ( // with the maps was profiled to be slower – likely due to the additional pointer // dereferences. type stripeSeries struct { - size int - series []map[uint64]*memSeries - hashes []seriesHashmap - locks []stripeLock + size int + series []map[uint64]*memSeries + hashes []seriesHashmap + locks []stripeLock + seriesLifecycleCallback SeriesLifecycleCallback } type stripeLock struct { @@ -1719,12 +1738,13 @@ type stripeLock struct { _ [40]byte } -func newStripeSeries(stripeSize int) *stripeSeries { +func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *stripeSeries { s := &stripeSeries{ - size: stripeSize, - series: make([]map[uint64]*memSeries, stripeSize), - hashes: make([]seriesHashmap, stripeSize), - locks: make([]stripeLock, stripeSize), + size: stripeSize, + series: make([]map[uint64]*memSeries, stripeSize), + hashes: make([]seriesHashmap, stripeSize), + locks: make([]stripeLock, stripeSize), + seriesLifecycleCallback: seriesCallback, } for i := range s.series { @@ -1740,8 +1760,9 @@ func newStripeSeries(stripeSize int) *stripeSeries { // series entirely that have no chunks left. func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { var ( - deleted = map[uint64]struct{}{} - rmChunks = 0 + deleted = map[uint64]struct{}{} + deletedForCallback = []labels.Labels{} + rmChunks = 0 ) // Run through all series and truncate old chunks. Mark those with no // chunks left as deleted and store their ID. @@ -1772,6 +1793,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { deleted[series.ref] = struct{}{} s.hashes[i].del(hash, series.lset) delete(s.series[j], series.ref) + deletedForCallback = append(deletedForCallback, series.lset) if i != j { s.locks[j].Unlock() @@ -1782,6 +1804,9 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { } s.locks[i].Unlock() + + s.seriesLifecycleCallback.PostDeletion(deletedForCallback...) + deletedForCallback = deletedForCallback[:0] } return deleted, rmChunks @@ -1807,25 +1832,39 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { return series } -func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) { - i := hash & uint64(s.size-1) +func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool, error) { + // PreCreation is called here to avoid calling it inside the lock. + // It is not necessary to call it just before creating a series, + // rather it gives a 'hint' whether to create a series or not. + createSeriesErr := s.seriesLifecycleCallback.PreCreation(series.lset) + i := hash & uint64(s.size-1) s.locks[i].Lock() if prev := s.hashes[i].get(hash, series.lset); prev != nil { s.locks[i].Unlock() - return prev, false + return prev, false, nil + } + if createSeriesErr == nil { + s.hashes[i].set(hash, series) } - s.hashes[i].set(hash, series) s.locks[i].Unlock() + if createSeriesErr != nil { + // The callback prevented creation of series. + return nil, false, createSeriesErr + } + // Setting the series in the s.hashes marks the creation of series + // as any further calls to this methods would return that series. + s.seriesLifecycleCallback.PostCreation(series.lset) + i = series.ref & uint64(s.size-1) s.locks[i].Lock() s.series[i][series.ref] = series s.locks[i].Unlock() - return series, true + return series, true, nil } type sample struct { @@ -2275,3 +2314,24 @@ type mmappedChunk struct { func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool { return mc.minTime <= maxt && mint <= mc.maxTime } + +// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. +// It is always a no-op in Prometheus and mainly meant for external users who import TSDB. +// All the callbacks should be safe to be called concurrently. +// It is upto the user to implement soft or hard consistency by making the callbacks +// atomic or non-atomic. Atomic callbacks can cause degradation performance. +type SeriesLifecycleCallback interface { + // PreCreation is called before creating a series to indicate if the series can be created. + // A non nil error means the series should not be created. + PreCreation(labels.Labels) error + // PostCreation is called after creating a series to indicate a creation of series. + PostCreation(labels.Labels) + // PostDeletion is called after deletion of series. + PostDeletion(...labels.Labels) +} + +type noopSeriesLifecycleCallback struct{} + +func (noopSeriesLifecycleCallback) PreCreation(labels.Labels) error { return nil } +func (noopSeriesLifecycleCallback) PostCreation(labels.Labels) {} +func (noopSeriesLifecycleCallback) PostDeletion(...labels.Labels) {} diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index bd0f248267..0dd30b55d6 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -31,7 +31,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) { testutil.Ok(b, os.RemoveAll(chunkDir)) }() // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(b, err) defer h.Close() @@ -47,7 +47,7 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { testutil.Ok(b, os.RemoveAll(chunkDir)) }() // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(b, err) defer h.Close() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 7ac9b599b4..fd5620bf25 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -173,7 +173,7 @@ func BenchmarkLoadWAL(b *testing.B) { // Load the WAL. for i := 0; i < b.N; i++ { - h, err := NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize) + h, err := NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize, nil) testutil.Ok(b, err) h.Init(0) } @@ -286,7 +286,7 @@ func TestHead_WALMultiRef(t *testing.T) { w, err = wal.New(nil, nil, w.Dir(), false) testutil.Ok(t, err) - head, err = NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize) + head, err = NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize, nil) testutil.Ok(t, err) testutil.Ok(t, head.Init(0)) defer func() { @@ -313,10 +313,10 @@ func TestHead_Truncate(t *testing.T) { h.initTime(0) - s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) - s2, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1")) - s3, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2")) - s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) + s1, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) + s2, _, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1")) + s3, _, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2")) + s4, _, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) s1.mmappedChunks = []*mmappedChunk{ {minTime: 0, maxTime: 999}, @@ -550,7 +550,7 @@ func TestHeadDeleteSimple(t *testing.T) { // Compare the samples for both heads - before and after the reload. reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload. testutil.Ok(t, err) - reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, DefaultStripeSize) + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, DefaultStripeSize, nil) testutil.Ok(t, err) testutil.Ok(t, reloadedHead.Init(0)) @@ -990,7 +990,7 @@ func TestGCChunkAccess(t *testing.T) { h.initTime(0) - s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) + s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) @@ -1044,7 +1044,7 @@ func TestGCSeriesAccess(t *testing.T) { h.initTime(0) - s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) + s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) @@ -1248,7 +1248,7 @@ func TestWalRepair_DecodingError(t *testing.T) { testutil.Ok(t, w.Log(test.rec)) } - h, err := NewHead(nil, nil, w, 1, w.Dir(), nil, DefaultStripeSize) + h, err := NewHead(nil, nil, w, 1, w.Dir(), nil, DefaultStripeSize, nil) testutil.Ok(t, err) testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) initErr := h.Init(math.MinInt64) @@ -1303,12 +1303,12 @@ func TestHeadReadWriterRepair(t *testing.T) { w, err := wal.New(nil, nil, walDir, false) testutil.Ok(t, err) - h, err := NewHead(nil, nil, w, chunkRange, dir, nil, DefaultStripeSize) + h, err := NewHead(nil, nil, w, chunkRange, dir, nil, DefaultStripeSize, nil) testutil.Ok(t, err) testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal)) testutil.Ok(t, h.Init(math.MinInt64)) - s, created := h.getOrCreate(1, labels.FromStrings("a", "1")) + s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) testutil.Assert(t, created, "series was not created") for i := 0; i < 7; i++ { @@ -1531,7 +1531,7 @@ func TestMemSeriesIsolation(t *testing.T) { wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false) testutil.Ok(t, err) - hb, err = NewHead(nil, nil, wlog, 1000, wlog.Dir(), nil, DefaultStripeSize) + hb, err = NewHead(nil, nil, wlog, 1000, wlog.Dir(), nil, DefaultStripeSize, nil) defer func() { testutil.Ok(t, hb.Close()) }() testutil.Ok(t, err) testutil.Ok(t, hb.Init(0)) @@ -1641,7 +1641,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { h.initTime(0) - s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) + s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) ok, _ := s.append(0, 0, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "Series append failed.") @@ -1794,7 +1794,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) testutil.Ok(t, err) - h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, DefaultStripeSize) + h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, DefaultStripeSize, nil) testutil.Ok(t, err) testutil.Ok(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 2a03852f20..6d5f04a658 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -35,7 +35,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) { defer func() { testutil.Ok(b, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(b, err) defer func() { testutil.Ok(b, h.Close()) @@ -136,7 +136,7 @@ func BenchmarkQuerierSelect(b *testing.B) { defer func() { testutil.Ok(b, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(b, err) defer h.Close() app := h.Appender() diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 8cf13249aa..ea6a99ee1c 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1862,7 +1862,7 @@ func TestPostingsForMatchers(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) testutil.Ok(t, err) defer func() { testutil.Ok(t, h.Close()) diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 8a26e91e79..2264abc754 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -33,7 +33,7 @@ type MetricSample struct { // CreateHead creates a TSDB writer head to write the sample data to. func CreateHead(samples []*MetricSample, chunkRange int64, chunkDir string, logger log.Logger) (*Head, error) { - head, err := NewHead(nil, logger, nil, chunkRange, chunkDir, nil, DefaultStripeSize) + head, err := NewHead(nil, logger, nil, chunkRange, chunkDir, nil, DefaultStripeSize, nil) if err != nil { return nil, err diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index a27f818ad6..0aff0350f7 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -1915,7 +1915,7 @@ func (f *fakeDB) Stats(statsByLabelName string) (_ *tsdb.Stats, retErr error) { retErr = err } }() - h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, tsdb.DefaultStripeSize) + h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, tsdb.DefaultStripeSize, nil) return h.Stats(statsByLabelName), nil }