diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index 1c6acf5713..9eaf6e28ed 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -155,6 +155,113 @@ func (h *Histogram) CumulativeBucketIterator() BucketIterator { return &cumulativeBucketIterator{h: h, posSpansIdx: -1} } +// Equals returns true if the given histogram matches exactly. +// Exact match is when there are no new buckets (even empty) and no missing buckets, +// and all the bucket values match. Spans can have different empty length spans in between, +// but they must represent the same bucket layout to match. +func (h *Histogram) Equals(h2 *Histogram) bool { + if h2 == nil { + return false + } + + if h.Schema != h2.Schema || h.ZeroThreshold != h2.ZeroThreshold || + h.ZeroCount != h2.ZeroCount || h.Count != h2.Count || h.Sum != h2.Sum { + return false + } + + if !spansMatch(h.PositiveSpans, h2.PositiveSpans) { + return false + } + if !spansMatch(h.NegativeSpans, h2.NegativeSpans) { + return false + } + + if !bucketsMatch(h.PositiveBuckets, h2.PositiveBuckets) { + return false + } + if !bucketsMatch(h.NegativeBuckets, h2.NegativeBuckets) { + return false + } + + return true +} + +// spansMatch returns true if both spans represent the same bucket layout +// after combining zero length spans with the next non-zero length span. +func spansMatch(s1, s2 []Span) bool { + if len(s1) == 0 && len(s2) == 0 { + return true + } + + s1idx, s2idx := 0, 0 + for { + if s1idx >= len(s1) { + return allEmptySpans(s2[s2idx:]) + } + if s2idx >= len(s2) { + return allEmptySpans(s1[s1idx:]) + } + + currS1, currS2 := s1[s1idx], s2[s2idx] + s1idx++ + s2idx++ + if currS1.Length == 0 { + // This span is zero length, so we add consecutive such spans + // until we find a non-zero span. + for ; s1idx < len(s1) && s1[s1idx].Length == 0; s1idx++ { + currS1.Offset += s1[s1idx].Offset + } + if s1idx < len(s1) { + currS1.Offset += s1[s1idx].Offset + currS1.Length = s1[s1idx].Length + s1idx++ + } + } + if currS2.Length == 0 { + // This span is zero length, so we add consecutive such spans + // until we find a non-zero span. + for ; s2idx < len(s2) && s2[s2idx].Length == 0; s2idx++ { + currS2.Offset += s2[s2idx].Offset + } + if s2idx < len(s2) { + currS2.Offset += s2[s2idx].Offset + currS2.Length = s2[s2idx].Length + s2idx++ + } + } + + if currS1.Length == 0 && currS2.Length == 0 { + // The last spans of both set are zero length. Previous spans match. + return true + } + + if currS1.Offset != currS2.Offset || currS1.Length != currS2.Length { + return false + } + } +} + +func allEmptySpans(s []Span) bool { + for _, ss := range s { + if ss.Length > 0 { + return false + } + } + return true +} + +func bucketsMatch(b1, b2 []int64) bool { + if len(b1) != len(b2) { + return false + } + for i, b := range b1 { + if b != b2[i] { + return false + } + } + return true +} + // ToFloat returns a FloatHistogram representation of the Histogram. It is a // deep copy (e.g. spans are not shared). func (h *Histogram) ToFloat() *FloatHistogram { diff --git a/model/histogram/histogram_test.go b/model/histogram/histogram_test.go index 151dacdb8e..3b136edcee 100644 --- a/model/histogram/histogram_test.go +++ b/model/histogram/histogram_test.go @@ -410,3 +410,117 @@ func TestHistogramToFloat(t *testing.T) { require.Equal(t, h.String(), fh.String()) } + +func TestHistogramMatches(t *testing.T) { + h1 := Histogram{ + Schema: 3, + Count: 61, + Sum: 2.7, + ZeroThreshold: 0.1, + ZeroCount: 42, + PositiveSpans: []Span{ + {Offset: 0, Length: 4}, + {Offset: 10, Length: 3}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, + NegativeSpans: []Span{ + {Offset: 0, Length: 4}, + {Offset: 10, Length: 3}, + }, + NegativeBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, + } + + h2 := h1.Copy() + require.True(t, h1.Equals(h2)) + + // Changed spans but same layout. + h2.PositiveSpans = append(h2.PositiveSpans, Span{Offset: 5}) + h2.NegativeSpans = append(h2.NegativeSpans, Span{Offset: 2}) + require.True(t, h1.Equals(h2)) + require.True(t, h2.Equals(&h1)) + // Adding empty spans in between. + h2.PositiveSpans[1].Offset = 6 + h2.PositiveSpans = []Span{ + h2.PositiveSpans[0], + {Offset: 1}, + {Offset: 3}, + h2.PositiveSpans[1], + h2.PositiveSpans[2], + } + h2.NegativeSpans[1].Offset = 5 + h2.NegativeSpans = []Span{ + h2.NegativeSpans[0], + {Offset: 2}, + {Offset: 3}, + h2.NegativeSpans[1], + h2.NegativeSpans[2], + } + require.True(t, h1.Equals(h2)) + require.True(t, h2.Equals(&h1)) + + // All mismatches. + require.False(t, h1.Equals(nil)) + + h2.Schema = 1 + require.False(t, h1.Equals(h2)) + + h2 = h1.Copy() + h2.Count++ + require.False(t, h1.Equals(h2)) + + h2 = h1.Copy() + h2.Sum++ + require.False(t, h1.Equals(h2)) + + h2 = h1.Copy() + h2.ZeroThreshold++ + require.False(t, h1.Equals(h2)) + + h2 = h1.Copy() + h2.ZeroCount++ + require.False(t, h1.Equals(h2)) + + // Changing value of buckets. + h2 = h1.Copy() + h2.PositiveBuckets[len(h2.PositiveBuckets)-1]++ + require.False(t, h1.Equals(h2)) + h2 = h1.Copy() + h2.NegativeBuckets[len(h2.NegativeBuckets)-1]++ + require.False(t, h1.Equals(h2)) + + // Changing bucket layout. + h2 = h1.Copy() + h2.PositiveSpans[1].Offset++ + require.False(t, h1.Equals(h2)) + h2 = h1.Copy() + h2.NegativeSpans[1].Offset++ + require.False(t, h1.Equals(h2)) + + // Adding an empty bucket. + h2 = h1.Copy() + h2.PositiveSpans[0].Offset-- + h2.PositiveSpans[0].Length++ + h2.PositiveBuckets = append([]int64{0}, h2.PositiveBuckets...) + require.False(t, h1.Equals(h2)) + h2 = h1.Copy() + h2.NegativeSpans[0].Offset-- + h2.NegativeSpans[0].Length++ + h2.NegativeBuckets = append([]int64{0}, h2.NegativeBuckets...) + require.False(t, h1.Equals(h2)) + + // Adding new bucket. + h2 = h1.Copy() + h2.PositiveSpans = append(h2.PositiveSpans, Span{ + Offset: 1, + Length: 1, + }) + h2.PositiveBuckets = append(h2.PositiveBuckets, 1) + require.False(t, h1.Equals(h2)) + h2 = h1.Copy() + h2.NegativeSpans = append(h2.NegativeSpans, Span{ + Offset: 1, + Length: 1, + }) + h2.NegativeBuckets = append(h2.NegativeBuckets, 1) + require.False(t, h1.Equals(h2)) +} diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 1384a46134..e6afccea51 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -475,9 +475,37 @@ func TestAmendDatapointCausesError(t *testing.T) { require.NoError(t, app.Commit()) app = db.Appender(ctx) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 0) + require.NoError(t, err) _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 1) require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err) require.NoError(t, app.Rollback()) + + h := histogram.Histogram{ + Schema: 3, + Count: 61, + Sum: 2.7, + ZeroThreshold: 0.1, + ZeroCount: 42, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 10, Length: 3}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, + } + + app = db.Appender(ctx) + _, err = app.AppendHistogram(0, labels.Labels{{Name: "a", Value: "c"}}, 0, h.Copy()) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + app = db.Appender(ctx) + _, err = app.AppendHistogram(0, labels.Labels{{Name: "a", Value: "c"}}, 0, h.Copy()) + require.NoError(t, err) + h.Schema = 2 + _, err = app.AppendHistogram(0, labels.Labels{{Name: "a", Value: "c"}}, 0, h.Copy()) + require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err) + require.NoError(t, app.Rollback()) } func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index ef31bd8bf3..037b034077 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -392,12 +392,12 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error { if t < c.maxTime { return storage.ErrOutOfOrderSample } - // TODO(beorn7): do it for histogram. + // We are allowing exact duplicates as we can encounter them in valid cases // like federation and erroring out at that time would be extremely noisy. - //if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { - // return storage.ErrDuplicateSampleForTimestamp - //} + if !h.Equals(s.sampleBuf[3].h) { + return storage.ErrDuplicateSampleForTimestamp + } return nil }