diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 106b8e51bc..bf0e41b66b 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -781,6 +781,8 @@ func (*Encoder) MmapMarkers(markers []RefMmapMarker, b []byte) []byte { return buf.Get() } +// HistogramSamples encode exponential histograms while returning all the excluded custom bucket histograms. +// Callers can encode the returned custom bucket histograms via CustomBucketsHistogramSamples. func (*Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]byte, []RefHistogramSample) { buf := encoding.Encbuf{B: b} buf.PutByte(byte(HistogramSamples)) @@ -815,6 +817,7 @@ func (*Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) ([]b return buf.Get(), customBucketHistograms } +// CustomBucketsHistogramSamples encodes given histograms as custom bucket histograms. func (*Encoder) CustomBucketsHistogramSamples(histograms []RefHistogramSample, b []byte) []byte { buf := encoding.Encbuf{B: b} buf.PutByte(byte(CustomBucketsHistogramSamples)) @@ -880,6 +883,8 @@ func EncodeHistogram(buf *encoding.Encbuf, h *histogram.Histogram) { } } +// FloatHistogramSamples encode exponential float histograms while returning all the excluded custom bucket float histograms. +// Callers can encode the returned custom bucket float histograms via CustomBucketsFloatHistogramSamples. func (*Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) ([]byte, []RefFloatHistogramSample) { buf := encoding.Encbuf{B: b} buf.PutByte(byte(FloatHistogramSamples)) @@ -915,6 +920,7 @@ func (*Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b [] return buf.Get(), customBucketsFloatHistograms } +// CustomBucketsFloatHistogramSamples encodes given float histograms as custom bucket float histograms. func (*Encoder) CustomBucketsFloatHistogramSamples(histograms []RefFloatHistogramSample, b []byte) []byte { buf := encoding.Encbuf{B: b} buf.PutByte(byte(CustomBucketsFloatHistogramSamples)) diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index b9a6504298..51185f2215 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -29,9 +29,12 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/util/compression" + "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testwal" ) var ( @@ -41,6 +44,7 @@ var ( ) // retry executes f() n times at each interval until it returns true. +// TODO(bwplotka): Replace with require.Eventually. func retry(t *testing.T, interval time.Duration, n int, f func() bool) { t.Helper() ticker := time.NewTicker(interval) @@ -62,51 +66,93 @@ func overwriteReadTimeout(t *testing.T, val time.Duration) { } type writeToMock struct { - samplesAppended int - exemplarsAppended int - histogramsAppended int - floatHistogramsAppended int - seriesLock sync.Mutex - seriesSegmentIndexes map[chunks.HeadSeriesRef]int + mu sync.Mutex + + seriesStored []record.RefSeries + metadataStored []record.RefMetadata + samplesAppended []record.RefSample + exemplarsAppended []record.RefExemplar + histogramsAppended []record.RefHistogramSample + floatHistogramsAppended []record.RefFloatHistogramSample + + seriesStores int + metadataStores int + sampleAppends int + exemplarAppends int + histogramAppends int + floatHistogramsAppends int + + seriesSegmentIndexes map[chunks.HeadSeriesRef]int // If nonzero, delay reads with a short sleep. delay time.Duration } func (wtm *writeToMock) Append(s []record.RefSample) bool { + wtm.mu.Lock() + defer wtm.mu.Unlock() + + wtm.sampleAppends++ + wtm.samplesAppended = append(wtm.samplesAppended, s...) time.Sleep(wtm.delay) - wtm.samplesAppended += len(s) return true } func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { + wtm.mu.Lock() + defer wtm.mu.Unlock() + time.Sleep(wtm.delay) - wtm.exemplarsAppended += len(e) + wtm.exemplarAppends++ + wtm.exemplarsAppended = append(wtm.exemplarsAppended, e...) return true } func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool { + wtm.mu.Lock() + defer wtm.mu.Unlock() + time.Sleep(wtm.delay) - wtm.histogramsAppended += len(h) + wtm.histogramAppends++ + wtm.histogramsAppended = append(wtm.histogramsAppended, h...) return true } func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool { + wtm.mu.Lock() + defer wtm.mu.Unlock() + time.Sleep(wtm.delay) - wtm.floatHistogramsAppended += len(fh) + wtm.floatHistogramsAppends++ + wtm.floatHistogramsAppended = append(wtm.floatHistogramsAppended, fh...) return true } func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { + wtm.mu.Lock() + defer wtm.mu.Unlock() + + wtm.seriesStores++ + wtm.seriesStored = append(wtm.seriesStored, series...) + for _, s := range series { + wtm.seriesSegmentIndexes[s.Ref] = index + } time.Sleep(wtm.delay) - wtm.UpdateSeriesSegment(series, index) } -func (*writeToMock) StoreMetadata([]record.RefMetadata) { /* no-op */ } +func (wtm *writeToMock) StoreMetadata(meta []record.RefMetadata) { + wtm.mu.Lock() + defer wtm.mu.Unlock() + + wtm.metadataStores++ + wtm.metadataStored = append(wtm.metadataStored, meta...) + time.Sleep(wtm.delay) +} func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int) { - wtm.seriesLock.Lock() - defer wtm.seriesLock.Unlock() + wtm.mu.Lock() + defer wtm.mu.Unlock() + for _, s := range series { wtm.seriesSegmentIndexes[s.Ref] = index } @@ -115,8 +161,9 @@ func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int func (wtm *writeToMock) SeriesReset(index int) { // Check for series that are in segments older than the checkpoint // that were not also present in the checkpoint. - wtm.seriesLock.Lock() - defer wtm.seriesLock.Unlock() + wtm.mu.Lock() + defer wtm.mu.Unlock() + for k, v := range wtm.seriesSegmentIndexes { if v < index { delete(wtm.seriesSegmentIndexes, k) @@ -125,8 +172,9 @@ func (wtm *writeToMock) SeriesReset(index int) { } func (wtm *writeToMock) checkNumSeries() int { - wtm.seriesLock.Lock() - defer wtm.seriesLock.Unlock() + wtm.mu.Lock() + defer wtm.mu.Unlock() + return len(wtm.seriesSegmentIndexes) } @@ -137,151 +185,149 @@ func newWriteToMock(delay time.Duration) *writeToMock { } } -func TestTailSamples(t *testing.T) { - pageSize := 32 * 1024 - const seriesCount = 10 - const samplesCount = 250 - const exemplarsCount = 25 - const histogramsCount = 50 +func TestWatcher_Tail(t *testing.T) { + const ( + pageSize = 32 * 1024 + batches = 3 + seriesPerBatch = 100 + exemplarsPerSeries = 2 + ) + for _, compress := range compression.Types() { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { - now := time.Now() + var ( + now = time.Now() + dir = t.TempDir() + wdir = path.Join(dir, "wal") + enc record.Encoder + ) + require.NoError(t, os.Mkdir(wdir, 0o777)) - dir := t.TempDir() + // Generate test records that represents batches of records data. + // "batch" simulates a single scrape or RW/OTLP receive message. + // Watcher does not inspect the data other than watching start timestamp, so records + // does not need any certain shape. + records := make([]testwal.Records, batches) + cbHistogramRecords := make([]testwal.Records, batches) + for i := range records { + tsFn := func(_, _ int) int64 { + return timestamp.FromTime(now.Add(1 * time.Second)) + } + records[i] = testwal.GenerateRecords(testwal.RecordsCase{ + RefPadding: i * seriesPerBatch, + TsFn: tsFn, - wdir := path.Join(dir, "wal") - err := os.Mkdir(wdir, 0o777) - require.NoError(t, err) + Series: seriesPerBatch, + SamplesPerSeries: 10, + HistogramsPerSeries: 5, + FloatHistogramsPerSeries: 5, + ExemplarsPerSeries: exemplarsPerSeries, + }) + cbHistogramRecords[i] = testwal.GenerateRecords(testwal.RecordsCase{ + RefPadding: i * seriesPerBatch, + TsFn: tsFn, - enc := record.Encoder{} + Series: seriesPerBatch, + HistogramsPerSeries: 5, + FloatHistogramsPerSeries: 5, + HistogramFn: func(ref int) *histogram.Histogram { + return &histogram.Histogram{ + Schema: -53, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 2, + Sum: 0, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + CustomValues: []float64{float64(ref) + 2}, + } + }, + }) + } + + // Create WAL for writing. w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) require.NoError(t, err) - defer func() { + t.Cleanup(func() { require.NoError(t, w.Close()) - }() - - // Write to the initial segment then checkpoint. - for i := range seriesCount { - ref := i + 100 - series := enc.Series([]record.RefSeries{ - { - Ref: chunks.HeadSeriesRef(ref), - Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), - }, - }, nil) - require.NoError(t, w.Log(series)) - - for range samplesCount { - inner := rand.Intn(ref + 1) - sample := enc.Samples([]record.RefSample{ - { - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - V: float64(i), - }, - }, nil) - require.NoError(t, w.Log(sample)) - } - - for range exemplarsCount { - inner := rand.Intn(ref + 1) - exemplar := enc.Exemplars([]record.RefExemplar{ - { - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - V: float64(i), - Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", inner)), - }, - }, nil) - require.NoError(t, w.Log(exemplar)) - } - - for range histogramsCount { - inner := rand.Intn(ref + 1) - hist := &histogram.Histogram{ - Schema: 2, - ZeroThreshold: 1e-128, - ZeroCount: 0, - Count: 2, - Sum: 0, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, - PositiveBuckets: []int64{int64(i) + 1}, - NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, - NegativeBuckets: []int64{int64(-i) - 1}, - } - - histograms, _ := enc.HistogramSamples([]record.RefHistogramSample{{ - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - H: hist, - }}, nil) - require.NoError(t, w.Log(histograms)) - - customBucketHist := &histogram.Histogram{ - Schema: -53, - ZeroThreshold: 1e-128, - ZeroCount: 0, - Count: 2, - Sum: 0, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, - CustomValues: []float64{float64(i) + 2}, - } - - customBucketHistograms := enc.CustomBucketsHistogramSamples([]record.RefHistogramSample{{ - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - H: customBucketHist, - }}, nil) - require.NoError(t, w.Log(customBucketHistograms)) - - floatHistograms, _ := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{ - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - FH: hist.ToFloat(nil), - }}, nil) - require.NoError(t, w.Log(floatHistograms)) - - customBucketFloatHistograms := enc.CustomBucketsFloatHistogramSamples([]record.RefFloatHistogramSample{{ - Ref: chunks.HeadSeriesRef(inner), - T: now.UnixNano() + 1, - FH: customBucketHist.ToFloat(nil), - }}, nil) - require.NoError(t, w.Log(customBucketFloatHistograms)) - } - } - - // Start read after checkpoint, no more data written. - first, last, err := Segments(w.Dir()) - require.NoError(t, err) - - wt := newWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true, true) - watcher.SetStartTime(now) - - // Set the Watcher's metrics so they're not nil pointers. - watcher.SetMetrics() - for i := first; i <= last; i++ { - segment, err := OpenReadSegment(SegmentName(watcher.walDir, i)) - require.NoError(t, err) - - reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment) - // Use tail true so we can ensure we got the right number of samples. - watcher.readSegment(reader, i, true) - require.NoError(t, segment.Close()) - } - - expectedSeries := seriesCount - expectedSamples := seriesCount * samplesCount - expectedExemplars := seriesCount * exemplarsCount - expectedHistograms := seriesCount * histogramsCount * 2 - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() >= expectedSeries }) - require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series") - require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples") - require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars") - require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms") - require.Equal(t, expectedHistograms, wt.floatHistogramsAppended, "did not receive the expected number of float histograms") + + // Start watcher to that reads into a mock. + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true) + // Update the time because we just created samples around "now" time and watcher + // only starts watching after that time. + watcher.SetStartTime(now) + // Start spins up watcher loop in a go-routine. + watcher.Start() + t.Cleanup(watcher.Stop) + + // Write to WAL like append commit would do, while watcher is tailing. + + // Write first a few samples before the start time, we don't expect those to be appended. + require.NoError(t, w.Log(enc.Samples([]record.RefSample{ + {Ref: 1, T: timestamp.FromTime(now), V: 123}, + {Ref: 2, T: timestamp.FromTime(now), V: 123.1}, + }, nil))) + + for i := range records { + // Similar order as tsdb/head_appender.go.headAppenderBase.log + // https://github.com/prometheus/prometheus/blob/1751685dd4f6430757ba3078a96cffeffcb2bb47/tsdb/head_append.go#L1053 + require.NoError(t, w.Log(enc.Series(records[i].Series, nil))) + require.NoError(t, w.Log(enc.Metadata(records[i].Metadata, nil))) + require.NoError(t, w.Log(enc.Samples(records[i].Samples, nil))) + + hs, cbHs := enc.HistogramSamples(records[i].Histograms, nil) + require.Empty(t, cbHs) + require.NoError(t, w.Log(hs)) + fhs, cbFhs := enc.FloatHistogramSamples(records[i].FloatHistograms, nil) + require.Empty(t, cbFhs) + require.NoError(t, w.Log(fhs)) + require.NoError(t, w.Log(enc.CustomBucketsHistogramSamples(cbHistogramRecords[i].Histograms, nil))) + require.NoError(t, w.Log(enc.CustomBucketsFloatHistogramSamples(cbHistogramRecords[i].FloatHistograms, nil))) + + require.NoError(t, w.Log(enc.Exemplars(records[i].Exemplars, nil))) + + // Ping watcher for faster test. Watcher is checking for segment changes or 15s timeout. + watcher.Notify() + } + + // Wait for watcher to lead all. + require.Eventually(t, func() bool { + wt.mu.Lock() + defer wt.mu.Unlock() + + // Exemplars are logged as the last one, so assert on those. + return wt.exemplarAppends >= batches + }, 2*time.Minute, 1*time.Second) + + wt.mu.Lock() + defer wt.mu.Unlock() + + require.Equal(t, batches, wt.seriesStores) + require.Equal(t, batches, wt.metadataStores) + require.Equal(t, batches, wt.sampleAppends) + require.Equal(t, 2*batches, wt.histogramAppends) + require.Equal(t, 2*batches, wt.floatHistogramsAppends) + require.Equal(t, batches, wt.exemplarAppends) + + for i := range batches { + sector := len(records[i].Series) + testutil.RequireEqual(t, records[i].Series, wt.seriesStored[i*sector:(i+1)*sector], i) + sector = len(records[i].Metadata) + require.Equal(t, records[i].Metadata, wt.metadataStored[i*sector:(i+1)*sector], i) + sector = len(records[i].Samples) + require.Equal(t, records[i].Samples, wt.samplesAppended[i*sector:(i+1)*sector], i) + + sector = len(records[i].Histograms) + len(cbHistogramRecords[i].Histograms) + require.Equal(t, records[i].Histograms, wt.histogramsAppended[i*sector:i*sector+len(records[i].Histograms)], i) + require.Equal(t, cbHistogramRecords[i].Histograms, wt.histogramsAppended[i*sector+len(records[i].Histograms):(i+1)*sector]) + sector = len(records[i].FloatHistograms) + len(cbHistogramRecords[i].FloatHistograms) + require.Equal(t, records[i].FloatHistograms, wt.floatHistogramsAppended[i*sector:i*sector+len(records[i].FloatHistograms)]) + require.Equal(t, cbHistogramRecords[i].FloatHistograms, wt.floatHistogramsAppended[i*sector+len(records[i].FloatHistograms):(i+1)*sector]) + + sector = len(records[i].Exemplars) + testutil.RequireEqual(t, records[i].Exemplars, wt.exemplarsAppended[i*sector:(i+1)*sector]) + } }) } } @@ -830,7 +876,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { // All series and samples were read. require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read. - require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended) + require.Len(t, wt.samplesAppended, segmentsToRead*seriesCount*samplesCount) require.NoError(t, w.Close()) }) } diff --git a/util/testwal/records.go b/util/testwal/records.go index 9a75efb235..5f85e42c3c 100644 --- a/util/testwal/records.go +++ b/util/testwal/records.go @@ -25,6 +25,8 @@ import ( ) // RecordsCase represents record generation option in a form of a test case. +// +// Generated Series will have refs that monotonic and deterministic, in range of [RefPadding, RefPadding+Series). type RecordsCase struct { Name string @@ -36,8 +38,16 @@ type RecordsCase struct { ExtraLabels []labels.Label - LabelsFn func(lb *labels.ScratchBuilder, i int) labels.Labels - TsFn func(i, j int) int64 + // RefPadding represents a padding to add to Series refs. + RefPadding int + // LabelsFn allows injecting custom labels, by default it's a test_metric_%d with ExtraLabels. + LabelsFn func(lb *labels.ScratchBuilder, ref int) labels.Labels + // TsFn allows injecting custom sample timestamps. j represents the sample index within the series. + // By default, it injects j. + TsFn func(ref, j int) int64 + // HistogramFn source histogram for histogram and float histogram records. + // By default, newTestHist is used (exponential bucketing) + HistogramFn func(ref int) *histogram.Histogram } // Records represents batches of generated WAL records. @@ -90,47 +100,51 @@ func GenerateRecords(c RecordsCase) (ret Records) { if c.TsFn == nil { c.TsFn = func(_, j int) int64 { return int64(j) } } + if c.HistogramFn == nil { + c.HistogramFn = newTestHist + } lb := labels.NewScratchBuilder(1 + len(c.ExtraLabels)) for i := range ret.Series { + ref := c.RefPadding + i ret.Series[i] = record.RefSeries{ - Ref: chunks.HeadSeriesRef(i), - Labels: c.LabelsFn(&lb, i), + Ref: chunks.HeadSeriesRef(ref), + Labels: c.LabelsFn(&lb, ref), } ret.Metadata[i] = record.RefMetadata{ - Ref: chunks.HeadSeriesRef(i), + Ref: chunks.HeadSeriesRef(ref), Type: uint8(record.Counter), Unit: "unit text", - Help: fmt.Sprintf("help text for %d", i), + Help: fmt.Sprintf("help text for %d", ref), } for j := range c.SamplesPerSeries { ret.Samples[i*c.SamplesPerSeries+j] = record.RefSample{ - Ref: chunks.HeadSeriesRef(i), - T: c.TsFn(i, j), - V: float64(i), + Ref: chunks.HeadSeriesRef(ref), + T: c.TsFn(ref, j), + V: float64(ref), } } - h := newTestHist(i) + h := c.HistogramFn(ref) for j := range c.HistogramsPerSeries { ret.Histograms[i*c.HistogramsPerSeries+j] = record.RefHistogramSample{ - Ref: chunks.HeadSeriesRef(i), - T: c.TsFn(i, j), + Ref: chunks.HeadSeriesRef(ref), + T: c.TsFn(ref, j), H: h, } } for j := range c.FloatHistogramsPerSeries { ret.FloatHistograms[i*c.FloatHistogramsPerSeries+j] = record.RefFloatHistogramSample{ - Ref: chunks.HeadSeriesRef(i), - T: c.TsFn(i, j), + Ref: chunks.HeadSeriesRef(ref), + T: c.TsFn(ref, j), FH: h.ToFloat(nil), } } for j := range c.ExemplarsPerSeries { ret.Exemplars[i*c.ExemplarsPerSeries+j] = record.RefExemplar{ - Ref: chunks.HeadSeriesRef(i), - T: c.TsFn(i, j), - V: float64(i), - Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)), + Ref: chunks.HeadSeriesRef(ref), + T: c.TsFn(ref, j), + V: float64(ref), + Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", ref)), } } }