From 10a3c7220b66939d85c3c860e8c97e797bda9d46 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Mon, 10 Jun 2024 14:24:17 +0200 Subject: [PATCH] `MemPostings.PostingsForLabelMatching()`: don't hold the mutex while matching (#14286) * MemPostings.PostingsForLabelMatching: let mutex go This changes the `MemPostings.PostingsForLabelMatching` implementation to stop holding the read mutex while matching the label values. We've seen that this method can be slow when the matcher is expensive, that's why we even added a context expiration check. However, there are critical process that might be waiting on this mutex: writes (adding new series) and compaction (deleting the garbage-collected ones), so we should avoid holding it for a long period of time. Given that we've copied the values to a slice anyway, there's no need to hold the lock while matching. Signed-off-by: Oleg Zaytsev --- tsdb/index/postings.go | 74 ++++++++++++++++++++++++++----------- tsdb/index/postings_test.go | 22 +++++++++++ 2 files changed, 75 insertions(+), 21 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 937d1287b8..6b654f6b5b 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -425,16 +425,62 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { } func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { - p.mtx.RLock() + // We'll copy the values into a slice and then match over that, + // this way we don't need to hold the mutex while we're matching, + // which can be slow (seconds) if the match function is a huge regex. + // Holding this lock prevents new series from being added (slows down the write path) + // and blocks the compaction process. + vals := p.labelValues(name) + for i, count := 0, 1; i < len(vals); count++ { + if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { + return ErrPostings(ctx.Err()) + } - e := p.m[name] - if len(e) == 0 { - p.mtx.RUnlock() + if match(vals[i]) { + i++ + continue + } + + // Didn't match, bring the last value to this position, make the slice shorter and check again. + // The order of the slice doesn't matter as it comes from a map iteration. + vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] + } + + // If none matched (or this label had no values), no need to grab the lock again. + if len(vals) == 0 { return EmptyPostings() } - // Benchmarking shows that first copying the values into a slice and then matching over that is - // faster than matching over the map keys directly, at least on AMD64. + // Now `vals` only contains the values that matched, get their postings. + its := make([]Postings, 0, len(vals)) + p.mtx.RLock() + e := p.m[name] + for _, v := range vals { + if refs, ok := e[v]; ok { + // Some of the values may have been garbage-collected in the meantime this is fine, we'll just skip them. + // If we didn't let the mutex go, we'd have these postings here, but they would be pointing nowhere + // because there would be a `MemPostings.Delete()` call waiting for the lock to delete these labels, + // because the series were deleted already. + its = append(its, NewListPostings(refs)) + } + } + // Let the mutex go before merging. + p.mtx.RUnlock() + + return Merge(ctx, its...) +} + +// labelValues returns a slice of label values for the given label name. +// It will take the read lock. +func (p *MemPostings) labelValues(name string) []string { + p.mtx.RLock() + defer p.mtx.RUnlock() + + e := p.m[name] + if len(e) == 0 { + return nil + } + vals := make([]string, 0, len(e)) for v, srs := range e { if len(srs) > 0 { @@ -442,21 +488,7 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, } } - var its []Postings - count := 1 - for _, v := range vals { - if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { - p.mtx.RUnlock() - return ErrPostings(ctx.Err()) - } - count++ - if match(v) { - its = append(its, NewListPostings(e[v])) - } - } - p.mtx.RUnlock() - - return Merge(ctx, its...) + return vals } // ExpandPostings returns the postings expanded as a slice. diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 562aef457e..4f34cc47ea 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1435,6 +1435,28 @@ func BenchmarkMemPostings_PostingsForLabelMatching(b *testing.B) { } } +func TestMemPostings_PostingsForLabelMatching(t *testing.T) { + mp := NewMemPostings() + mp.Add(1, labels.FromStrings("foo", "1")) + mp.Add(2, labels.FromStrings("foo", "2")) + mp.Add(3, labels.FromStrings("foo", "3")) + mp.Add(4, labels.FromStrings("foo", "4")) + + isEven := func(v string) bool { + iv, err := strconv.Atoi(v) + if err != nil { + panic(err) + } + return iv%2 == 0 + } + + p := mp.PostingsForLabelMatching(context.Background(), "foo", isEven) + require.NoError(t, p.Err()) + refs, err := ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{2, 4}, refs) +} + func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { memP := NewMemPostings() seriesCount := 10 * checkContextEveryNIterations