diff --git a/promql/promqltest/testdata/native_histograms.test b/promql/promqltest/testdata/native_histograms.test index f0c510bd50..0aac43e0ad 100644 --- a/promql/promqltest/testdata/native_histograms.test +++ b/promql/promqltest/testdata/native_histograms.test @@ -1677,3 +1677,18 @@ eval instant at 1m histogram_count(histogram unless histogram_quantile(0.5, hist eval instant at 1m histogram_quantile(0.5, histogram unless histogram_count(histogram) == 0) {} 3.1748021039363987 +clear + +# Regression test for: +# https://github.com/prometheus/prometheus/issues/14172 +# https://github.com/prometheus/prometheus/issues/15177 +load 1m + mixed_metric1 1 2 3 {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:8 count:6 buckets:[1 4 1]}} 4 5 {{schema:0 sum:18 count:10 buckets:[3 4 3]}} + mixed_metric2 1 2 3 {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:8 count:6 buckets:[1 4 1]}} + +# The order of the float vs native histograms is preserved. +eval range from 0 to 8m step 1m mixed_metric1 + mixed_metric1{} 1 2 3 {{count:4 sum:5 buckets:[1 2 1]}} {{count:6 sum:8 buckets:[1 4 1]}} 4 5 {{schema:0 sum:18 count:10 buckets:[3 4 3]}} {{schema:0 sum:18 count:10 buckets:[3 4 3]}} + +eval range from 0 to 5m step 1m mixed_metric2 + mixed_metric2 1 2 3 {{count:4 sum:5 buckets:[1 2 1]}} {{count:6 sum:8 buckets:[1 4 1]}} {{count:6 sum:8 buckets:[1 4 1]}} diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 5224b2800f..8e649982fc 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -300,19 +300,87 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { }() app := db.Appender(context.Background()) - _, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0) + _, err := app.Append(0, labels.FromStrings("type", "float"), 0, 0) + require.NoError(t, err) + + _, err = app.AppendHistogram( + 0, labels.FromStrings("type", "histogram"), 0, + &histogram.Histogram{Count: 42, Sum: math.NaN()}, nil, + ) + require.NoError(t, err) + + _, err = app.AppendHistogram( + 0, labels.FromStrings("type", "floathistogram"), 0, + nil, &histogram.FloatHistogram{Count: 42, Sum: math.NaN()}, + ) require.NoError(t, err) err = app.Rollback() require.NoError(t, err) - querier, err := db.Querier(0, 1) + for _, typ := range []string{"float", "histogram", "floathistogram"} { + querier, err := db.Querier(0, 1) + require.NoError(t, err) + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "type", typ)) + require.Equal(t, map[string][]chunks.Sample{}, seriesSet) + } + + sr, err := wlog.NewSegmentsReader(db.head.wal.Dir()) require.NoError(t, err) - defer querier.Close() + defer func() { + require.NoError(t, sr.Close()) + }() - seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + // Read records from WAL and check for expected count of series and samples. + var ( + r = wlog.NewReader(sr) + dec = record.NewDecoder(labels.NewSymbolTable()) - require.Equal(t, map[string][]chunks.Sample{}, seriesSet) + walSeriesCount, walSamplesCount, walHistogramCount, walFloatHistogramCount, walExemplarsCount int + ) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + var series []record.RefSeries + series, err = dec.Series(rec, series) + require.NoError(t, err) + walSeriesCount += len(series) + + case record.Samples: + var samples []record.RefSample + samples, err = dec.Samples(rec, samples) + require.NoError(t, err) + walSamplesCount += len(samples) + + case record.Exemplars: + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + + case record.HistogramSamples, record.CustomBucketsHistogramSamples: + var histograms []record.RefHistogramSample + histograms, err = dec.HistogramSamples(rec, histograms) + require.NoError(t, err) + walHistogramCount += len(histograms) + + case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: + var floatHistograms []record.RefFloatHistogramSample + floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) + require.NoError(t, err) + walFloatHistogramCount += len(floatHistograms) + + default: + } + } + + // Check that only series get stored after calling Rollback. + require.Equal(t, 3, walSeriesCount, "series should have been written to WAL") + require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL") + require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL") + require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL") + require.Equal(t, 0, walFloatHistogramCount, "float histograms should not have been written to WAL") } func TestDBAppenderAddRef(t *testing.T) { @@ -4856,10 +4924,7 @@ func TestMetadataAssertInMemoryData(t *testing.T) { } // TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the -// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This -// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed. -// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a -// histogram in a single write request. +// same series when there are multiple encodings. With issue #15177 fixed, this now all works as expected. func TestMultipleEncodingsCommitOrder(t *testing.T) { opts := DefaultOptions() opts.OutOfOrderCapMax = 30 @@ -4933,26 +4998,19 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { s := addSample(app, int64(i), chunkenc.ValFloat) expSamples = append(expSamples, s) } - // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the - // same batch. for i := 110; i < 120; i++ { s := addSample(app, int64(i), chunkenc.ValHistogram) expSamples = append(expSamples, s) } - // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the - // same batch. for i := 120; i < 130; i++ { s := addSample(app, int64(i), chunkenc.ValFloatHistogram) expSamples = append(expSamples, s) } - // These samples will be marked as in-order as their timestamps are greater than the max timestamp for float - // samples in the same batch. for i := 140; i < 150; i++ { s := addSample(app, int64(i), chunkenc.ValFloatHistogram) expSamples = append(expSamples, s) } - // These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150 - // because float samples are processed first and these samples are in-order wrt to the float samples in the batch. + // These samples will be marked as out-of-order. for i := 130; i < 135; i++ { s := addSample(app, int64(i), chunkenc.ValFloat) expSamples = append(expSamples, s) @@ -4964,8 +5022,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { return expSamples[i].T() < expSamples[j].T() }) - // oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO. - verifySamples(100, 150, expSamples, 20) + // oooCount = 5 for the samples 130 to 134. + verifySamples(100, 150, expSamples, 5) // Append and commit some in-order histograms by themselves. app = db.Appender(context.Background()) @@ -4975,8 +5033,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { } require.NoError(t, app.Commit()) - // oooCount remains at 20 as no new OOO samples have been added. - verifySamples(100, 160, expSamples, 20) + // oooCount remains at 5. + verifySamples(100, 160, expSamples, 5) // Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples // with newer timestamps have already been committed. @@ -5004,8 +5062,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { return expSamples[i].T() < expSamples[j].T() }) - // oooCount = 50 as we've added 30 more OOO samples. - verifySamples(50, 160, expSamples, 50) + // oooCount = 35 as we've added 30 more OOO samples. + verifySamples(50, 160, expSamples, 35) } // TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start diff --git a/tsdb/head.go b/tsdb/head.go index 957cad52e4..45f425cea9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -86,7 +86,8 @@ type Head struct { exemplarMetrics *ExemplarMetrics exemplars ExemplarStorage logger *slog.Logger - appendPool zeropool.Pool[[]record.RefSample] + refSeriesPool zeropool.Pool[[]record.RefSeries] + floatsPool zeropool.Pool[[]record.RefSample] exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef] histogramsPool zeropool.Pool[[]record.RefHistogramSample] floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] diff --git a/tsdb/head_append.go b/tsdb/head_append.go index a5d6e9127a..80357fdf13 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -164,13 +164,6 @@ func (h *Head) Appender(context.Context) storage.Appender { func (h *Head) appender() *headAppender { minValidTime := h.appendableMinValidTime() appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback. - - // Allocate the exemplars buffer only if exemplars are enabled. - var exemplarsBuf []exemplarWithSeriesRef - if h.opts.EnableExemplarStorage { - exemplarsBuf = h.getExemplarBuffer() - } - return &headAppender{ head: h, minValidTime: minValidTime, @@ -178,12 +171,9 @@ func (h *Head) appender() *headAppender { maxt: math.MinInt64, headMaxt: h.MaxTime(), oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(), - samples: h.getAppendBuffer(), - sampleSeries: h.getSeriesBuffer(), - exemplars: exemplarsBuf, - histograms: h.getHistogramBuffer(), - floatHistograms: h.getFloatHistogramBuffer(), - metadata: h.getMetadataBuffer(), + seriesRefs: h.getRefSeriesBuffer(), + series: h.getSeriesBuffer(), + typesInBatch: map[chunks.HeadSeriesRef]sampleType{}, appendID: appendID, cleanupAppendIDsBelow: cleanupAppendIDsBelow, } @@ -213,16 +203,28 @@ func (h *Head) AppendableMinValidTime() (int64, bool) { return h.appendableMinValidTime(), true } -func (h *Head) getAppendBuffer() []record.RefSample { - b := h.appendPool.Get() +func (h *Head) getRefSeriesBuffer() []record.RefSeries { + b := h.refSeriesPool.Get() + if b == nil { + return make([]record.RefSeries, 0, 512) + } + return b +} + +func (h *Head) putRefSeriesBuffer(b []record.RefSeries) { + h.refSeriesPool.Put(b[:0]) +} + +func (h *Head) getFloatBuffer() []record.RefSample { + b := h.floatsPool.Get() if b == nil { return make([]record.RefSample, 0, 512) } return b } -func (h *Head) putAppendBuffer(b []record.RefSample) { - h.appendPool.Put(b[:0]) +func (h *Head) putFloatBuffer(b []record.RefSample) { + h.floatsPool.Put(b[:0]) } func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef { @@ -312,17 +314,30 @@ type exemplarWithSeriesRef struct { exemplar exemplar.Exemplar } -type headAppender struct { - head *Head - minValidTime int64 // No samples below this timestamp are allowed. - mint, maxt int64 - headMaxt int64 // We track it here to not take the lock for every sample appended. - oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample. +// sampleType describes sample types we need to distinguish for append batching. +// We need separate types for everything that goes into a different WAL record +// type or into a different chunk encoding. +type sampleType byte - seriesRefs []record.RefSeries // New series records held by this appender. - series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs) - samples []record.RefSample // New float samples held by this appender. - sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). +const ( + stNone sampleType = iota // To mark that the sample type does not matter. + stFloat // All simple floats (counters, gauges, untyped). Goes to `floats`. + stHistogram // Native integer histograms with a standard exponential schema. Goes to `histograms`. + stCustomBucketHistogram // Native integer histograms with custom bucket boundaries. Goes to `histograms`. + stFloatHistogram // Native float histograms. Goes to `floatHistograms`. + stCustomBucketFloatHistogram // Native float histograms with custom bucket boundaries. Goes to `floatHistograms`. +) + +// appendBatch is used to partition all the appended data into batches that are +// "type clean", i.e. every series receives only samples of one type within the +// batch. Types in this regard are defined by the sampleType enum above. +// TODO(beorn7): The same concept could be extended to make sure every series in +// the batch has at most one metadata record. This is currently not implemented +// because it is unclear if it is needed at all. (Maybe we will remove metadata +// records altogether, see issue #15911.) +type appendBatch struct { + floats []record.RefSample // New float samples held by this appender. + floatSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). histograms []record.RefHistogramSample // New histogram samples held by this appender. histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender. @@ -330,6 +345,42 @@ type headAppender struct { metadata []record.RefMetadata // New metadata held by this appender. metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. exemplars []exemplarWithSeriesRef // New exemplars held by this appender. +} + +// close returns all the slices to the pools in Head and nil's them. +func (b *appendBatch) close(h *Head) { + h.putFloatBuffer(b.floats) + b.floats = nil + h.putSeriesBuffer(b.floatSeries) + b.floatSeries = nil + h.putHistogramBuffer(b.histograms) + b.histograms = nil + h.putSeriesBuffer(b.histogramSeries) + b.histogramSeries = nil + h.putFloatHistogramBuffer(b.floatHistograms) + b.floatHistograms = nil + h.putSeriesBuffer(b.floatHistogramSeries) + b.floatHistogramSeries = nil + h.putMetadataBuffer(b.metadata) + b.metadata = nil + h.putSeriesBuffer(b.metadataSeries) + b.metadataSeries = nil + h.putExemplarBuffer(b.exemplars) + b.exemplars = nil +} + +type headAppender struct { + head *Head + minValidTime int64 // No samples below this timestamp are allowed. + mint, maxt int64 + headMaxt int64 // We track it here to not take the lock for every sample appended. + oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample. + + seriesRefs []record.RefSeries // New series records held by this appender. + series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs) + batches []*appendBatch // Holds all the other data to append. (In regular cases, there should be only one of these.) + + typesInBatch map[chunks.HeadSeriesRef]sampleType // Which (one) sample type each series holds in the most recent batch. appendID, cleanupAppendIDsBelow uint64 closed bool @@ -357,21 +408,27 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 } } - s.Lock() if value.IsStaleNaN(v) { - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we could do this conversion - // in commit. This code should move into Commit(). - switch { - case s.lastHistogramValue != nil: - s.Unlock() + // If we have added a sample before with this same appender, we + // can check the previously used type and turn a stale float + // sample into a stale histogram sample or stale float histogram + // sample as appropriate. This prevents an unnecessary creation + // of a new batch. However, since other appenders might append + // to the same series concurrently, this is not perfect but just + // an optimization for the more likely case. + switch a.typesInBatch[s.ref] { + case stHistogram, stCustomBucketHistogram: return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil) - case s.lastFloatHistogramValue != nil: - s.Unlock() + case stFloatHistogram, stCustomBucketFloatHistogram: return a.AppendHistogram(ref, lset, t, nil, &histogram.FloatHistogram{Sum: v}) } + // Note that a series reference not yet in the map will come out + // as stNone, but since we do not handle that case separately, + // we do not need to check for the difference between "unknown + // series" and "known series with stNone". } + s.Lock() defer s.Unlock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. @@ -403,12 +460,13 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 a.maxt = t } - a.samples = append(a.samples, record.RefSample{ + b := a.getCurrentBatch(stFloat, s.ref) + b.floats = append(b.floats, record.RefSample{ Ref: s.ref, T: t, V: v, }) - a.sampleSeries = append(a.sampleSeries, s) + b.floatSeries = append(b.floatSeries, s) return storage.SeriesRef(s.ref), nil } @@ -448,8 +506,9 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab if ct > a.maxt { a.maxt = ct } - a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0}) - a.sampleSeries = append(a.sampleSeries, s) + b := a.getCurrentBatch(stFloat, s.ref) + b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: ct, V: 0.0}) + b.floatSeries = append(b.floatSeries, s) return storage.SeriesRef(s.ref), nil } @@ -476,6 +535,65 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo return s, created, nil } +// getCurrentBatch returns the current batch if it fits the provided sampleType +// for the provided series. Otherwise, it adds a new batch and returns it. +func (a *headAppender) getCurrentBatch(st sampleType, s chunks.HeadSeriesRef) *appendBatch { + h := a.head + + newBatch := func() *appendBatch { + b := appendBatch{ + floats: h.getFloatBuffer(), + floatSeries: h.getSeriesBuffer(), + histograms: h.getHistogramBuffer(), + histogramSeries: h.getSeriesBuffer(), + floatHistograms: h.getFloatHistogramBuffer(), + floatHistogramSeries: h.getSeriesBuffer(), + metadata: h.getMetadataBuffer(), + metadataSeries: h.getSeriesBuffer(), + } + + // Allocate the exemplars buffer only if exemplars are enabled. + if h.opts.EnableExemplarStorage { + b.exemplars = h.getExemplarBuffer() + } + clear(a.typesInBatch) + if st != stNone { + a.typesInBatch[s] = st + } + a.batches = append(a.batches, &b) + return &b + } + + // First batch ever. Create it. + if len(a.batches) == 0 { + return newBatch() + } + + // TODO(beorn7): If we ever see that the a.typesInBatch map grows so + // large that it matters for total memory consumption, we could limit + // the batch size here, i.e. cut a new batch even without a type change. + // Something like: + // if len(a.typesInBatch > limit) { + // return newBatch() + // } + + lastBatch := a.batches[len(a.batches)-1] + if st == stNone { + // Type doesn't matter, last batch will always do. + return lastBatch + } + prevST, ok := a.typesInBatch[s] + switch { + case !ok: // New series. Add it to map and return current batch. + a.typesInBatch[s] = st + return lastBatch + case prevST == st: // Old series, same type. Just return batch. + return lastBatch + } + // An old series got a new type. Start new batch. + return newBatch() +} + // appendable checks whether the given sample is valid for appending to the series. // If the sample is valid and in-order, it returns false with no error. // If the sample belongs to the out-of-order chunk, it returns true with no error. @@ -638,7 +756,8 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, return 0, err } - a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e}) + b := a.getCurrentBatch(stNone, chunks.HeadSeriesRef(ref)) + b.exemplars = append(b.exemplars, exemplarWithSeriesRef{ref, e}) return storage.SeriesRef(s.ref), nil } @@ -667,11 +786,10 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } } - var created bool s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, created, err = a.getOrCreate(lset) + s, _, err = a.getOrCreate(lset) if err != nil { return 0, err } @@ -680,14 +798,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels switch { case h != nil: s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastHistogramValue = &histogram.Histogram{} - } - // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -707,22 +817,19 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } return 0, err } - a.histograms = append(a.histograms, record.RefHistogramSample{ + st := stHistogram + if h.UsesCustomBuckets() { + st = stCustomBucketHistogram + } + b := a.getCurrentBatch(st, s.ref) + b.histograms = append(b.histograms, record.RefHistogramSample{ Ref: s.ref, T: t, H: h, }) - a.histogramSeries = append(a.histogramSeries, s) + b.histogramSeries = append(b.histogramSeries, s) case fh != nil: s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastFloatHistogramValue = &histogram.FloatHistogram{} - } - // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -742,12 +849,17 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } return 0, err } - a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ + st := stFloatHistogram + if fh.UsesCustomBuckets() { + st = stCustomBucketFloatHistogram + } + b := a.getCurrentBatch(st, s.ref) + b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{ Ref: s.ref, T: t, FH: fh, }) - a.floatHistogramSeries = append(a.floatHistogramSeries, s) + b.floatHistogramSeries = append(b.floatHistogramSeries, s) } if t < a.mint { @@ -769,11 +881,10 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l return 0, storage.ErrCTNewerThanSample } - var created bool s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, created, err = a.getOrCreate(lset) + s, _, err = a.getOrCreate(lset) if err != nil { return 0, err } @@ -784,16 +895,12 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l zeroHistogram := &histogram.Histogram{ // The CTZeroSample represents a counter reset by definition. CounterResetHint: histogram.CounterReset, + // Replicate other fields to avoid needless chunk creation. + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + CustomValues: h.CustomValues, } s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastHistogramValue = zeroHistogram - } - // For CTZeroSamples OOO is not allowed. // We set it to true to make this implementation as close as possible to the float implementation. isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -815,26 +922,27 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l s.pendingCommit = true s.Unlock() - a.histograms = append(a.histograms, record.RefHistogramSample{ + st := stHistogram + if h.UsesCustomBuckets() { + st = stCustomBucketHistogram + } + b := a.getCurrentBatch(st, s.ref) + b.histograms = append(b.histograms, record.RefHistogramSample{ Ref: s.ref, T: ct, H: zeroHistogram, }) - a.histogramSeries = append(a.histogramSeries, s) + b.histogramSeries = append(b.histogramSeries, s) case fh != nil: zeroFloatHistogram := &histogram.FloatHistogram{ // The CTZeroSample represents a counter reset by definition. CounterResetHint: histogram.CounterReset, + // Replicate other fields to avoid needless chunk creation. + Schema: fh.Schema, + ZeroThreshold: fh.ZeroThreshold, + CustomValues: fh.CustomValues, } s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastFloatHistogramValue = zeroFloatHistogram - } - // We set it to true to make this implementation as close as possible to the float implementation. isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) // OOO is not allowed for CTZeroSamples. if err != nil { @@ -855,12 +963,17 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l s.pendingCommit = true s.Unlock() - a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ + st := stFloatHistogram + if fh.UsesCustomBuckets() { + st = stCustomBucketFloatHistogram + } + b := a.getCurrentBatch(st, s.ref) + b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{ Ref: s.ref, T: ct, FH: zeroFloatHistogram, }) - a.floatHistogramSeries = append(a.floatHistogramSeries, s) + b.floatHistogramSeries = append(b.floatHistogramSeries, s) } if ct > a.maxt { @@ -889,13 +1002,14 @@ func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, s.Unlock() if hasNewMetadata { - a.metadata = append(a.metadata, record.RefMetadata{ + b := a.getCurrentBatch(stNone, s.ref) + b.metadata = append(b.metadata, record.RefMetadata{ Ref: s.ref, Type: record.GetMetricType(meta.Type), Unit: meta.Unit, Help: meta.Help, }) - a.metadataSeries = append(a.metadataSeries, s) + b.metadataSeries = append(b.metadataSeries, s) } return ref, nil @@ -932,66 +1046,68 @@ func (a *headAppender) log() error { return fmt.Errorf("log series: %w", err) } } - if len(a.metadata) > 0 { - rec = enc.Metadata(a.metadata, buf) - buf = rec[:0] + for _, b := range a.batches { + if len(b.metadata) > 0 { + rec = enc.Metadata(b.metadata, buf) + buf = rec[:0] - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log metadata: %w", err) - } - } - if len(a.samples) > 0 { - rec = enc.Samples(a.samples, buf) - buf = rec[:0] - - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log samples: %w", err) - } - } - if len(a.histograms) > 0 { - var customBucketsHistograms []record.RefHistogramSample - rec, customBucketsHistograms = enc.HistogramSamples(a.histograms, buf) - buf = rec[:0] - if len(rec) > 0 { if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log histograms: %w", err) + return fmt.Errorf("log metadata: %w", err) } } + if len(b.floats) > 0 { + rec = enc.Samples(b.floats, buf) + buf = rec[:0] - if len(customBucketsHistograms) > 0 { - rec = enc.CustomBucketsHistogramSamples(customBucketsHistograms, buf) if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log custom buckets histograms: %w", err) + return fmt.Errorf("log samples: %w", err) } } - } - if len(a.floatHistograms) > 0 { - var customBucketsFloatHistograms []record.RefFloatHistogramSample - rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(a.floatHistograms, buf) - buf = rec[:0] - if len(rec) > 0 { - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log float histograms: %w", err) + if len(b.histograms) > 0 { + var customBucketsHistograms []record.RefHistogramSample + rec, customBucketsHistograms = enc.HistogramSamples(b.histograms, buf) + buf = rec[:0] + if len(rec) > 0 { + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log histograms: %w", err) + } + } + + if len(customBucketsHistograms) > 0 { + rec = enc.CustomBucketsHistogramSamples(customBucketsHistograms, buf) + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log custom buckets histograms: %w", err) + } } } + if len(b.floatHistograms) > 0 { + var customBucketsFloatHistograms []record.RefFloatHistogramSample + rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(b.floatHistograms, buf) + buf = rec[:0] + if len(rec) > 0 { + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log float histograms: %w", err) + } + } - if len(customBucketsFloatHistograms) > 0 { - rec = enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, buf) - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log custom buckets float histograms: %w", err) + if len(customBucketsFloatHistograms) > 0 { + rec = enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, buf) + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log custom buckets float histograms: %w", err) + } } } - } - // Exemplars should be logged after samples (float/native histogram/etc), - // otherwise it might happen that we send the exemplars in a remote write - // batch before the samples, which in turn means the exemplar is rejected - // for missing series, since series are created due to samples. - if len(a.exemplars) > 0 { - rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf) - buf = rec[:0] + // Exemplars should be logged after samples (float/native histogram/etc), + // otherwise it might happen that we send the exemplars in a remote write + // batch before the samples, which in turn means the exemplar is rejected + // for missing series, since series are created due to samples. + if len(b.exemplars) > 0 { + rec = enc.Exemplars(exemplarsForEncoding(b.exemplars), buf) + buf = rec[:0] - if err := a.head.wal.Log(rec); err != nil { - return fmt.Errorf("log exemplars: %w", err) + if err := a.head.wal.Log(rec); err != nil { + return fmt.Errorf("log exemplars: %w", err) + } } } return nil @@ -1040,10 +1156,10 @@ type appenderCommitContext struct { enc record.Encoder } -// commitExemplars adds all exemplars from headAppender to the head's exemplar storage. -func (a *headAppender) commitExemplars() { +// commitExemplars adds all exemplars from the provided batch to the head's exemplar storage. +func (a *headAppender) commitExemplars(b *appendBatch) { // No errors logging to WAL, so pass the exemplars along to the in memory storage. - for _, e := range a.exemplars { + for _, e := range b.exemplars { s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref)) if s == nil { // This is very unlikely to happen, but we have seen it in the wild. @@ -1147,9 +1263,9 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld } } -// commitSamples processes and commits the samples in the headAppender to the series. -// It handles both in-order and out-of-order samples, updating the appenderCommitContext -// with the results of the append operations. +// commitFloats processes and commits the samples in the provided batch to the +// series. It handles both in-order and out-of-order samples, updating the +// appenderCommitContext with the results of the append operations. // // The function iterates over the samples in the headAppender and attempts to append each sample // to its corresponding series. It handles various error cases such as out-of-order samples, @@ -1166,14 +1282,68 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld // operations on the series after appending the samples. // // There are also specific functions to commit histograms and float histograms. -func (a *headAppender) commitSamples(acc *appenderCommitContext) { +func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext) { var ok, chunkCreated bool var series *memSeries - for i, s := range a.samples { - series = a.sampleSeries[i] + for i, s := range b.floats { + series = b.floatSeries[i] series.Lock() + if value.IsStaleNaN(s.V) { + // If a float staleness marker had been appended for a + // series that got a histogram or float histogram + // appended before via this same appender, it would not + // show up here because we had already converted it. We + // end up here for two reasons: (1) This is the very + // first sample for this series appended via this + // appender. (2) A float sample was appended to this + // series before via this same appender. + // + // In either case, we need to check the previous sample + // in the memSeries to append the appropriately typed + // staleness marker. This is obviously so in case (1). + // In case (2), we would usually expect a float sample + // as the previous sample, but there might be concurrent + // appends that have added a histogram sample in the + // meantime. (This will probably lead to OOO shenanigans + // anyway, but that's a different story.) + // + // If the last sample in the memSeries is indeed a + // float, we don't have to do anything special here and + // just go on with the normal commit for a float sample. + // However, if the last sample in the memSeries is a + // histogram or float histogram, we have to convert the + // staleness marker to a histogram (or float histogram, + // respectively), and just add it at the end of the + // histograms (or float histograms) in the same batch, + // to be committed later in commitHistograms (or + // commitFloatHistograms). The latter is fine because we + // know there is no other histogram (or float histogram) + // sample for this same series in this same batch + // (because any such sample would have triggered a new + // batch). + switch { + case series.lastHistogramValue != nil: + b.histograms = append(b.histograms, record.RefHistogramSample{ + Ref: series.ref, + T: s.T, + H: &histogram.Histogram{Sum: s.V}, + }) + b.histogramSeries = append(b.histogramSeries, series) + series.Unlock() + continue + case series.lastFloatHistogramValue != nil: + b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{ + Ref: series.ref, + T: s.T, + FH: &histogram.FloatHistogram{Sum: s.V}, + }) + b.floatHistogramSeries = append(b.floatHistogramSeries, series) + series.Unlock() + continue + } + } oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err != nil { handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected) @@ -1261,15 +1431,24 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { } } -// For details on the commitHistograms function, see the commitSamples docs. -func (a *headAppender) commitHistograms(acc *appenderCommitContext) { +// For details on the commitHistograms function, see the commitFloats docs. +func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitContext) { var ok, chunkCreated bool var series *memSeries - for i, s := range a.histograms { - series = a.histogramSeries[i] + for i, s := range b.histograms { + series = b.histogramSeries[i] series.Lock() + // At this point, we could encounter a histogram staleness + // marker that should better be a float staleness marker or a + // float histogram staleness marker. This can only happen with + // concurrent appenders appending to the same series _and_ doing + // so in a mixed-type scenario. This case is expected to be very + // rare, so we do not bother here to convert the staleness + // marker. The worst case is that we need to cut a new chunk + // just for the staleness marker. + oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err != nil { handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) @@ -1361,15 +1540,24 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) { } } -// For details on the commitFloatHistograms function, see the commitSamples docs. -func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { +// For details on the commitFloatHistograms function, see the commitFloats docs. +func (a *headAppender) commitFloatHistograms(b *appendBatch, acc *appenderCommitContext) { var ok, chunkCreated bool var series *memSeries - for i, s := range a.floatHistograms { - series = a.floatHistogramSeries[i] + for i, s := range b.floatHistograms { + series = b.floatHistogramSeries[i] series.Lock() + // At this point, we could encounter a float histogram staleness + // marker that should better be a float staleness marker or an + // integer histogram staleness marker. This can only happen with + // concurrent appenders appending to the same series _and_ doing + // so in a mixed-type scenario. This case is expected to be very + // rare, so we do not bother here to convert the staleness + // marker. The worst case is that we need to cut a new chunk + // just for the staleness marker. + oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err != nil { handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) @@ -1461,14 +1649,14 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { } } -// commitMetadata commits the metadata for each series in the headAppender. +// commitMetadata commits the metadata for each series in the provided batch. // It iterates over the metadata slice and updates the corresponding series // with the new metadata information. The series is locked during the update // to ensure thread safety. -func (a *headAppender) commitMetadata() { +func commitMetadata(b *appendBatch) { var series *memSeries - for i, m := range a.metadata { - series = a.metadataSeries[i] + for i, m := range b.metadata { + series = b.metadataSeries[i] series.Lock() series.meta = &metadata.Metadata{Type: record.ToMetricType(m.Type), Unit: m.Unit, Help: m.Help} series.Unlock() @@ -1489,75 +1677,82 @@ func (a *headAppender) Commit() (err error) { if a.closed { return ErrAppenderClosed } - defer func() { a.closed = true }() + + h := a.head + + defer func() { + h.putRefSeriesBuffer(a.seriesRefs) + h.putSeriesBuffer(a.series) + a.closed = true + }() if err := a.log(); err != nil { _ = a.Rollback() // Most likely the same error will happen again. return fmt.Errorf("write to WAL: %w", err) } - if a.head.writeNotified != nil { - a.head.writeNotified.Notify() + if h.writeNotified != nil { + h.writeNotified.Notify() } - a.commitExemplars() - - defer a.head.metrics.activeAppenders.Dec() - defer a.head.putAppendBuffer(a.samples) - defer a.head.putSeriesBuffer(a.sampleSeries) - defer a.head.putExemplarBuffer(a.exemplars) - defer a.head.putHistogramBuffer(a.histograms) - defer a.head.putFloatHistogramBuffer(a.floatHistograms) - defer a.head.putMetadataBuffer(a.metadata) - defer a.head.iso.closeAppend(a.appendID) - acc := &appenderCommitContext{ - floatsAppended: len(a.samples), - histogramsAppended: len(a.histograms) + len(a.floatHistograms), - inOrderMint: math.MaxInt64, - inOrderMaxt: math.MinInt64, - oooMinT: math.MaxInt64, - oooMaxT: math.MinInt64, - oooCapMax: a.head.opts.OutOfOrderCapMax.Load(), + inOrderMint: math.MaxInt64, + inOrderMaxt: math.MinInt64, + oooMinT: math.MaxInt64, + oooMaxT: math.MinInt64, + oooCapMax: h.opts.OutOfOrderCapMax.Load(), appendChunkOpts: chunkOpts{ - chunkDiskMapper: a.head.chunkDiskMapper, - chunkRange: a.head.chunkRange.Load(), - samplesPerChunk: a.head.opts.SamplesPerChunk, + chunkDiskMapper: h.chunkDiskMapper, + chunkRange: h.chunkRange.Load(), + samplesPerChunk: h.opts.SamplesPerChunk, }, } + for _, b := range a.batches { + acc.floatsAppended += len(b.floats) + acc.histogramsAppended += len(b.histograms) + len(b.floatHistograms) + a.commitExemplars(b) + defer b.close(h) + } + defer h.metrics.activeAppenders.Dec() + defer h.iso.closeAppend(a.appendID) + defer func() { for i := range acc.oooRecords { - a.head.putBytesBuffer(acc.oooRecords[i][:0]) + h.putBytesBuffer(acc.oooRecords[i][:0]) } }() - a.commitSamples(acc) - a.commitHistograms(acc) - a.commitFloatHistograms(acc) - a.commitMetadata() + for _, b := range a.batches { + // Do not change the order of these calls. The staleness marker + // handling depends on it. + a.commitFloats(b, acc) + a.commitHistograms(b, acc) + a.commitFloatHistograms(b, acc) + commitMetadata(b) + } // Unmark all series as pending commit after all samples have been committed. a.unmarkCreatedSeriesAsPendingCommit() - a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected)) - a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected)) - a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected)) - a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected)) - a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended)) - a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended)) - a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted)) - a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted)) - a.head.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt) - a.head.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT) + h.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected)) + h.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected)) + h.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected)) + h.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected)) + h.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended)) + h.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended)) + h.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted)) + h.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted)) + h.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt) + h.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT) acc.collectOOORecords(a) - if a.head.wbl != nil { - if err := a.head.wbl.Log(acc.oooRecords...); err != nil { + if h.wbl != nil { + if err := h.wbl.Log(acc.oooRecords...); err != nil { // TODO(codesome): Currently WBL logging of ooo samples is best effort here since we cannot try logging // until we have found what samples become OOO. We can try having a metric for this failure. // Returning the error here is not correct because we have already put the samples into the memory, // hence the append/insert was a success. - a.head.logger.Error("Failed to log out of order samples into the WAL", "err", err) + h.logger.Error("Failed to log out of order samples into the WAL", "err", err) } } return nil @@ -2007,37 +2202,43 @@ func (a *headAppender) Rollback() (err error) { if a.closed { return ErrAppenderClosed } - defer func() { a.closed = true }() - defer a.head.metrics.activeAppenders.Dec() - defer a.head.iso.closeAppend(a.appendID) - defer a.head.putSeriesBuffer(a.sampleSeries) - defer a.unmarkCreatedSeriesAsPendingCommit() + h := a.head + defer func() { + a.unmarkCreatedSeriesAsPendingCommit() + h.iso.closeAppend(a.appendID) + h.metrics.activeAppenders.Dec() + a.closed = true + h.putRefSeriesBuffer(a.seriesRefs) + h.putSeriesBuffer(a.series) + }() var series *memSeries - for i := range a.samples { - series = a.sampleSeries[i] - series.Lock() - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() + fmt.Println("ROLLBACK") + for _, b := range a.batches { + for i := range b.floats { + series = b.floatSeries[i] + series.Lock() + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + } + for i := range b.histograms { + series = b.histogramSeries[i] + series.Lock() + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + } + for i := range b.floatHistograms { + series = b.floatHistogramSeries[i] + series.Lock() + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + } + b.close(h) } - for i := range a.histograms { - series = a.histogramSeries[i] - series.Lock() - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - } - a.head.putAppendBuffer(a.samples) - a.head.putExemplarBuffer(a.exemplars) - a.head.putHistogramBuffer(a.histograms) - a.head.putFloatHistogramBuffer(a.floatHistograms) - a.head.putMetadataBuffer(a.metadata) - a.samples = nil - a.exemplars = nil - a.histograms = nil - a.metadata = nil - + a.batches = a.batches[:0] // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. return a.log() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 8fe14c35f9..37aef96a66 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5336,8 +5336,6 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { samples []chunks.Sample expChunks int err error - // If this is empty, samples above will be taken instead of this. - addToExp []chunks.Sample }{ // Histograms that end up in the expected samples are copied here so that we // can independently set the CounterResetHint later. @@ -5377,43 +5375,29 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { samples: []chunks.Sample{sample{t: 100, fh: floatHists[4].Copy()}}, err: storage.ErrOutOfOrderSample, }, + // The three next tests all failed before #15177 was fixed. { - // Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also - // verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order. samples: []chunks.Sample{ sample{t: 400, f: 4}, - sample{t: 500, h: hists[5]}, // This won't be committed. + sample{t: 500, h: hists[5]}, sample{t: 600, f: 6}, }, - addToExp: []chunks.Sample{ - sample{t: 400, f: 4}, - sample{t: 600, f: 6}, - }, - expChunks: 7, // Only 1 new chunk for float64. + expChunks: 9, // Each of the three samples above creates a new chunk because the type changes. }, { - // Here the histogram is appended at the end, hence the first histogram is out of order. samples: []chunks.Sample{ - sample{t: 700, h: hists[7]}, // Out of order w.r.t. the next float64 sample that is appended first. + sample{t: 700, h: hists[7]}, sample{t: 800, f: 8}, sample{t: 900, h: hists[9]}, }, - addToExp: []chunks.Sample{ - sample{t: 800, f: 8}, - sample{t: 900, h: hists[9].Copy()}, - }, - expChunks: 8, // float64 added to old chunk, only 1 new for histograms. + expChunks: 12, // Again each sample creates a new chunk. }, { - // Float histogram is appended at the end. samples: []chunks.Sample{ - sample{t: 1000, fh: floatHists[7]}, // Out of order w.r.t. the next histogram. + sample{t: 1000, fh: floatHists[7]}, sample{t: 1100, h: hists[9]}, }, - addToExp: []chunks.Sample{ - sample{t: 1100, h: hists[9].Copy()}, - }, - expChunks: 8, + expChunks: 14, // Even changes between float and integer histogram create new chunks. }, } @@ -5431,11 +5415,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { if a.err == nil { require.NoError(t, app.Commit()) - if len(a.addToExp) > 0 { - expResult = append(expResult, a.addToExp...) - } else { - expResult = append(expResult, a.samples...) - } + expResult = append(expResult, a.samples...) checkExpChunks(a.expChunks) } else { require.NoError(t, app.Rollback()) @@ -6751,7 +6731,27 @@ func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing func TestHeadAppender_AppendCT(t *testing.T) { testHistogram := tsdbutil.GenerateTestHistogram(1) + testHistogram.CounterResetHint = histogram.NotCounterReset testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1) + testFloatHistogram.CounterResetHint = histogram.NotCounterReset + // TODO(beorn7): Once issue #15346 is fixed, the CounterResetHint of the + // following two zero histograms should be histogram.CounterReset. + testZeroHistogram := &histogram.Histogram{ + Schema: testHistogram.Schema, + ZeroThreshold: testHistogram.ZeroThreshold, + PositiveSpans: testHistogram.PositiveSpans, + NegativeSpans: testHistogram.NegativeSpans, + PositiveBuckets: []int64{0, 0, 0, 0}, + NegativeBuckets: []int64{0, 0, 0, 0}, + } + testZeroFloatHistogram := &histogram.FloatHistogram{ + Schema: testFloatHistogram.Schema, + ZeroThreshold: testFloatHistogram.ZeroThreshold, + PositiveSpans: testFloatHistogram.PositiveSpans, + NegativeSpans: testFloatHistogram.NegativeSpans, + PositiveBuckets: []float64{0, 0, 0, 0}, + NegativeBuckets: []float64{0, 0, 0, 0}, + } type appendableSamples struct { ts int64 fSample float64 @@ -6783,12 +6783,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, h: testHistogram, ct: 1}, }, expectedSamples: func() []chunks.Sample { - hNoCounterReset := *testHistogram - hNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 1, h: testZeroHistogram}, sample{t: 100, h: testHistogram}, - sample{t: 101, h: &hNoCounterReset}, + sample{t: 101, h: testHistogram}, } }(), }, @@ -6799,12 +6797,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, fh: testFloatHistogram, ct: 1}, }, expectedSamples: func() []chunks.Sample { - fhNoCounterReset := *testFloatHistogram - fhNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 1, fh: testZeroFloatHistogram}, sample{t: 100, fh: testFloatHistogram}, - sample{t: 101, fh: &fhNoCounterReset}, + sample{t: 101, fh: testFloatHistogram}, } }(), }, @@ -6827,12 +6823,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, h: testHistogram, ct: 1}, }, expectedSamples: func() []chunks.Sample { - hNoCounterReset := *testHistogram - hNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 1, h: testZeroHistogram}, sample{t: 100, h: testHistogram}, - sample{t: 101, h: &hNoCounterReset}, + sample{t: 101, h: testHistogram}, } }(), }, @@ -6843,12 +6837,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, fh: testFloatHistogram, ct: 1}, }, expectedSamples: func() []chunks.Sample { - fhNoCounterReset := *testFloatHistogram - fhNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 1, fh: testZeroFloatHistogram}, sample{t: 100, fh: testFloatHistogram}, - sample{t: 101, fh: &fhNoCounterReset}, + sample{t: 101, fh: testFloatHistogram}, } }(), }, @@ -6872,9 +6864,9 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 102, h: testHistogram, ct: 101}, }, expectedSamples: []chunks.Sample{ - sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 1, h: testZeroHistogram}, sample{t: 100, h: testHistogram}, - sample{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.UnknownCounterReset}}, + sample{t: 101, h: testZeroHistogram}, sample{t: 102, h: testHistogram}, }, }, @@ -6885,9 +6877,9 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 102, fh: testFloatHistogram, ct: 101}, }, expectedSamples: []chunks.Sample{ - sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 1, fh: testZeroFloatHistogram}, sample{t: 100, fh: testFloatHistogram}, - sample{t: 101, fh: &histogram.FloatHistogram{CounterResetHint: histogram.UnknownCounterReset}}, + sample{t: 101, fh: testZeroFloatHistogram}, sample{t: 102, fh: testFloatHistogram}, }, }, @@ -6910,12 +6902,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, h: testHistogram, ct: 100}, }, expectedSamples: func() []chunks.Sample { - hNoCounterReset := *testHistogram - hNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 1, h: testZeroHistogram}, sample{t: 100, h: testHistogram}, - sample{t: 101, h: &hNoCounterReset}, + sample{t: 101, h: testHistogram}, } }(), }, @@ -6926,12 +6916,10 @@ func TestHeadAppender_AppendCT(t *testing.T) { {ts: 101, fh: testFloatHistogram, ct: 100}, }, expectedSamples: func() []chunks.Sample { - fhNoCounterReset := *testFloatHistogram - fhNoCounterReset.CounterResetHint = histogram.NotCounterReset return []chunks.Sample{ - sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 1, fh: testZeroFloatHistogram}, sample{t: 100, fh: testFloatHistogram}, - sample{t: 101, fh: &fhNoCounterReset}, + sample{t: 101, fh: testFloatHistogram}, } }(), },