diff --git a/go.mod b/go.mod index 32e2a7ce95..4f7f1e4548 100644 --- a/go.mod +++ b/go.mod @@ -89,7 +89,7 @@ require ( github.com/prometheus/client_golang v0.9.1 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea - github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc + github.com/prometheus/tsdb v0.4.0 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/rlmcpherson/s3gof3r v0.5.0 // indirect github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect diff --git a/go.sum b/go.sum index c9c29869ec..59f86c72c0 100644 --- a/go.sum +++ b/go.sum @@ -218,8 +218,8 @@ github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea h1:4RkbEb5XX0Wvu github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFdaDqxJVlbOQ1DtGmZWs/Qau0hIlk+WQ= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc h1:phU3kj067sczIc4fhaq5rRcH4Lp9A45MsrcQqjC+cao= -github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/prometheus/tsdb v0.4.0 h1:pXJyEi/5p6UBmOrnzsZmYxLrZjxnRlEB78/qj3+a8Gk= +github.com/prometheus/tsdb v0.4.0/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rlmcpherson/s3gof3r v0.5.0 h1:1izOJpTiohSibfOHuNyEA/yQnAirh05enzEdmhez43k= diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index f1750f6515..f18036216b 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -108,9 +108,6 @@ type adapter struct { // Options of the DB storage. type Options struct { - // The interval at which the write ahead log is flushed to disc. - WALFlushInterval time.Duration - // The timestamp range of head blocks after which they get persisted. // It's the minimum duration of any persisted block. MinBlockDuration model.Duration @@ -185,7 +182,6 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t } db, err := tsdb.Open(path, l, r, &tsdb.Options{ - WALFlushInterval: 10 * time.Second, WALSegmentSize: int(opts.WALSegmentSize), RetentionDuration: uint64(time.Duration(opts.Retention).Seconds() * 1000), BlockRanges: rngs, diff --git a/vendor/github.com/prometheus/tsdb/.travis.yml b/vendor/github.com/prometheus/tsdb/.travis.yml index c03d14f533..c5012a34ac 100644 --- a/vendor/github.com/prometheus/tsdb/.travis.yml +++ b/vendor/github.com/prometheus/tsdb/.travis.yml @@ -15,10 +15,10 @@ go_import_path: github.com/prometheus/tsdb before_install: - if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then choco install make; fi - + install: - - go get -v -t ./... + - make deps script: # `staticcheck` target is omitted due to linting errors - - if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then make test; else make check_license style unused test; fi + - if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then make test; else make; fi diff --git a/vendor/github.com/prometheus/tsdb/CHANGELOG.md b/vendor/github.com/prometheus/tsdb/CHANGELOG.md index 8e3c01f2c3..1847345379 100644 --- a/vendor/github.com/prometheus/tsdb/CHANGELOG.md +++ b/vendor/github.com/prometheus/tsdb/CHANGELOG.md @@ -1,8 +1,18 @@ ## master / unreleased + +## 0.4.0 - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. + - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) + - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: + - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` + - new public interface `SizeReader: Size() int64` + - `OpenBlock` signature changed to take a logger. + - [REMOVED] `PrefixMatcher` is considered unused so was removed. + - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. + - [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read. ## 0.3.1 -- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. + - [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. ## 0.3.0 - [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path. @@ -11,3 +21,4 @@ - [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)` - [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field. - [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset. + -[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc. diff --git a/vendor/github.com/prometheus/tsdb/Makefile b/vendor/github.com/prometheus/tsdb/Makefile index eff204cd2d..3458e7fa49 100644 --- a/vendor/github.com/prometheus/tsdb/Makefile +++ b/vendor/github.com/prometheus/tsdb/Makefile @@ -18,11 +18,15 @@ TSDB_BENCHMARK_NUM_METRICS ?= 1000 TSDB_BENCHMARK_DATASET ?= "$(TSDB_PROJECT_DIR)/testdata/20kseries.json" TSDB_BENCHMARK_OUTPUT_DIR ?= "$(TSDB_CLI_DIR)/benchout" -STATICCHECK_IGNORE = include Makefile.common +.PHONY: deps +deps: + @echo ">> getting dependencies" + GO111MODULE=$(GO111MODULE) $(GO) get $(GOOPTS) -t ./... + build: - @$(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR) + GO111MODULE=$(GO111MODULE) $(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR) bench: build @echo ">> running benchmark, writing result to $(TSDB_BENCHMARK_OUTPUT_DIR)" diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index e5a66bd9e9..42e11d9513 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -21,6 +21,8 @@ import ( "path/filepath" "sync" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" @@ -140,6 +142,12 @@ type Appendable interface { Appender() Appender } +// SizeReader returns the size of the object in bytes. +type SizeReader interface { + // Size returns the size in bytes. + Size() int64 +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -166,6 +174,7 @@ type BlockStats struct { NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` + NumBytes int64 `json:"numBytes,omitempty"` } // BlockDesc describes a block by ULID and time range. @@ -182,6 +191,9 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + // Indicates that during compaction it resulted in a block without any samples + // so it should be deleted on the next reload. + Deletable bool `json:"deletable,omitempty"` // Short descriptions of the direct blocks that were used to create // this block. Parents []BlockDesc `json:"parents,omitempty"` @@ -257,7 +269,10 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { + if logger == nil { + logger = log.NewNopLogger() + } meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -272,11 +287,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return nil, err } - tr, err := readTombstones(dir) + tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } + // TODO refactor to set this at block creation time as + // that would be the logical place for a block size to be calculated. + bs := blockSize(cr, ir, tsr) + meta.Stats.NumBytes = bs + err = writeMetaFile(dir, meta) + if err != nil { + level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) + } + pb := &Block{ dir: dir, meta: *meta, @@ -288,6 +312,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return pb, nil } +func blockSize(rr ...SizeReader) int64 { + var total int64 + for _, r := range rr { + if r != nil { + total += r.Size() + } + } + return total +} + // Close closes the on-disk block. It blocks as long as there are readers reading from the block. func (pb *Block) Close() error { pb.mtx.Lock() @@ -315,6 +349,9 @@ func (pb *Block) Dir() string { return pb.dir } // Meta returns meta information about the block. func (pb *Block) Meta() BlockMeta { return pb.meta } +// Size returns the number of bytes that the block takes up. +func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } + // ErrClosing is returned when a block is in the process of being closed. var ErrClosing = errors.New("block is closing") diff --git a/vendor/github.com/prometheus/tsdb/checkpoint.go b/vendor/github.com/prometheus/tsdb/checkpoint.go index d1cad4c10b..1c6239232e 100644 --- a/vendor/github.com/prometheus/tsdb/checkpoint.go +++ b/vendor/github.com/prometheus/tsdb/checkpoint.go @@ -128,7 +128,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) defer sgmReader.Close() } - cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) + cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to)) cpdirtmp := cpdir + ".tmp" if err := os.MkdirAll(cpdirtmp, 0777); err != nil { @@ -139,6 +139,12 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) return nil, errors.Wrap(err, "open checkpoint") } + // Ensures that an early return caused by an error doesn't leave any tmp files. + defer func() { + cp.Close() + os.RemoveAll(cpdirtmp) + }() + r := wal.NewReader(sgmReader) var ( diff --git a/vendor/github.com/prometheus/tsdb/chunks/chunks.go b/vendor/github.com/prometheus/tsdb/chunks/chunks.go index 5eab23982d..8fb288384e 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/chunks.go +++ b/vendor/github.com/prometheus/tsdb/chunks/chunks.go @@ -205,6 +205,7 @@ func (w *Writer) WriteChunks(chks ...Meta) error { for _, c := range chks { maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding. maxLen += int64(len(c.Chunk.Bytes())) + maxLen += 4 // The 4 bytes of crc32 } newsz := w.n + maxLen @@ -284,17 +285,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { // Reader implements a SeriesReader for a serialized byte stream // of series data. type Reader struct { - // The underlying bytes holding the encoded series data. - bs []ByteSlice - - // Closers for resources behind the byte slices. - cs []io.Closer - + bs []ByteSlice // The underlying bytes holding the encoded series data. + cs []io.Closer // Closers for resources behind the byte slices. + size int64 // The total size of bytes in the reader. pool chunkenc.Pool } func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { cr := Reader{pool: pool, bs: bs, cs: cs} + var totalSize int64 for i, b := range cr.bs { if b.Len() < 4 { @@ -304,7 +303,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { return nil, errors.Errorf("invalid magic number %x", m) } + totalSize += int64(b.Len()) } + cr.size = totalSize return &cr, nil } @@ -327,9 +328,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { pool = chunkenc.NewPool() } - var bs []ByteSlice - var cs []io.Closer - + var ( + bs []ByteSlice + cs []io.Closer + ) for _, fn := range files { f, err := fileutil.OpenMmapFile(fn) if err != nil { @@ -345,6 +347,11 @@ func (s *Reader) Close() error { return closeAll(s.cs...) } +// Size returns the size of the chunks. +func (s *Reader) Size() int64 { + return s.size +} + func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( seq = int(ref >> 32) diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index f8e6ff545c..5d8155f51b 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -55,12 +55,17 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. + // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. + // When resulting Block has 0 samples + // * No block is written. + // * The source dirs are marked Deletable. + // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) } @@ -186,13 +191,12 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { return res, nil } - // Compact any blocks that have >5% tombstones. + // Compact any blocks with big enough time range that have >5% tombstones. for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { break } - if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { return []string{dms[i].dir}, nil } @@ -347,7 +351,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if b == nil { var err error - b, err = OpenBlock(d, c.chunkPool) + b, err = OpenBlock(c.logger, d, c.chunkPool) if err != nil { return uid, err } @@ -366,15 +370,34 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u meta := compactBlockMetas(uid, metas...) err = c.write(dest, meta, blocks...) if err == nil { - level.Info(c.logger).Log( - "msg", "compact blocks", - "count", len(blocks), - "mint", meta.MinTime, - "maxt", meta.MaxTime, - "ulid", meta.ULID, - "sources", fmt.Sprintf("%v", uids), - "duration", time.Since(start), - ) + if meta.Stats.NumSamples == 0 { + for _, b := range bs { + b.meta.Compaction.Deletable = true + if err = writeMetaFile(b.dir, &b.meta); err != nil { + level.Error(c.logger).Log( + "msg", "Failed to write 'Deletable' to meta file after compaction", + "ulid", b.meta.ULID, + ) + } + } + uid = ulid.ULID{} + level.Info(c.logger).Log( + "msg", "compact blocks resulted in empty block", + "count", len(blocks), + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + } else { + level.Info(c.logger).Log( + "msg", "compact blocks", + "count", len(blocks), + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + } return uid, nil } @@ -413,6 +436,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p return uid, err } + if meta.Stats.NumSamples == 0 { + return ulid.ULID{}, nil + } + level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) return uid, nil } @@ -490,11 +517,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } - - if err = writeMetaFile(tmp, meta); err != nil { - return errors.Wrap(err, "write merged meta") - } - // We are explicitly closing them here to check for error even // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to @@ -506,6 +528,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "close index writer") } + // Populated block is empty, so cleanup and exit. + if meta.Stats.NumSamples == 0 { + if err := os.RemoveAll(tmp); err != nil { + return errors.Wrap(err, "remove tmp folder after empty block failed") + } + return nil + } + + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } + // Create an empty tombstones file. if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index e5a057cbae..1349dfbd51 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -44,7 +44,6 @@ import ( // DefaultOptions used for the DB. They are sane for setups using // millisecond precision timestamps. var DefaultOptions = &Options{ - WALFlushInterval: 5 * time.Second, WALSegmentSize: wal.DefaultSegmentSize, RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5), @@ -53,15 +52,19 @@ var DefaultOptions = &Options{ // Options of the DB storage. type Options struct { - // The interval at which the write ahead log is flushed to disk. - WALFlushInterval time.Duration - // Segments (wal files) max size WALSegmentSize int // Duration of persisted data to keep. RetentionDuration uint64 + // Maximum number of bytes in blocks to be retained. + // 0 or less means disabled. + // NOTE: For proper storage calculations need to consider + // the size of the WAL folder which is not added when calculating + // the current size of the database. + MaxBytes int64 + // The sizes of the Blocks. BlockRanges []int64 @@ -131,11 +134,12 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + timeRetentionCount prometheus.Counter compactionsSkipped prometheus.Counter - cutoffs prometheus.Counter - cutoffsFailed prometheus.Counter startTime prometheus.GaugeFunc tombCleanTimer prometheus.Histogram + blocksBytes prometheus.Gauge + sizeRetentionCount prometheus.Counter } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -174,18 +178,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_time_retentions_total", + Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.", + }) m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_skipped_total", Help: "Total number of skipped compactions due to disabled auto compaction.", }) - m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_total", - Help: "Number of times the database cut off block data from disk.", - }) - m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_failures_total", - Help: "Number of times the database failed to cut off block data from disk.", - }) m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_lowest_timestamp", Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.", @@ -201,6 +201,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", }) + m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_storage_blocks_bytes_total", + Help: "The number of bytes that are currently used for local storage by all blocks.", + }) + m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_size_retentions_total", + Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", + }) if r != nil { r.MustRegister( @@ -208,11 +216,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.symbolTableSize, m.reloads, m.reloadsFailed, - m.cutoffs, - m.cutoffsFailed, + m.timeRetentionCount, m.compactionsTriggered, m.startTime, m.tombCleanTimer, + m.blocksBytes, + m.sizeRetentionCount, ) } return m @@ -344,25 +353,6 @@ func (db *DB) run() { } } -func (db *DB) beyondRetention(meta *BlockMeta) bool { - if db.opts.RetentionDuration == 0 { - return false - } - - db.mtx.RLock() - blocks := db.blocks[:] - db.mtx.RUnlock() - - if len(blocks) == 0 { - return false - } - - last := blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) - - return meta.MaxTime < mint -} - // Appender opens a new appender against the database. func (db *DB) Appender() Appender { return dbAppender{db: db, Appender: db.head.Appender()} @@ -427,7 +417,8 @@ func (db *DB) compact() (err error) { // from the block interval here. maxt: maxt - 1, } - if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { + uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil) + if err != nil { return errors.Wrap(err, "persist head block") } @@ -436,6 +427,14 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { return errors.Wrap(err, "reload blocks") } + if (uid == ulid.ULID{}) { + // Compaction resulted in an empty block. + // Head truncating during db.reload() depends on the persisted blocks and + // in this case no new block will be persisted so manually truncate the head. + if err = db.head.Truncate(maxt); err != nil { + return errors.Wrap(err, "head truncate failed (in compact)") + } + } runtime.GC() } @@ -478,8 +477,7 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes -// a list of block directories which should be deleted during reload. +// reload blocks and trigger head truncation if new blocks appeared. // Blocks that are obsolete due to replacement or retention will be deleted. func (db *DB) reload() (err error) { defer func() { @@ -489,112 +487,193 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - dirs, err := blockDirs(db.dir) + loadable, corrupted, err := db.openBlocks() if err != nil { - return errors.Wrap(err, "find blocks") + return err } - // We delete old blocks that have been superseded by new ones by gathering all parents - // from existing blocks. Those parents all have newer replacements and can be safely deleted - // after we loaded the other blocks. - // This makes us resilient against the process crashing towards the end of a compaction. - // Creation of a new block and deletion of its parents cannot happen atomically. By creating - // blocks with their parents, we can pick up the deletion where it left off during a crash. - var ( - blocks []*Block - corrupted = map[ulid.ULID]error{} - opened = map[ulid.ULID]struct{}{} - deleteable = map[ulid.ULID]struct{}{} - ) - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - // The block was potentially in the middle of being deleted during a crash. - // Skip it since we may delete it properly further down again. - level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) - ulid, err2 := ulid.Parse(filepath.Base(dir)) - if err2 != nil { - level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) - continue - } - corrupted[ulid] = err - continue - } - if db.beyondRetention(meta) { - deleteable[meta.ULID] = struct{}{} - continue - } - for _, b := range meta.Compaction.Parents { - deleteable[b.ULID] = struct{}{} + deletable := db.deletableBlocks(loadable) + + // Corrupted blocks that have been replaced by parents can be safely ignored and deleted. + // This makes it resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. + // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. + for _, block := range loadable { + for _, b := range block.Meta().Compaction.Parents { + delete(corrupted, b.ULID) + deletable[b.ULID] = nil } } - // Blocks we failed to open should all be those we are want to delete anyway. - for c, err := range corrupted { - if _, ok := deleteable[c]; !ok { - return errors.Wrapf(err, "unexpected corrupted block %s", c) - } + if len(corrupted) > 0 { + return errors.Wrap(err, "unexpected corrupted block") } - // Load new blocks into memory. - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - return errors.Wrapf(err, "read meta information %s", dir) - } - // Don't load blocks that are scheduled for deletion. - if _, ok := deleteable[meta.ULID]; ok { + + // All deletable blocks should not be loaded. + var ( + bb []*Block + blocksSize int64 + ) + for _, block := range loadable { + if _, ok := deletable[block.Meta().ULID]; ok { + deletable[block.Meta().ULID] = block continue } - // See if we already have the block in memory or open it otherwise. - b, ok := db.getBlock(meta.ULID) - if !ok { - b, err = OpenBlock(dir, db.chunkPool) - if err != nil { - return errors.Wrapf(err, "open block %s", dir) - } - } - blocks = append(blocks, b) - opened[meta.ULID] = struct{}{} + bb = append(bb, block) + blocksSize += block.Size() + } - sort.Slice(blocks, func(i, j int) bool { - return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime + loadable = bb + db.metrics.blocksBytes.Set(float64(blocksSize)) + + sort.Slice(loadable, func(i, j int) bool { + return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime }) - if err := validateBlockSequence(blocks); err != nil { + if err := validateBlockSequence(loadable); err != nil { return errors.Wrap(err, "invalid block sequence") } - // Swap in new blocks first for subsequently created readers to be seen. - // Then close previous blocks, which may block for pending readers to complete. + // Swap new blocks first for subsequently created readers to be seen. db.mtx.Lock() oldBlocks := db.blocks - db.blocks = blocks + db.blocks = loadable db.mtx.Unlock() - // Drop old blocks from memory. for _, b := range oldBlocks { - if _, ok := opened[b.Meta().ULID]; ok { - continue - } - if err := b.Close(); err != nil { - level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + if _, ok := deletable[b.Meta().ULID]; ok { + deletable[b.Meta().ULID] = b } } - // Delete all obsolete blocks. None of them are opened any longer. - for ulid := range deleteable { - if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { - return errors.Wrapf(err, "delete obsolete block %s", ulid) - } + + if err := db.deleteBlocks(deletable); err != nil { + return err } // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. - if len(blocks) == 0 { + if len(loadable) == 0 { return nil } - maxt := blocks[len(blocks)-1].Meta().MaxTime + + maxt := loadable[len(loadable)-1].Meta().MaxTime return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } +func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { + dirs, err := blockDirs(db.dir) + if err != nil { + return nil, nil, errors.Wrap(err, "find blocks") + } + + corrupted = make(map[ulid.ULID]error) + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + continue + } + + // See if we already have the block in memory or open it otherwise. + block, ok := db.getBlock(meta.ULID) + if !ok { + block, err = OpenBlock(db.logger, dir, db.chunkPool) + if err != nil { + corrupted[meta.ULID] = err + continue + } + } + blocks = append(blocks, block) + } + return blocks, corrupted, nil +} + +// deletableBlocks returns all blocks past retention policy. +func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { + deletable := make(map[ulid.ULID]*Block) + + // Sort the blocks by time - newest to oldest (largest to smallest timestamp). + // This ensures that the retentions will remove the oldest blocks. + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime + }) + + for _, block := range blocks { + if block.Meta().Compaction.Deletable { + deletable[block.Meta().ULID] = block + } + } + + for ulid, block := range db.beyondTimeRetention(blocks) { + deletable[ulid] = block + } + + for ulid, block := range db.beyondSizeRetention(blocks) { + deletable[ulid] = block + } + + return deletable +} + +func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Time retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + for i, block := range blocks { + // The difference between the first block and this block is larger than + // the retention period so any blocks after that are added as deleteable. + if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) { + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.timeRetentionCount.Inc() + break + } + } + return deleteable +} + +func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Size retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + blocksSize := int64(0) + for i, block := range blocks { + blocksSize += block.Size() + if blocksSize > db.opts.MaxBytes { + // Add this and all following blocks for deletion. + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.sizeRetentionCount.Inc() + break + } + } + return deleteable +} + +// deleteBlocks closes and deletes blocks from the disk. +// When the map contains a non nil block object it means it is loaded in memory +// so needs to be closed first as it might need to wait for pending readers to complete. +func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { + for ulid, block := range blocks { + if block != nil { + if err := block.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + } + } + if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { + return errors.Wrapf(err, "delete obsolete block %s", ulid) + } + } + return nil +} + // validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. func validateBlockSequence(bs []*Block) error { if len(bs) <= 1 { diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index eba1f2dc43..cbc8661f8c 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -685,6 +685,7 @@ func (h *Head) getAppendBuffer() []RefSample { } func (h *Head) putAppendBuffer(b []RefSample) { + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. h.appendPool.Put(b[:0]) } @@ -697,6 +698,7 @@ func (h *Head) getBytesBuffer() []byte { } func (h *Head) putBytesBuffer(b []byte) { + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. h.bytesPool.Put(b[:0]) } @@ -1094,25 +1096,30 @@ func (h *headIndexReader) Postings(name, value string) (index.Postings, error) { } func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { - ep := make([]uint64, 0, 128) + series := make([]*memSeries, 0, 128) + // Fetch all the series only once. for p.Next() { - ep = append(ep, p.At()) + s := h.head.series.getByID(p.At()) + if s == nil { + level.Debug(h.head.logger).Log("msg", "looked up series not found") + } else { + series = append(series, s) + } } if err := p.Err(); err != nil { return index.ErrPostings(errors.Wrap(err, "expand postings")) } - sort.Slice(ep, func(i, j int) bool { - a := h.head.series.getByID(ep[i]) - b := h.head.series.getByID(ep[j]) - - if a == nil || b == nil { - level.Debug(h.head.logger).Log("msg", "looked up series not found") - return false - } - return labels.Compare(a.lset, b.lset) < 0 + sort.Slice(series, func(i, j int) bool { + return labels.Compare(series[i].lset, series[j].lset) < 0 }) + + // Convert back to list. + ep := make([]uint64, 0, len(series)) + for _, p := range series { + ep = append(ep, p.ref) + } return index.NewListPostings(ep) } diff --git a/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go index 602498f115..9104f1cb5f 100644 --- a/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go +++ b/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go @@ -18,6 +18,8 @@ import ( "hash" "hash/crc32" "unsafe" + + "github.com/pkg/errors" ) // enbuf is a helper type to populate a byte slice with various types. @@ -86,6 +88,60 @@ type decbuf struct { e error } +// newDecbufAt returns a new decoding buffer. It expects the first 4 bytes +// after offset to hold the big endian encoded content length, followed by the contents and the expected +// checksum. +func newDecbufAt(bs ByteSlice, off int) decbuf { + if bs.Len() < off+4 { + return decbuf{e: errInvalidSize} + } + b := bs.Range(off, off+4) + l := int(binary.BigEndian.Uint32(b)) + + if bs.Len() < off+4+l+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = bs.Range(off+4, off+4+l+4) + dec := decbuf{b: b[:len(b)-4]} + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { + return decbuf{e: errInvalidChecksum} + } + return dec +} + +// decbufUvarintAt returns a new decoding buffer. It expects the first bytes +// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected +// checksum. +func newDecbufUvarintAt(bs ByteSlice, off int) decbuf { + // We never have to access this method at the far end of the byte slice. Thus just checking + // against the MaxVarintLen32 is sufficient. + if bs.Len() < off+binary.MaxVarintLen32 { + return decbuf{e: errInvalidSize} + } + b := bs.Range(off, off+binary.MaxVarintLen32) + + l, n := binary.Uvarint(b) + if n <= 0 || n > binary.MaxVarintLen32 { + return decbuf{e: errors.Errorf("invalid uvarint %d", n)} + } + + if bs.Len() < off+n+int(l)+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = bs.Range(off+n, off+n+int(l)+4) + dec := decbuf{b: b[:len(b)-4]} + + if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { + return decbuf{e: errInvalidChecksum} + } + return dec +} + func (d *decbuf) uvarint() int { return int(d.uvarint64()) } func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } diff --git a/vendor/github.com/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/tsdb/index/index.go index 6413a9fca3..74e08d4651 100644 --- a/vendor/github.com/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/tsdb/index/index.go @@ -20,6 +20,7 @@ import ( "hash" "hash/crc32" "io" + "io/ioutil" "math" "os" "path/filepath" @@ -35,9 +36,13 @@ import ( const ( // MagicIndex 4 bytes at the head of an index file. MagicIndex = 0xBAAAD700 + // HeaderLen represents number of bytes reserved of index for header. + HeaderLen = 5 - indexFormatV1 = 1 - indexFormatV2 = 2 + // FormatV1 represents 1 version of index. + FormatV1 = 1 + // FormatV2 represents 2 version of index. + FormatV2 = 2 labelNameSeperator = "\xff" ) @@ -108,7 +113,7 @@ type Writer struct { fbuf *bufio.Writer pos uint64 - toc indexTOC + toc TOC stage indexWriterStage // Reusable memory. @@ -129,13 +134,42 @@ type Writer struct { Version int } -type indexTOC struct { - symbols uint64 - series uint64 - labelIndices uint64 - labelIndicesTable uint64 - postings uint64 - postingsTable uint64 +// TOC represents index Table Of Content that states where each section of index starts. +type TOC struct { + Symbols uint64 + Series uint64 + LabelIndices uint64 + LabelIndicesTable uint64 + Postings uint64 + PostingsTable uint64 +} + +// NewTOCFromByteSlice return parsed TOC from given index byte slice. +func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { + if bs.Len() < indexTOCLen { + return nil, errInvalidSize + } + b := bs.Range(bs.Len()-indexTOCLen, bs.Len()) + + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := decbuf{b: b[:len(b)-4]} + + if d.crc32() != expCRC { + return nil, errors.Wrap(errInvalidChecksum, "read TOC") + } + + if err := d.err(); err != nil { + return nil, err + } + + return &TOC{ + Symbols: d.be64(), + Series: d.be64(), + LabelIndices: d.be64(), + LabelIndicesTable: d.be64(), + Postings: d.be64(), + PostingsTable: d.be64(), + }, nil } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. @@ -223,22 +257,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error { // Mark start of sections in table of contents. switch s { case idxStageSymbols: - w.toc.symbols = w.pos + w.toc.Symbols = w.pos case idxStageSeries: - w.toc.series = w.pos + w.toc.Series = w.pos case idxStageLabelIndex: - w.toc.labelIndices = w.pos + w.toc.LabelIndices = w.pos case idxStagePostings: - w.toc.postings = w.pos + w.toc.Postings = w.pos case idxStageDone: - w.toc.labelIndicesTable = w.pos + w.toc.LabelIndicesTable = w.pos if err := w.writeOffsetTable(w.labelIndexes); err != nil { return err } - w.toc.postingsTable = w.pos + w.toc.PostingsTable = w.pos if err := w.writeOffsetTable(w.postings); err != nil { return err } @@ -254,7 +288,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { func (w *Writer) writeMeta() error { w.buf1.reset() w.buf1.putBE32(MagicIndex) - w.buf1.putByte(indexFormatV2) + w.buf1.putByte(FormatV2) return w.write(w.buf1.get()) } @@ -346,8 +380,6 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error { } sort.Strings(symbols) - const headerSize = 4 - w.buf1.reset() w.buf2.reset() @@ -438,12 +470,12 @@ const indexTOCLen = 6*8 + 4 func (w *Writer) writeTOC() error { w.buf1.reset() - w.buf1.putBE64(w.toc.symbols) - w.buf1.putBE64(w.toc.series) - w.buf1.putBE64(w.toc.labelIndices) - w.buf1.putBE64(w.toc.labelIndicesTable) - w.buf1.putBE64(w.toc.postings) - w.buf1.putBE64(w.toc.postingsTable) + w.buf1.putBE64(w.toc.Symbols) + w.buf1.putBE64(w.toc.Series) + w.buf1.putBE64(w.toc.LabelIndices) + w.buf1.putBE64(w.toc.LabelIndicesTable) + w.buf1.putBE64(w.toc.Postings) + w.buf1.putBE64(w.toc.PostingsTable) w.buf1.putHash(w.crc32) @@ -535,15 +567,14 @@ type StringTuples interface { } type Reader struct { - // The underlying byte slice holding the encoded series data. - b ByteSlice - toc indexTOC + b ByteSlice // Close that releases the underlying resources of the byte slice. c io.Closer // Cached hashmaps of section offsets. - labels map[string]uint64 + labels map[string]uint64 + // LabelName to LabelValue to offset map. postings map[string]map[string]uint64 // Cache of read symbols. Strings that are returned when reading from the // block are always backed by true strings held in here rather than @@ -551,19 +582,17 @@ type Reader struct { // prevents memory faults when applications work with read symbols after // the block has been unmapped. The older format has sparse indexes so a map // must be used, but the new format is not so we can use a slice. - symbols map[uint32]string - symbolSlice []string + symbolsV1 map[uint32]string + symbolsV2 []string + symbolsTableSize uint64 dec *Decoder - crc32 hash.Hash32 - version int } var ( errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") errInvalidChecksum = fmt.Errorf("invalid checksum") ) @@ -587,10 +616,10 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { return b[start:end] } -// NewReader returns a new IndexReader on the given byte slice. It automatically +// NewReader returns a new index reader on the given byte slice. It automatically // handles different format versions. func NewReader(b ByteSlice) (*Reader, error) { - return newReader(b, nil) + return newReader(b, ioutil.NopCloser(nil)) } // NewFileReader returns a new index reader against the given index file. @@ -606,14 +635,12 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { r := &Reader{ b: b, c: c, - symbols: map[uint32]string{}, labels: map[string]uint64{}, postings: map[string]map[string]uint64{}, - crc32: newCRC32(), } // Verify header. - if b.Len() < 5 { + if r.b.Len() < HeaderLen { return nil, errors.Wrap(errInvalidSize, "index header") } if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { @@ -621,54 +648,59 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { } r.version = int(r.b.Range(4, 5)[0]) - if r.version != 1 && r.version != 2 { + if r.version != FormatV1 && r.version != FormatV2 { return nil, errors.Errorf("unknown index file version %d", r.version) } - if err := r.readTOC(); err != nil { + toc, err := NewTOCFromByteSlice(b) + if err != nil { return nil, errors.Wrap(err, "read TOC") } - if err := r.readSymbols(int(r.toc.symbols)); err != nil { + + r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(toc.Symbols)) + if err != nil { return nil, errors.Wrap(err, "read symbols") } - var err error // Use the strings already allocated by symbols, rather than // re-allocating them again below. - symbols := make(map[string]string, len(r.symbols)+len(r.symbolSlice)) - for _, s := range r.symbols { - symbols[s] = s + // Additionally, calculate symbolsTableSize. + allocatedSymbols := make(map[string]string, len(r.symbolsV1)+len(r.symbolsV2)) + for _, s := range r.symbolsV1 { + r.symbolsTableSize += uint64(len(s) + 8) + allocatedSymbols[s] = s } - for _, s := range r.symbolSlice { - symbols[s] = s + for _, s := range r.symbolsV2 { + r.symbolsTableSize += uint64(len(s) + 8) + allocatedSymbols[s] = s } - err = r.readOffsetTable(r.toc.labelIndicesTable, func(key []string, off uint64) error { + if err := ReadOffsetTable(r.b, toc.LabelIndicesTable, func(key []string, off uint64) error { if len(key) != 1 { - return errors.Errorf("unexpected key length %d", len(key)) + return errors.Errorf("unexpected key length for label indices table %d", len(key)) } - r.labels[symbols[key[0]]] = off + + r.labels[allocatedSymbols[key[0]]] = off return nil - }) - if err != nil { + }); err != nil { return nil, errors.Wrap(err, "read label index table") } + r.postings[""] = map[string]uint64{} - err = r.readOffsetTable(r.toc.postingsTable, func(key []string, off uint64) error { + if err := ReadOffsetTable(r.b, toc.PostingsTable, func(key []string, off uint64) error { if len(key) != 2 { - return errors.Errorf("unexpected key length %d", len(key)) + return errors.Errorf("unexpected key length for posting table %d", len(key)) } if _, ok := r.postings[key[0]]; !ok { - r.postings[symbols[key[0]]] = map[string]uint64{} + r.postings[allocatedSymbols[key[0]]] = map[string]uint64{} } - r.postings[key[0]][symbols[key[1]]] = off + r.postings[key[0]][allocatedSymbols[key[1]]] = off return nil - }) - if err != nil { + }); err != nil { return nil, errors.Wrap(err, "read postings table") } - r.dec = &Decoder{lookupSymbol: r.lookupSymbol} + r.dec = &Decoder{LookupSymbol: r.lookupSymbol} return r, nil } @@ -690,7 +722,7 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { for k, e := range r.postings { for v, start := range e { - d := r.decbufAt(int(start)) + d := newDecbufAt(r.b, int(start)) if d.err() != nil { return nil, d.err() } @@ -703,121 +735,45 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { return m, nil } -func (r *Reader) readTOC() error { - if r.b.Len() < indexTOCLen { - return errInvalidSize - } - b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len()) - - expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) - d := decbuf{b: b[:len(b)-4]} - - if d.crc32() != expCRC { - return errors.Wrap(errInvalidChecksum, "read TOC") - } - - r.toc.symbols = d.be64() - r.toc.series = d.be64() - r.toc.labelIndices = d.be64() - r.toc.labelIndicesTable = d.be64() - r.toc.postings = d.be64() - r.toc.postingsTable = d.be64() - - return d.err() -} - -// decbufAt returns a new decoding buffer. It expects the first 4 bytes -// after offset to hold the big endian encoded content length, followed by the contents and the expected -// checksum. -func (r *Reader) decbufAt(off int) decbuf { - if r.b.Len() < off+4 { - return decbuf{e: errInvalidSize} - } - b := r.b.Range(off, off+4) - l := int(binary.BigEndian.Uint32(b)) - - if r.b.Len() < off+4+l+4 { - return decbuf{e: errInvalidSize} - } - - // Load bytes holding the contents plus a CRC32 checksum. - b = r.b.Range(off+4, off+4+l+4) - dec := decbuf{b: b[:len(b)-4]} - - if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { - return decbuf{e: errInvalidChecksum} - } - return dec -} - -// decbufUvarintAt returns a new decoding buffer. It expects the first bytes -// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected -// checksum. -func (r *Reader) decbufUvarintAt(off int) decbuf { - // We never have to access this method at the far end of the byte slice. Thus just checking - // against the MaxVarintLen32 is sufficient. - if r.b.Len() < off+binary.MaxVarintLen32 { - return decbuf{e: errInvalidSize} - } - b := r.b.Range(off, off+binary.MaxVarintLen32) - - l, n := binary.Uvarint(b) - if n <= 0 || n > binary.MaxVarintLen32 { - return decbuf{e: errors.Errorf("invalid uvarint %d", n)} - } - - if r.b.Len() < off+n+int(l)+4 { - return decbuf{e: errInvalidSize} - } - - // Load bytes holding the contents plus a CRC32 checksum. - b = r.b.Range(off+n, off+n+int(l)+4) - dec := decbuf{b: b[:len(b)-4]} - - if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { - return decbuf{e: errInvalidChecksum} - } - return dec -} - -// readSymbols reads the symbol table fully into memory and allocates proper strings for them. +// ReadSymbols reads the symbol table fully into memory and allocates proper strings for them. // Strings backed by the mmap'd memory would cause memory faults if applications keep using them // after the reader is closed. -func (r *Reader) readSymbols(off int) error { +func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]string, error) { if off == 0 { - return nil + return nil, nil, nil } - d := r.decbufAt(off) + d := newDecbufAt(bs, off) var ( - origLen = d.len() - cnt = d.be32int() - basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d.len()) + origLen = d.len() + cnt = d.be32int() + basePos = uint32(off) + 4 + nextPos = basePos + uint32(origLen-d.len()) + symbolSlice []string + symbols = map[uint32]string{} ) - if r.version == 2 { - r.symbolSlice = make([]string, 0, cnt) + if version == 2 { + symbolSlice = make([]string, 0, cnt) } for d.err() == nil && d.len() > 0 && cnt > 0 { s := d.uvarintStr() - if r.version == 2 { - r.symbolSlice = append(r.symbolSlice, s) + if version == FormatV2 { + symbolSlice = append(symbolSlice, s) } else { - r.symbols[nextPos] = s + symbols[nextPos] = s nextPos = basePos + uint32(origLen-d.len()) } cnt-- } - return errors.Wrap(d.err(), "read symbols") + return symbolSlice, symbols, errors.Wrap(d.err(), "read symbols") } -// readOffsetTable reads an offset table at the given position calls f for each -// found entry.f -// If f returns an error it stops decoding and returns the received error, -func (r *Reader) readOffsetTable(off uint64, f func([]string, uint64) error) error { - d := r.decbufAt(int(off)) +// ReadOffsetTable reads an offset table and at the given position calls f for each +// found entry. If f returns an error it stops decoding and returns the received error. +func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error { + d := newDecbufAt(bs, int(off)) cnt := d.be32() for d.err() == nil && d.len() > 0 && cnt > 0 { @@ -845,10 +801,10 @@ func (r *Reader) Close() error { } func (r *Reader) lookupSymbol(o uint32) (string, error) { - if int(o) < len(r.symbolSlice) { - return r.symbolSlice[o], nil + if int(o) < len(r.symbolsV2) { + return r.symbolsV2[o], nil } - s, ok := r.symbols[o] + s, ok := r.symbolsV1[o] if !ok { return "", errors.Errorf("unknown symbol offset %d", o) } @@ -857,27 +813,20 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) { // Symbols returns a set of symbols that exist within the index. func (r *Reader) Symbols() (map[string]struct{}, error) { - res := make(map[string]struct{}, len(r.symbols)) + res := make(map[string]struct{}, len(r.symbolsV1)+len(r.symbolsV2)) - for _, s := range r.symbols { + for _, s := range r.symbolsV1 { res[s] = struct{}{} } - for _, s := range r.symbolSlice { + for _, s := range r.symbolsV2 { res[s] = struct{}{} } return res, nil } -// SymbolTableSize returns the symbol table that is used to resolve symbol references. +// SymbolTableSize returns the symbol table size in bytes. func (r *Reader) SymbolTableSize() uint64 { - var size int - for _, s := range r.symbols { - size += len(s) + 8 - } - for _, s := range r.symbolSlice { - size += len(s) + 8 - } - return uint64(size) + return r.symbolsTableSize } // LabelValues returns value tuples that exist for the given label name tuples. @@ -892,7 +841,7 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - d := r.decbufAt(int(off)) + d := newDecbufAt(r.b, int(off)) nc := d.be32int() d.be32() // consume unused value entry count. @@ -916,7 +865,7 @@ func (emptyStringTuples) Len() int { return 0 } // LabelIndices returns a slice of label names for which labels or label tuples value indices exist. // NOTE: This is deprecated. Use `LabelNames()` instead. func (r *Reader) LabelIndices() ([][]string, error) { - res := [][]string{} + var res [][]string for s := range r.labels { res = append(res, strings.Split(s, labelNameSeperator)) } @@ -928,10 +877,10 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err offset := id // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. - if r.version == 2 { + if r.version == FormatV2 { offset = id * 16 } - d := r.decbufUvarintAt(int(offset)) + d := newDecbufUvarintAt(r.b, int(offset)) if d.err() != nil { return d.err() } @@ -948,7 +897,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) { if !ok { return EmptyPostings(), nil } - d := r.decbufAt(int(off)) + d := newDecbufAt(r.b, int(off)) if d.err() != nil { return nil, errors.Wrap(d.err(), "get postings entry") } @@ -965,6 +914,11 @@ func (r *Reader) SortedPostings(p Postings) Postings { return p } +// Size returns the size of an index file. +func (r *Reader) Size() int64 { + return int64(r.b.Len()) +} + // LabelNames returns all the unique label names present in the index. func (r *Reader) LabelNames() ([]string, error) { labelNamesMap := make(map[string]struct{}, len(r.labels)) @@ -1062,7 +1016,7 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { // It currently does not contain decoding methods for all entry types but can be extended // by them if there's demand. type Decoder struct { - lookupSymbol func(uint32) (string, error) + LookupSymbol func(uint32) (string, error) } // Postings returns a postings list for b and its number of elements. @@ -1090,11 +1044,11 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e return errors.Wrap(d.err(), "read series label offsets") } - ln, err := dec.lookupSymbol(lno) + ln, err := dec.LookupSymbol(lno) if err != nil { return errors.Wrap(err, "lookup label name") } - lv, err := dec.lookupSymbol(lvo) + lv, err := dec.LookupSymbol(lvo) if err != nil { return errors.Wrap(err, "lookup label value") } diff --git a/vendor/github.com/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/tsdb/index/postings.go index 1f2ee0d6f7..13df1c69a1 100644 --- a/vendor/github.com/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/tsdb/index/postings.go @@ -366,80 +366,25 @@ func Merge(its ...Postings) Postings { if len(its) == 1 { return its[0] } - l := len(its) / 2 - return newMergedPostings(Merge(its[:l]...), Merge(its[l:]...)) -} - -type mergedPostings struct { - a, b Postings - initialized bool - aok, bok bool - cur uint64 -} - -func newMergedPostings(a, b Postings) *mergedPostings { - return &mergedPostings{a: a, b: b} -} - -func (it *mergedPostings) At() uint64 { - return it.cur -} - -func (it *mergedPostings) Next() bool { - if !it.initialized { - it.aok = it.a.Next() - it.bok = it.b.Next() - it.initialized = true + // All the uses of this function immediately expand it, so + // collect everything in a map. This is more efficient + // when there's 100ks of postings, compared to + // having a tree of merge objects. + pm := make(map[uint64]struct{}, len(its)) + for _, it := range its { + for it.Next() { + pm[it.At()] = struct{}{} + } + if it.Err() != nil { + return ErrPostings(it.Err()) + } } - - if !it.aok && !it.bok { - return false + pl := make([]uint64, 0, len(pm)) + for p := range pm { + pl = append(pl, p) } - - if !it.aok { - it.cur = it.b.At() - it.bok = it.b.Next() - return true - } - if !it.bok { - it.cur = it.a.At() - it.aok = it.a.Next() - return true - } - - acur, bcur := it.a.At(), it.b.At() - - if acur < bcur { - it.cur = acur - it.aok = it.a.Next() - } else if acur > bcur { - it.cur = bcur - it.bok = it.b.Next() - } else { - it.cur = acur - it.aok = it.a.Next() - it.bok = it.b.Next() - } - return true -} - -func (it *mergedPostings) Seek(id uint64) bool { - if it.cur >= id { - return true - } - - it.aok = it.a.Seek(id) - it.bok = it.b.Seek(id) - it.initialized = true - - return it.Next() -} - -func (it *mergedPostings) Err() error { - if it.a.Err() != nil { - return it.a.Err() - } - return it.b.Err() + sort.Slice(pl, func(i, j int) bool { return pl[i] < pl[j] }) + return newListPostings(pl) } // Without returns a new postings list that contains all elements from the full list that diff --git a/vendor/github.com/prometheus/tsdb/labels/selector.go b/vendor/github.com/prometheus/tsdb/labels/selector.go index 7bc452faa6..c0c74ed526 100644 --- a/vendor/github.com/prometheus/tsdb/labels/selector.go +++ b/vendor/github.com/prometheus/tsdb/labels/selector.go @@ -15,7 +15,6 @@ package labels import ( "regexp" - "strings" ) // Selector holds constraints for matching against a label set. @@ -99,22 +98,3 @@ func (m *notMatcher) Matches(v string) bool { return !m.Matcher.Matches(v) } func Not(m Matcher) Matcher { return ¬Matcher{m} } - -// PrefixMatcher implements Matcher for labels which values matches prefix. -type PrefixMatcher struct { - name, prefix string -} - -// NewPrefixMatcher returns new Matcher for label name matching prefix. -func NewPrefixMatcher(name, prefix string) Matcher { - return &PrefixMatcher{name: name, prefix: prefix} -} - -// Name implements Matcher interface. -func (m *PrefixMatcher) Name() string { return m.name } - -// Prefix returns matching prefix. -func (m *PrefixMatcher) Prefix() string { return m.prefix } - -// Matches implements Matcher interface. -func (m *PrefixMatcher) Matches(v string) bool { return strings.HasPrefix(v, m.prefix) } diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index 7459c6bee6..4a5a40636a 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -247,37 +247,6 @@ func PostingsForMatchers(ix IndexReader, ms ...labels.Matcher) (index.Postings, return ix.SortedPostings(index.Intersect(its...)), nil } -// tuplesByPrefix uses binary search to find prefix matches within ts. -func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) { - var outErr error - tslen := ts.Len() - i := sort.Search(tslen, func(i int) bool { - vs, err := ts.At(i) - if err != nil { - outErr = fmt.Errorf("Failed to read tuple %d/%d: %v", i, tslen, err) - return true - } - val := vs[0] - l := len(m.Prefix()) - if l > len(vs) { - l = len(val) - } - return val[:l] >= m.Prefix() - }) - if outErr != nil { - return nil, outErr - } - var matches []string - for ; i < tslen; i++ { - vs, err := ts.At(i) - if err != nil || !m.Matches(vs[0]) { - return matches, err - } - matches = append(matches, vs[0]) - } - return matches, nil -} - func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error) { // If the matcher selects an empty value, it selects all the series which don't // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 @@ -301,21 +270,13 @@ func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error } var res []string - if pm, ok := m.(*labels.PrefixMatcher); ok { - res, err = tuplesByPrefix(pm, tpls) + for i := 0; i < tpls.Len(); i++ { + vals, err := tpls.At(i) if err != nil { return nil, err } - - } else { - for i := 0; i < tpls.Len(); i++ { - vals, err := tpls.At(i) - if err != nil { - return nil, err - } - if m.Matches(vals[0]) { - res = append(res, vals[0]) - } + if m.Matches(vals[0]) { + res = append(res, vals[0]) } } @@ -620,11 +581,9 @@ func (s *populatedChunkSeries) Next() bool { // This means that the chunk has be garbage collected. Remove it from the list. if s.err == ErrNotFound { s.err = nil - // Delete in-place. - chks = append(chks[:j], chks[j+1:]...) + s.chks = append(chks[:j], chks[j+1:]...) } - return false } } diff --git a/vendor/github.com/prometheus/tsdb/staticcheck.conf b/vendor/github.com/prometheus/tsdb/staticcheck.conf new file mode 100644 index 0000000000..3266a2e297 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/staticcheck.conf @@ -0,0 +1,2 @@ +# Enable only "legacy" staticcheck verifications. +checks = [ "SA*" ] diff --git a/vendor/github.com/prometheus/tsdb/tombstones.go b/vendor/github.com/prometheus/tsdb/tombstones.go index a1f30b59c7..0781404067 100644 --- a/vendor/github.com/prometheus/tsdb/tombstones.go +++ b/vendor/github.com/prometheus/tsdb/tombstones.go @@ -113,37 +113,41 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (TombstoneReader, error) { +func readTombstones(dir string) (TombstoneReader, SizeReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), nil + return newMemTombstones(), nil, nil } else if err != nil { - return nil, err + return nil, nil, err + } + + sr := &TombstoneFile{ + size: int64(len(b)), } if len(b) < 5 { - return nil, errors.Wrap(errInvalidSize, "tombstones header") + return nil, sr, errors.Wrap(errInvalidSize, "tombstones header") } d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. if mg := d.be32(); mg != MagicTombstone { - return nil, fmt.Errorf("invalid magic number %x", mg) + return nil, sr, fmt.Errorf("invalid magic number %x", mg) } if flag := d.byte(); flag != tombstoneFormatV1 { - return nil, fmt.Errorf("invalid tombstone format %x", flag) + return nil, sr, fmt.Errorf("invalid tombstone format %x", flag) } if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } // Verify checksum. hash := newCRC32() if _, err := hash.Write(d.get()); err != nil { - return nil, errors.Wrap(err, "write to hash") + return nil, sr, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { - return nil, errors.New("checksum did not match") + return nil, sr, errors.New("checksum did not match") } stonesMap := newMemTombstones() @@ -153,13 +157,13 @@ func readTombstones(dir string) (TombstoneReader, error) { mint := d.varint64() maxt := d.varint64() if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } stonesMap.addInterval(k, Interval{mint, maxt}) } - return stonesMap, nil + return stonesMap, sr, nil } type memTombstones struct { @@ -210,6 +214,16 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } +// TombstoneFile holds information about the tombstone file. +type TombstoneFile struct { + size int64 +} + +// Size returns the tombstone file size. +func (t *TombstoneFile) Size() int64 { + return t.size +} + func (*memTombstones) Close() error { return nil } diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 684a2fa6a9..60e1c58075 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -94,27 +94,6 @@ type WAL interface { Close() error } -// NopWAL is a WAL that does nothing. -func NopWAL() WAL { - return nopWAL{} -} - -type nopWAL struct{} - -func (nopWAL) Read( - seriesf func([]RefSeries), - samplesf func([]RefSample), - deletesf func([]Stone), -) error { - return nil -} -func (w nopWAL) Reader() WALReader { return w } -func (nopWAL) LogSeries([]RefSeries) error { return nil } -func (nopWAL) LogSamples([]RefSample) error { return nil } -func (nopWAL) LogDeletes([]Stone) error { return nil } -func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } -func (nopWAL) Close() error { return nil } - // WALReader reads entries from a WAL. type WALReader interface { Read( @@ -909,16 +888,19 @@ func (r *walReader) Read( if seriesf != nil { seriesf(v) } + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. seriesPool.Put(v[:0]) case []RefSample: if samplesf != nil { samplesf(v) } + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. samplePool.Put(v[:0]) case []Stone: if deletesf != nil { deletesf(v) } + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. deletePool.Put(v[:0]) default: level.Error(r.logger).Log("msg", "unexpected data type") diff --git a/vendor/github.com/prometheus/tsdb/wal/wal.go b/vendor/github.com/prometheus/tsdb/wal/wal.go index 92374e3121..fd90eb90ec 100644 --- a/vendor/github.com/prometheus/tsdb/wal/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal/wal.go @@ -164,6 +164,7 @@ type WAL struct { page *page // active page stopc chan chan struct{} actorc chan func() + closed bool // To allow calling Close() more than once without blocking. fsyncDuration prometheus.Summary pageFlushes prometheus.Counter @@ -584,6 +585,10 @@ func (w *WAL) Close() (err error) { w.mtx.Lock() defer w.mtx.Unlock() + if w.closed { + return nil + } + // Flush the last page and zero out all its remaining size. // We must not flush an empty page as it would falsely signal // the segment is done if we start writing to it again after opening. @@ -603,7 +608,7 @@ func (w *WAL) Close() (err error) { if err := w.segment.Close(); err != nil { level.Error(w.logger).Log("msg", "close previous segment", "err", err) } - + w.closed = true return nil } @@ -827,28 +832,13 @@ func (r *Reader) next() (err error) { } r.rec = append(r.rec, buf[:length]...) - switch r.curRecTyp { - case recFull: - if i != 0 { - return errors.New("unexpected full record") - } - return nil - case recFirst: - if i != 0 { - return errors.New("unexpected first record") - } - case recMiddle: - if i == 0 { - return errors.New("unexpected middle record") - } - case recLast: - if i == 0 { - return errors.New("unexpected last record") - } - return nil - default: - return errors.Errorf("unexpected record type %d", r.curRecTyp) + if err := validateRecord(r.curRecTyp, i); err != nil { + return err } + if r.curRecTyp == recLast || r.curRecTyp == recFull { + return nil + } + // Only increment i for non-zero records since we use it // to determine valid content record sequences. i++ @@ -899,6 +889,226 @@ func (r *Reader) Offset() int64 { return r.total } +// NewLiveReader returns a new live reader. +func NewLiveReader(r io.Reader) *LiveReader { + return &LiveReader{rdr: r} +} + +// Reader reads WAL records from an io.Reader. It buffers partial record data for +// the next read. +type LiveReader struct { + rdr io.Reader + err error + rec []byte + hdr [recordHeaderSize]byte + buf [pageSize]byte + readIndex int // Index in buf to start at for next read. + writeIndex int // Index in buf to start at for next write. + total int64 // Total bytes processed during reading in calls to Next(). + index int // Used to track partial records, should be 0 at the start of every new record. +} + +func (r *LiveReader) Err() error { + return r.err +} + +func (r *LiveReader) TotalRead() int64 { + return r.total +} + +func (r *LiveReader) fillBuffer() error { + n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)]) + r.writeIndex += n + return err +} + +// Shift the buffer up to the read index. +func (r *LiveReader) shiftBuffer() { + copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex]) + r.readIndex = 0 + r.writeIndex = copied +} + +// Next returns true if r.rec will contain a full record. +// False does not indicate that there will never be more data to +// read for the current io.Reader. +func (r *LiveReader) Next() bool { + for { + if r.buildRecord() { + return true + } + if r.err != nil && r.err != io.EOF { + return false + } + if r.readIndex == pageSize { + r.shiftBuffer() + } + if r.writeIndex != pageSize { + if err := r.fillBuffer(); err != nil { + // We expect to get EOF, since we're reading the segment file as it's being written. + if err != io.EOF { + r.err = err + } + return false + } + } + } +} + +// Record returns the current record. +// The returned byte slice is only valid until the next call to Next. +func (r *LiveReader) Record() []byte { + return r.rec +} + +// Rebuild a full record from potentially partial records. Returns false +// if there was an error or if we weren't able to read a record for any reason. +// Returns true if we read a full record. Any record data is appeneded to +// LiveReader.rec +func (r *LiveReader) buildRecord() bool { + for { + // Check that we have data in the internal buffer to read. + if r.writeIndex <= r.readIndex { + return false + } + + // Attempt to read a record, partial or otherwise. + temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total) + r.readIndex += n + r.total += int64(n) + if err != nil { + r.err = err + return false + } + + if temp == nil { + return false + } + + rt := recType(r.hdr[0]) + + if rt == recFirst || rt == recFull { + r.rec = r.rec[:0] + } + r.rec = append(r.rec, temp...) + + if err := validateRecord(rt, r.index); err != nil { + r.err = err + r.index = 0 + return false + } + if rt == recLast || rt == recFull { + r.index = 0 + return true + } + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + r.index++ + } +} + +// Returns an error if the recType and i indicate an invalid record sequence. +// As an example, if i is > 0 because we've read some amount of a partial record +// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull +// instead of a recLast or recMiddle we would have an invalid record. +func validateRecord(typ recType, i int) error { + switch typ { + case recFull: + if i != 0 { + return errors.New("unexpected full record") + } + return nil + case recFirst: + if i != 0 { + return errors.New("unexpected first record, dropping buffer") + } + return nil + case recMiddle: + if i == 0 { + return errors.New("unexpected middle record, dropping buffer") + } + return nil + case recLast: + if i == 0 { + return errors.New("unexpected last record, dropping buffer") + } + return nil + default: + return errors.Errorf("unexpected record type %d", typ) + } +} + +// Read a sub-record (see recType) from the buffer. It could potentially +// be a full record (recFull) if the record fits within the bounds of a single page. +// Returns a byte slice of the record data read, the number of bytes read, and an error +// if there's a non-zero byte in a page term record or the record checksum fails. +// TODO(callum) the EOF errors we're returning from this function should theoretically +// never happen, add a metric for them. +func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) { + readIndex := 0 + header[0] = buf[0] + readIndex++ + total++ + + // The rest of this function is mostly from Reader.Next(). + typ := recType(header[0]) + // Gobble up zero bytes. + if typ == recPageTerm { + // We are pedantic and check whether the zeros are actually up to a page boundary. + // It's not strictly necessary but may catch sketchy state early. + k := pageSize - (total % pageSize) + if k == pageSize { + return nil, 1, nil // Initial 0 byte was last page byte. + } + + if k <= int64(len(buf)-readIndex) { + for _, v := range buf[readIndex : int64(readIndex)+k] { + readIndex++ + if v != 0 { + return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes") + } + } + return nil, readIndex, nil + } + // Not enough bytes to read the rest of the page term rec. + // This theoretically should never happen, since we're only shifting the + // internal buffer of the live reader when we read to the end of page. + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + + if readIndex+recordHeaderSize-1 > len(buf) { + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + + copy(header[1:], buf[readIndex:readIndex+len(header[1:])]) + readIndex += recordHeaderSize - 1 + total += int64(recordHeaderSize - 1) + var ( + length = binary.BigEndian.Uint16(header[1:]) + crc = binary.BigEndian.Uint32(header[3:]) + ) + readTo := int(length) + readIndex + if readTo > len(buf) { + if (readTo - readIndex) > pageSize { + return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length)) + } + // Not enough data to read all of the record data. + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + recData := buf[readIndex:readTo] + readIndex += int(length) + total += int64(length) + + // TODO(callum) what should we do here, throw out the record? We should add a metric at least. + if c := crc32.Checksum(recData, castagnoliTable); c != crc { + return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + return recData, readIndex, nil +} + func min(i, j int) int { if i < j { return i diff --git a/vendor/modules.txt b/vendor/modules.txt index a2830bae0e..fe73cb649e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -229,7 +229,7 @@ github.com/prometheus/procfs github.com/prometheus/procfs/nfs github.com/prometheus/procfs/xfs github.com/prometheus/procfs/internal/util -# github.com/prometheus/tsdb v0.3.2-0.20181219094047-6d489a1004dc +# github.com/prometheus/tsdb v0.4.0 github.com/prometheus/tsdb github.com/prometheus/tsdb/labels github.com/prometheus/tsdb/chunkenc