diff --git a/tsdb/block.go b/tsdb/block.go index e302ce8fa1..fa8e896e3b 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -112,7 +112,7 @@ type ChunkReader interface { // BlockReader provides reading access to a data block. type BlockReader interface { // Index returns an IndexReader over the block's data. - Index() (IndexReader, error) + Index(mint, maxt int64) (IndexReader, error) // Chunks returns a ChunkReader over the block's data. Chunks() (ChunkReader, error) @@ -372,7 +372,7 @@ func (pb *Block) startRead() error { } // Index returns a new IndexReader against the block data. -func (pb *Block) Index() (IndexReader, error) { +func (pb *Block) Index(mint, maxt int64) (IndexReader, error) { if err := pb.startRead(); err != nil { return nil, err } diff --git a/tsdb/compact.go b/tsdb/compact.go index 2d7376e1af..f0cf28d9b4 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -683,7 +683,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - indexr, err := b.Index() + indexr, err := b.Index(math.MinInt64, math.MaxInt64) if err != nil { return errors.Wrapf(err, "open index reader for block %s", b) } diff --git a/tsdb/head.go b/tsdb/head.go index abf356e1ac..a727eef99c 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -763,7 +763,7 @@ func NewRangeHead(head *Head, mint, maxt int64) *RangeHead { } } -func (h *RangeHead) Index() (IndexReader, error) { +func (h *RangeHead) Index(mint, maxt int64) (IndexReader, error) { return h.head.indexRange(h.mint, h.maxt), nil } @@ -1162,8 +1162,8 @@ func (h *Head) Tombstones() (tombstones.Reader, error) { } // Index returns an IndexReader against the block. -func (h *Head) Index() (IndexReader, error) { - return h.indexRange(math.MinInt64, math.MaxInt64), nil +func (h *Head) Index(mint, maxt int64) (IndexReader, error) { + return h.indexRange(mint, maxt), nil } func (h *Head) indexRange(mint, maxt int64) *headIndexReader { @@ -1349,7 +1349,25 @@ func (h *headIndexReader) LabelNames() ([]string, error) { func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { res := make([]index.Postings, 0, len(values)) for _, value := range values { - res = append(res, h.head.postings.Get(name, value)) + p := h.head.postings.Get(name, value) + // Filter out series not in the time range, to avoid + // later on building up all the chunk metadata just to + // discard it. + filtered := []uint64{} + for p.Next() { + s := h.head.series.getByID(p.At()) + if s == nil { + level.Debug(h.head.logger).Log("msg", "looked up series not found") + continue + } + if s.minTime() <= h.maxt && s.maxTime() >= h.mint { + filtered = append(filtered, p.At()) + } + } + if p.Err() != nil { + return nil, p.Err() + } + res = append(res, index.NewListPostings(filtered)) } return index.Merge(res...), nil } diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index c1ada34171..954541395a 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "fmt" "strconv" "sync/atomic" "testing" @@ -48,3 +49,35 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { } }) } + +func BenchmarkHeadSeries(b *testing.B) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(b, err) + defer h.Close() + app := h.Appender() + numSeries := 1000000 + for i := 0; i < numSeries; i++ { + app.Add(labels.FromStrings("foo", "bar", "i", strconv.Itoa(i)), int64(i), 0) + } + testutil.Ok(b, app.Commit()) + + matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") + + for s := 1; s <= numSeries; s *= 10 { + b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) { + q, err := NewBlockQuerier(h, 0, int64(s-1)) + testutil.Ok(b, err) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ss, err := q.Select(matcher) + testutil.Ok(b, err) + for ss.Next() { + } + testutil.Ok(b, ss.Err()) + } + q.Close() + }) + + } +} diff --git a/tsdb/querier.go b/tsdb/querier.go index ed5f7351cb..a5181c3f69 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -160,7 +160,7 @@ func (q *verticalQuerier) sel(p *storage.SelectParams, qs []storage.Querier, ms // NewBlockQuerier returns a querier against the reader. func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) { - indexr, err := b.Index() + indexr, err := b.Index(mint, maxt) if err != nil { return nil, errors.Wrapf(err, "open index reader") }