diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 0870a4b111..1da34b16ac 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -322,7 +322,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { } func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head { - head, err := NewHead(nil, nil, w, DefaultBlockDuration, chunkDir, nil, DefaultStripeSize, nil) + head, err := NewHead(nil, nil, w, DefaultBlockDuration, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(tb, err) app := head.Appender(context.Background()) diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index bb61841692..0cd05eb77f 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" ) // BlockWriter is a block writer that allows appending and flushing series to disk. @@ -67,7 +68,7 @@ func (w *BlockWriter) initHead() error { } w.chunkDir = chunkDir - h, err := NewHead(nil, w.logger, nil, w.blockSize, w.chunkDir, nil, DefaultStripeSize, nil) + h, err := NewHead(nil, w.logger, nil, w.blockSize, w.chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) if err != nil { return errors.Wrap(err, "tsdb.NewHead") } diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 1709f8353a..d82c12d33f 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -40,7 +40,6 @@ const ( MagicHeadChunks = 0x0130BC91 headChunksFormatV1 = 1 - writeBufferSize = 4 * 1024 * 1024 // 4 MiB. ) var ( @@ -63,6 +62,12 @@ const ( // MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data. // Max because the uvarint size can be smaller. MaxHeadChunkMetaSize = SeriesRefSize + 2*MintMaxtSize + ChunksFormatVersionSize + MaxChunkLengthFieldSize + CRCSize + // MinWriteBufferSize is the minimum write buffer size allowed. + MinWriteBufferSize = 64 * 1024 // 64KB. + // MaxWriteBufferSize is the maximum write buffer size allowed. + MaxWriteBufferSize = 8 * 1024 * 1024 // 8 MiB. + // DefaultWriteBufferSize is the default write buffer size. + DefaultWriteBufferSize = 4 * 1024 * 1024 // 4 MiB. ) // corruptionErr is an error that's returned when corruption is encountered. @@ -82,7 +87,8 @@ type ChunkDiskMapper struct { curFileNumBytes atomic.Int64 // Bytes written in current open file. /// Writer. - dir *os.File + dir *os.File + writeBufferSize int curFile *os.File // File being written to. curFileSequence int // Index of current open file being appended to. @@ -121,7 +127,15 @@ type mmappedChunkFile struct { // using the default head chunk file duration. // NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. -func NewChunkDiskMapper(dir string, pool chunkenc.Pool) (*ChunkDiskMapper, error) { +func NewChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*ChunkDiskMapper, error) { + // Validate write buffer size. + if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize { + return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxHeadChunkFileSize, writeBufferSize) + } + if writeBufferSize%1024 != 0 { + return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize) + } + if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } @@ -131,10 +145,11 @@ func NewChunkDiskMapper(dir string, pool chunkenc.Pool) (*ChunkDiskMapper, error } m := &ChunkDiskMapper{ - dir: dirFile, - pool: pool, - crc32: newCRC32(), - chunkBuffer: newChunkBuffer(), + dir: dirFile, + pool: pool, + writeBufferSize: writeBufferSize, + crc32: newCRC32(), + chunkBuffer: newChunkBuffer(), } if m.pool == nil { @@ -273,7 +288,7 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c // if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size; // so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer). - if len(chk.Bytes())+MaxHeadChunkMetaSize < writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) { + if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) { if err := cdm.flushBuffer(); err != nil { return 0, err } @@ -313,7 +328,7 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c cdm.chunkBuffer.put(chkRef, chk) - if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize { + if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize { // The chunk was bigger than the buffer itself. // Flushing to not keep partial chunks in buffer. if err := cdm.flushBuffer(); err != nil { @@ -382,7 +397,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) { if cdm.chkWriter != nil { cdm.chkWriter.Reset(newFile) } else { - cdm.chkWriter = bufio.NewWriterSize(newFile, writeBufferSize) + cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize) } cdm.closers[cdm.curFileSequence] = mmapFile diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 75afe14ba9..43aa0484b4 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -133,7 +133,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { // Testing IterateAllChunks method. dir := hrw.dir.Name() require.NoError(t, hrw.Close()) - hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) idx := 0 @@ -223,7 +223,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // Restarted. var err error - hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) @@ -316,7 +316,7 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { // Restarting checks for unsequential files. var err error - hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) verifyFiles([]int{3, 4, 5, 6, 7}) } @@ -337,7 +337,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { require.NoError(t, hrw.Close()) // Restarting to recreate https://github.com/prometheus/prometheus/issues/7753. - hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) // Forcefully failing IterateAllChunks. @@ -394,7 +394,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { require.NoError(t, f.Close()) // Open chunk disk mapper again, corrupt file should be removed. - hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) require.NoError(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) @@ -425,7 +425,7 @@ func testChunkDiskMapper(t *testing.T) *ChunkDiskMapper { require.NoError(t, os.RemoveAll(tmpdir)) }) - hrw, err := NewChunkDiskMapper(tmpdir, chunkenc.NewPool()) + hrw, err := NewChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) require.NoError(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 59873fbd3a..377ccb5531 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1093,7 +1093,7 @@ func BenchmarkCompactionFromHead(b *testing.B) { defer func() { require.NoError(b, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(b, err) for ln := 0; ln < labelNames; ln++ { app := h.Appender(context.Background()) diff --git a/tsdb/db.go b/tsdb/db.go index 81669829de..f63437fc6a 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" _ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met. @@ -67,14 +68,15 @@ var ( // millisecond precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wal.DefaultSegmentSize, - RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), - MinBlockDuration: DefaultBlockDuration, - MaxBlockDuration: DefaultBlockDuration, - NoLockfile: false, - AllowOverlappingBlocks: false, - WALCompression: false, - StripeSize: DefaultStripeSize, + WALSegmentSize: wal.DefaultSegmentSize, + RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), + MinBlockDuration: DefaultBlockDuration, + MaxBlockDuration: DefaultBlockDuration, + NoLockfile: false, + AllowOverlappingBlocks: false, + WALCompression: false, + StripeSize: DefaultStripeSize, + HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, } } @@ -122,6 +124,9 @@ type Options struct { // Typically it is in milliseconds. MaxBlockDuration int64 + // HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper. + HeadChunksWriteBufferSize int + // SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. // It is always a no-op in Prometheus and mainly meant for external users who import TSDB. SeriesLifecycleCallback SeriesLifecycleCallback @@ -328,7 +333,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { if err != nil { return err } - head, err := NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, DefaultStripeSize, nil) + head, err := NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) if err != nil { return err } @@ -381,7 +386,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue blocks[i] = b } - head, err := NewHead(nil, db.logger, nil, DefaultBlockDuration, db.dir, nil, DefaultStripeSize, nil) + head, err := NewHead(nil, db.logger, nil, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) if err != nil { return nil, err } @@ -399,7 +404,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue if err != nil { return nil, err } - head, err = NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, DefaultStripeSize, nil) + head, err = NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) if err != nil { return nil, err } @@ -531,7 +536,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { if opts.StripeSize <= 0 { opts.StripeSize = DefaultStripeSize } - + if opts.HeadChunksWriteBufferSize <= 0 { + opts.HeadChunksWriteBufferSize = chunks.DefaultWriteBufferSize + } if opts.MinBlockDuration <= 0 { opts.MinBlockDuration = DefaultBlockDuration } @@ -642,7 +649,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } } - db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.StripeSize, opts.SeriesLifecycleCallback) + db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.HeadChunksWriteBufferSize, opts.StripeSize, opts.SeriesLifecycleCallback) if err != nil { return nil, err } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 7d70119f46..6bedae9f35 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -193,9 +193,9 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { require.Equal(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet) } -// TestNoPanicAfterWALCorrutpion ensures that querying the db after a WAL corruption doesn't cause a panic. +// TestNoPanicAfterWALCorruption ensures that querying the db after a WAL corruption doesn't cause a panic. // https://github.com/prometheus/prometheus/issues/7548 -func TestNoPanicAfterWALCorrutpion(t *testing.T) { +func TestNoPanicAfterWALCorruption(t *testing.T) { db := openTestDB(t, &Options{WALSegmentSize: 32 * 1024}, nil) // Append until the first mmaped head chunk. diff --git a/tsdb/head.go b/tsdb/head.go index 7ad23df872..342073dd6c 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -295,7 +295,7 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings // stripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series. -func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, pool chunkenc.Pool, stripeSize int, seriesCallback SeriesLifecycleCallback) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, chkPool chunkenc.Pool, chkWriteBufferSize, stripeSize int, seriesCallback SeriesLifecycleCallback) (*Head, error) { if l == nil { l = log.NewNopLogger() } @@ -328,12 +328,12 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int h.lastWALTruncationTime.Store(math.MinInt64) h.metrics = newHeadMetrics(h, r) - if pool == nil { - pool = chunkenc.NewPool() + if chkPool == nil { + chkPool = chunkenc.NewPool() } var err error - h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(mmappedChunksDir(chkDirRoot), pool) + h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(mmappedChunksDir(chkDirRoot), chkPool, chkWriteBufferSize) if err != nil { return nil, err } diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index 6e746e8c81..ea7afc63cb 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -23,6 +23,7 @@ import ( "go.uber.org/atomic" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunks" ) func BenchmarkHeadStripeSeriesCreate(b *testing.B) { @@ -32,7 +33,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) { require.NoError(b, os.RemoveAll(chunkDir)) }() // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(b, err) defer h.Close() @@ -48,7 +49,7 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { require.NoError(b, os.RemoveAll(chunkDir)) }() // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(b, err) defer h.Close() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 992946a3db..5f4f59892f 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -47,7 +47,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) require.NoError(t, err) - h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(t, err) require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) @@ -191,7 +191,7 @@ func BenchmarkLoadWAL(b *testing.B) { // Load the WAL. for i := 0; i < b.N; i++ { - h, err := NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, w, 1000, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(b, err) h.Init(0) } @@ -302,7 +302,7 @@ func TestHead_WALMultiRef(t *testing.T) { w, err = wal.New(nil, nil, w.Dir(), false) require.NoError(t, err) - head, err = NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize, nil) + head, err = NewHead(nil, nil, w, 1000, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(t, err) require.NoError(t, head.Init(0)) defer func() { @@ -421,7 +421,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() // This is usually taken from the Head, but passing manually here. - chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool()) + chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize) require.NoError(t, err) defer func() { require.NoError(t, chunkDiskMapper.Close()) @@ -583,7 +583,7 @@ func TestHeadDeleteSimple(t *testing.T) { // Compare the samples for both heads - before and after the reloadBlocks. reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks. require.NoError(t, err) - reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, DefaultStripeSize, nil) + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(t, err) require.NoError(t, reloadedHead.Init(0)) @@ -963,7 +963,7 @@ func TestMemSeries_append(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() // This is usually taken from the Head, but passing manually here. - chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool()) + chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize) require.NoError(t, err) defer func() { require.NoError(t, chunkDiskMapper.Close()) @@ -1265,7 +1265,7 @@ func TestWalRepair_DecodingError(t *testing.T) { require.NoError(t, w.Log(test.rec)) } - h, err := NewHead(nil, nil, w, 1, w.Dir(), nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, w, 1, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(t, err) require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) initErr := h.Init(math.MinInt64) @@ -1320,7 +1320,7 @@ func TestHeadReadWriterRepair(t *testing.T) { w, err := wal.New(nil, nil, walDir, false) require.NoError(t, err) - h, err := NewHead(nil, nil, w, chunkRange, dir, nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, w, chunkRange, dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(t, err) require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal)) require.NoError(t, h.Init(math.MinInt64)) @@ -1550,7 +1550,7 @@ func TestMemSeriesIsolation(t *testing.T) { wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false) require.NoError(t, err) - hb, err = NewHead(nil, nil, wlog, 1000, wlog.Dir(), nil, DefaultStripeSize, nil) + hb, err = NewHead(nil, nil, wlog, 1000, wlog.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) defer func() { require.NoError(t, hb.Close()) }() require.NoError(t, err) require.NoError(t, hb.Init(0)) diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 0674a3693a..dc4df74c58 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunks" ) // Make entries ~50B in size, to emulate real-world high cardinality. @@ -37,7 +38,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) { defer func() { require.NoError(b, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(b, err) defer func() { require.NoError(b, h.Close()) @@ -146,7 +147,7 @@ func BenchmarkQuerierSelect(b *testing.B) { defer func() { require.NoError(b, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(b, err) defer h.Close() app := h.Appender(context.Background()) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 7b983c0223..dc027f8213 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -407,7 +407,7 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - h, err := NewHead(nil, nil, nil, 2*time.Hour.Milliseconds(), "", nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, nil, 2*time.Hour.Milliseconds(), "", nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(t, err) defer h.Close() @@ -1550,7 +1550,7 @@ func TestPostingsForMatchers(t *testing.T) { defer func() { require.NoError(t, os.RemoveAll(chunkDir)) }() - h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil) require.NoError(t, err) defer func() { require.NoError(t, h.Close()) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index fbb7f3aace..6b81eb7328 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -56,6 +56,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/util/teststorage" ) @@ -2126,7 +2127,7 @@ func (f *fakeDB) Stats(statsByLabelName string) (_ *tsdb.Stats, retErr error) { retErr = err } }() - h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, tsdb.DefaultStripeSize, nil) + h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, chunks.DefaultWriteBufferSize, tsdb.DefaultStripeSize, nil) return h.Stats(statsByLabelName), nil }