prometheus/storage/series_set.go
Bartlomiej Plotka af5a7d1078 Add Chunks* Iterator/Querier/Selector to storage interface.
* Added Chunk versions of all iterating methods. It all starts in Querier/ChunkQuerier. The plan is that
Storage will implement both chunked and samples.
* Added Seek to chunks.Iterator interface for iterating over chunks.
* Mock, NewTestSeries, SampleSeriesIterator and ChunkSeriesIterator are now available from storage package and reuses instead of
being recreated in many places. NewConcreteSeries was created to replace concreteSeries.
* NewMergeChunkQuerier was added; Both this and NewMergeQuerier are now using generigMergeQuerier to share the code. Generic code was added.
* Both Compactor and block Querier use *exactly* the same iterators. (blockChunkSeriesSet).
* Added some TODO for further simplifications in next PRs.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
2020-03-16 18:00:26 +00:00

65 lines
1.6 KiB
Go

// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import "github.com/prometheus/prometheus/tsdb/chunkenc"
type chunkSetToSeriesSet struct {
ChunkSeriesSet
chkIterErr error
sameSeriesChunks []Series
bufIterator chunkenc.Iterator
}
func NewSeriesSetFromChunkSeriesSet(chk ChunkSeriesSet) SeriesSet {
return &chunkSetToSeriesSet{ChunkSeriesSet: chk}
}
func (c *chunkSetToSeriesSet) Next() bool {
if c.Err() != nil || !c.ChunkSeriesSet.Next() {
return false
}
iter := c.ChunkSeriesSet.At().Iterator()
c.sameSeriesChunks = c.sameSeriesChunks[:0]
for iter.Next() {
c.sameSeriesChunks = append(c.sameSeriesChunks, &ConcreteSeries{
labels: c.ChunkSeriesSet.At().Labels(),
SampleIteratorFn: func() chunkenc.Iterator {
return iter.At().Chunk.Iterator(c.bufIterator)
},
})
}
if iter.Err() != nil {
c.chkIterErr = iter.Err()
return false
}
return true
}
func (c *chunkSetToSeriesSet) At() Series {
return ChainedSeriesMerge(c.sameSeriesChunks...)
}
func (c *chunkSetToSeriesSet) Err() error {
if c.chkIterErr != nil {
return c.chkIterErr
}
return c.ChunkSeriesSet.Err()
}