mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-17 02:26:56 +02:00
* Added Chunk versions of all iterating methods. It all starts in Querier/ChunkQuerier. The plan is that Storage will implement both chunked and samples. * Added Seek to chunks.Iterator interface for iterating over chunks. * Mock, NewTestSeries, SampleSeriesIterator and ChunkSeriesIterator are now available from storage package and reuses instead of being recreated in many places. NewConcreteSeries was created to replace concreteSeries. * NewMergeChunkQuerier was added; Both this and NewMergeQuerier are now using generigMergeQuerier to share the code. Generic code was added. * Both Compactor and block Querier use *exactly* the same iterators. (blockChunkSeriesSet). * Added some TODO for further simplifications in next PRs. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
168 lines
4.3 KiB
Go
168 lines
4.3 KiB
Go
// Copyright 2017 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"math"
|
|
"sort"
|
|
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
|
)
|
|
|
|
type ConcreteSeries struct {
|
|
labels labels.Labels
|
|
SampleIteratorFn func() chunkenc.Iterator
|
|
}
|
|
|
|
func NewTestSeries(lset labels.Labels, samples ...[]tsdbutil.Sample) *ConcreteSeries {
|
|
return &ConcreteSeries{
|
|
labels: lset,
|
|
SampleIteratorFn: func() chunkenc.Iterator {
|
|
var list tsdbutil.SampleSlice
|
|
for _, l := range samples {
|
|
list = append(list, l...)
|
|
}
|
|
return NewSampleIterator(list)
|
|
},
|
|
}
|
|
}
|
|
|
|
func NewSeriesFromSamples(lset labels.Labels, samples tsdbutil.Samples) Series {
|
|
return &ConcreteSeries{
|
|
labels: lset,
|
|
SampleIteratorFn: func() chunkenc.Iterator {
|
|
return NewSampleIterator(samples)
|
|
},
|
|
}
|
|
}
|
|
|
|
func (s *ConcreteSeries) Labels() labels.Labels { return s.labels }
|
|
func (s *ConcreteSeries) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() }
|
|
|
|
type SampleSeriesIterator struct {
|
|
samples tsdbutil.Samples
|
|
idx int
|
|
}
|
|
|
|
func NewSampleIterator(samples tsdbutil.Samples) chunkenc.Iterator {
|
|
return &SampleSeriesIterator{samples: samples, idx: -1}
|
|
}
|
|
|
|
func (it *SampleSeriesIterator) At() (int64, float64) {
|
|
s := it.samples.Get(it.idx)
|
|
return s.T(), s.V()
|
|
}
|
|
|
|
func (it *SampleSeriesIterator) Next() bool {
|
|
it.idx++
|
|
return it.idx < it.samples.Len()
|
|
}
|
|
|
|
func (it *SampleSeriesIterator) Seek(t int64) bool {
|
|
if it.idx == -1 {
|
|
it.idx = 0
|
|
}
|
|
// Do binary search between current position and end.
|
|
it.idx = sort.Search(it.samples.Len()-it.idx, func(i int) bool {
|
|
s := it.samples.Get(i + it.idx)
|
|
return s.T() >= t
|
|
})
|
|
|
|
return it.idx < it.samples.Len()
|
|
}
|
|
|
|
func (it *SampleSeriesIterator) Err() error { return nil }
|
|
|
|
type ChunkConcreteSeries struct {
|
|
labels labels.Labels
|
|
ChunkIteratorFn func() chunks.Iterator
|
|
}
|
|
|
|
func NewTestChunkSeries(lset labels.Labels, samples ...[]tsdbutil.Sample) *ChunkConcreteSeries {
|
|
var chks []chunks.Meta
|
|
|
|
return &ChunkConcreteSeries{
|
|
labels: lset,
|
|
ChunkIteratorFn: func() chunks.Iterator {
|
|
// Inefficient chunks encoding implementation, not caring about chunk size.
|
|
for _, s := range samples {
|
|
chks = append(chks, tsdbutil.ChunkFromSamples(s))
|
|
}
|
|
return NewListChunkSeriesIterator(chks...)
|
|
},
|
|
}
|
|
}
|
|
|
|
func (s *ChunkConcreteSeries) Labels() labels.Labels { return s.labels }
|
|
func (s *ChunkConcreteSeries) Iterator() chunks.Iterator { return s.ChunkIteratorFn() }
|
|
|
|
type ChunkReader interface {
|
|
Chunk(ref uint64) (chunkenc.Chunk, error)
|
|
}
|
|
|
|
type ChunkSeriesIterator struct {
|
|
chks []chunks.Meta
|
|
|
|
idx int
|
|
}
|
|
|
|
func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator {
|
|
return &ChunkSeriesIterator{chks: chks, idx: -1}
|
|
}
|
|
|
|
func (it *ChunkSeriesIterator) At() chunks.Meta {
|
|
return it.chks[it.idx]
|
|
}
|
|
|
|
func (it *ChunkSeriesIterator) Next() bool {
|
|
it.idx++
|
|
return it.idx < len(it.chks)
|
|
}
|
|
|
|
func (it *ChunkSeriesIterator) Err() error { return nil }
|
|
|
|
func NewListChunkSeries(lset labels.Labels, chks ...chunks.Meta) ChunkSeries {
|
|
return &ChunkConcreteSeries{
|
|
labels: lset,
|
|
ChunkIteratorFn: func() chunks.Iterator {
|
|
// Inefficient chunks encoding implementation, not caring about chunk size.
|
|
return NewListChunkSeriesIterator(chks...)
|
|
},
|
|
}
|
|
}
|
|
|
|
func ExpandSamples(iter chunkenc.Iterator) ([]tsdbutil.Sample, error) {
|
|
var result []tsdbutil.Sample
|
|
for iter.Next() {
|
|
t, v := iter.At()
|
|
// NaNs can't be compared normally, so substitute for another value.
|
|
if math.IsNaN(v) {
|
|
v = -42
|
|
}
|
|
result = append(result, sample{t, v})
|
|
}
|
|
return result, iter.Err()
|
|
}
|
|
|
|
func ExpandChunks(iter chunks.Iterator) ([]chunks.Meta, error) {
|
|
var result []chunks.Meta
|
|
for iter.Next() {
|
|
result = append(result, iter.At())
|
|
}
|
|
return result, iter.Err()
|
|
}
|