From 501bc6419ee54ee5617d75c3bc16ecf36d94c552 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 29 Jan 2024 12:57:27 +0100 Subject: [PATCH] Add ShardedPostings() support to TSDB (#10421) This PR is a reference implementation of the proposal described in #10420. In addition to what described in #10420, in this PR I've introduced labels.StableHash(). The idea is to offer an hashing function which doesn't change over time, and that's used by query sharding in order to get a stable behaviour over time. The implementation of labels.StableHash() is the hashing function used by Prometheus before stringlabels, and what's used by Grafana Mimir for query sharding (because built before stringlabels was a thing). Follow up work As mentioned in #10420, if this PR is accepted I'm also open to upload another foundamental piece used by Grafana Mimir query sharding to accelerate the query execution: an optional, configurable and fast in-memory cache for the series hashes. Signed-off-by: Marco Pracucci --- model/labels/sharding.go | 47 ++++++++++++ model/labels/sharding_stringlabels.go | 54 ++++++++++++++ model/labels/sharding_test.go | 32 ++++++++ storage/interface.go | 14 ++++ tsdb/block.go | 9 +++ tsdb/db.go | 5 ++ tsdb/head.go | 23 ++++-- tsdb/head_read.go | 32 ++++++++ tsdb/head_read_test.go | 2 +- tsdb/head_test.go | 101 ++++++++++++++++++++++---- tsdb/index/index.go | 37 +++++++++- tsdb/index/index_test.go | 101 ++++++++++++++++++++++++++ tsdb/ooo_head_read.go | 4 + tsdb/querier.go | 9 +++ tsdb/querier_test.go | 25 +++++++ 15 files changed, 475 insertions(+), 20 deletions(-) create mode 100644 model/labels/sharding.go create mode 100644 model/labels/sharding_stringlabels.go create mode 100644 model/labels/sharding_test.go diff --git a/model/labels/sharding.go b/model/labels/sharding.go new file mode 100644 index 0000000000..6b4119860a --- /dev/null +++ b/model/labels/sharding.go @@ -0,0 +1,47 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !stringlabels + +package labels + +import ( + "github.com/cespare/xxhash/v2" +) + +// StableHash is a labels hashing implementation which is guaranteed to not change over time. +// This function should be used whenever labels hashing backward compatibility must be guaranteed. +func StableHash(ls Labels) uint64 { + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + for i, v := range ls { + if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB+ do not allocate whole entry. + h := xxhash.New() + _, _ = h.Write(b) + for _, v := range ls[i:] { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(seps) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(seps) + } + return h.Sum64() + } + + b = append(b, v.Name...) + b = append(b, seps[0]) + b = append(b, v.Value...) + b = append(b, seps[0]) + } + return xxhash.Sum64(b) +} diff --git a/model/labels/sharding_stringlabels.go b/model/labels/sharding_stringlabels.go new file mode 100644 index 0000000000..3ad2027d8c --- /dev/null +++ b/model/labels/sharding_stringlabels.go @@ -0,0 +1,54 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build stringlabels + +package labels + +import ( + "github.com/cespare/xxhash/v2" +) + +// StableHash is a labels hashing implementation which is guaranteed to not change over time. +// This function should be used whenever labels hashing backward compatibility must be guaranteed. +func StableHash(ls Labels) uint64 { + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + var h *xxhash.Digest + for i := 0; i < len(ls.data); { + var v Label + v.Name, i = decodeString(ls.data, i) + v.Value, i = decodeString(ls.data, i) + if h == nil && len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB+, switch to Write API. Copy in the values up to this point. + h = xxhash.New() + _, _ = h.Write(b) + } + if h != nil { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(seps) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(seps) + continue + } + + b = append(b, v.Name...) + b = append(b, seps[0]) + b = append(b, v.Value...) + b = append(b, seps[0]) + } + if h != nil { + return h.Sum64() + } + return xxhash.Sum64(b) +} diff --git a/model/labels/sharding_test.go b/model/labels/sharding_test.go new file mode 100644 index 0000000000..78e3047509 --- /dev/null +++ b/model/labels/sharding_test.go @@ -0,0 +1,32 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package labels + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestStableHash tests that StableHash is stable. +// The hashes this test asserts should not be changed. +func TestStableHash(t *testing.T) { + for expectedHash, lbls := range map[uint64]Labels{ + 0xef46db3751d8e999: EmptyLabels(), + 0x347c8ee7a9e29708: FromStrings("hello", "world"), + 0xcbab40540f26097d: FromStrings(MetricName, "metric", "label", "value"), + } { + require.Equal(t, expectedHash, StableHash(lbls)) + } +} diff --git a/storage/interface.go b/storage/interface.go index 675e44c0ee..347e779b56 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -197,6 +197,20 @@ type SelectHints struct { By bool // Indicate whether it is without or by. Range int64 // Range vector selector range in milliseconds. + // ShardCount is the total number of shards that series should be split into + // at query time. Then, only series in the ShardIndex shard will be returned + // by the query. + // + // ShardCount equal to 0 means that sharding is disabled. + ShardCount uint64 + + // ShardIndex is the series shard index to query. The index must be between 0 and ShardCount-1. + // When ShardCount is set to a value > 0, then a query will only process series within the + // ShardIndex's shard. + // + // Series are sharded by "labels stable hash" mod "ShardCount". + ShardIndex uint64 + // DisableTrimming allows to disable trimming of matching series chunks based on query Start and End time. // When disabled, the result may contain samples outside the queried time range but Select() performances // may be improved. diff --git a/tsdb/block.go b/tsdb/block.go index e2562de03c..7833e187ff 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -81,6 +81,11 @@ type IndexReader interface { // by the label set of the underlying series. SortedPostings(index.Postings) index.Postings + // ShardedPostings returns a postings list filtered by the provided shardIndex + // out of shardCount. For a given posting, its shard MUST be computed hashing + // the series labels mod shardCount, using a hash function which is consistent over time. + ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings + // Series populates the given builder and chunk metas for the series identified // by the reference. // Returns storage.ErrNotFound if the ref does not resolve to a known series. @@ -517,6 +522,10 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { return r.ir.SortedPostings(p) } +func (r blockIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + return r.ir.ShardedPostings(p, shardIndex, shardCount) +} + func (r blockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { if err := r.ir.Series(ref, builder, chks); err != nil { return fmt.Errorf("block: %s: %w", r.b.Meta().ULID, err) diff --git a/tsdb/db.go b/tsdb/db.go index 2436fab2ac..e9265c55e5 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -84,6 +84,7 @@ func DefaultOptions() *Options { HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, OutOfOrderCapMax: DefaultOutOfOrderCapMax, EnableOverlappingCompaction: true, + EnableSharding: false, } } @@ -186,6 +187,9 @@ type Options struct { // they'd rather keep overlapping blocks and let another component do the overlapping compaction later. // For Prometheus, this will always be true. EnableOverlappingCompaction bool + + // EnableSharding enables query sharding support in TSDB. + EnableSharding bool } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -875,6 +879,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms) headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow) headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax) + headOpts.EnableSharding = opts.EnableSharding if opts.WALReplayConcurrency > 0 { headOpts.WALReplayConcurrency = opts.WALReplayConcurrency } diff --git a/tsdb/head.go b/tsdb/head.go index cdcd3ea568..86ab7fe06c 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -176,6 +176,9 @@ type HeadOptions struct { // The default value is GOMAXPROCS. // If it is set to a negative value or zero, the default value is used. WALReplayConcurrency int + + // EnableSharding enables ShardedPostings() support in the Head. + EnableSharding bool } const ( @@ -1663,7 +1666,12 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, h.opts.IsolationDisabled) + shardHash := uint64(0) + if h.opts.EnableSharding { + shardHash = labels.StableHash(lset) + } + + return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled) }) if err != nil { return nil, false, err @@ -2022,6 +2030,10 @@ type memSeries struct { lset labels.Labels meta *metadata.Metadata + // Series labels hash to use for sharding purposes. The value is always 0 when sharding has not + // been explicitly enabled in TSDB. + shardHash uint64 + // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps. // When compaction runs, chunks get moved into a block and all pointers are shifted like so: // @@ -2071,11 +2083,12 @@ type memSeriesOOOFields struct { firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled bool) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - nextAt: math.MinInt64, + lset: lset, + ref: id, + nextAt: math.MinInt64, + shardHash: shardHash, } if !isolationDisabled { s.txs = newTxRing(0) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 29f845c7bd..457d3e1c47 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -149,7 +149,35 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { return index.NewListPostings(ep) } +// ShardedPostings implements IndexReader. This function returns an failing postings list if sharding +// has not been enabled in the Head. +func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + if !h.head.opts.EnableSharding { + return index.ErrPostings(errors.New("sharding is disabled")) + } + + out := make([]storage.SeriesRef, 0, 128) + + for p.Next() { + s := h.head.series.getByID(chunks.HeadSeriesRef(p.At())) + if s == nil { + level.Debug(h.head.logger).Log("msg", "Looked up series not found") + continue + } + + // Check if the series belong to the shard. + if s.shardHash%shardCount != shardIndex { + continue + } + + out = append(out, storage.SeriesRef(s.ref)) + } + + return index.NewListPostings(out) +} + // Series returns the series for the given reference. +// Chunks are skipped if chks is nil. func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { s := h.head.series.getByID(chunks.HeadSeriesRef(ref)) @@ -159,6 +187,10 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB } builder.Assign(s.lset) + if chks == nil { + return nil + } + s.Lock() defer s.Unlock() diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index b063512019..de97d70a56 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -526,7 +526,7 @@ func TestMemSeries_chunk(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - series := newMemSeries(labels.EmptyLabels(), 1, true) + series := newMemSeries(labels.EmptyLabels(), 1, 0, true) if tc.setup != nil { tc.setup(t, series, chunkDiskMapper) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 83abeacbeb..90e187b58c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -52,20 +52,30 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" ) -func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) { - dir := t.TempDir() - wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) - require.NoError(t, err) - +// newTestHeadDefaultOptions returns the HeadOptions that should be used by default in unit tests. +func newTestHeadDefaultOptions(chunkRange int64, oooEnabled bool) *HeadOptions { opts := DefaultHeadOptions() opts.ChunkRange = chunkRange - opts.ChunkDirRoot = dir opts.EnableExemplarStorage = true opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) opts.EnableNativeHistograms.Store(true) if oooEnabled { opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) } + return opts +} + +func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) { + return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, oooEnabled)) +} + +func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts *HeadOptions) (*Head, *wlog.WL) { + dir := t.TempDir() + wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) + require.NoError(t, err) + + // Override the chunks dir with the testing one. + opts.ChunkDirRoot = dir h, err := NewHead(nil, nil, wal, nil, opts, nil) require.NoError(t, err) @@ -342,7 +352,7 @@ func BenchmarkLoadWLs(b *testing.B) { } for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. - s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled) s.append(c.mmappedChunkT, 42, 0, cOpts) // There's only one head chunk because only a single sample is appended. mmapChunks() // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with @@ -912,7 +922,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { }, } - s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled) + s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled) for i := 0; i < 4000; i += 5 { ok, _ := s.append(int64(i), float64(i), 0, cOpts) @@ -1053,7 +1063,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { require.NoError(t, chunkDiskMapper.Close()) }() - series := newMemSeries(labels.EmptyLabels(), 1, true) + series := newMemSeries(labels.EmptyLabels(), 1, 0, true) cOpts := chunkOpts{ chunkDiskMapper: chunkDiskMapper, @@ -1631,7 +1641,7 @@ func TestMemSeries_append(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) // Add first two samples at the very end of a chunk range and the next two // on and after it. @@ -1692,7 +1702,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) histograms := tsdbutil.GenerateTestHistograms(4) histogramWithOneMoreBucket := histograms[3].Copy() @@ -1754,7 +1764,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { samplesPerChunk: samplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) // At this slow rate, we will fill the chunk in two block durations. slowRate := (DefaultBlockDuration * 2) / samplesPerChunk @@ -2900,6 +2910,71 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) { } } +func TestHeadShardedPostings(t *testing.T) { + headOpts := newTestHeadDefaultOptions(1000, false) + headOpts.EnableSharding = true + head, _ := newTestHeadWithOptions(t, wlog.CompressionNone, headOpts) + defer func() { + require.NoError(t, head.Close()) + }() + + ctx := context.Background() + + // Append some series. + app := head.Appender(ctx) + for i := 0; i < 100; i++ { + _, err := app.Append(0, labels.FromStrings("unique", fmt.Sprintf("value%d", i), "const", "1"), 100, 0) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + ir := head.indexRange(0, 200) + + // List all postings for a given label value. This is what we expect to get + // in output from all shards. + p, err := ir.Postings(ctx, "const", "1") + require.NoError(t, err) + + var expected []storage.SeriesRef + for p.Next() { + expected = append(expected, p.At()) + } + require.NoError(t, p.Err()) + require.NotEmpty(t, expected) + + // Query the same postings for each shard. + const shardCount = uint64(4) + actualShards := make(map[uint64][]storage.SeriesRef) + actualPostings := make([]storage.SeriesRef, 0, len(expected)) + + for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ { + p, err = ir.Postings(ctx, "const", "1") + require.NoError(t, err) + + p = ir.ShardedPostings(p, shardIndex, shardCount) + for p.Next() { + ref := p.At() + + actualShards[shardIndex] = append(actualShards[shardIndex], ref) + actualPostings = append(actualPostings, ref) + } + require.NoError(t, p.Err()) + } + + // We expect the postings merged out of shards is the exact same of the non sharded ones. + require.ElementsMatch(t, expected, actualPostings) + + // We expect the series in each shard are the expected ones. + for shardIndex, ids := range actualShards { + for _, id := range ids { + var lbls labels.ScratchBuilder + + require.NoError(t, ir.Series(id, &lbls, nil)) + require.Equal(t, shardIndex, labels.StableHash(lbls.Labels())%shardCount) + } + } +} + func TestErrReuseAppender(t *testing.T) { head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { @@ -3038,7 +3113,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { samplesPerChunk: DefaultSamplesPerChunk, } - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled) for i := 0; i < 7; i++ { ok, _ := s.append(int64(i), float64(i), 0, cOpts) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index c2ca581f7c..36b8878bc4 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1744,6 +1744,33 @@ func (r *Reader) SortedPostings(p Postings) Postings { return p } +// ShardedPostings returns a postings list filtered by the provided shardIndex out of shardCount. +func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Postings { + var ( + out = make([]storage.SeriesRef, 0, 128) + bufLbls = labels.ScratchBuilder{} + ) + + for p.Next() { + id := p.At() + + // Get the series labels (no chunks). + err := r.Series(id, &bufLbls, nil) + if err != nil { + return ErrPostings(fmt.Errorf("series %d not found", id)) + } + + // Check if the series belong to the shard. + if labels.StableHash(bufLbls.Labels())%shardCount != shardIndex { + continue + } + + out = append(out, id) + } + + return NewListPostings(out) +} + // Size returns the size of an index file. func (r *Reader) Size() int64 { return int64(r.b.Len()) @@ -1864,9 +1891,12 @@ func (dec *Decoder) LabelValueFor(ctx context.Context, b []byte, label string) ( // Series decodes a series entry from the given byte slice into builder and chks. // Previous contents of builder can be overwritten - make sure you copy before retaining. +// Skips reading chunks metadata if chks is nil. func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { builder.Reset() - *chks = (*chks)[:0] + if chks != nil { + *chks = (*chks)[:0] + } d := encoding.Decbuf{B: b} @@ -1892,6 +1922,11 @@ func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chu builder.Add(ln, lv) } + // Skip reading chunks metadata if chks is nil. + if chks == nil { + return d.Err() + } + // Read the chunks meta data. k = d.Uvarint() diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 369d337384..ef88870355 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -242,6 +242,58 @@ func TestIndexRW_Postings(t *testing.T) { }, labelIndices) require.NoError(t, ir.Close()) + + t.Run("ShardedPostings()", func(t *testing.T) { + ir, err := NewFileReader(fn) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, ir.Close()) + }) + + // List all postings for a given label value. This is what we expect to get + // in output from all shards. + p, err = ir.Postings(ctx, "a", "1") + require.NoError(t, err) + + var expected []storage.SeriesRef + for p.Next() { + expected = append(expected, p.At()) + } + require.NoError(t, p.Err()) + require.NotEmpty(t, expected) + + // Query the same postings for each shard. + const shardCount = uint64(4) + actualShards := make(map[uint64][]storage.SeriesRef) + actualPostings := make([]storage.SeriesRef, 0, len(expected)) + + for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ { + p, err = ir.Postings(ctx, "a", "1") + require.NoError(t, err) + + p = ir.ShardedPostings(p, shardIndex, shardCount) + for p.Next() { + ref := p.At() + + actualShards[shardIndex] = append(actualShards[shardIndex], ref) + actualPostings = append(actualPostings, ref) + } + require.NoError(t, p.Err()) + } + + // We expect the postings merged out of shards is the exact same of the non sharded ones. + require.ElementsMatch(t, expected, actualPostings) + + // We expect the series in each shard are the expected ones. + for shardIndex, ids := range actualShards { + for _, id := range ids { + var lbls labels.ScratchBuilder + + require.NoError(t, ir.Series(id, &lbls, nil)) + require.Equal(t, shardIndex, labels.StableHash(lbls.Labels())%shardCount) + } + } + }) } func TestPostingsMany(t *testing.T) { @@ -565,6 +617,55 @@ func TestSymbols(t *testing.T) { require.NoError(t, iter.Err()) } +func BenchmarkReader_ShardedPostings(b *testing.B) { + const ( + numSeries = 10000 + numShards = 16 + ) + + dir, err := os.MkdirTemp("", "benchmark_reader_sharded_postings") + require.NoError(b, err) + defer func() { + require.NoError(b, os.RemoveAll(dir)) + }() + + ctx := context.Background() + + // Generate an index. + fn := filepath.Join(dir, indexFilename) + + iw, err := NewWriter(ctx, fn) + require.NoError(b, err) + + for i := 1; i <= numSeries; i++ { + require.NoError(b, iw.AddSymbol(fmt.Sprintf("%10d", i))) + } + require.NoError(b, iw.AddSymbol("const")) + require.NoError(b, iw.AddSymbol("unique")) + + for i := 1; i <= numSeries; i++ { + require.NoError(b, iw.AddSeries(storage.SeriesRef(i), + labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i)))) + } + + require.NoError(b, iw.Close()) + + b.ResetTimer() + + // Create a reader to read back all postings from the index. + ir, err := NewFileReader(fn) + require.NoError(b, err) + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + allPostings, err := ir.Postings(ctx, "const", fmt.Sprintf("%10d", 1)) + require.NoError(b, err) + + ir.ShardedPostings(allPostings, uint64(n%numShards), numShards) + } +} + func TestDecoder_Postings_WrongInput(t *testing.T) { _, _, err := (&Decoder{}).Postings([]byte("the cake is a lie")) require.Error(t, err) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 440130f7db..3a801bf98b 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -440,6 +440,10 @@ func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.P return p } +func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + return ir.ch.oooIR.ShardedPostings(p, shardIndex, shardCount) +} + func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef) } diff --git a/tsdb/querier.go b/tsdb/querier.go index a692c98f1a..ab2c53d70d 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -131,11 +131,15 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora mint := q.mint maxt := q.maxt disableTrimming := false + sharded := hints != nil && hints.ShardCount > 0 p, err := PostingsForMatchers(ctx, q.index, ms...) if err != nil { return storage.ErrSeriesSet(err) } + if sharded { + p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + } if sortSeries { p = q.index.SortedPostings(p) } @@ -171,6 +175,8 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints * mint := q.mint maxt := q.maxt disableTrimming := false + sharded := hints != nil && hints.ShardCount > 0 + if hints != nil { mint = hints.Start maxt = hints.End @@ -180,6 +186,9 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints * if err != nil { return storage.ErrChunkSeriesSet(err) } + if sharded { + p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + } if sortSeries { p = q.index.SortedPostings(p) } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index fcedc54621..c2bd717082 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2326,6 +2326,27 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings { return index.NewListPostings(ep) } +func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + out := make([]storage.SeriesRef, 0, 128) + + for p.Next() { + ref := p.At() + s, ok := m.series[ref] + if !ok { + continue + } + + // Check if the series belong to the shard. + if s.l.Hash()%shardCount != shardIndex { + continue + } + + out = append(out, ref) + } + + return index.NewListPostings(out) +} + func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { s, ok := m.series[ref] if !ok { @@ -3272,6 +3293,10 @@ func (m mockMatcherIndex) SortedPostings(p index.Postings) index.Postings { return index.EmptyPostings() } +func (m mockMatcherIndex) ShardedPostings(ps index.Postings, shardIndex, shardCount uint64) index.Postings { + return ps +} + func (m mockMatcherIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { return nil }