mirror of
https://github.com/prometheus/prometheus.git
synced 2025-08-05 21:57:09 +02:00
fix(chunkenc): appending histograms with empty buckets (#16893)
* test(chunkenc): appending histograms with empty buckets and gaps Append such native histograms that have empty buckets and gaps between the indexes of those buckets. There is a special case for appending counter native histograms to a chunk in TSDB: if we append a histogram that is missing some buckets that are already in chunk, then usually that's a counter reset. However if the missing bucket is empty, meaning its value is 0, then we don't consider it missing. For this case to trigger , we need to write empty buckets into the chunk. Normally native histograms are compacted when we emit them , so this is very rare and compact make sure that there are no multiple continuous empty buckets with gaps between them. The code that I've added in #14513 did not take into account that you can bypass compact and write histograms with many empty buckets, with gaps between them. These are still valid, so the code has to account for them. Main fix in the expandIntSpansAndBuckets and expandFloatSpansAndBuckets function. I've also refactored them for clarity. Consequently needed to fix insert and adjustForInserts to also allow gaps between inserts. I've added some new test cases (data driven test would be nice here, too many cases). And removed the deprecated old function that didn't deal with empty buckets at all. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> Signed-off-by: George Krajcsovits <krajorama@users.noreply.github.com> Co-authored-by: Björn Rabenstein <beorn@grafana.com>
This commit is contained in:
parent
13b55ffc81
commit
3f59fe1a80
@ -343,7 +343,7 @@ func (a *FloatHistogramAppender) appendable(h *histogram.FloatHistogram) (
|
||||
// original deltas to a new set of deltas to match a new span layout that adds
|
||||
// buckets, we simply need to generate a list of inserts.
|
||||
//
|
||||
// Note: Within expandSpansForward we don't have to worry about the changes to the
|
||||
// Note: Within expandFloatSpansAndBuckets we don't have to worry about the changes to the
|
||||
// spans themselves, thanks to the iterators we get to work with the more useful
|
||||
// bucket indices (which of course directly correspond to the buckets we have to
|
||||
// adjust).
|
||||
@ -378,6 +378,48 @@ func expandFloatSpansAndBuckets(a, b []histogram.Span, aBuckets []xorValue, bBuc
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
|
||||
// addInsert updates the current Insert with a new insert at the given
|
||||
// bucket index (otherIdx).
|
||||
addInsert := func(inserts []Insert, insert *Insert, otherIdx int) []Insert {
|
||||
if insert.num == 0 {
|
||||
// First insert.
|
||||
insert.bucketIdx = otherIdx
|
||||
} else if insert.bucketIdx+insert.num != otherIdx {
|
||||
// Insert is not continuous from previous insert.
|
||||
inserts = append(inserts, *insert)
|
||||
insert.num = 0
|
||||
insert.bucketIdx = otherIdx
|
||||
}
|
||||
insert.num++
|
||||
return inserts
|
||||
}
|
||||
|
||||
advanceA := func() {
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
aIdx, aOK = ai.Next()
|
||||
aInter.pos++
|
||||
aCountIdx++
|
||||
if aOK {
|
||||
aCount = aBuckets[aCountIdx].value
|
||||
}
|
||||
}
|
||||
|
||||
advanceB := func() {
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
bIdx, bOK = bi.Next()
|
||||
bInter.pos++
|
||||
bCountIdx++
|
||||
if bOK {
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
}
|
||||
|
||||
loop:
|
||||
for {
|
||||
switch {
|
||||
@ -389,105 +431,37 @@ loop:
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
// Finish WIP insert for a and reset.
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
|
||||
// Finish WIP insert for b and reset.
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
|
||||
aIdx, aOK = ai.Next()
|
||||
bIdx, bOK = bi.Next()
|
||||
aInter.pos++ // Advance potential insert position.
|
||||
aCountIdx++ // Advance absolute bucket count index for a.
|
||||
if aOK {
|
||||
aCount = aBuckets[aCountIdx].value
|
||||
}
|
||||
bInter.pos++ // Advance potential insert position.
|
||||
bCountIdx++ // Advance absolute bucket count index for b.
|
||||
if bOK {
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
advanceA()
|
||||
advanceB()
|
||||
|
||||
continue
|
||||
case aIdx < bIdx: // b misses a bucket index that is in a.
|
||||
// This is ok if the count in a is 0, in which case we make a note to
|
||||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++ // Mark that we need to insert a bucket in b.
|
||||
bInter.bucketIdx = aIdx
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
aIdx, aOK = ai.Next()
|
||||
aInter.pos++
|
||||
aCountIdx++
|
||||
if aOK {
|
||||
aCount = aBuckets[aCountIdx].value
|
||||
}
|
||||
bInserts = addInsert(bInserts, &bInter, aIdx)
|
||||
advanceA()
|
||||
continue
|
||||
}
|
||||
// Otherwise we are missing a bucket that was in use in a, which is a reset.
|
||||
return nil, nil, false
|
||||
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
bInter.bucketIdx = bIdx
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
bIdx, bOK = bi.Next()
|
||||
bInter.pos++
|
||||
bCountIdx++
|
||||
if bOK {
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
aInserts = addInsert(aInserts, &aInter, bIdx)
|
||||
advanceB()
|
||||
}
|
||||
case aOK && !bOK: // b misses a value that is in a.
|
||||
// This is ok if the count in a is 0, in which case we make a note to
|
||||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++
|
||||
bInter.bucketIdx = aIdx
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
aIdx, aOK = ai.Next()
|
||||
aInter.pos++ // Advance potential insert position.
|
||||
// Update absolute bucket counts for a.
|
||||
aCountIdx++
|
||||
if aOK {
|
||||
aCount = aBuckets[aCountIdx].value
|
||||
}
|
||||
bInserts = addInsert(bInserts, &bInter, aIdx)
|
||||
advanceA()
|
||||
continue
|
||||
}
|
||||
// Otherwise we are missing a bucket that was in use in a, which is a reset.
|
||||
return nil, nil, false
|
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
bInter.bucketIdx = bIdx
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
bIdx, bOK = bi.Next()
|
||||
bInter.pos++ // Advance potential insert position.
|
||||
// Update absolute bucket counts for b.
|
||||
bCountIdx++
|
||||
if bOK {
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
aInserts = addInsert(aInserts, &aInter, bIdx)
|
||||
advanceB()
|
||||
default: // Both iterators ran out. We're done.
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
@ -782,7 +756,7 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
|
||||
// The histogram needs to be expanded to have the extra empty buckets
|
||||
// of the chunk.
|
||||
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
|
||||
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
|
||||
// No new buckets from the histogram, so the spans of the appender can accommodate the new buckets.
|
||||
// However we need to make a copy in case the input is sharing spans from an iterator.
|
||||
h.PositiveSpans = make([]histogram.Span, len(a.pSpans))
|
||||
copy(h.PositiveSpans, a.pSpans)
|
||||
|
@ -195,7 +195,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
|
||||
require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1))
|
||||
}
|
||||
|
||||
// Mimics the scenario described for expandSpansForward.
|
||||
// Mimics the scenario described for expandFloatSpansAndBuckets.
|
||||
func TestFloatHistogramChunkBucketChanges(t *testing.T) {
|
||||
c := Chunk(NewFloatHistogramChunk())
|
||||
|
||||
@ -1420,3 +1420,45 @@ func assertFirstFloatHistogramSampleHint(t *testing.T, chunk Chunk, expected his
|
||||
_, v := it.AtFloatHistogram(nil)
|
||||
require.Equal(t, expected, v.CounterResetHint)
|
||||
}
|
||||
|
||||
func TestFloatHistogramEmptyBucketsWithGaps(t *testing.T) {
|
||||
h1 := &histogram.FloatHistogram{
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: -19, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []float64{0, 0, 0, 0},
|
||||
}
|
||||
require.NoError(t, h1.Validate())
|
||||
|
||||
c := NewFloatHistogramChunk()
|
||||
app, err := c.Appender()
|
||||
require.NoError(t, err)
|
||||
_, _, _, err = app.AppendFloatHistogram(nil, 1, h1, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
h2 := &histogram.FloatHistogram{
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: -19, Length: 1},
|
||||
{Offset: 4, Length: 1},
|
||||
{Offset: 3, Length: 1},
|
||||
},
|
||||
PositiveBuckets: []float64{0, 0, 0},
|
||||
}
|
||||
require.NoError(t, h2.Validate())
|
||||
|
||||
newC, recoded, _, err := app.AppendFloatHistogram(nil, 2, h2, false)
|
||||
require.NoError(t, err)
|
||||
require.True(t, recoded)
|
||||
require.NotNil(t, newC)
|
||||
|
||||
it := newC.Iterator(nil)
|
||||
require.Equal(t, ValFloatHistogram, it.Next())
|
||||
_, h := it.AtFloatHistogram(nil)
|
||||
require.NoError(t, h.Validate())
|
||||
require.Equal(t, ValFloatHistogram, it.Next())
|
||||
_, h = it.AtFloatHistogram(nil)
|
||||
require.NoError(t, h.Validate())
|
||||
require.Equal(t, ValNone, it.Next())
|
||||
require.NoError(t, it.Err())
|
||||
}
|
||||
|
@ -374,7 +374,7 @@ func (a *HistogramAppender) appendable(h *histogram.Histogram) (
|
||||
// original deltas to a new set of deltas to match a new span layout that adds
|
||||
// buckets, we simply need to generate a list of inserts.
|
||||
//
|
||||
// Note: Within expandSpansForward we don't have to worry about the changes to the
|
||||
// Note: Within expandIntSpansAndBuckets we don't have to worry about the changes to the
|
||||
// spans themselves, thanks to the iterators we get to work with the more useful
|
||||
// bucket indices (which of course directly correspond to the buckets we have to
|
||||
// adjust).
|
||||
@ -409,6 +409,48 @@ func expandIntSpansAndBuckets(a, b []histogram.Span, aBuckets, bBuckets []int64)
|
||||
bCount = bBuckets[bCountIdx]
|
||||
}
|
||||
|
||||
// addInsert updates the current Insert with a new insert at the given
|
||||
// bucket index (otherIdx).
|
||||
addInsert := func(inserts []Insert, insert *Insert, otherIdx int) []Insert {
|
||||
if insert.num == 0 {
|
||||
// First insert.
|
||||
insert.bucketIdx = otherIdx
|
||||
} else if insert.bucketIdx+insert.num != otherIdx {
|
||||
// Insert is not continuous from previous insert.
|
||||
inserts = append(inserts, *insert)
|
||||
insert.num = 0
|
||||
insert.bucketIdx = otherIdx
|
||||
}
|
||||
insert.num++
|
||||
return inserts
|
||||
}
|
||||
|
||||
advanceA := func() {
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
aIdx, aOK = ai.Next()
|
||||
aInter.pos++
|
||||
aCountIdx++
|
||||
if aOK {
|
||||
aCount += aBuckets[aCountIdx]
|
||||
}
|
||||
}
|
||||
|
||||
advanceB := func() {
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
bIdx, bOK = bi.Next()
|
||||
bInter.pos++
|
||||
bCountIdx++
|
||||
if bOK {
|
||||
bCount += bBuckets[bCountIdx]
|
||||
}
|
||||
}
|
||||
|
||||
loop:
|
||||
for {
|
||||
switch {
|
||||
@ -420,105 +462,37 @@ loop:
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
// Finish WIP insert for a and reset.
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
|
||||
// Finish WIP insert for b and reset.
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
|
||||
aIdx, aOK = ai.Next()
|
||||
bIdx, bOK = bi.Next()
|
||||
aInter.pos++ // Advance potential insert position.
|
||||
aCountIdx++ // Advance absolute bucket count index for a.
|
||||
if aOK {
|
||||
aCount += aBuckets[aCountIdx]
|
||||
}
|
||||
bInter.pos++ // Advance potential insert position.
|
||||
bCountIdx++ // Advance absolute bucket count index for b.
|
||||
if bOK {
|
||||
bCount += bBuckets[bCountIdx]
|
||||
}
|
||||
advanceA()
|
||||
advanceB()
|
||||
|
||||
continue
|
||||
case aIdx < bIdx: // b misses a bucket index that is in a.
|
||||
// This is ok if the count in a is 0, in which case we make a note to
|
||||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++ // Mark that we need to insert a bucket in b.
|
||||
bInter.bucketIdx = aIdx
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
aIdx, aOK = ai.Next()
|
||||
aInter.pos++
|
||||
aCountIdx++
|
||||
if aOK {
|
||||
aCount += aBuckets[aCountIdx]
|
||||
}
|
||||
bInserts = addInsert(bInserts, &bInter, aIdx)
|
||||
advanceA()
|
||||
continue
|
||||
}
|
||||
// Otherwise we are missing a bucket that was in use in a, which is a reset.
|
||||
return nil, nil, false
|
||||
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
aInter.bucketIdx = bIdx
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
bIdx, bOK = bi.Next()
|
||||
bInter.pos++
|
||||
bCountIdx++
|
||||
if bOK {
|
||||
bCount += bBuckets[bCountIdx]
|
||||
}
|
||||
aInserts = addInsert(aInserts, &aInter, bIdx)
|
||||
advanceB()
|
||||
}
|
||||
case aOK && !bOK: // b misses a value that is in a.
|
||||
// This is ok if the count in a is 0, in which case we make a note to
|
||||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++
|
||||
bInter.bucketIdx = aIdx
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
aInter.num = 0
|
||||
}
|
||||
aIdx, aOK = ai.Next()
|
||||
aInter.pos++ // Advance potential insert position.
|
||||
// Update absolute bucket counts for a.
|
||||
aCountIdx++
|
||||
if aOK {
|
||||
aCount += aBuckets[aCountIdx]
|
||||
}
|
||||
bInserts = addInsert(bInserts, &bInter, aIdx)
|
||||
advanceA()
|
||||
continue
|
||||
}
|
||||
// Otherwise we are missing a bucket that was in use in a, which is a reset.
|
||||
return nil, nil, false
|
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
aInter.bucketIdx = bIdx
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
bIdx, bOK = bi.Next()
|
||||
bInter.pos++ // Advance potential insert position.
|
||||
// Update absolute bucket counts for b.
|
||||
bCountIdx++
|
||||
if bOK {
|
||||
bCount += bBuckets[bCountIdx]
|
||||
}
|
||||
aInserts = addInsert(aInserts, &aInter, bIdx)
|
||||
advanceB()
|
||||
default: // Both iterators ran out. We're done.
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
@ -823,7 +797,7 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
|
||||
// The histogram needs to be expanded to have the extra empty buckets
|
||||
// of the chunk.
|
||||
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
|
||||
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
|
||||
// No new buckets from the histogram, so the spans of the appender can accommodate the new buckets.
|
||||
// However we need to make a copy in case the input is sharing spans from an iterator.
|
||||
h.PositiveSpans = make([]histogram.Span, len(a.pSpans))
|
||||
copy(h.PositiveSpans, a.pSpans)
|
||||
|
@ -284,101 +284,12 @@ type Insert struct {
|
||||
bucketIdx int
|
||||
}
|
||||
|
||||
// Deprecated: expandSpansForward, use expandIntSpansAndBuckets or
|
||||
// expandFloatSpansAndBuckets instead.
|
||||
// expandSpansForward is left here for reference.
|
||||
// expandSpansForward returns the inserts to expand the bucket spans 'a' so that
|
||||
// they match the spans in 'b'. 'b' must cover the same or more buckets than
|
||||
// 'a', otherwise the function will return false.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// Let's say the old buckets look like this:
|
||||
//
|
||||
// span syntax: [offset, length]
|
||||
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
|
||||
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
|
||||
// raw values 6 3 3 2 4 5 1
|
||||
// deltas 6 -3 0 -1 2 1 -4
|
||||
//
|
||||
// But now we introduce a new bucket layout. (Carefully chosen example where we
|
||||
// have a span appended, one unchanged[*], one prepended, and two merge - in
|
||||
// that order.)
|
||||
//
|
||||
// [*] unchanged in terms of which bucket indices they represent. but to achieve
|
||||
// that, their offset needs to change if "disrupted" by spans changing ahead of
|
||||
// them
|
||||
//
|
||||
// \/ this one is "unchanged"
|
||||
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
|
||||
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
|
||||
// raw values 6 3 0 3 0 0 2 4 5 0 1
|
||||
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
|
||||
// delta mods: / \ / \ / \
|
||||
//
|
||||
// Note for histograms with delta-encoded buckets: Whenever any new buckets are
|
||||
// introduced, the subsequent "old" bucket needs to readjust its delta to the
|
||||
// new base of 0. Thus, for the caller who wants to transform the set of
|
||||
// original deltas to a new set of deltas to match a new span layout that adds
|
||||
// buckets, we simply need to generate a list of inserts.
|
||||
//
|
||||
// Note: Within expandSpansForward we don't have to worry about the changes to the
|
||||
// spans themselves, thanks to the iterators we get to work with the more useful
|
||||
// bucket indices (which of course directly correspond to the buckets we have to
|
||||
// adjust).
|
||||
func expandSpansForward(a, b []histogram.Span) (forward []Insert, ok bool) {
|
||||
ai := newBucketIterator(a)
|
||||
bi := newBucketIterator(b)
|
||||
|
||||
var inserts []Insert
|
||||
|
||||
// When inter.num becomes > 0, this becomes a valid insert that should
|
||||
// be yielded when we finish a streak of new buckets.
|
||||
var inter Insert
|
||||
|
||||
av, aOK := ai.Next()
|
||||
bv, bOK := bi.Next()
|
||||
loop:
|
||||
for {
|
||||
switch {
|
||||
case aOK && bOK:
|
||||
switch {
|
||||
case av == bv: // Both have an identical value. move on!
|
||||
// Finish WIP insert and reset.
|
||||
if inter.num > 0 {
|
||||
inserts = append(inserts, inter)
|
||||
}
|
||||
inter.num = 0
|
||||
av, aOK = ai.Next()
|
||||
bv, bOK = bi.Next()
|
||||
inter.pos++
|
||||
case av < bv: // b misses a value that is in a.
|
||||
return inserts, false
|
||||
case av > bv: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++
|
||||
bv, bOK = bi.Next()
|
||||
}
|
||||
case aOK && !bOK: // b misses a value that is in a.
|
||||
return inserts, false
|
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++
|
||||
bv, bOK = bi.Next()
|
||||
default: // Both iterators ran out. We're done.
|
||||
if inter.num > 0 {
|
||||
inserts = append(inserts, inter)
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
return inserts, true
|
||||
}
|
||||
|
||||
// expandSpansBothWays is similar to expandSpansForward, but now b may also
|
||||
// cover an entirely different set of buckets. The function returns the
|
||||
// “forward” inserts to expand 'a' to also cover all the buckets exclusively
|
||||
// covered by 'b', and it returns the “backward” inserts to expand 'b' to also
|
||||
// cover all the buckets exclusively covered by 'a'.
|
||||
// expandSpansBothWays is similar to expandFloatSpansAndBuckets and
|
||||
// expandIntSpansAndBuckets, but now b may also cover an entirely different set
|
||||
// of buckets and counter resets are ignored. The function returns the “forward”
|
||||
// inserts to expand 'a' to also cover all the buckets exclusively covered by
|
||||
// 'b', and it returns the “backward” inserts to expand 'b' to also cover all
|
||||
// the buckets exclusively covered by 'a'.
|
||||
func expandSpansBothWays(a, b []histogram.Span) (forward, backward []Insert, mergedSpans []histogram.Span) {
|
||||
ai := newBucketIterator(a)
|
||||
bi := newBucketIterator(b)
|
||||
@ -488,14 +399,24 @@ func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV {
|
||||
ii int // The next insert to process.
|
||||
)
|
||||
for i, d := range in {
|
||||
if ii < len(inserts) && i == inserts[ii].pos {
|
||||
if ii >= len(inserts) || i != inserts[ii].pos {
|
||||
// No inserts at this position, the original delta is still valid.
|
||||
out[oi] = d
|
||||
oi++
|
||||
v += d
|
||||
continue
|
||||
}
|
||||
// Process inserts.
|
||||
firstInsert := true
|
||||
for ii < len(inserts) && i == inserts[ii].pos {
|
||||
// We have an insert!
|
||||
// Add insert.num new delta values such that their
|
||||
// bucket values equate 0. When deltas==false, it means
|
||||
// that it is an absolute value. So we set it to 0
|
||||
// directly.
|
||||
if deltas {
|
||||
if deltas && firstInsert {
|
||||
out[oi] = -v
|
||||
firstInsert = false // No need to go to 0 in further inserts.
|
||||
} else {
|
||||
out[oi] = 0
|
||||
}
|
||||
@ -505,32 +426,30 @@ func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV {
|
||||
oi++
|
||||
}
|
||||
ii++
|
||||
|
||||
// Now save the value from the input. The delta value we
|
||||
// should save is the original delta value + the last
|
||||
// value of the point before the insert (to undo the
|
||||
// delta that was introduced by the insert). When
|
||||
// deltas==false, it means that it is an absolute value,
|
||||
// so we set it directly to the value in the 'in' slice.
|
||||
if deltas {
|
||||
out[oi] = d + v
|
||||
} else {
|
||||
out[oi] = d
|
||||
}
|
||||
oi++
|
||||
v = d + v
|
||||
continue
|
||||
}
|
||||
// If there was no insert, the original delta is still valid.
|
||||
out[oi] = d
|
||||
// Now save the value from the input. The delta value we
|
||||
// should save is the original delta value + the last
|
||||
// value of the point before the insert (to undo the
|
||||
// delta that was introduced by the insert). When
|
||||
// deltas==false, it means that it is an absolute value,
|
||||
// so we set it directly to the value in the 'in' slice.
|
||||
if deltas {
|
||||
out[oi] = d + v
|
||||
} else {
|
||||
out[oi] = d
|
||||
}
|
||||
oi++
|
||||
v += d
|
||||
}
|
||||
switch ii {
|
||||
case len(inserts):
|
||||
// All inserts processed. Nothing more to do.
|
||||
case len(inserts) - 1:
|
||||
// One more insert to process at the end.
|
||||
// Insert empty buckets at the end.
|
||||
for ii < len(inserts) {
|
||||
if inserts[ii].pos < len(in) {
|
||||
panic("leftover inserts must be after the current buckets")
|
||||
}
|
||||
// Add insert.num new delta values such that their
|
||||
// bucket values equate 0. When deltas==false, it means
|
||||
// that it is an absolute value. So we set it to 0
|
||||
// directly.
|
||||
if deltas {
|
||||
out[oi] = -v
|
||||
} else {
|
||||
@ -541,8 +460,8 @@ func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV {
|
||||
out[oi] = 0
|
||||
oi++
|
||||
}
|
||||
default:
|
||||
panic("unprocessed inserts left")
|
||||
ii++
|
||||
v = 0
|
||||
}
|
||||
return out
|
||||
}
|
||||
@ -628,7 +547,7 @@ func adjustForInserts(spans []histogram.Span, inserts []Insert) (mergedSpans []h
|
||||
}
|
||||
}
|
||||
for i < len(inserts) {
|
||||
addBucket(inserts[i].bucketIdx)
|
||||
addBucket(insertIdx)
|
||||
consumeInsert()
|
||||
}
|
||||
return
|
||||
|
@ -78,7 +78,7 @@ func TestBucketIterator(t *testing.T) {
|
||||
},
|
||||
idxs: []int{100, 101, 102, 103, 112, 113, 114, 115, 116, 117, 118, 119},
|
||||
},
|
||||
// The below 2 sets ore the ones described in expandSpansForward's comments.
|
||||
// The below 2 sets ore the ones described in expandFloatSpansAndBuckets's comments.
|
||||
{
|
||||
spans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
@ -111,12 +111,13 @@ func TestBucketIterator(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompareSpansAndInsert(t *testing.T) {
|
||||
func TestExpandSpansBothWaysAndInsert(t *testing.T) {
|
||||
scenarios := []struct {
|
||||
description string
|
||||
spansA, spansB []histogram.Span
|
||||
fInserts, bInserts []Insert
|
||||
bucketsIn, bucketsOut []int64
|
||||
mergedSpans []histogram.Span
|
||||
}{
|
||||
{
|
||||
description: "single prepend at the beginning",
|
||||
@ -134,6 +135,9 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
},
|
||||
bucketsIn: []int64{6, -3, 0},
|
||||
bucketsOut: []int64{0, 6, -3, 0},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: -11, Length: 4},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "single append at the end",
|
||||
@ -151,6 +155,9 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
},
|
||||
bucketsIn: []int64{6, -3, 0},
|
||||
bucketsOut: []int64{6, -3, 0, -3},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: -10, Length: 4},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "double prepend at the beginning",
|
||||
@ -168,6 +175,9 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
},
|
||||
bucketsIn: []int64{6, -3, 0},
|
||||
bucketsOut: []int64{0, 0, 6, -3, 0},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: -12, Length: 5},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "double append at the end",
|
||||
@ -185,6 +195,9 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
},
|
||||
bucketsIn: []int64{6, -3, 0},
|
||||
bucketsOut: []int64{6, -3, 0, -3, 0},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: -10, Length: 5},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "double prepond at the beginning and double append at the end",
|
||||
@ -206,6 +219,9 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
},
|
||||
bucketsIn: []int64{6, -3, 0},
|
||||
bucketsOut: []int64{0, 0, 6, -3, 0, -3, 0},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: -12, Length: 7},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "single removal of bucket at the start",
|
||||
@ -218,6 +234,11 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
bInserts: []Insert{
|
||||
{pos: 0, num: 1},
|
||||
},
|
||||
bucketsIn: []int64{1, 2, -1, 2},
|
||||
bucketsOut: []int64{1, 2, -1, 2},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: -10, Length: 4},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "single removal of bucket in the middle",
|
||||
@ -231,6 +252,11 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
bInserts: []Insert{
|
||||
{pos: 2, num: 1},
|
||||
},
|
||||
bucketsIn: []int64{1, 2, -1, 2},
|
||||
bucketsOut: []int64{1, 2, -1, 2},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: -10, Length: 4},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "single removal of bucket at the end",
|
||||
@ -243,6 +269,11 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
bInserts: []Insert{
|
||||
{pos: 3, num: 1},
|
||||
},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: -10, Length: 4},
|
||||
},
|
||||
bucketsIn: []int64{1, 2, -1, 2},
|
||||
bucketsOut: []int64{1, 2, -1, 2},
|
||||
},
|
||||
{
|
||||
description: "as described in doc comment",
|
||||
@ -275,6 +306,12 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
},
|
||||
bucketsIn: []int64{6, -3, 0, -1, 2, 1, -4},
|
||||
bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 1, Length: 1},
|
||||
{Offset: 1, Length: 4},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "both forward and backward inserts, complex case",
|
||||
@ -324,27 +361,51 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
num: 1,
|
||||
},
|
||||
},
|
||||
bucketsIn: []int64{1, 2, -1, 2, 0, 3, 1},
|
||||
bucketsOut: []int64{1, 2, -3, 2, -2, 0, 4, 0, 3, -7, 8},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 1, Length: 1},
|
||||
{Offset: 1, Length: 4},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "inserts with gaps",
|
||||
spansA: []histogram.Span{
|
||||
{Offset: -19, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
spansB: []histogram.Span{
|
||||
{Offset: -19, Length: 1},
|
||||
{Offset: 4, Length: 1},
|
||||
{Offset: 3, Length: 1},
|
||||
},
|
||||
fInserts: []Insert{
|
||||
{pos: 4, num: 2},
|
||||
},
|
||||
bInserts: []Insert{
|
||||
{pos: 1, num: 3},
|
||||
},
|
||||
bucketsIn: []int64{1, 2, -1, 1},
|
||||
bucketsOut: []int64{1, 2, -1, 1, -3, 0},
|
||||
mergedSpans: []histogram.Span{
|
||||
{Offset: -19, Length: 2},
|
||||
{Offset: 1, Length: 3},
|
||||
{Offset: 3, Length: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, s := range scenarios {
|
||||
t.Run(s.description, func(t *testing.T) {
|
||||
if len(s.bInserts) > 0 {
|
||||
fInserts, bInserts, _ := expandSpansBothWays(s.spansA, s.spansB)
|
||||
require.Equal(t, s.fInserts, fInserts)
|
||||
require.Equal(t, s.bInserts, bInserts)
|
||||
}
|
||||
|
||||
inserts, valid := expandSpansForward(s.spansA, s.spansB)
|
||||
if len(s.bInserts) > 0 {
|
||||
require.False(t, valid, "compareScan unexpectedly returned true")
|
||||
return
|
||||
}
|
||||
require.True(t, valid, "compareScan unexpectedly returned false")
|
||||
require.Equal(t, s.fInserts, inserts)
|
||||
fInserts, bInserts, m := expandSpansBothWays(s.spansA, s.spansB)
|
||||
require.Equal(t, s.fInserts, fInserts)
|
||||
require.Equal(t, s.bInserts, bInserts)
|
||||
require.Equal(t, s.mergedSpans, m)
|
||||
|
||||
gotBuckets := make([]int64, len(s.bucketsOut))
|
||||
insert(s.bucketsIn, gotBuckets, inserts, true)
|
||||
insert(s.bucketsIn, gotBuckets, fInserts, true)
|
||||
require.Equal(t, s.bucketsOut, gotBuckets)
|
||||
|
||||
floatBucketsIn := make([]float64, len(s.bucketsIn))
|
||||
@ -362,7 +423,7 @@ func TestCompareSpansAndInsert(t *testing.T) {
|
||||
floatBucketsOut[i] = float64(last)
|
||||
}
|
||||
gotFloatBuckets := make([]float64, len(floatBucketsOut))
|
||||
insert(floatBucketsIn, gotFloatBuckets, inserts, false)
|
||||
insert(floatBucketsIn, gotFloatBuckets, fInserts, false)
|
||||
require.Equal(t, floatBucketsOut, gotFloatBuckets)
|
||||
})
|
||||
}
|
||||
@ -599,3 +660,259 @@ func TestSpansFromBidirectionalCompareSpans(t *testing.T) {
|
||||
require.Equal(t, c.exp, act)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandIntOrFloatSpansAndBuckets(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
spansA []histogram.Span
|
||||
bucketsA []int64
|
||||
spansB []histogram.Span
|
||||
bucketsB []int64
|
||||
|
||||
expectReset bool
|
||||
expectForwardInserts []Insert
|
||||
expectBackwardInserts []Insert
|
||||
expectMergedSpans []histogram.Span
|
||||
expectBucketsA []int64
|
||||
expectBucketsB []int64
|
||||
}{
|
||||
"empty": {
|
||||
spansA: []histogram.Span{},
|
||||
bucketsA: []int64{},
|
||||
spansB: []histogram.Span{},
|
||||
bucketsB: []int64{},
|
||||
expectReset: false,
|
||||
expectForwardInserts: nil,
|
||||
expectBackwardInserts: nil,
|
||||
expectMergedSpans: []histogram.Span{},
|
||||
expectBucketsA: []int64{},
|
||||
expectBucketsB: []int64{},
|
||||
},
|
||||
"single bucket reset to none": {
|
||||
spansA: []histogram.Span{{Offset: 1, Length: 1}},
|
||||
bucketsA: []int64{1},
|
||||
spansB: []histogram.Span{},
|
||||
bucketsB: []int64{},
|
||||
expectReset: true,
|
||||
},
|
||||
"single bucket reset to lower": {
|
||||
spansA: []histogram.Span{{Offset: 1, Length: 1}},
|
||||
bucketsA: []int64{2},
|
||||
spansB: []histogram.Span{{Offset: 1, Length: 1}},
|
||||
bucketsB: []int64{1},
|
||||
expectReset: true,
|
||||
},
|
||||
"single bucket increase": {
|
||||
spansA: []histogram.Span{{Offset: 1, Length: 1}},
|
||||
bucketsA: []int64{1},
|
||||
spansB: []histogram.Span{{Offset: 1, Length: 1}},
|
||||
bucketsB: []int64{2},
|
||||
expectReset: false,
|
||||
expectForwardInserts: nil,
|
||||
expectBackwardInserts: nil,
|
||||
expectMergedSpans: []histogram.Span{{Offset: 1, Length: 1}},
|
||||
expectBucketsA: []int64{1},
|
||||
expectBucketsB: []int64{2},
|
||||
},
|
||||
"distinct new buckets and increase": {
|
||||
// A: ___1_____
|
||||
// B: 22_22___2
|
||||
// B': 22_22___2
|
||||
spansA: []histogram.Span{{Offset: 1, Length: 1}},
|
||||
bucketsA: []int64{1},
|
||||
spansB: []histogram.Span{{Offset: -2, Length: 2}, {Offset: 1, Length: 2}, {Offset: 3, Length: 1}},
|
||||
bucketsB: []int64{2, 0, 0, 0, 0},
|
||||
expectReset: false,
|
||||
expectForwardInserts: []Insert{{pos: 0, num: 2, bucketIdx: -2}, {pos: 1, num: 1, bucketIdx: 2}, {pos: 1, num: 1, bucketIdx: 6}},
|
||||
expectBackwardInserts: nil,
|
||||
expectMergedSpans: []histogram.Span{{Offset: -2, Length: 2}, {Offset: 1, Length: 2}, {Offset: 3, Length: 1}},
|
||||
expectBucketsA: []int64{0, 0, 1, -1, 0},
|
||||
expectBucketsB: []int64{2, 0, 0, 0, 0},
|
||||
},
|
||||
"distinct new buckets but reset": {
|
||||
// A: ___2_____
|
||||
// B: 11_11___1
|
||||
spansA: []histogram.Span{{Offset: 1, Length: 1}},
|
||||
bucketsA: []int64{2},
|
||||
spansB: []histogram.Span{{Offset: -2, Length: 2}, {Offset: 1, Length: 2}, {Offset: 3, Length: 1}},
|
||||
bucketsB: []int64{1, 0, 0, 0, 0},
|
||||
expectReset: true,
|
||||
},
|
||||
"distinct new buckets but missing": {
|
||||
// A: ___2_____
|
||||
// B: 11__1___1
|
||||
spansA: []histogram.Span{{Offset: 1, Length: 1}},
|
||||
bucketsA: []int64{2},
|
||||
spansB: []histogram.Span{{Offset: -2, Length: 2}, {Offset: 2, Length: 1}, {Offset: 3, Length: 1}},
|
||||
bucketsB: []int64{1, 0, 0, 0},
|
||||
expectReset: true,
|
||||
},
|
||||
"distinct new buckets and missing an empty bucket": {
|
||||
// A: _0__
|
||||
// B: ___1
|
||||
// B': _0_1
|
||||
spansA: []histogram.Span{{Offset: 1, Length: 1}},
|
||||
bucketsA: []int64{0},
|
||||
spansB: []histogram.Span{{Offset: 3, Length: 1}},
|
||||
bucketsB: []int64{1},
|
||||
expectReset: false,
|
||||
expectForwardInserts: []Insert{{pos: 1, num: 1, bucketIdx: 3}},
|
||||
expectBackwardInserts: []Insert{{pos: 0, num: 1, bucketIdx: 1}},
|
||||
expectMergedSpans: []histogram.Span{{Offset: 1, Length: 1}, {Offset: 1, Length: 1}},
|
||||
expectBucketsA: []int64{0, 0},
|
||||
expectBucketsB: []int64{0, 1},
|
||||
},
|
||||
"distinct new buckets and missing multiple empty buckets": {
|
||||
// Idx: 01234567890123
|
||||
// A: _000_00__0__00
|
||||
// B; ________1_____
|
||||
// B': _000_00_10__00
|
||||
spansA: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 2, Length: 1}, {Offset: 2, Length: 2}},
|
||||
bucketsA: []int64{0, 0, 0, 0, 0, 0, 0, 0},
|
||||
spansB: []histogram.Span{{Offset: 8, Length: 1}},
|
||||
bucketsB: []int64{1},
|
||||
expectReset: false,
|
||||
expectForwardInserts: []Insert{{pos: 5, num: 1, bucketIdx: 8}},
|
||||
expectBackwardInserts: []Insert{{pos: 0, num: 3, bucketIdx: 1}, {pos: 0, num: 2, bucketIdx: 5}, {pos: 1, num: 1, bucketIdx: 9}, {pos: 1, num: 2, bucketIdx: 12}},
|
||||
expectMergedSpans: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 1, Length: 2}, {Offset: 2, Length: 2}},
|
||||
expectBucketsA: []int64{0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
expectBucketsB: []int64{0, 0, 0, 0, 0, 1, -1, 0, 0},
|
||||
},
|
||||
"overlap new buckets and missing multiple empty buckets": {
|
||||
// Idx: 01234567890123
|
||||
// A: _000_00_10__00
|
||||
// B; ________2_____
|
||||
// B': _000_00_20__00
|
||||
spansA: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 1, Length: 2}, {Offset: 2, Length: 2}},
|
||||
bucketsA: []int64{0, 0, 0, 0, 0, 1, -1, 0, 0},
|
||||
spansB: []histogram.Span{{Offset: 8, Length: 1}},
|
||||
bucketsB: []int64{2},
|
||||
expectReset: false,
|
||||
expectForwardInserts: nil,
|
||||
expectBackwardInserts: []Insert{{pos: 0, num: 3, bucketIdx: 1}, {pos: 0, num: 2, bucketIdx: 5}, {pos: 1, num: 1, bucketIdx: 9}, {pos: 1, num: 2, bucketIdx: 12}},
|
||||
expectMergedSpans: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 1, Length: 2}, {Offset: 2, Length: 2}},
|
||||
expectBucketsA: []int64{0, 0, 0, 0, 0, 1, -1, 0, 0},
|
||||
expectBucketsB: []int64{0, 0, 0, 0, 0, 2, -2, 0, 0},
|
||||
},
|
||||
"overlap new buckets and missing multiple empty buckets with 0 length/offset spans": {
|
||||
// Idx: 01234567890123
|
||||
// A: _000_00_10__00
|
||||
// B; ________2_____
|
||||
// B': _000_00_20__00
|
||||
spansA: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 1, Length: 2}, {Offset: 1, Length: 0}, {Offset: 1, Length: 2}},
|
||||
bucketsA: []int64{0, 0, 0, 0, 0, 1, -1, 0, 0},
|
||||
spansB: []histogram.Span{{Offset: 1, Length: 0}, {Offset: 7, Length: 1}},
|
||||
bucketsB: []int64{2},
|
||||
expectReset: false,
|
||||
expectForwardInserts: nil,
|
||||
expectBackwardInserts: []Insert{{pos: 0, num: 3, bucketIdx: 1}, {pos: 0, num: 2, bucketIdx: 5}, {pos: 1, num: 1, bucketIdx: 9}, {pos: 1, num: 2, bucketIdx: 12}},
|
||||
expectMergedSpans: []histogram.Span{{Offset: 1, Length: 3}, {Offset: 1, Length: 2}, {Offset: 1, Length: 2}, {Offset: 2, Length: 2}},
|
||||
expectBucketsA: []int64{0, 0, 0, 0, 0, 1, -1, 0, 0},
|
||||
expectBucketsB: []int64{0, 0, 0, 0, 0, 2, -2, 0, 0},
|
||||
},
|
||||
"new empty buckets between filled buckets": {
|
||||
// A: 11212332____1__1
|
||||
// B: 122323321__11__1
|
||||
// A': 112123320__01__1
|
||||
// B': 122323321__11__1
|
||||
spansA: []histogram.Span{{Offset: -51, Length: 8}, {Offset: 11, Length: 1}, {Offset: 14, Length: 1}},
|
||||
bucketsA: []int64{1, 0, 1, -1, 1, 1, 0, -1, -1, 0},
|
||||
spansB: []histogram.Span{{Offset: -51, Length: 9}, {Offset: 9, Length: 2}, {Offset: 14, Length: 1}},
|
||||
bucketsB: []int64{1, 1, 0, 1, -1, 1, 0, -1, -1, 0, 0, 0},
|
||||
expectReset: false,
|
||||
expectForwardInserts: []Insert{{pos: 8, num: 1, bucketIdx: -43}, {pos: 8, num: 1, bucketIdx: -33}},
|
||||
expectMergedSpans: []histogram.Span{{Offset: -51, Length: 9}, {Offset: 9, Length: 2}, {Offset: 14, Length: 1}},
|
||||
expectBucketsA: []int64{1, 0, 1, -1, 1, 1, 0, -1, -2, 0, 1, 0},
|
||||
// 1 0 1 -1 1 1 0 -1 -2 -2 1 0
|
||||
|
||||
expectBucketsB: []int64{1, 1, 0, 1, -1, 1, 0, -1, -1, 0, 0, 0},
|
||||
},
|
||||
"real example 1": {
|
||||
// I- 6543210987654321
|
||||
// A: 0__2_______0__
|
||||
// B: _0130_____00_0
|
||||
// A': 00020_____00_0
|
||||
// B': 00130_____00_0
|
||||
spansA: []histogram.Span{{Offset: -16, Length: 1}, {Offset: 2, Length: 1}, {Offset: 7, Length: 1}},
|
||||
bucketsA: []int64{0, 2, -2},
|
||||
spansB: []histogram.Span{{Offset: -15, Length: 4}, {Offset: 5, Length: 2}, {Offset: 1, Length: 1}},
|
||||
bucketsB: []int64{0, 1, 2, -3, 0, 0, 0},
|
||||
expectReset: false,
|
||||
expectForwardInserts: []Insert{{pos: 1, num: 2, bucketIdx: -15}, {pos: 2, num: 1, bucketIdx: -12}, {pos: 2, num: 1, bucketIdx: -6}, {pos: 3, num: 1, bucketIdx: -3}},
|
||||
expectBackwardInserts: []Insert{{pos: 0, num: 1, bucketIdx: -16}},
|
||||
expectMergedSpans: []histogram.Span{{Offset: -16, Length: 5}, {Offset: 5, Length: 2}, {Offset: 1, Length: 1}},
|
||||
expectBucketsA: []int64{0, 0, 0, 2, -2, 0, 0, 0},
|
||||
expectBucketsB: []int64{0, 0, 1, 2, -3, 0, 0, 0},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
// Sanity check.
|
||||
require.Len(t, tc.bucketsA, countSpans(tc.spansA))
|
||||
require.Len(t, tc.bucketsB, countSpans(tc.spansB))
|
||||
require.Len(t, tc.expectBucketsA, countSpans(tc.expectMergedSpans))
|
||||
require.Len(t, tc.expectBucketsB, countSpans(tc.expectMergedSpans))
|
||||
|
||||
t.Run("integers", func(t *testing.T) {
|
||||
fInserts, bInserts, ok := expandIntSpansAndBuckets(tc.spansA, tc.spansB, tc.bucketsA, tc.bucketsB)
|
||||
if tc.expectReset {
|
||||
require.False(t, ok)
|
||||
return
|
||||
}
|
||||
require.Equal(t, tc.expectForwardInserts, fInserts, "forward inserts")
|
||||
require.Equal(t, tc.expectBackwardInserts, bInserts, "backward inserts")
|
||||
|
||||
gotBspans := adjustForInserts(tc.spansB, bInserts)
|
||||
require.Equal(t, tc.expectMergedSpans, gotBspans)
|
||||
|
||||
gotAbuckets := make([]int64, len(tc.expectBucketsA))
|
||||
insert(tc.bucketsA, gotAbuckets, fInserts, true)
|
||||
require.Equal(t, tc.expectBucketsA, gotAbuckets)
|
||||
|
||||
gotBbuckets := make([]int64, len(tc.expectBucketsB))
|
||||
insert(tc.bucketsB, gotBbuckets, bInserts, true)
|
||||
require.Equal(t, tc.expectBucketsB, gotBbuckets)
|
||||
})
|
||||
|
||||
t.Run("floats", func(t *testing.T) {
|
||||
aXorValues := make([]xorValue, len(tc.bucketsA))
|
||||
absolute := float64(0)
|
||||
for i, v := range tc.bucketsA {
|
||||
absolute += float64(v)
|
||||
aXorValues[i].value = absolute
|
||||
}
|
||||
|
||||
makeFloatBuckets := func(in []int64) []float64 {
|
||||
out := make([]float64, len(in))
|
||||
absolute = float64(0)
|
||||
for i, v := range in {
|
||||
absolute += float64(v)
|
||||
out[i] = absolute
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
bFloatBuckets := makeFloatBuckets(tc.bucketsB)
|
||||
|
||||
fInserts, bInserts, ok := expandFloatSpansAndBuckets(tc.spansA, tc.spansB, aXorValues, bFloatBuckets)
|
||||
if tc.expectReset {
|
||||
require.False(t, ok)
|
||||
return
|
||||
}
|
||||
require.Equal(t, tc.expectForwardInserts, fInserts, "forward inserts")
|
||||
require.Equal(t, tc.expectBackwardInserts, bInserts, "backward inserts")
|
||||
|
||||
gotBspans := adjustForInserts(tc.spansB, bInserts)
|
||||
require.Equal(t, tc.expectMergedSpans, gotBspans)
|
||||
|
||||
gotAbuckets := make([]float64, len(tc.expectBucketsA))
|
||||
insert(makeFloatBuckets(tc.bucketsA), gotAbuckets, fInserts, false)
|
||||
require.Equal(t, makeFloatBuckets(tc.expectBucketsA), gotAbuckets)
|
||||
|
||||
gotBbuckets := make([]float64, len(tc.expectBucketsB))
|
||||
insert(makeFloatBuckets(tc.bucketsB), gotBbuckets, bInserts, false)
|
||||
require.Equal(t, makeFloatBuckets(tc.expectBucketsB), gotBbuckets)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -206,7 +206,7 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
|
||||
require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1))
|
||||
}
|
||||
|
||||
// Mimics the scenario described for expandSpansForward.
|
||||
// Mimics the scenario described for expandIntSpansAndBuckets.
|
||||
func TestHistogramChunkBucketChanges(t *testing.T) {
|
||||
c := Chunk(NewHistogramChunk())
|
||||
|
||||
@ -1776,3 +1776,45 @@ func assertFirstIntHistogramSampleHint(t *testing.T, chunk Chunk, expected histo
|
||||
_, v := it.AtHistogram(nil)
|
||||
require.Equal(t, expected, v.CounterResetHint)
|
||||
}
|
||||
|
||||
func TestIntHistogramEmptyBucketsWithGaps(t *testing.T) {
|
||||
h1 := &histogram.Histogram{
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: -19, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []int64{0, 0, 0, 0},
|
||||
}
|
||||
require.NoError(t, h1.Validate())
|
||||
|
||||
c := NewHistogramChunk()
|
||||
app, err := c.Appender()
|
||||
require.NoError(t, err)
|
||||
_, _, _, err = app.AppendHistogram(nil, 1, h1, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
h2 := &histogram.Histogram{
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: -19, Length: 1},
|
||||
{Offset: 4, Length: 1},
|
||||
{Offset: 3, Length: 1},
|
||||
},
|
||||
PositiveBuckets: []int64{0, 0, 0},
|
||||
}
|
||||
require.NoError(t, h2.Validate())
|
||||
|
||||
newC, recoded, _, err := app.AppendHistogram(nil, 2, h2, false)
|
||||
require.NoError(t, err)
|
||||
require.True(t, recoded)
|
||||
require.NotNil(t, newC)
|
||||
|
||||
it := newC.Iterator(nil)
|
||||
require.Equal(t, ValHistogram, it.Next())
|
||||
_, h := it.AtFloatHistogram(nil)
|
||||
require.NoError(t, h.Validate())
|
||||
require.Equal(t, ValHistogram, it.Next())
|
||||
_, h = it.AtFloatHistogram(nil)
|
||||
require.NoError(t, h.Validate())
|
||||
require.Equal(t, ValNone, it.Next())
|
||||
require.NoError(t, it.Err())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user