diff --git a/model/labels/sharding.go b/model/labels/sharding.go new file mode 100644 index 0000000000..6b4119860a --- /dev/null +++ b/model/labels/sharding.go @@ -0,0 +1,47 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !stringlabels + +package labels + +import ( + "github.com/cespare/xxhash/v2" +) + +// StableHash is a labels hashing implementation which is guaranteed to not change over time. +// This function should be used whenever labels hashing backward compatibility must be guaranteed. +func StableHash(ls Labels) uint64 { + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + for i, v := range ls { + if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB+ do not allocate whole entry. + h := xxhash.New() + _, _ = h.Write(b) + for _, v := range ls[i:] { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(seps) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(seps) + } + return h.Sum64() + } + + b = append(b, v.Name...) + b = append(b, seps[0]) + b = append(b, v.Value...) + b = append(b, seps[0]) + } + return xxhash.Sum64(b) +} diff --git a/model/labels/sharding_stringlabels.go b/model/labels/sharding_stringlabels.go new file mode 100644 index 0000000000..3ad2027d8c --- /dev/null +++ b/model/labels/sharding_stringlabels.go @@ -0,0 +1,54 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build stringlabels + +package labels + +import ( + "github.com/cespare/xxhash/v2" +) + +// StableHash is a labels hashing implementation which is guaranteed to not change over time. +// This function should be used whenever labels hashing backward compatibility must be guaranteed. +func StableHash(ls Labels) uint64 { + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + var h *xxhash.Digest + for i := 0; i < len(ls.data); { + var v Label + v.Name, i = decodeString(ls.data, i) + v.Value, i = decodeString(ls.data, i) + if h == nil && len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB+, switch to Write API. Copy in the values up to this point. + h = xxhash.New() + _, _ = h.Write(b) + } + if h != nil { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(seps) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(seps) + continue + } + + b = append(b, v.Name...) + b = append(b, seps[0]) + b = append(b, v.Value...) + b = append(b, seps[0]) + } + if h != nil { + return h.Sum64() + } + return xxhash.Sum64(b) +} diff --git a/model/labels/sharding_test.go b/model/labels/sharding_test.go new file mode 100644 index 0000000000..78e3047509 --- /dev/null +++ b/model/labels/sharding_test.go @@ -0,0 +1,32 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package labels + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestStableHash tests that StableHash is stable. +// The hashes this test asserts should not be changed. +func TestStableHash(t *testing.T) { + for expectedHash, lbls := range map[uint64]Labels{ + 0xef46db3751d8e999: EmptyLabels(), + 0x347c8ee7a9e29708: FromStrings("hello", "world"), + 0xcbab40540f26097d: FromStrings(MetricName, "metric", "label", "value"), + } { + require.Equal(t, expectedHash, StableHash(lbls)) + } +} diff --git a/storage/interface.go b/storage/interface.go index 675e44c0ee..347e779b56 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -197,6 +197,20 @@ type SelectHints struct { By bool // Indicate whether it is without or by. Range int64 // Range vector selector range in milliseconds. + // ShardCount is the total number of shards that series should be split into + // at query time. Then, only series in the ShardIndex shard will be returned + // by the query. + // + // ShardCount equal to 0 means that sharding is disabled. + ShardCount uint64 + + // ShardIndex is the series shard index to query. The index must be between 0 and ShardCount-1. + // When ShardCount is set to a value > 0, then a query will only process series within the + // ShardIndex's shard. + // + // Series are sharded by "labels stable hash" mod "ShardCount". + ShardIndex uint64 + // DisableTrimming allows to disable trimming of matching series chunks based on query Start and End time. // When disabled, the result may contain samples outside the queried time range but Select() performances // may be improved. diff --git a/tsdb/block.go b/tsdb/block.go index e2562de03c..7833e187ff 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -81,6 +81,11 @@ type IndexReader interface { // by the label set of the underlying series. SortedPostings(index.Postings) index.Postings + // ShardedPostings returns a postings list filtered by the provided shardIndex + // out of shardCount. For a given posting, its shard MUST be computed hashing + // the series labels mod shardCount, using a hash function which is consistent over time. + ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings + // Series populates the given builder and chunk metas for the series identified // by the reference. // Returns storage.ErrNotFound if the ref does not resolve to a known series. @@ -517,6 +522,10 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { return r.ir.SortedPostings(p) } +func (r blockIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + return r.ir.ShardedPostings(p, shardIndex, shardCount) +} + func (r blockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { if err := r.ir.Series(ref, builder, chks); err != nil { return fmt.Errorf("block: %s: %w", r.b.Meta().ULID, err) diff --git a/tsdb/db.go b/tsdb/db.go index 2436fab2ac..e9265c55e5 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -84,6 +84,7 @@ func DefaultOptions() *Options { HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, OutOfOrderCapMax: DefaultOutOfOrderCapMax, EnableOverlappingCompaction: true, + EnableSharding: false, } } @@ -186,6 +187,9 @@ type Options struct { // they'd rather keep overlapping blocks and let another component do the overlapping compaction later. // For Prometheus, this will always be true. EnableOverlappingCompaction bool + + // EnableSharding enables query sharding support in TSDB. + EnableSharding bool } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -875,6 +879,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms) headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow) headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax) + headOpts.EnableSharding = opts.EnableSharding if opts.WALReplayConcurrency > 0 { headOpts.WALReplayConcurrency = opts.WALReplayConcurrency } diff --git a/tsdb/head.go b/tsdb/head.go index cdcd3ea568..86ab7fe06c 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -176,6 +176,9 @@ type HeadOptions struct { // The default value is GOMAXPROCS. // If it is set to a negative value or zero, the default value is used. WALReplayConcurrency int + + // EnableSharding enables ShardedPostings() support in the Head. + EnableSharding bool } const ( @@ -1663,7 +1666,12 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, h.opts.IsolationDisabled) + shardHash := uint64(0) + if h.opts.EnableSharding { + shardHash = labels.StableHash(lset) + } + + return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled) }) if err != nil { return nil, false, err @@ -2022,6 +2030,10 @@ type memSeries struct { lset labels.Labels meta *metadata.Metadata + // Series labels hash to use for sharding purposes. The value is always 0 when sharding has not + // been explicitly enabled in TSDB. + shardHash uint64 + // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps. // When compaction runs, chunks get moved into a block and all pointers are shifted like so: // @@ -2071,11 +2083,12 @@ type memSeriesOOOFields struct { firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled bool) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - nextAt: math.MinInt64, + lset: lset, + ref: id, + nextAt: math.MinInt64, + shardHash: shardHash, } if !isolationDisabled { s.txs = newTxRing(0) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 29f845c7bd..457d3e1c47 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -149,7 +149,35 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { return index.NewListPostings(ep) } +// ShardedPostings implements IndexReader. This function returns an failing postings list if sharding +// has not been enabled in the Head. +func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + if !h.head.opts.EnableSharding { + return index.ErrPostings(errors.New("sharding is disabled")) + } + + out := make([]storage.SeriesRef, 0, 128) + + for p.Next() { + s := h.head.series.getByID(chunks.HeadSeriesRef(p.At())) + if s == nil { + level.Debug(h.head.logger).Log("msg", "Looked up series not found") + continue + } + + // Check if the series belong to the shard. + if s.shardHash%shardCount != shardIndex { + continue + } + + out = append(out, storage.SeriesRef(s.ref)) + } + + return index.NewListPostings(out) +} + // Series returns the series for the given reference. +// Chunks are skipped if chks is nil. func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { s := h.head.series.getByID(chunks.HeadSeriesRef(ref)) @@ -159,6 +187,10 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB } builder.Assign(s.lset) + if chks == nil { + return nil + } + s.Lock() defer s.Unlock() diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index b063512019..de97d70a56 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -526,7 +526,7 @@ func TestMemSeries_chunk(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - series := newMemSeries(labels.EmptyLabels(), 1, true) + series := newMemSeries(labels.EmptyLabels(), 1, 0, true) if tc.setup != nil { tc.setup(t, series, chunkDiskMapper) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 83abeacbeb..90e187b58c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -52,20 +52,30 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" ) -func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) { - dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) - require.NoError(t, err) - +// newTestHeadDefaultOptions returns the HeadOptions that should be used by default in unit tests. +func newTestHeadDefaultOptions(chunkRange int64, oooEnabled bool) *HeadOptions { opts := DefaultHeadOptions() opts.ChunkRange = chunkRange - opts.ChunkDirRoot = dir opts.EnableExemplarStorage = true opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) opts.EnableNativeHistograms.Store(true) if oooEnabled { opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) } + return opts +} + +func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) { + return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, oooEnabled)) +} + +func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts *HeadOptions) (*Head, *wlog.WL) { + dir := t.TempDir() + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) + require.NoError(t, err) + + // Override the chunks dir with the testing one. + opts.ChunkDirRoot = dir h, err := NewHead(nil, nil, wal, nil, opts, nil) require.NoError(t, err) @@ -342,7 +352,7 @@ func BenchmarkLoadWLs(b *testing.B) { } for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. - s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled) s.append(c.mmappedChunkT, 42, 0, cOpts) // There's only one head chunk because only a single sample is appended. mmapChunks() // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with @@ -912,7 +922,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { }, } - s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled) + s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled) for i := 0; i < 4000; i += 5 { ok, _ := s.append(int64(i), float64(i), 0, cOpts) @@ -1053,7 +1063,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - series := newMemSeries(labels.EmptyLabels(), 1, true) + series := newMemSeries(labels.EmptyLabels(), 1, 0, true) cOpts := chunkOpts{ chunkDiskMapper: chunkDiskMapper, @@ -1631,7 +1641,7 @@ func TestMemSeries_append(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) // Add first two samples at the very end of a chunk range and the next two // on and after it. @@ -1692,7 +1702,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) histograms := tsdbutil.GenerateTestHistograms(4) histogramWithOneMoreBucket := histograms[3].Copy() @@ -1754,7 +1764,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { samplesPerChunk: samplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) // At this slow rate, we will fill the chunk in two block durations. slowRate := (DefaultBlockDuration * 2) / samplesPerChunk @@ -2900,6 +2910,71 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) { } } +func TestHeadShardedPostings(t *testing.T) { + headOpts := newTestHeadDefaultOptions(1000, false) + headOpts.EnableSharding = true + head, _ := newTestHeadWithOptions(t, wlog.CompressionNone, headOpts) + defer func() { + require.NoError(t, head.Close()) + }() + + ctx := context.Background() + + // Append some series. + app := head.Appender(ctx) + for i := 0; i < 100; i++ { + _, err := app.Append(0, labels.FromStrings("unique", fmt.Sprintf("value%d", i), "const", "1"), 100, 0) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + ir := head.indexRange(0, 200) + + // List all postings for a given label value. This is what we expect to get + // in output from all shards. + p, err := ir.Postings(ctx, "const", "1") + require.NoError(t, err) + + var expected []storage.SeriesRef + for p.Next() { + expected = append(expected, p.At()) + } + require.NoError(t, p.Err()) + require.NotEmpty(t, expected) + + // Query the same postings for each shard. + const shardCount = uint64(4) + actualShards := make(map[uint64][]storage.SeriesRef) + actualPostings := make([]storage.SeriesRef, 0, len(expected)) + + for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ { + p, err = ir.Postings(ctx, "const", "1") + require.NoError(t, err) + + p = ir.ShardedPostings(p, shardIndex, shardCount) + for p.Next() { + ref := p.At() + + actualShards[shardIndex] = append(actualShards[shardIndex], ref) + actualPostings = append(actualPostings, ref) + } + require.NoError(t, p.Err()) + } + + // We expect the postings merged out of shards is the exact same of the non sharded ones. + require.ElementsMatch(t, expected, actualPostings) + + // We expect the series in each shard are the expected ones. + for shardIndex, ids := range actualShards { + for _, id := range ids { + var lbls labels.ScratchBuilder + + require.NoError(t, ir.Series(id, &lbls, nil)) + require.Equal(t, shardIndex, labels.StableHash(lbls.Labels())%shardCount) + } + } +} + func TestErrReuseAppender(t *testing.T) { head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { @@ -3038,7 +3113,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) for i := 0; i < 7; i++ { ok, _ := s.append(int64(i), float64(i), 0, cOpts) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index c2ca581f7c..36b8878bc4 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1744,6 +1744,33 @@ func (r *Reader) SortedPostings(p Postings) Postings { return p } +// ShardedPostings returns a postings list filtered by the provided shardIndex out of shardCount. +func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Postings { + var ( + out = make([]storage.SeriesRef, 0, 128) + bufLbls = labels.ScratchBuilder{} + ) + + for p.Next() { + id := p.At() + + // Get the series labels (no chunks). + err := r.Series(id, &bufLbls, nil) + if err != nil { + return ErrPostings(fmt.Errorf("series %d not found", id)) + } + + // Check if the series belong to the shard. + if labels.StableHash(bufLbls.Labels())%shardCount != shardIndex { + continue + } + + out = append(out, id) + } + + return NewListPostings(out) +} + // Size returns the size of an index file. func (r *Reader) Size() int64 { return int64(r.b.Len()) @@ -1864,9 +1891,12 @@ func (dec *Decoder) LabelValueFor(ctx context.Context, b []byte, label string) ( // Series decodes a series entry from the given byte slice into builder and chks. // Previous contents of builder can be overwritten - make sure you copy before retaining. +// Skips reading chunks metadata if chks is nil. func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { builder.Reset() - *chks = (*chks)[:0] + if chks != nil { + *chks = (*chks)[:0] + } d := encoding.Decbuf{B: b} @@ -1892,6 +1922,11 @@ func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chu builder.Add(ln, lv) } + // Skip reading chunks metadata if chks is nil. + if chks == nil { + return d.Err() + } + // Read the chunks meta data. k = d.Uvarint() diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 369d337384..ef88870355 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -242,6 +242,58 @@ func TestIndexRW_Postings(t *testing.T) { }, labelIndices) require.NoError(t, ir.Close()) + + t.Run("ShardedPostings()", func(t *testing.T) { + ir, err := NewFileReader(fn) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, ir.Close()) + }) + + // List all postings for a given label value. This is what we expect to get + // in output from all shards. + p, err = ir.Postings(ctx, "a", "1") + require.NoError(t, err) + + var expected []storage.SeriesRef + for p.Next() { + expected = append(expected, p.At()) + } + require.NoError(t, p.Err()) + require.NotEmpty(t, expected) + + // Query the same postings for each shard. + const shardCount = uint64(4) + actualShards := make(map[uint64][]storage.SeriesRef) + actualPostings := make([]storage.SeriesRef, 0, len(expected)) + + for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ { + p, err = ir.Postings(ctx, "a", "1") + require.NoError(t, err) + + p = ir.ShardedPostings(p, shardIndex, shardCount) + for p.Next() { + ref := p.At() + + actualShards[shardIndex] = append(actualShards[shardIndex], ref) + actualPostings = append(actualPostings, ref) + } + require.NoError(t, p.Err()) + } + + // We expect the postings merged out of shards is the exact same of the non sharded ones. + require.ElementsMatch(t, expected, actualPostings) + + // We expect the series in each shard are the expected ones. + for shardIndex, ids := range actualShards { + for _, id := range ids { + var lbls labels.ScratchBuilder + + require.NoError(t, ir.Series(id, &lbls, nil)) + require.Equal(t, shardIndex, labels.StableHash(lbls.Labels())%shardCount) + } + } + }) } func TestPostingsMany(t *testing.T) { @@ -565,6 +617,55 @@ func TestSymbols(t *testing.T) { require.NoError(t, iter.Err()) } +func BenchmarkReader_ShardedPostings(b *testing.B) { + const ( + numSeries = 10000 + numShards = 16 + ) + + dir, err := os.MkdirTemp("", "benchmark_reader_sharded_postings") + require.NoError(b, err) + defer func() { + require.NoError(b, os.RemoveAll(dir)) + }() + + ctx := context.Background() + + // Generate an index. + fn := filepath.Join(dir, indexFilename) + + iw, err := NewWriter(ctx, fn) + require.NoError(b, err) + + for i := 1; i <= numSeries; i++ { + require.NoError(b, iw.AddSymbol(fmt.Sprintf("%10d", i))) + } + require.NoError(b, iw.AddSymbol("const")) + require.NoError(b, iw.AddSymbol("unique")) + + for i := 1; i <= numSeries; i++ { + require.NoError(b, iw.AddSeries(storage.SeriesRef(i), + labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i)))) + } + + require.NoError(b, iw.Close()) + + b.ResetTimer() + + // Create a reader to read back all postings from the index. + ir, err := NewFileReader(fn) + require.NoError(b, err) + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + allPostings, err := ir.Postings(ctx, "const", fmt.Sprintf("%10d", 1)) + require.NoError(b, err) + + ir.ShardedPostings(allPostings, uint64(n%numShards), numShards) + } +} + func TestDecoder_Postings_WrongInput(t *testing.T) { _, _, err := (&Decoder{}).Postings([]byte("the cake is a lie")) require.Error(t, err) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 440130f7db..3a801bf98b 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -440,6 +440,10 @@ func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.P return p } +func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + return ir.ch.oooIR.ShardedPostings(p, shardIndex, shardCount) +} + func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef) } diff --git a/tsdb/querier.go b/tsdb/querier.go index a692c98f1a..ab2c53d70d 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -131,11 +131,15 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora mint := q.mint maxt := q.maxt disableTrimming := false + sharded := hints != nil && hints.ShardCount > 0 p, err := PostingsForMatchers(ctx, q.index, ms...) if err != nil { return storage.ErrSeriesSet(err) } + if sharded { + p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + } if sortSeries { p = q.index.SortedPostings(p) } @@ -171,6 +175,8 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints * mint := q.mint maxt := q.maxt disableTrimming := false + sharded := hints != nil && hints.ShardCount > 0 + if hints != nil { mint = hints.Start maxt = hints.End @@ -180,6 +186,9 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints * if err != nil { return storage.ErrChunkSeriesSet(err) } + if sharded { + p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + } if sortSeries { p = q.index.SortedPostings(p) } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index fcedc54621..c2bd717082 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2326,6 +2326,27 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings { return index.NewListPostings(ep) } +func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + out := make([]storage.SeriesRef, 0, 128) + + for p.Next() { + ref := p.At() + s, ok := m.series[ref] + if !ok { + continue + } + + // Check if the series belong to the shard. + if s.l.Hash()%shardCount != shardIndex { + continue + } + + out = append(out, ref) + } + + return index.NewListPostings(out) +} + func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { s, ok := m.series[ref] if !ok { @@ -3272,6 +3293,10 @@ func (m mockMatcherIndex) SortedPostings(p index.Postings) index.Postings { return index.EmptyPostings() } +func (m mockMatcherIndex) ShardedPostings(ps index.Postings, shardIndex, shardCount uint64) index.Postings { + return ps +} + func (m mockMatcherIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { return nil }