From ca93fd544b76d1abf3922cc54295549b9c06e62c Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Mon, 19 Nov 2018 15:51:14 +0530 Subject: [PATCH] /api/v1/labels endpoint for getting all label names (#4835) * vendor: update tsdb Signed-off-by: Ganesh Vernekar * /api/v1/labels endpoint Signed-off-by: Ganesh Vernekar * regex matchers for API Signed-off-by: Ganesh Vernekar * Add docs Signed-off-by: Ganesh Vernekar * Matchers behaving as OR Signed-off-by: Ganesh Vernekar * Removed the matchers Signed-off-by: Ganesh Vernekar * vendor: update tsdb using go mod Signed-off-by: Ganesh Vernekar * vendor update: tsdb Signed-off-by: Ganesh Vernekar * Added LabelNames() to storage.Querier Signed-off-by: Ganesh Vernekar * Test for api.labelNames Signed-off-by: Ganesh Vernekar * Nits Signed-off-by: Ganesh Vernekar --- docs/querying/api.md | 45 ++++- go.mod | 2 +- go.sum | 4 +- promql/engine_test.go | 1 + storage/fanout.go | 24 +++ storage/interface.go | 3 + storage/noop.go | 4 + storage/remote/read.go | 6 + storage/tsdb/tsdb.go | 1 + vendor/github.com/prometheus/tsdb/.travis.yml | 3 +- .../github.com/prometheus/tsdb/MAINTAINERS.md | 4 + vendor/github.com/prometheus/tsdb/README.md | 2 + vendor/github.com/prometheus/tsdb/block.go | 17 +- .../github.com/prometheus/tsdb/checkpoint.go | 40 ++--- vendor/github.com/prometheus/tsdb/compact.go | 56 ++++-- vendor/github.com/prometheus/tsdb/db.go | 8 +- .../prometheus/tsdb/encoding_helpers.go | 13 ++ .../prometheus/tsdb/fileutil/fileutil.go | 13 ++ .../prometheus/tsdb/fileutil/mmap.go | 13 ++ .../prometheus/tsdb/fileutil/mmap_386.go | 13 ++ .../prometheus/tsdb/fileutil/mmap_amd64.go | 13 ++ vendor/github.com/prometheus/tsdb/head.go | 160 +++++++++++------- .../prometheus/tsdb/index/encoding_helpers.go | 13 ++ .../github.com/prometheus/tsdb/index/index.go | 37 +++- .../prometheus/tsdb/labels/labels.go | 6 +- vendor/github.com/prometheus/tsdb/querier.go | 38 ++++- vendor/github.com/prometheus/tsdb/record.go | 20 +-- vendor/github.com/prometheus/tsdb/repair.go | 13 ++ .../github.com/prometheus/tsdb/tombstones.go | 14 +- vendor/github.com/prometheus/tsdb/wal/wal.go | 73 ++++---- vendor/modules.txt | 2 +- web/api/v1/api.go | 16 ++ web/api/v1/api_test.go | 45 ++++- 33 files changed, 554 insertions(+), 168 deletions(-) create mode 100644 vendor/github.com/prometheus/tsdb/MAINTAINERS.md diff --git a/docs/querying/api.md b/docs/querying/api.md index 37da527a3a..82ce78699b 100644 --- a/docs/querying/api.md +++ b/docs/querying/api.md @@ -236,6 +236,49 @@ $ curl -g 'http://localhost:9090/api/v1/series?match[]=up&match[]=process_start_ } ``` +### Getting label names + +The following endpoint returns a list of label names: + +``` +GET /api/v1/labels +POST /api/v1/labels +``` + +The `data` section of the JSON response is a list of string label names. + +Here is an example. + +```json +$ curl 'localhost:9090/api/v1/labels' +{ + "status": "success", + "data": [ + "__name__", + "call", + "code", + "config", + "dialer_name", + "endpoint", + "event", + "goversion", + "handler", + "instance", + "interval", + "job", + "le", + "listener_name", + "name", + "quantile", + "reason", + "role", + "scrape_job", + "slice", + "version" + ] +} +``` + ### Querying label values The following endpoint returns a list of label values for a provided label name: @@ -244,7 +287,7 @@ The following endpoint returns a list of label values for a provided label name: GET /api/v1/label//values ``` -The `data` section of the JSON response is a list of string label names. +The `data` section of the JSON response is a list of string label values. This example queries for all label values for the `job` label: diff --git a/go.mod b/go.mod index 99cb80948e..599796f50f 100644 --- a/go.mod +++ b/go.mod @@ -105,7 +105,7 @@ require ( github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1 github.com/prometheus/procfs v0.0.0-20160411190841-abf152e5f3e9 // indirect - github.com/prometheus/tsdb v0.0.0-20181003080831-0ce41118ed20 + github.com/prometheus/tsdb v0.2.0 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/rlmcpherson/s3gof3r v0.5.0 // indirect github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect diff --git a/go.sum b/go.sum index 7405efcef7..030a9a0728 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,8 @@ github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1 h1:osmNoEW2SCW3L github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20160411190841-abf152e5f3e9 h1:IrO4Eb9oGw+GxzOhO4b2QC5EWO85Omh/4iTSPZktMm8= github.com/prometheus/procfs v0.0.0-20160411190841-abf152e5f3e9/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/prometheus/tsdb v0.0.0-20181003080831-0ce41118ed20 h1:Jh/eKJuru9z9u3rUGdQ8gYc3aZmCGkjXT3gmy0Ex8W8= -github.com/prometheus/tsdb v0.0.0-20181003080831-0ce41118ed20/go.mod h1:lFf/o1J2a31WmWQbxYXfY1azJK5Xp5D8hwKMnVMBTGU= +github.com/prometheus/tsdb v0.2.0 h1:27z98vFd/gPew17nmKEbLn37exGCwc2F5EyrgScg6bk= +github.com/prometheus/tsdb v0.2.0/go.mod h1:lFf/o1J2a31WmWQbxYXfY1azJK5Xp5D8hwKMnVMBTGU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rlmcpherson/s3gof3r v0.5.0 h1:1izOJpTiohSibfOHuNyEA/yQnAirh05enzEdmhez43k= diff --git a/promql/engine_test.go b/promql/engine_test.go index a219af231a..61cdba430b 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -173,6 +173,7 @@ func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage. return errSeriesSet{err: q.err}, q.err } func (*errQuerier) LabelValues(name string) ([]string, error) { return nil, nil } +func (*errQuerier) LabelNames() ([]string, error) { return nil, nil } func (*errQuerier) Close() error { return nil } // errSeriesSet implements storage.SeriesSet which always returns error. diff --git a/storage/fanout.go b/storage/fanout.go index 3ab994391c..af4fe1b327 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -16,10 +16,12 @@ package storage import ( "container/heap" "context" + "sort" "strings" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) @@ -280,6 +282,28 @@ func mergeTwoStringSlices(a, b []string) []string { return result } +// LabelNames returns all the unique label names present in the block in sorted order. +func (q *mergeQuerier) LabelNames() ([]string, error) { + labelNamesMap := make(map[string]struct{}) + for _, b := range q.queriers { + names, err := b.LabelNames() + if err != nil { + return nil, errors.Wrap(err, "LabelNames() from Querier") + } + for _, name := range names { + labelNamesMap[name] = struct{}{} + } + } + + labelNames := make([]string, 0, len(labelNamesMap)) + for name := range labelNamesMap { + labelNames = append(labelNames, name) + } + sort.Strings(labelNames) + + return labelNames, nil +} + // Close releases the resources of the Querier. func (q *mergeQuerier) Close() error { // TODO return multiple errors? diff --git a/storage/interface.go b/storage/interface.go index 1d120cd640..45f97d8ed7 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -57,6 +57,9 @@ type Querier interface { // LabelValues returns all potential values for a label name. LabelValues(name string) ([]string, error) + // LabelNames returns all the unique label names present in the block in sorted order. + LabelNames() ([]string, error) + // Close releases the resources of the Querier. Close() error } diff --git a/storage/noop.go b/storage/noop.go index 7bf92dbd79..a0ead036e6 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -34,6 +34,10 @@ func (noopQuerier) LabelValues(name string) ([]string, error) { return nil, nil } +func (noopQuerier) LabelNames() ([]string, error) { + return nil, nil +} + func (noopQuerier) Close() error { return nil } diff --git a/storage/remote/read.go b/storage/remote/read.go index 775471e806..7d9652f721 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -83,6 +83,12 @@ func (q *querier) LabelValues(name string) ([]string, error) { return nil, nil } +// LabelNames implements storage.Querier and is a noop. +func (q *querier) LabelNames() ([]string, error) { + // TODO implement? + return nil, nil +} + // Close implements storage.Querier and is a noop. func (q *querier) Close() error { return nil diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 92e8cadd3e..d04a47a129 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -202,6 +202,7 @@ func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storag } func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) } +func (q querier) LabelNames() ([]string, error) { return q.q.LabelNames() } func (q querier) Close() error { return q.q.Close() } type seriesSet struct { diff --git a/vendor/github.com/prometheus/tsdb/.travis.yml b/vendor/github.com/prometheus/tsdb/.travis.yml index f659ab70ad..c24773b7b1 100644 --- a/vendor/github.com/prometheus/tsdb/.travis.yml +++ b/vendor/github.com/prometheus/tsdb/.travis.yml @@ -13,6 +13,5 @@ install: - go get -v -t ./... script: - # `check_license` target is omitted due to some missing license headers # `staticcheck` target is omitted due to linting errors - - make style unused test + - make check_license style unused test diff --git a/vendor/github.com/prometheus/tsdb/MAINTAINERS.md b/vendor/github.com/prometheus/tsdb/MAINTAINERS.md new file mode 100644 index 0000000000..dcb57a80df --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/MAINTAINERS.md @@ -0,0 +1,4 @@ +Maintainers of this repository: + +* Krasi Georgiev @krasi-georgiev +* Goutham Veeramachaneni @gouthamve \ No newline at end of file diff --git a/vendor/github.com/prometheus/tsdb/README.md b/vendor/github.com/prometheus/tsdb/README.md index 737020c4df..4393d511a5 100644 --- a/vendor/github.com/prometheus/tsdb/README.md +++ b/vendor/github.com/prometheus/tsdb/README.md @@ -7,6 +7,8 @@ This repository contains the Prometheus storage layer that is used in its 2.x re A writeup of its design can be found [here](https://fabxc.org/blog/2017-04-10-writing-a-tsdb/). +Based on the Gorilla TSDB [white papers](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf). + Video: [Storing 16 Bytes at Scale](https://youtu.be/b_pEevMAC3I) from [PromCon 2017](https://promcon.io/2017-munich/). See also the [format documentation](docs/format/README.md). diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 981c69eb44..7081db0772 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -83,8 +83,12 @@ type IndexReader interface { Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error // LabelIndices returns a list of string tuples for which a label value index exists. + // NOTE: This is deprecated. Use `LabelNames()` instead. LabelIndices() ([][]string, error) + // LabelNames returns all the unique label names present in the index in sorted order. + LabelNames() ([]string, error) + // Close releases the underlying resources of the reader. Close() error } @@ -407,6 +411,10 @@ func (r blockIndexReader) LabelIndices() ([][]string, error) { return ss, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) } +func (r blockIndexReader) LabelNames() ([]string, error) { + return r.b.LabelNames() +} + func (r blockIndexReader) Close() error { r.b.pendingReaders.Done() return nil @@ -449,7 +457,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := NewMemTombstones() + stones := newMemTombstones() var lset labels.Labels var chks []chunks.Meta @@ -557,13 +565,18 @@ func (pb *Block) Snapshot(dir string) error { return nil } -// Returns true if the block overlaps [mint, maxt]. +// OverlapsClosedInterval returns true if the block overlaps [mint, maxt]. func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool { // The block itself is a half-open interval // [pb.meta.MinTime, pb.meta.MaxTime). return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime } +// LabelNames returns all the unique label names present in the Block in sorted order. +func (pb *Block) LabelNames() ([]string, error) { + return pb.indexr.LabelNames() +} + func clampInterval(a, b, mint, maxt int64) (int64, int64) { if a < mint { a = mint diff --git a/vendor/github.com/prometheus/tsdb/checkpoint.go b/vendor/github.com/prometheus/tsdb/checkpoint.go index b8de5d14ac..aa8170520c 100644 --- a/vendor/github.com/prometheus/tsdb/checkpoint.go +++ b/vendor/github.com/prometheus/tsdb/checkpoint.go @@ -38,7 +38,7 @@ type CheckpointStats struct { TotalTombstones int // Processed tombstones including dropped ones. } -// LastCheckpoint returns the directory name of the most recent checkpoint. +// LastCheckpoint returns the directory name and index of the most recent checkpoint. // If dir does not contain any checkpoints, ErrNotFound is returned. func LastCheckpoint(dir string) (string, int, error) { files, err := ioutil.ReadDir(dir) @@ -55,18 +55,17 @@ func LastCheckpoint(dir string) (string, int, error) { if !fi.IsDir() { return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name()) } - k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) if err != nil { continue } - return fi.Name(), k, nil + return fi.Name(), idx, nil } return "", 0, ErrNotFound } -// DeleteCheckpoints deletes all checkpoints in dir that have an index -// below n. -func DeleteCheckpoints(dir string, n int) error { +// DeleteCheckpoints deletes all checkpoints in a directory below a given index. +func DeleteCheckpoints(dir string, maxIndex int) error { var errs MultiError files, err := ioutil.ReadDir(dir) @@ -77,8 +76,8 @@ func DeleteCheckpoints(dir string, n int) error { if !strings.HasPrefix(fi.Name(), checkpointPrefix) { continue } - k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) - if err != nil || k >= n { + index, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil || index >= maxIndex { continue } if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { @@ -90,7 +89,7 @@ func DeleteCheckpoints(dir string, n int) error { const checkpointPrefix = "checkpoint." -// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL. +// Checkpoint creates a compacted checkpoint of segments in range [first, last] in the given WAL. // It includes the most recent checkpoint if it exists. // All series not satisfying keep and samples below mint are dropped. // @@ -98,7 +97,7 @@ const checkpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sr io.Reader @@ -107,27 +106,28 @@ func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*C // files if there is an error somewhere. var closers []io.Closer { - lastFn, k, err := LastCheckpoint(w.Dir()) + dir, idx, err := LastCheckpoint(w.Dir()) if err != nil && err != ErrNotFound { return nil, errors.Wrap(err, "find last checkpoint") } + last := idx + 1 if err == nil { - if m > k+1 { - return nil, errors.New("unexpected gap to last checkpoint") + if from > last { + return nil, fmt.Errorf("unexpected gap to last checkpoint. expected:%v, requested:%v", last, from) } // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. - m = k + 1 + from = last - last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn)) + r, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), dir)) if err != nil { return nil, errors.Wrap(err, "open last checkpoint") } - defer last.Close() - closers = append(closers, last) - sr = last + defer r.Close() + closers = append(closers, r) + sr = r } - segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) + segsr, err := wal.NewSegmentsRangeReader(w.Dir(), from, to) if err != nil { return nil, errors.Wrap(err, "create segment reader") } @@ -141,7 +141,7 @@ func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*C } } - cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n)) + cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) cpdirtmp := cpdir + ".tmp" if err := os.MkdirAll(cpdirtmp, 0777); err != nil { diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 3f5fa367c6..f8e6ff545c 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -59,7 +59,9 @@ type Compactor interface { // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). - Compact(dest string, dirs ...string) (ulid.ULID, error) + // Can optionally pass a list of already open blocks, + // to avoid having to reopen them. + Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) } // LeveledCompactor implements the Compactor interface. @@ -317,26 +319,41 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { // Compact creates a new block in the compactor's directory from the blocks in the // provided directories. -func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, err error) { +func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { var ( blocks []BlockReader bs []*Block metas []*BlockMeta uids []string ) + start := time.Now() for _, d := range dirs { - b, err := OpenBlock(d, c.chunkPool) - if err != nil { - return uid, err - } - defer b.Close() - meta, err := readMetaFile(d) if err != nil { return uid, err } + var b *Block + + // Use already open blocks if we can, to avoid + // having the index data in memory twice. + for _, o := range open { + if meta.ULID == o.Meta().ULID { + b = o + break + } + } + + if b == nil { + var err error + b, err = OpenBlock(d, c.chunkPool) + if err != nil { + return uid, err + } + defer b.Close() + } + metas = append(metas, meta) blocks = append(blocks, b) bs = append(bs, b) @@ -356,6 +373,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, "maxt", meta.MaxTime, "ulid", meta.ULID, "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), ) return uid, nil } @@ -489,7 +507,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil { + if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -524,6 +542,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { + if len(blocks) == 0 { + return errors.New("cannot populate block from no readers") + } + var ( set ChunkSeriesSet allSymbols = make(map[string]struct{}, 1<<16) @@ -595,13 +617,17 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, continue } - if len(dranges) > 0 { - // Re-encode the chunk to not have deleted values. - for i, chk := range chks { + for i, chk := range chks { + if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime { + return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d", + chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime) + } + + if len(dranges) > 0 { + // Re-encode the chunk to not have deleted values. if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) { continue } - newChunk := chunkenc.NewXORChunk() app, err := newChunk.Appender() if err != nil { @@ -617,6 +643,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, chks[i].Chunk = newChunk } } + if err := chunkw.WriteChunks(chks...); err != nil { return errors.Wrap(err, "write chunks") } @@ -791,7 +818,6 @@ func (c *compactionMerger) Next() bool { var chks []chunks.Meta d := c.compare() - // Both sets contain the current series. Chain them into a single one. if d > 0 { lset, chks, c.intervals = c.b.At() c.l = append(c.l[:0], lset...) @@ -805,8 +831,10 @@ func (c *compactionMerger) Next() bool { c.aok = c.a.Next() } else { + // Both sets contain the current series. Chain them into a single one. l, ca, ra := c.a.At() _, cb, rb := c.b.At() + for _, r := range rb { ra = ra.add(r) } diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index cb02b48356..3f3ae72bb5 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -429,7 +429,7 @@ func (db *DB) compact() (err error) { default: } - if _, err := db.compactor.Compact(db.dir, plan...); err != nil { + if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil { return errors.Wrapf(err, "compact %s", plan) } runtime.GC() @@ -793,7 +793,11 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { } } if maxt >= db.head.MinTime() { - blocks = append(blocks, db.head) + blocks = append(blocks, &rangeHead{ + head: db.head, + mint: mint, + maxt: maxt, + }) } sq := &querier{ diff --git a/vendor/github.com/prometheus/tsdb/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/encoding_helpers.go index ffb58b5c8b..6dd6e7c2e4 100644 --- a/vendor/github.com/prometheus/tsdb/encoding_helpers.go +++ b/vendor/github.com/prometheus/tsdb/encoding_helpers.go @@ -1,3 +1,16 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( diff --git a/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go b/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go index 15403c8b3c..677df8c090 100644 --- a/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go +++ b/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go @@ -1,3 +1,16 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + // Package fileutil provides utility methods used when dealing with the filesystem in tsdb. // It is largely copied from github.com/coreos/etcd/pkg/fileutil to avoid the // dependency chain it brings with it. diff --git a/vendor/github.com/prometheus/tsdb/fileutil/mmap.go b/vendor/github.com/prometheus/tsdb/fileutil/mmap.go index a0c598254e..26fc80c585 100644 --- a/vendor/github.com/prometheus/tsdb/fileutil/mmap.go +++ b/vendor/github.com/prometheus/tsdb/fileutil/mmap.go @@ -1,3 +1,16 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package fileutil import ( diff --git a/vendor/github.com/prometheus/tsdb/fileutil/mmap_386.go b/vendor/github.com/prometheus/tsdb/fileutil/mmap_386.go index 156f81b638..66b9d36803 100644 --- a/vendor/github.com/prometheus/tsdb/fileutil/mmap_386.go +++ b/vendor/github.com/prometheus/tsdb/fileutil/mmap_386.go @@ -1,3 +1,16 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + // +build windows package fileutil diff --git a/vendor/github.com/prometheus/tsdb/fileutil/mmap_amd64.go b/vendor/github.com/prometheus/tsdb/fileutil/mmap_amd64.go index 4025dbfcb9..4b523bc67c 100644 --- a/vendor/github.com/prometheus/tsdb/fileutil/mmap_amd64.go +++ b/vendor/github.com/prometheus/tsdb/fileutil/mmap_amd64.go @@ -1,3 +1,16 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + // +build windows package fileutil diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 8d259fd666..2adda313d6 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -225,7 +225,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), - tombstones: NewMemTombstones(), + tombstones: newMemTombstones(), } h.metrics = newHeadMetrics(h, r) @@ -237,22 +237,28 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int // Samples before the mint timestamp are discarded. func (h *Head) processWALSamples( minValidTime int64, - partition, total uint64, input <-chan []RefSample, output chan<- []RefSample, ) (unknownRefs uint64) { defer close(output) + // Mitigate lock contention in getByID. + refSeries := map[uint64]*memSeries{} + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for samples := range input { for _, s := range samples { - if s.T < minValidTime || s.Ref%total != partition { + if s.T < minValidTime { continue } - ms := h.series.getByID(s.Ref) + ms := refSeries[s.Ref] if ms == nil { - unknownRefs++ - continue + ms = h.series.getByID(s.Ref) + if ms == nil { + unknownRefs++ + continue + } + refSeries[s.Ref] = ms } _, chunkCreated := ms.append(s.T, s.V) if chunkCreated { @@ -310,25 +316,22 @@ func (h *Head) loadWAL(r *wal.Reader) error { // They are connected through a ring of channels which ensures that all sample batches // read from the WAL are processed in order. var ( - wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - firstInput = make(chan []RefSample, 300) - input = firstInput + wg sync.WaitGroup + n = runtime.GOMAXPROCS(0) + inputs = make([]chan []RefSample, n) + outputs = make([]chan []RefSample, n) ) wg.Add(n) for i := 0; i < n; i++ { - output := make(chan []RefSample, 300) + outputs[i] = make(chan []RefSample, 300) + inputs[i] = make(chan []RefSample, 300) - go func(i int, input <-chan []RefSample, output chan<- []RefSample) { - unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output) + go func(input <-chan []RefSample, output chan<- []RefSample) { + unknown := h.processWALSamples(minValidTime, input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() - }(i, input, output) - - // The output feeds the next worker goroutine. For the last worker, - // it feeds the initial input again to reuse the RefSample slices. - input = output + }(inputs[i], outputs[i]) } var ( @@ -336,6 +339,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { series []RefSeries samples []RefSample tstones []Stone + err error ) for r.Next() { series, samples, tstones = series[:0], samples[:0], tstones[:0] @@ -343,7 +347,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { switch dec.Type(rec) { case RecordSeries: - series, err := dec.Series(rec, series) + series, err = dec.Series(rec, series) if err != nil { return errors.Wrap(err, "decode series") } @@ -355,7 +359,8 @@ func (h *Head) loadWAL(r *wal.Reader) error { } } case RecordSamples: - samples, err := dec.Samples(rec, samples) + samples, err = dec.Samples(rec, samples) + s := samples if err != nil { return errors.Wrap(err, "decode samples") } @@ -364,20 +369,31 @@ func (h *Head) loadWAL(r *wal.Reader) error { // cause thousands of very large in flight buffers occupying large amounts // of unused memory. for len(samples) > 0 { - n := 5000 - if len(samples) < n { - n = len(samples) + m := 5000 + if len(samples) < m { + m = len(samples) } - var buf []RefSample - select { - case buf = <-input: - default: + shards := make([][]RefSample, n) + for i := 0; i < n; i++ { + var buf []RefSample + select { + case buf = <-outputs[i]: + default: + } + shards[i] = buf[:0] } - firstInput <- append(buf[:0], samples[:n]...) - samples = samples[n:] + for _, sam := range samples[:m] { + mod := sam.Ref % uint64(n) + shards[mod] = append(shards[mod], sam) + } + for i := 0; i < n; i++ { + inputs[i] <- shards[i] + } + samples = samples[m:] } + samples = s // Keep whole slice for reuse. case RecordTombstones: - tstones, err := dec.Tombstones(rec, tstones) + tstones, err = dec.Tombstones(rec, tstones) if err != nil { return errors.Wrap(err, "decode tombstones") } @@ -397,9 +413,11 @@ func (h *Head) loadWAL(r *wal.Reader) error { return errors.Wrap(r.Err(), "read records") } - // Signal termination to first worker and wait for last one to close its output channel. - close(firstInput) - for range input { + // Signal termination to each worker and wait for it to close its output channel. + for i := 0; i < n; i++ { + close(inputs[i]) + for range outputs[i] { + } } wg.Wait() @@ -418,12 +436,12 @@ func (h *Head) Init() error { } // Backfill the checkpoint first if it exists. - cp, n, err := LastCheckpoint(h.wal.Dir()) + dir, startFrom, err := LastCheckpoint(h.wal.Dir()) if err != nil && err != ErrNotFound { return errors.Wrap(err, "find last checkpoint") } if err == nil { - sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp)) + sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), dir)) if err != nil { return errors.Wrap(err, "open checkpoint") } @@ -434,11 +452,11 @@ func (h *Head) Init() error { if err := h.loadWAL(wal.NewReader(sr)); err != nil { return errors.Wrap(err, "backfill checkpoint") } - n++ + startFrom++ } // Backfill segments from the last checkpoint onwards - sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1) + sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), startFrom, -1) if err != nil { return errors.Wrap(err, "open WAL segments") } @@ -493,18 +511,18 @@ func (h *Head) Truncate(mint int64) (err error) { } start = time.Now() - m, n, err := h.wal.Segments() + first, last, err := h.wal.Segments() if err != nil { return errors.Wrap(err, "get segment range") } - n-- // Never consider last segment for checkpoint. - if n < 0 { + last-- // Never consider last segment for checkpoint. + if last < 0 { return nil // no segments yet. } // The lower third of segments should contain mostly obsolete samples. // If we have less than three segments, it's not worth checkpointing yet. - n = m + (n-m)/3 - if n <= m { + last = first + (last-first)/3 + if last <= first { return nil } @@ -512,18 +530,18 @@ func (h *Head) Truncate(mint int64) (err error) { return h.series.getByID(id) != nil } h.metrics.checkpointCreationTotal.Inc() - if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil { + if _, err = Checkpoint(h.wal, first, last, keep, mint); err != nil { h.metrics.checkpointCreationFail.Inc() return errors.Wrap(err, "create checkpoint") } - if err := h.wal.Truncate(n + 1); err != nil { + if err := h.wal.Truncate(last + 1); err != nil { // If truncating fails, we'll just try again at the next checkpoint. // Leftover segments will just be ignored in the future if there's a checkpoint // that supersedes them. level.Error(h.logger).Log("msg", "truncating segments failed", "err", err) } h.metrics.checkpointDeleteTotal.Inc() - if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil { + if err := DeleteCheckpoints(h.wal.Dir(), last); err != nil { // Leftover old checkpoints do not cause problems down the line beyond // occupying disk space. // They will just be ignored since a higher checkpoint exists. @@ -533,7 +551,7 @@ func (h *Head) Truncate(mint int64) (err error) { h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) level.Info(h.logger).Log("msg", "WAL checkpoint complete", - "low", m, "high", n, "duration", time.Since(start)) + "first", first, "last", last, "duration", time.Since(start)) return nil } @@ -1014,19 +1032,33 @@ func (h *headIndexReader) LabelValues(names ...string) (index.StringTuples, erro if len(names) != 1 { return nil, errInvalidSize } - var sl []string h.head.symMtx.RLock() - defer h.head.symMtx.RUnlock() - + sl := make([]string, 0, len(h.head.values[names[0]])) for s := range h.head.values[names[0]] { sl = append(sl, s) } + h.head.symMtx.RUnlock() sort.Strings(sl) return index.NewStringTuples(sl, len(names)) } +// LabelNames returns all the unique label names present in the head. +func (h *headIndexReader) LabelNames() ([]string, error) { + h.head.symMtx.RLock() + defer h.head.symMtx.RUnlock() + labelNames := make([]string, 0, len(h.head.values)) + for name := range h.head.values { + if name == "" { + continue + } + labelNames = append(labelNames, name) + } + sort.Strings(labelNames) + return labelNames, nil +} + // Postings returns the postings list iterator for the label pair. func (h *headIndexReader) Postings(name, value string) (index.Postings, error) { return h.head.postings.Get(name, value), nil @@ -1088,9 +1120,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks func (h *headIndexReader) LabelIndices() ([][]string, error) { h.head.symMtx.RLock() defer h.head.symMtx.RUnlock() - res := [][]string{} - for s := range h.head.values { res = append(res, []string{s}) } @@ -1313,6 +1343,14 @@ type sample struct { v float64 } +func (s sample) T() int64 { + return s.t +} + +func (s sample) V() float64 { + return s.v +} + // memSeries is the in-memory representation of a series. None of its methods // are goroutine safe and it is the caller's responsibility to lock it. type memSeries struct { @@ -1321,11 +1359,11 @@ type memSeries struct { ref uint64 lset labels.Labels chunks []*memChunk + headChunk *memChunk chunkRange int64 firstChunkID int nextAt int64 // Timestamp at which to cut the next chunk. - lastValue float64 sampleBuf [4]sample pendingCommit bool // Whether there are samples waiting to be committed to this series. @@ -1354,6 +1392,7 @@ func (s *memSeries) cut(mint int64) *memChunk { maxTime: math.MinInt64, } s.chunks = append(s.chunks, c) + s.headChunk = c // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. @@ -1392,7 +1431,7 @@ func (s *memSeries) appendable(t int64, v float64) error { } // We are allowing exact duplicates as we can encounter them in valid cases // like federation and erroring out at that time would be extremely noisy. - if math.Float64bits(s.lastValue) != math.Float64bits(v) { + if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { return ErrAmendSample } return nil @@ -1422,12 +1461,20 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { } s.chunks = append(s.chunks[:0], s.chunks[k:]...) s.firstChunkID += k + if len(s.chunks) == 0 { + s.headChunk = nil + } else { + s.headChunk = s.chunks[len(s.chunks)-1] + } return k } // append adds the sample (t, v) to the series. func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { + // Based on Gorilla white papers this offers near-optimal compression ratio + // so anything bigger that this has diminishing returns and increases + // the time range within which we have to decompress all samples. const samplesPerChunk = 120 c := s.head() @@ -1456,8 +1503,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c.maxTime = t - s.lastValue = v - s.sampleBuf[0] = s.sampleBuf[1] s.sampleBuf[1] = s.sampleBuf[2] s.sampleBuf[2] = s.sampleBuf[3] @@ -1501,10 +1546,7 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator { } func (s *memSeries) head() *memChunk { - if len(s.chunks) == 0 { - return nil - } - return s.chunks[len(s.chunks)-1] + return s.headChunk } type memChunk struct { diff --git a/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go index 69e729791f..602498f115 100644 --- a/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go +++ b/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go @@ -1,3 +1,16 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package index import ( diff --git a/vendor/github.com/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/tsdb/index/index.go index 17acf9ab29..133799ac96 100644 --- a/vendor/github.com/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/tsdb/index/index.go @@ -38,6 +38,8 @@ const ( indexFormatV1 = 1 indexFormatV2 = 2 + + labelNameSeperator = "\xff" ) type indexWriterSeries struct { @@ -850,9 +852,8 @@ func (r *Reader) SymbolTable() map[uint32]string { // LabelValues returns value tuples that exist for the given label name tuples. func (r *Reader) LabelValues(names ...string) (StringTuples, error) { - const sep = "\xff" - key := strings.Join(names, sep) + key := strings.Join(names, labelNameSeperator) off, ok := r.labels[key] if !ok { // XXX(fabxc): hot fix. Should return a partial data error and handle cases @@ -882,14 +883,12 @@ type emptyStringTuples struct{} func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } func (emptyStringTuples) Len() int { return 0 } -// LabelIndices returns a for which labels or label tuples value indices exist. +// LabelIndices returns a slice of label names for which labels or label tuples value indices exist. +// NOTE: This is deprecated. Use `LabelNames()` instead. func (r *Reader) LabelIndices() ([][]string, error) { - const sep = "\xff" - res := [][]string{} - for s := range r.labels { - res = append(res, strings.Split(s, sep)) + res = append(res, strings.Split(s, labelNameSeperator)) } return res, nil } @@ -935,6 +934,30 @@ func (r *Reader) SortedPostings(p Postings) Postings { return p } +// LabelNames returns all the unique label names present in the index. +func (r *Reader) LabelNames() ([]string, error) { + labelNamesMap := make(map[string]struct{}, len(r.labels)) + for key := range r.labels { + // 'key' contains the label names concatenated with the + // delimiter 'labelNameSeperator'. + names := strings.Split(key, labelNameSeperator) + for _, name := range names { + if name == allPostingsKey.Name { + // This is not from any metric. + // It is basically an empty label name. + continue + } + labelNamesMap[name] = struct{}{} + } + } + labelNames := make([]string, 0, len(labelNamesMap)) + for name := range labelNamesMap { + labelNames = append(labelNames, name) + } + sort.Strings(labelNames) + return labelNames, nil +} + type stringTuples struct { length int // tuple length entries []string // flattened tuple entries diff --git a/vendor/github.com/prometheus/tsdb/labels/labels.go b/vendor/github.com/prometheus/tsdb/labels/labels.go index d76ba0d08d..d1ba70b454 100644 --- a/vendor/github.com/prometheus/tsdb/labels/labels.go +++ b/vendor/github.com/prometheus/tsdb/labels/labels.go @@ -117,11 +117,13 @@ func New(ls ...Label) Labels { // FromMap returns new sorted Labels from the given map. func FromMap(m map[string]string) Labels { - l := make([]Label, 0, len(m)) + l := make(Labels, 0, len(m)) for k, v := range m { l = append(l, Label{Name: k, Value: v}) } - return New(l...) + sort.Sort(l) + + return l } // FromStrings creates new labels from pairs of strings. diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index dbd49cec84..7459c6bee6 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -33,10 +33,14 @@ type Querier interface { // LabelValues returns all potential values for a label name. LabelValues(string) ([]string, error) + // LabelValuesFor returns all potential values for a label name. // under the constraint of another label. LabelValuesFor(string, labels.Label) ([]string, error) + // LabelNames returns all the unique label names present in the block in sorted order. + LabelNames() ([]string, error) + // Close releases the resources of the Querier. Close() error } @@ -60,6 +64,28 @@ func (q *querier) LabelValues(n string) ([]string, error) { return q.lvals(q.blocks, n) } +// LabelNames returns all the unique label names present querier blocks. +func (q *querier) LabelNames() ([]string, error) { + labelNamesMap := make(map[string]struct{}) + for _, b := range q.blocks { + names, err := b.LabelNames() + if err != nil { + return nil, errors.Wrap(err, "LabelNames() from Querier") + } + for _, name := range names { + labelNamesMap[name] = struct{}{} + } + } + + labelNames := make([]string, 0, len(labelNamesMap)) + for name := range labelNamesMap { + labelNames = append(labelNames, name) + } + sort.Strings(labelNames) + + return labelNames, nil +} + func (q *querier) lvals(qs []Querier, n string) ([]string, error) { if len(qs) == 0 { return nil, nil @@ -187,6 +213,10 @@ func (q *blockQuerier) LabelValues(name string) ([]string, error) { return res, nil } +func (q *blockQuerier) LabelNames() ([]string, error) { + return q.index.LabelNames() +} + func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { return nil, fmt.Errorf("not implemented") } @@ -249,7 +279,7 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) } func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error) { - // If the matcher selects an empty value, it selects all the series which dont + // If the matcher selects an empty value, it selects all the series which don't // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 if m.Matches("") { @@ -478,7 +508,7 @@ type baseChunkSeries struct { // over them. It drops chunks based on tombstones in the given reader. func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { - tr = NewMemTombstones() + tr = newMemTombstones() } p, err := PostingsForMatchers(ir, ms...) if err != nil { @@ -499,8 +529,8 @@ func (s *baseChunkSeries) Err() error { return s.err } func (s *baseChunkSeries) Next() bool { var ( - lset labels.Labels - chkMetas []chunks.Meta + lset = make(labels.Labels, len(s.lset)) + chkMetas = make([]chunks.Meta, len(s.chks)) err error ) diff --git a/vendor/github.com/prometheus/tsdb/record.go b/vendor/github.com/prometheus/tsdb/record.go index c8cc7a5043..364e8144d9 100644 --- a/vendor/github.com/prometheus/tsdb/record.go +++ b/vendor/github.com/prometheus/tsdb/record.go @@ -26,22 +26,16 @@ import ( type RecordType uint8 const ( - RecordInvalid RecordType = 255 - RecordSeries RecordType = 1 - RecordSamples RecordType = 2 + // RecordInvalid is returned for unrecognised WAL record types. + RecordInvalid RecordType = 255 + // RecordSeries is used to match WAL records of type Series. + RecordSeries RecordType = 1 + // RecordSamples is used to match WAL records of type Samples. + RecordSamples RecordType = 2 + // RecordTombstones is used to match WAL records of type Tombstones. RecordTombstones RecordType = 3 ) -type RecordLogger interface { - Log(recs ...[]byte) error -} - -type RecordReader interface { - Next() bool - Err() error - Record() []byte -} - // RecordDecoder decodes series, sample, and tombstone records. // The zero value is ready to use. type RecordDecoder struct { diff --git a/vendor/github.com/prometheus/tsdb/repair.go b/vendor/github.com/prometheus/tsdb/repair.go index be8c1f3ed6..fd40cbb5a9 100644 --- a/vendor/github.com/prometheus/tsdb/repair.go +++ b/vendor/github.com/prometheus/tsdb/repair.go @@ -1,3 +1,16 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( diff --git a/vendor/github.com/prometheus/tsdb/tombstones.go b/vendor/github.com/prometheus/tsdb/tombstones.go index 0626ac58e9..a1f30b59c7 100644 --- a/vendor/github.com/prometheus/tsdb/tombstones.go +++ b/vendor/github.com/prometheus/tsdb/tombstones.go @@ -29,7 +29,7 @@ const tombstoneFilename = "tombstones" const ( // MagicTombstone is 4 bytes at the head of a tombstone file. - MagicTombstone = 0x130BA30 + MagicTombstone = 0x0130BA30 tombstoneFormatV1 = 1 ) @@ -113,10 +113,10 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (*memTombstones, error) { +func readTombstones(dir string) (TombstoneReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return NewMemTombstones(), nil + return newMemTombstones(), nil } else if err != nil { return nil, err } @@ -146,7 +146,7 @@ func readTombstones(dir string) (*memTombstones, error) { return nil, errors.New("checksum did not match") } - stonesMap := NewMemTombstones() + stonesMap := newMemTombstones() for d.len() > 0 { k := d.uvarint64() @@ -167,7 +167,9 @@ type memTombstones struct { mtx sync.RWMutex } -func NewMemTombstones() *memTombstones { +// newMemTombstones creates new in memory TombstoneReader +// that allows adding new intervals. +func newMemTombstones() *memTombstones { return &memTombstones{intvlGroups: make(map[uint64]Intervals)} } @@ -208,7 +210,7 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } -func (memTombstones) Close() error { +func (*memTombstones) Close() error { return nil } diff --git a/vendor/github.com/prometheus/tsdb/wal/wal.go b/vendor/github.com/prometheus/tsdb/wal/wal.go index d9a59c005c..1aae430b43 100644 --- a/vendor/github.com/prometheus/tsdb/wal/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal/wal.go @@ -46,6 +46,10 @@ const ( // before. var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) +// page is an in memory buffer used to batch disk writes. +// Records bigger than the page size are split and flushed separately. +// A flush is triggered when a single records doesn't fit the page size or +// when the next record can't fit in the remaining free page space. type page struct { alloc int flushed int @@ -92,8 +96,9 @@ func (e *CorruptionErr) Error() string { } // OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. -func OpenWriteSegment(dir string, k int) (*Segment, error) { - f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666) +func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) { + segName := SegmentName(dir, k) + f, err := os.OpenFile(segName, os.O_WRONLY|os.O_APPEND, 0666) if err != nil { return nil, err } @@ -108,6 +113,7 @@ func OpenWriteSegment(dir string, k int) (*Segment, error) { // If it was torn mid-record, a full read (which the caller should do anyway // to ensure integrity) will detect it as a corruption by the end. if d := stat.Size() % pageSize; d != 0 { + level.Warn(logger).Log("msg", "last page of the wal is torn, filling it with zeros", "segment", segName) if _, err := f.Write(make([]byte, pageSize-d)); err != nil { f.Close() return nil, errors.Wrap(err, "zero-pad torn page") @@ -225,7 +231,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi return nil, err } } else { - if w.segment, err = OpenWriteSegment(w.dir, j); err != nil { + if w.segment, err = OpenWriteSegment(w.logger, w.dir, j); err != nil { return nil, err } // Correctly initialize donePages. @@ -289,13 +295,13 @@ func (w *WAL) Repair(origErr error) error { if err != nil { return errors.Wrap(err, "list segments") } - level.Warn(w.logger).Log("msg", "deleting all segments behind corruption") + level.Warn(w.logger).Log("msg", "deleting all segments behind corruption", "segment", cerr.Segment) for _, s := range segs { - if s.n <= cerr.Segment { + if s.index <= cerr.Segment { continue } - if w.segment.i == s.n { + if w.segment.i == s.index { // The active segment needs to be removed, // close it first (Windows!). Can be closed safely // as we set the current segment to repaired file @@ -304,14 +310,14 @@ func (w *WAL) Repair(origErr error) error { return errors.Wrap(err, "close active segment") } } - if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { - return errors.Wrap(err, "delete segment") + if err := os.Remove(filepath.Join(w.dir, s.name)); err != nil { + return errors.Wrapf(err, "delete segment:%v", s.index) } } // Regardless of the corruption offset, no record reaches into the previous segment. // So we can safely repair the WAL by removing the segment and re-inserting all // its records up to the corruption. - level.Warn(w.logger).Log("msg", "rewrite corrupted segment") + level.Warn(w.logger).Log("msg", "rewrite corrupted segment", "segment", cerr.Segment) fn := SegmentName(w.dir, cerr.Segment) tmpfn := fn + ".repair" @@ -397,7 +403,7 @@ func (w *WAL) flushPage(clear bool) error { // No more data will fit into the page. Enqueue and clear it. if clear { - p.alloc = pageSize // write till end of page + p.alloc = pageSize // Write till end of page. w.pageCompletions.Inc() } n, err := w.segment.Write(p.buf[p.flushed:p.alloc]) @@ -465,13 +471,14 @@ func (w *WAL) Log(recs ...[]byte) error { } // log writes rec to the log and forces a flush of the current page if its -// the final record of a batch. +// the final record of a batch, the record is bigger than the page size or +// the current page is full. func (w *WAL) log(rec []byte, final bool) error { - // If the record is too big to fit within pages in the current + // If the record is too big to fit within the active page in the current // segment, terminate the active segment and advance to the next one. // This ensures that records do not cross segment boundaries. - left := w.page.remaining() - recordHeaderSize // Active pages. - left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages. + left := w.page.remaining() - recordHeaderSize // Free space in the active page. + left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages in the active segment. if len(rec) > left { if err := w.nextSegment(); err != nil { @@ -511,7 +518,9 @@ func (w *WAL) log(rec []byte, final bool) error { copy(buf[recordHeaderSize:], part) p.alloc += len(part) + recordHeaderSize - // If we wrote a full record, we can fit more records of the batch + // By definition when a record is split it means its size is bigger than + // the page boundary so the current page would be full and needs to be flushed. + // On contrary if we wrote a full record, we can fit more records of the batch // into the page before flushing it. if final || typ != recFull || w.page.full() { if err := w.flushPage(false); err != nil { @@ -523,9 +532,9 @@ func (w *WAL) log(rec []byte, final bool) error { return nil } -// Segments returns the range [m, n] of currently existing segments. -// If no segments are found, m and n are -1. -func (w *WAL) Segments() (m, n int, err error) { +// Segments returns the range [first, n] of currently existing segments. +// If no segments are found, first and n are -1. +func (w *WAL) Segments() (first, last int, err error) { refs, err := listSegments(w.dir) if err != nil { return 0, 0, err @@ -533,7 +542,7 @@ func (w *WAL) Segments() (m, n int, err error) { if len(refs) == 0 { return -1, -1, nil } - return refs[0].n, refs[len(refs)-1].n, nil + return refs[0].index, refs[len(refs)-1].index, nil } // Truncate drops all segments before i. @@ -549,10 +558,10 @@ func (w *WAL) Truncate(i int) (err error) { return err } for _, r := range refs { - if r.n >= i { + if r.index >= i { break } - if err = os.Remove(filepath.Join(w.dir, r.s)); err != nil { + if err = os.Remove(filepath.Join(w.dir, r.name)); err != nil { return err } } @@ -595,8 +604,8 @@ func (w *WAL) Close() (err error) { } type segmentRef struct { - s string - n int + name string + index int } func listSegments(dir string) (refs []segmentRef, err error) { @@ -613,11 +622,11 @@ func listSegments(dir string) (refs []segmentRef, err error) { if len(refs) > 0 && k > last+1 { return nil, errors.New("segments are not sequential") } - refs = append(refs, segmentRef{s: fn, n: k}) + refs = append(refs, segmentRef{name: fn, index: k}) last = k } sort.Slice(refs, func(i, j int) bool { - return refs[i].n < refs[j].n + return refs[i].index < refs[j].index }) return refs, nil } @@ -628,8 +637,8 @@ func NewSegmentsReader(dir string) (io.ReadCloser, error) { } // NewSegmentsRangeReader returns a new reader over the given WAL segment range. -// If m or n are -1, the range is open on the respective end. -func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { +// If first or last are -1, the range is open on the respective end. +func NewSegmentsRangeReader(dir string, first, last int) (io.ReadCloser, error) { refs, err := listSegments(dir) if err != nil { return nil, err @@ -637,13 +646,13 @@ func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { var segs []*Segment for _, r := range refs { - if m >= 0 && r.n < m { + if first >= 0 && r.index < first { continue } - if n >= 0 && r.n > n { + if last >= 0 && r.index > last { break } - s, err := OpenReadSegment(filepath.Join(dir, r.s)) + s, err := OpenReadSegment(filepath.Join(dir, r.name)) if err != nil { return nil, err } @@ -745,6 +754,10 @@ func (r *Reader) next() (err error) { // Gobble up zero bytes. if typ == recPageTerm { + // recPageTerm is a single byte that indicates that the rest of the page is padded. + // If it's the first byte in a page, buf is too small and we have to resize buf to fit pageSize-1 bytes. + buf = r.buf[1:] + // We are pedantic and check whether the zeros are actually up // to a page boundary. // It's not strictly necessary but may catch sketchy state early. diff --git a/vendor/modules.txt b/vendor/modules.txt index 54007bc467..0a180ded10 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -186,7 +186,7 @@ github.com/prometheus/common/route github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg # github.com/prometheus/procfs v0.0.0-20160411190841-abf152e5f3e9 github.com/prometheus/procfs -# github.com/prometheus/tsdb v0.0.0-20181003080831-0ce41118ed20 +# github.com/prometheus/tsdb v0.2.0 github.com/prometheus/tsdb github.com/prometheus/tsdb/labels github.com/prometheus/tsdb/chunkenc diff --git a/web/api/v1/api.go b/web/api/v1/api.go index c2f4c471af..d1d40e078e 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -228,6 +228,8 @@ func (api *API) Register(r *route.Router) { r.Get("/query_range", wrap(api.queryRange)) r.Post("/query_range", wrap(api.queryRange)) + r.Get("/labels", wrap(api.labelNames)) + r.Post("/labels", wrap(api.labelNames)) r.Get("/label/:name/values", wrap(api.labelValues)) r.Get("/series", wrap(api.series)) @@ -390,6 +392,20 @@ func returnAPIError(err error) *apiError { return &apiError{errorExec, err} } +func (api *API) labelNames(r *http.Request) (interface{}, *apiError, func()) { + q, err := api.Queryable.Querier(r.Context(), math.MinInt64, math.MaxInt64) + if err != nil { + return nil, &apiError{errorExec, err}, nil + } + defer q.Close() + + names, err := q.LabelNames() + if err != nil { + return nil, &apiError{errorExec, err}, nil + } + return names, nil, nil +} + func (api *API) labelValues(r *http.Request) (interface{}, *apiError, func()) { ctx := r.Context() name := route.Param(ctx, "name") diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 58d661192e..8ed0264a87 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -248,9 +248,9 @@ func TestEndpoints(t *testing.T) { QueryEngine: suite.QueryEngine(), targetRetriever: testTargetRetriever{}, alertmanagerRetriever: testAlertmanagerRetriever{}, + flagsMap: sampleFlagMap, now: func() time.Time { return now }, config: func() config.Config { return samplePrometheusCfg }, - flagsMap: sampleFlagMap, ready: func(f http.HandlerFunc) http.HandlerFunc { return f }, rulesRetriever: algr, } @@ -301,15 +301,51 @@ func TestEndpoints(t *testing.T) { QueryEngine: suite.QueryEngine(), targetRetriever: testTargetRetriever{}, alertmanagerRetriever: testAlertmanagerRetriever{}, + flagsMap: sampleFlagMap, now: func() time.Time { return now }, config: func() config.Config { return samplePrometheusCfg }, - flagsMap: sampleFlagMap, ready: func(f http.HandlerFunc) http.HandlerFunc { return f }, rulesRetriever: algr, } testEndpoints(t, api, false) }) + +} + +func TestLabelNames(t *testing.T) { + // TestEndpoints doesn't have enough label names to test api.labelNames + // endpoint properly. Hence we test it separately. + suite, err := promql.NewTest(t, ` + load 1m + test_metric1{foo1="bar", baz="abc"} 0+100x100 + test_metric1{foo2="boo"} 1+0x100 + test_metric2{foo="boo"} 1+0x100 + test_metric2{foo="boo", xyz="qwerty"} 1+0x100 + `) + testutil.Ok(t, err) + defer suite.Close() + testutil.Ok(t, suite.Run()) + + api := &API{ + Queryable: suite.Storage(), + } + request := func(m string) (*http.Request, error) { + if m == http.MethodPost { + r, err := http.NewRequest(m, "http://example.com", nil) + r.Header.Set("Content-Type", "application/x-www-form-urlencoded") + return r, err + } + return http.NewRequest(m, "http://example.com", nil) + } + for _, method := range []string{http.MethodGet, http.MethodPost} { + ctx := context.Background() + req, err := request(method) + testutil.Ok(t, err) + resp, apiErr, _ := api.labelNames(req.WithContext(ctx)) + assertAPIError(t, apiErr, "") + assertAPIResponse(t, resp, []string{"__name__", "baz", "foo", "foo1", "foo2", "xyz"}) + } } func setupRemote(s storage.Storage) *httptest.Server { @@ -776,6 +812,11 @@ func testEndpoints(t *testing.T, api *API, testLabelAPI bool) { }, errType: errorBadData, }, + // Label names. + { + endpoint: api.labelNames, + response: []string{"__name__", "foo"}, + }, }...) }