From 51ce1cc7ff71bd738dc25fa88a8799e5fe5b91de Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Wed, 28 Mar 2018 18:33:41 +0100 Subject: [PATCH] db: Fixed validateBlockSequence. Signed-off-by: Bartek Plotka --- db.go | 78 +++++++++++++++++++++++++++++++++++++++++++++--------- db_test.go | 72 +++++++++++++++++++++++++------------------------ 2 files changed, 102 insertions(+), 48 deletions(-) diff --git a/db.go b/db.go index 6326014795..975d7717e3 100644 --- a/db.go +++ b/db.go @@ -555,8 +555,9 @@ func (db *DB) reload(deleteable ...string) (err error) { return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } +// ValidateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. func validateBlockSequence(bs []*Block) error { - if len(bs) == 0 { + if len(bs) <= 1 { return nil } @@ -565,31 +566,82 @@ func validateBlockSequence(bs []*Block) error { metas = append(metas, b.meta) } - overlappedBlocks := ValidateBlockSequence(metas) - if len(overlappedBlocks) == 0 { + overlaps := OverlappingBlocks(metas) + if len(overlaps) == 0 { return nil } - return errors.Errorf("block time ranges overlap (%v)", overlappedBlocks) + return errors.Errorf("block time ranges overlap (%v)", overlaps) } -// ValidateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. -func ValidateBlockSequence(bm []BlockMeta) [][]BlockMeta { - if len(bm) == 0 { +// OverlappingBlocks returns all overlapping blocks from given meta files. +func OverlappingBlocks(bm []BlockMeta) (overlaps [][]BlockMeta) { + if len(bm) <= 1 { return nil } sort.Slice(bm, func(i, j int) bool { return bm[i].MinTime < bm[j].MinTime }) - prev := bm[0] - for _, b := range bm[1:] { - if b.MinTime < prev.MaxTime { - return [][]BlockMeta{{b, prev}} + for i, b := range bm[1:] { + prev := bm[i] + if b.MinTime >= prev.MaxTime { + continue } - //prev = b + // prev overlaps with b. + + overlap := []BlockMeta{prev} + + // Check if prev overlaps with something else. + for j, fb := range bm[i+1:] { + if fb.MinTime >= prev.MaxTime { + break + } + + if fb.MinTime >= bm[i+j].MaxTime { + // fb overlaps with prev, but fb does not overlap with previous block. Pack in separate group. + overlaps = append(overlaps, overlap) + overlap = []BlockMeta{prev} + } + + overlap = append(overlap, fb) + } + overlaps = append(overlaps, overlap) } - return nil + + if len(overlaps) < 2 { + return overlaps + } + + // Deduplicate cases like {a, b, c} {b, c} into just {a, b, c} + newOverlaps := [][]BlockMeta{overlaps[0]} + for i, overlap := range overlaps[1:] { + prev := overlaps[i] + + // Check if prev contains all overlap elements. + found := false + for _, o := range overlap { + found = false + for _, p := range prev { + if p.ULID.Compare(o.ULID) == 0 { + found = true + break + } + } + if !found { + break + } + } + + if found { + // We can ignore this overlap. + continue + } + + newOverlaps = append(newOverlaps, overlap) + } + + return newOverlaps } func (db *DB) String() string { diff --git a/db_test.go b/db_test.go index 3eb783c616..7768a41302 100644 --- a/db_test.go +++ b/db_test.go @@ -21,6 +21,7 @@ import ( "sort" "testing" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" @@ -893,46 +894,47 @@ func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) { return result, ss.Err() } -func TestValidateBlockSequenceDetectsAllOverlaps(t *testing.T) { - var metas []BlockMeta - - // Create 10 blocks that does not overlap (0-10, 10-20, ..., 90-100) - for i := 0; i < 10; i++ { - metas = append(metas, BlockMeta{MinTime: int64(i * 10), MaxTime: int64((i + 1) * 10)}) +func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { + u := uint64(0) + newULID := func() ulid.ULID { + u++ + return ulid.MustNew(u, nil) } - overlappedBlocks := ValidateBlockSequence(metas) - testutil.Assert(t, len(overlappedBlocks) == 0, "we found unexpected overlaps") + // Create 10 blocks that does not overlap (0-10, 10-20, ..., 100-110) but in reverse order to ensure our algorithm + // will handle that. + var metas = make([]BlockMeta, 11) + for i := 10; i >= 0; i-- { + metas[i] = BlockMeta{MinTime: int64(i * 10), MaxTime: int64((i + 1) * 10), ULID: newULID()} + } - // Add overlaping blocks. + testutil.Assert(t, len(OverlappingBlocks(metas)) == 0, "we found unexpected overlaps") + + // Add overlapping blocks. // o1 overlaps with 10-20. - o1 := BlockMeta{MinTime: 15, MaxTime: 17} + o1 := BlockMeta{MinTime: 15, MaxTime: 17, ULID: newULID()} + testutil.Equals(t, [][]BlockMeta{{metas[1], o1}}, OverlappingBlocks(append(metas, o1))) - overlappedBlocks = ValidateBlockSequence(append(metas, o1)) - expectedOverlaps := [][]BlockMeta{ - {metas[1], o1}, - } - testutil.Equals(t, expectedOverlaps, overlappedBlocks) + // o2 overlaps with 20-30 and 30-40. + o2 := BlockMeta{MinTime: 21, MaxTime: 31, ULID: newULID()} + testutil.Equals(t, [][]BlockMeta{{metas[2], o2}, {o2, metas[3]}}, OverlappingBlocks(append(metas, o2))) - //// o2 overlaps with 20-30 and 30-40. - //o2 := BlockMeta{MinTime: 21, MaxTime: 31} - // - //// o3a and o3b overlaps with 30-40 and each other. - //o3a := BlockMeta{MinTime: 33, MaxTime: 39} - //o3b := BlockMeta{MinTime: 34, MaxTime: 36} - // - //// o4 is 1:1 overlap with 50-60 - //o4 := BlockMeta{MinTime: 50, MaxTime: 60} - // - //// o5 overlaps with 50-60, 60-70 and 70,80 - //o5 := BlockMeta{MinTime: 60, MaxTime: 80} - // + // o3a and o3b overlaps with 30-40 and each other. + o3a := BlockMeta{MinTime: 33, MaxTime: 39, ULID: newULID()} + o3b := BlockMeta{MinTime: 34, MaxTime: 36, ULID: newULID()} + testutil.Equals(t, [][]BlockMeta{{metas[3], o3a, o3b}}, OverlappingBlocks(append(metas, o3a, o3b))) - //expectedOverlaps := [][]block.Meta{ - // {metas[1], o1}, - // {metas[2], o2}, - // {metas[3], o2}, - // {metas[3], o3, o3b}, - //} -} \ No newline at end of file + // o4 is 1:1 overlap with 50-60. + o4 := BlockMeta{MinTime: 50, MaxTime: 60, ULID: newULID()} + testutil.Equals(t, [][]BlockMeta{{metas[5], o4}}, OverlappingBlocks(append(metas, o4))) + + // o5 overlaps with 60-70, 70-80 and 80-90. + o5 := BlockMeta{MinTime: 61, MaxTime: 85, ULID: newULID()} + testutil.Equals(t, [][]BlockMeta{{metas[6], o5}, {o5, metas[7]}, {o5, metas[8]}}, OverlappingBlocks(append(metas, o5))) + + // o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a. + o6a := BlockMeta{MinTime: 92, MaxTime: 105, ULID: newULID()} + o6b := BlockMeta{MinTime: 94, MaxTime: 99, ULID: newULID()} + testutil.Equals(t, [][]BlockMeta{{metas[9], o6a, o6b}, {o6a, metas[10]}}, OverlappingBlocks(append(metas, o6a, o6b))) +}