diff --git a/block.go b/block.go index 59a9d4dc18..8456cb3762 100644 --- a/block.go +++ b/block.go @@ -283,8 +283,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { return ErrClosing } - pr := newPostingsReader(pb.indexr) - p, absent, err := pr.Select(ms...) + p, absent, err := PostingsForMatchers(pb.indexr, ms...) if err != nil { return errors.Wrap(err, "select series") } diff --git a/compact.go b/compact.go index 48d77f72c8..2dc939bc1e 100644 --- a/compact.go +++ b/compact.go @@ -453,7 +453,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // of the provided blocks. It returns meta information for the new block. func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { var ( - set compactionSet + set ChunkSeriesSet allSymbols = make(map[string]struct{}, 1<<16) closers = []io.Closer{} ) @@ -597,18 +597,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return nil } -type compactionSet interface { - Next() bool - At() (labels.Labels, []ChunkMeta, Intervals) - Err() error -} - type compactionSeriesSet struct { p Postings index IndexReader chunks ChunkReader tombstones TombstoneReader - series SeriesSet l labels.Labels c []ChunkMeta @@ -679,7 +672,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) { } type compactionMerger struct { - a, b compactionSet + a, b ChunkSeriesSet aok, bok bool l labels.Labels @@ -692,7 +685,7 @@ type compactionSeries struct { chunks []*ChunkMeta } -func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { +func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { c := &compactionMerger{ a: a, b: b, diff --git a/head.go b/head.go index d8614add7e..d149cb1d9c 100644 --- a/head.go +++ b/head.go @@ -574,8 +574,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := h.indexRange(mint, maxt) - pr := newPostingsReader(ir) - p, absent, err := pr.Select(ms...) + p, absent, err := PostingsForMatchers(ir, ms...) if err != nil { return errors.Wrap(err, "select series") } diff --git a/querier.go b/querier.go index faa29ecda8..37672c7156 100644 --- a/querier.go +++ b/querier.go @@ -151,22 +151,13 @@ type blockQuerier struct { } func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) { - pr := newPostingsReader(q.index) - - p, absent, err := pr.Select(ms...) + base, err := LookupChunkSeries(q.index, q.tombstones, ms...) if err != nil { return nil, err } - return &blockSeriesSet{ set: &populatedChunkSeries{ - set: &baseChunkSeries{ - p: p, - index: q.index, - absent: absent, - - tombstones: q.tombstones, - }, + set: base, chunks: q.chunks, mint: q.mint, maxt: q.maxt, @@ -208,16 +199,10 @@ func (q *blockQuerier) Close() error { return merr.Err() } -// postingsReader is used to select matching postings from an IndexReader. -type postingsReader struct { - index IndexReader -} - -func newPostingsReader(i IndexReader) *postingsReader { - return &postingsReader{index: i} -} - -func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error) { +// PostingsForMatchers assembles a single postings iterator against the index reader +// based on the given matchers. It returns a list of label names that must be manually +// checked to not exist in series the postings list points to. +func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) { var ( its []Postings absent []string @@ -229,16 +214,13 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error absent = append(absent, m.Name()) continue } - it, err := r.selectSingle(m) + it, err := postingsForMatcher(index, m) if err != nil { return nil, nil, err } its = append(its, it) } - - p := Intersect(its...) - - return r.index.SortedPostings(p), absent, nil + return index.SortedPostings(Intersect(its...)), absent, nil } // tuplesByPrefix uses binary search to find prefix matches within ts. @@ -272,17 +254,17 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) return matches, nil } -func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) { +func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) { // Fast-path for equal matching. if em, ok := m.(*labels.EqualMatcher); ok { - it, err := r.index.Postings(em.Name(), em.Value()) + it, err := index.Postings(em.Name(), em.Value()) if err != nil { return nil, err } return it, nil } - tpls, err := r.index.LabelValues(m.Name()) + tpls, err := index.LabelValues(m.Name()) if err != nil { return nil, err } @@ -313,7 +295,7 @@ func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) { var rit []Postings for _, v := range res { - it, err := r.index.Postings(m.Name(), v) + it, err := index.Postings(m.Name(), v) if err != nil { return nil, err } @@ -435,7 +417,7 @@ func (s *mergedSeriesSet) Next() bool { return true } -type chunkSeriesSet interface { +type ChunkSeriesSet interface { Next() bool At() (labels.Labels, []ChunkMeta, Intervals) Err() error @@ -455,6 +437,24 @@ type baseChunkSeries struct { err error } +// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet +// over them. It drops chunks based on tombstones in the given reader. +func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { + if tr == nil { + tr = EmptyTombstoneReader() + } + p, absent, err := PostingsForMatchers(ir, ms...) + if err != nil { + return nil, err + } + return &baseChunkSeries{ + p: p, + index: ir, + tombstones: tr, + absent: absent, + }, nil +} + func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { return s.lset, s.chks, s.intervals } @@ -518,7 +518,7 @@ Outer: // with known chunk references. It filters out chunks that do not fit the // given time range. type populatedChunkSeries struct { - set chunkSeriesSet + set ChunkSeriesSet chunks ChunkReader mint, maxt int64 @@ -575,7 +575,7 @@ func (s *populatedChunkSeries) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { - set chunkSeriesSet + set ChunkSeriesSet err error cur Series