diff --git a/tsdb/head_append_v2_test.go b/tsdb/head_append_v2_test.go index b98ee5c10d..abf0bd94d3 100644 --- a/tsdb/head_append_v2_test.go +++ b/tsdb/head_append_v2_test.go @@ -608,10 +608,10 @@ func TestHeadAppenderV2_Delete_e2e(t *testing.T) { sexp := expSs.At() sres := ss.At() require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), newSample) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), newSample) require.Equal(t, errExp, errRes) - require.Equal(t, smplExp, smplRes) + requireEqualSamples(t, sexp.Labels().String(), smplExp, smplRes) } require.NoError(t, ss.Err()) require.Empty(t, ss.Warnings()) @@ -1333,22 +1333,32 @@ func TestDataMissingOnQueryDuringCompaction_AppenderV2(t *testing.T) { q, err := db.Querier(mint, maxt) require.NoError(t, err) - var wg sync.WaitGroup - wg.Go(func() { - // Compacting head while the querier spans the compaction time. - require.NoError(t, db.Compact(ctx)) - require.NotEmpty(t, db.Blocks()) - }) + truncationStarted := make(chan struct{}) + db.head.memTruncationCallBack = func() { + close(truncationStarted) + } - // Give enough time for compaction to finish. - // We expect it to be blocked until querier is closed. - <-time.After(3 * time.Second) + compactDone := make(chan error, 1) + go func() { + // Compacting head while the querier spans the compaction time. + compactDone <- db.Compact(ctx) + }() + + select { + case <-truncationStarted: + case err := <-compactDone: + require.NoError(t, err) + require.FailNow(t, "compaction finished before head truncation started") + case <-time.After(10 * time.Second): + require.FailNow(t, "timed out waiting for head truncation to start") + } // Querying the querier that was got before compaction. series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.Equal(t, map[string][]chunks.Sample{`{a="b"}`: expSamples}, series) - wg.Wait() + require.NoError(t, <-compactDone) + require.NotEmpty(t, db.Blocks()) } func TestIsQuerierCollidingWithTruncation_AppenderV2(t *testing.T) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index d4279185fe..aa2c39ffee 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2158,10 +2158,10 @@ func TestDelete_e2e(t *testing.T) { sexp := expSs.At() sres := ss.At() require.Equal(t, sexp.Labels(), sres.Labels()) - smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil) - smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil) + smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), newSample) + smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), newSample) require.Equal(t, errExp, errRes) - require.Equal(t, smplExp, smplRes) + requireEqualSamples(t, sexp.Labels().String(), smplExp, smplRes) } require.NoError(t, ss.Err()) require.Empty(t, ss.Warnings())