mirror of
https://github.com/prometheus/prometheus.git
synced 2025-09-20 21:31:02 +02:00
Merge pull request #17071 from prometheus/beorn7/tsdb
tsdb: Fix commit order for mixed-typed series
This commit is contained in:
commit
d5cc5e2738
@ -1677,3 +1677,18 @@ eval instant at 1m histogram_count(histogram unless histogram_quantile(0.5, hist
|
||||
eval instant at 1m histogram_quantile(0.5, histogram unless histogram_count(histogram) == 0)
|
||||
{} 3.1748021039363987
|
||||
|
||||
clear
|
||||
|
||||
# Regression test for:
|
||||
# https://github.com/prometheus/prometheus/issues/14172
|
||||
# https://github.com/prometheus/prometheus/issues/15177
|
||||
load 1m
|
||||
mixed_metric1 1 2 3 {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:8 count:6 buckets:[1 4 1]}} 4 5 {{schema:0 sum:18 count:10 buckets:[3 4 3]}}
|
||||
mixed_metric2 1 2 3 {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:8 count:6 buckets:[1 4 1]}}
|
||||
|
||||
# The order of the float vs native histograms is preserved.
|
||||
eval range from 0 to 8m step 1m mixed_metric1
|
||||
mixed_metric1{} 1 2 3 {{count:4 sum:5 buckets:[1 2 1]}} {{count:6 sum:8 buckets:[1 4 1]}} 4 5 {{schema:0 sum:18 count:10 buckets:[3 4 3]}} {{schema:0 sum:18 count:10 buckets:[3 4 3]}}
|
||||
|
||||
eval range from 0 to 5m step 1m mixed_metric2
|
||||
mixed_metric2 1 2 3 {{count:4 sum:5 buckets:[1 2 1]}} {{count:6 sum:8 buckets:[1 4 1]}} {{count:6 sum:8 buckets:[1 4 1]}}
|
||||
|
104
tsdb/db_test.go
104
tsdb/db_test.go
@ -300,21 +300,89 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
|
||||
}()
|
||||
|
||||
app := db.Appender(context.Background())
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
|
||||
_, err := app.Append(0, labels.FromStrings("type", "float"), 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = app.AppendHistogram(
|
||||
0, labels.FromStrings("type", "histogram"), 0,
|
||||
&histogram.Histogram{Count: 42, Sum: math.NaN()}, nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = app.AppendHistogram(
|
||||
0, labels.FromStrings("type", "floathistogram"), 0,
|
||||
nil, &histogram.FloatHistogram{Count: 42, Sum: math.NaN()},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = app.Rollback()
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, typ := range []string{"float", "histogram", "floathistogram"} {
|
||||
querier, err := db.Querier(0, 1)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
|
||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "type", typ))
|
||||
require.Equal(t, map[string][]chunks.Sample{}, seriesSet)
|
||||
}
|
||||
|
||||
sr, err := wlog.NewSegmentsReader(db.head.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, sr.Close())
|
||||
}()
|
||||
|
||||
// Read records from WAL and check for expected count of series and samples.
|
||||
var (
|
||||
r = wlog.NewReader(sr)
|
||||
dec = record.NewDecoder(labels.NewSymbolTable())
|
||||
|
||||
walSeriesCount, walSamplesCount, walHistogramCount, walFloatHistogramCount, walExemplarsCount int
|
||||
)
|
||||
for r.Next() {
|
||||
rec := r.Record()
|
||||
switch dec.Type(rec) {
|
||||
case record.Series:
|
||||
var series []record.RefSeries
|
||||
series, err = dec.Series(rec, series)
|
||||
require.NoError(t, err)
|
||||
walSeriesCount += len(series)
|
||||
|
||||
case record.Samples:
|
||||
var samples []record.RefSample
|
||||
samples, err = dec.Samples(rec, samples)
|
||||
require.NoError(t, err)
|
||||
walSamplesCount += len(samples)
|
||||
|
||||
case record.Exemplars:
|
||||
var exemplars []record.RefExemplar
|
||||
exemplars, err = dec.Exemplars(rec, exemplars)
|
||||
require.NoError(t, err)
|
||||
walExemplarsCount += len(exemplars)
|
||||
|
||||
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
|
||||
var histograms []record.RefHistogramSample
|
||||
histograms, err = dec.HistogramSamples(rec, histograms)
|
||||
require.NoError(t, err)
|
||||
walHistogramCount += len(histograms)
|
||||
|
||||
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
|
||||
var floatHistograms []record.RefFloatHistogramSample
|
||||
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
|
||||
require.NoError(t, err)
|
||||
walFloatHistogramCount += len(floatHistograms)
|
||||
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Check that only series get stored after calling Rollback.
|
||||
require.Equal(t, 3, walSeriesCount, "series should have been written to WAL")
|
||||
require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL")
|
||||
require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL")
|
||||
require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL")
|
||||
require.Equal(t, 0, walFloatHistogramCount, "float histograms should not have been written to WAL")
|
||||
}
|
||||
|
||||
func TestDBAppenderAddRef(t *testing.T) {
|
||||
db := openTestDB(t, nil, nil)
|
||||
defer func() {
|
||||
@ -4856,10 +4924,7 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
|
||||
}
|
||||
|
||||
// TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the
|
||||
// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This
|
||||
// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed.
|
||||
// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a
|
||||
// histogram in a single write request.
|
||||
// same series when there are multiple encodings. With issue #15177 fixed, this now all works as expected.
|
||||
func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 30
|
||||
@ -4933,26 +4998,19 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
||||
s := addSample(app, int64(i), chunkenc.ValFloat)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
|
||||
// same batch.
|
||||
for i := 110; i < 120; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValHistogram)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
|
||||
// same batch.
|
||||
for i := 120; i < 130; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as in-order as their timestamps are greater than the max timestamp for float
|
||||
// samples in the same batch.
|
||||
for i := 140; i < 150; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150
|
||||
// because float samples are processed first and these samples are in-order wrt to the float samples in the batch.
|
||||
// These samples will be marked as out-of-order.
|
||||
for i := 130; i < 135; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValFloat)
|
||||
expSamples = append(expSamples, s)
|
||||
@ -4964,8 +5022,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
||||
return expSamples[i].T() < expSamples[j].T()
|
||||
})
|
||||
|
||||
// oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO.
|
||||
verifySamples(100, 150, expSamples, 20)
|
||||
// oooCount = 5 for the samples 130 to 134.
|
||||
verifySamples(100, 150, expSamples, 5)
|
||||
|
||||
// Append and commit some in-order histograms by themselves.
|
||||
app = db.Appender(context.Background())
|
||||
@ -4975,8 +5033,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// oooCount remains at 20 as no new OOO samples have been added.
|
||||
verifySamples(100, 160, expSamples, 20)
|
||||
// oooCount remains at 5.
|
||||
verifySamples(100, 160, expSamples, 5)
|
||||
|
||||
// Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples
|
||||
// with newer timestamps have already been committed.
|
||||
@ -5004,8 +5062,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
||||
return expSamples[i].T() < expSamples[j].T()
|
||||
})
|
||||
|
||||
// oooCount = 50 as we've added 30 more OOO samples.
|
||||
verifySamples(50, 160, expSamples, 50)
|
||||
// oooCount = 35 as we've added 30 more OOO samples.
|
||||
verifySamples(50, 160, expSamples, 35)
|
||||
}
|
||||
|
||||
// TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start
|
||||
|
@ -86,7 +86,8 @@ type Head struct {
|
||||
exemplarMetrics *ExemplarMetrics
|
||||
exemplars ExemplarStorage
|
||||
logger *slog.Logger
|
||||
appendPool zeropool.Pool[[]record.RefSample]
|
||||
refSeriesPool zeropool.Pool[[]record.RefSeries]
|
||||
floatsPool zeropool.Pool[[]record.RefSample]
|
||||
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
|
||||
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||
|
@ -164,13 +164,6 @@ func (h *Head) Appender(context.Context) storage.Appender {
|
||||
func (h *Head) appender() *headAppender {
|
||||
minValidTime := h.appendableMinValidTime()
|
||||
appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback.
|
||||
|
||||
// Allocate the exemplars buffer only if exemplars are enabled.
|
||||
var exemplarsBuf []exemplarWithSeriesRef
|
||||
if h.opts.EnableExemplarStorage {
|
||||
exemplarsBuf = h.getExemplarBuffer()
|
||||
}
|
||||
|
||||
return &headAppender{
|
||||
head: h,
|
||||
minValidTime: minValidTime,
|
||||
@ -178,12 +171,9 @@ func (h *Head) appender() *headAppender {
|
||||
maxt: math.MinInt64,
|
||||
headMaxt: h.MaxTime(),
|
||||
oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(),
|
||||
samples: h.getAppendBuffer(),
|
||||
sampleSeries: h.getSeriesBuffer(),
|
||||
exemplars: exemplarsBuf,
|
||||
histograms: h.getHistogramBuffer(),
|
||||
floatHistograms: h.getFloatHistogramBuffer(),
|
||||
metadata: h.getMetadataBuffer(),
|
||||
seriesRefs: h.getRefSeriesBuffer(),
|
||||
series: h.getSeriesBuffer(),
|
||||
typesInBatch: map[chunks.HeadSeriesRef]sampleType{},
|
||||
appendID: appendID,
|
||||
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
|
||||
}
|
||||
@ -213,16 +203,28 @@ func (h *Head) AppendableMinValidTime() (int64, bool) {
|
||||
return h.appendableMinValidTime(), true
|
||||
}
|
||||
|
||||
func (h *Head) getAppendBuffer() []record.RefSample {
|
||||
b := h.appendPool.Get()
|
||||
func (h *Head) getRefSeriesBuffer() []record.RefSeries {
|
||||
b := h.refSeriesPool.Get()
|
||||
if b == nil {
|
||||
return make([]record.RefSeries, 0, 512)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (h *Head) putRefSeriesBuffer(b []record.RefSeries) {
|
||||
h.refSeriesPool.Put(b[:0])
|
||||
}
|
||||
|
||||
func (h *Head) getFloatBuffer() []record.RefSample {
|
||||
b := h.floatsPool.Get()
|
||||
if b == nil {
|
||||
return make([]record.RefSample, 0, 512)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (h *Head) putAppendBuffer(b []record.RefSample) {
|
||||
h.appendPool.Put(b[:0])
|
||||
func (h *Head) putFloatBuffer(b []record.RefSample) {
|
||||
h.floatsPool.Put(b[:0])
|
||||
}
|
||||
|
||||
func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef {
|
||||
@ -312,6 +314,61 @@ type exemplarWithSeriesRef struct {
|
||||
exemplar exemplar.Exemplar
|
||||
}
|
||||
|
||||
// sampleType describes sample types we need to distinguish for append batching.
|
||||
// We need separate types for everything that goes into a different WAL record
|
||||
// type or into a different chunk encoding.
|
||||
type sampleType byte
|
||||
|
||||
const (
|
||||
stNone sampleType = iota // To mark that the sample type does not matter.
|
||||
stFloat // All simple floats (counters, gauges, untyped). Goes to `floats`.
|
||||
stHistogram // Native integer histograms with a standard exponential schema. Goes to `histograms`.
|
||||
stCustomBucketHistogram // Native integer histograms with custom bucket boundaries. Goes to `histograms`.
|
||||
stFloatHistogram // Native float histograms. Goes to `floatHistograms`.
|
||||
stCustomBucketFloatHistogram // Native float histograms with custom bucket boundaries. Goes to `floatHistograms`.
|
||||
)
|
||||
|
||||
// appendBatch is used to partition all the appended data into batches that are
|
||||
// "type clean", i.e. every series receives only samples of one type within the
|
||||
// batch. Types in this regard are defined by the sampleType enum above.
|
||||
// TODO(beorn7): The same concept could be extended to make sure every series in
|
||||
// the batch has at most one metadata record. This is currently not implemented
|
||||
// because it is unclear if it is needed at all. (Maybe we will remove metadata
|
||||
// records altogether, see issue #15911.)
|
||||
type appendBatch struct {
|
||||
floats []record.RefSample // New float samples held by this appender.
|
||||
floatSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
histograms []record.RefHistogramSample // New histogram samples held by this appender.
|
||||
histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender.
|
||||
floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
metadata []record.RefMetadata // New metadata held by this appender.
|
||||
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
|
||||
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
|
||||
}
|
||||
|
||||
// close returns all the slices to the pools in Head and nil's them.
|
||||
func (b *appendBatch) close(h *Head) {
|
||||
h.putFloatBuffer(b.floats)
|
||||
b.floats = nil
|
||||
h.putSeriesBuffer(b.floatSeries)
|
||||
b.floatSeries = nil
|
||||
h.putHistogramBuffer(b.histograms)
|
||||
b.histograms = nil
|
||||
h.putSeriesBuffer(b.histogramSeries)
|
||||
b.histogramSeries = nil
|
||||
h.putFloatHistogramBuffer(b.floatHistograms)
|
||||
b.floatHistograms = nil
|
||||
h.putSeriesBuffer(b.floatHistogramSeries)
|
||||
b.floatHistogramSeries = nil
|
||||
h.putMetadataBuffer(b.metadata)
|
||||
b.metadata = nil
|
||||
h.putSeriesBuffer(b.metadataSeries)
|
||||
b.metadataSeries = nil
|
||||
h.putExemplarBuffer(b.exemplars)
|
||||
b.exemplars = nil
|
||||
}
|
||||
|
||||
type headAppender struct {
|
||||
head *Head
|
||||
minValidTime int64 // No samples below this timestamp are allowed.
|
||||
@ -321,15 +378,9 @@ type headAppender struct {
|
||||
|
||||
seriesRefs []record.RefSeries // New series records held by this appender.
|
||||
series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs)
|
||||
samples []record.RefSample // New float samples held by this appender.
|
||||
sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
histograms []record.RefHistogramSample // New histogram samples held by this appender.
|
||||
histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender.
|
||||
floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
metadata []record.RefMetadata // New metadata held by this appender.
|
||||
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
|
||||
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
|
||||
batches []*appendBatch // Holds all the other data to append. (In regular cases, there should be only one of these.)
|
||||
|
||||
typesInBatch map[chunks.HeadSeriesRef]sampleType // Which (one) sample type each series holds in the most recent batch.
|
||||
|
||||
appendID, cleanupAppendIDsBelow uint64
|
||||
closed bool
|
||||
@ -357,21 +408,27 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
|
||||
}
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
if value.IsStaleNaN(v) {
|
||||
// TODO(krajorama): reorganize Commit() to handle samples in append order
|
||||
// not floats first and then histograms. Then we could do this conversion
|
||||
// in commit. This code should move into Commit().
|
||||
switch {
|
||||
case s.lastHistogramValue != nil:
|
||||
s.Unlock()
|
||||
// If we have added a sample before with this same appender, we
|
||||
// can check the previously used type and turn a stale float
|
||||
// sample into a stale histogram sample or stale float histogram
|
||||
// sample as appropriate. This prevents an unnecessary creation
|
||||
// of a new batch. However, since other appenders might append
|
||||
// to the same series concurrently, this is not perfect but just
|
||||
// an optimization for the more likely case.
|
||||
switch a.typesInBatch[s.ref] {
|
||||
case stHistogram, stCustomBucketHistogram:
|
||||
return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil)
|
||||
case s.lastFloatHistogramValue != nil:
|
||||
s.Unlock()
|
||||
case stFloatHistogram, stCustomBucketFloatHistogram:
|
||||
return a.AppendHistogram(ref, lset, t, nil, &histogram.FloatHistogram{Sum: v})
|
||||
}
|
||||
// Note that a series reference not yet in the map will come out
|
||||
// as stNone, but since we do not handle that case separately,
|
||||
// we do not need to check for the difference between "unknown
|
||||
// series" and "known series with stNone".
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||
// to skip that sample from the WAL and write only in the WBL.
|
||||
@ -403,12 +460,13 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
|
||||
a.maxt = t
|
||||
}
|
||||
|
||||
a.samples = append(a.samples, record.RefSample{
|
||||
b := a.getCurrentBatch(stFloat, s.ref)
|
||||
b.floats = append(b.floats, record.RefSample{
|
||||
Ref: s.ref,
|
||||
T: t,
|
||||
V: v,
|
||||
})
|
||||
a.sampleSeries = append(a.sampleSeries, s)
|
||||
b.floatSeries = append(b.floatSeries, s)
|
||||
return storage.SeriesRef(s.ref), nil
|
||||
}
|
||||
|
||||
@ -448,8 +506,9 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab
|
||||
if ct > a.maxt {
|
||||
a.maxt = ct
|
||||
}
|
||||
a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0})
|
||||
a.sampleSeries = append(a.sampleSeries, s)
|
||||
b := a.getCurrentBatch(stFloat, s.ref)
|
||||
b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: ct, V: 0.0})
|
||||
b.floatSeries = append(b.floatSeries, s)
|
||||
return storage.SeriesRef(s.ref), nil
|
||||
}
|
||||
|
||||
@ -476,6 +535,65 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo
|
||||
return s, created, nil
|
||||
}
|
||||
|
||||
// getCurrentBatch returns the current batch if it fits the provided sampleType
|
||||
// for the provided series. Otherwise, it adds a new batch and returns it.
|
||||
func (a *headAppender) getCurrentBatch(st sampleType, s chunks.HeadSeriesRef) *appendBatch {
|
||||
h := a.head
|
||||
|
||||
newBatch := func() *appendBatch {
|
||||
b := appendBatch{
|
||||
floats: h.getFloatBuffer(),
|
||||
floatSeries: h.getSeriesBuffer(),
|
||||
histograms: h.getHistogramBuffer(),
|
||||
histogramSeries: h.getSeriesBuffer(),
|
||||
floatHistograms: h.getFloatHistogramBuffer(),
|
||||
floatHistogramSeries: h.getSeriesBuffer(),
|
||||
metadata: h.getMetadataBuffer(),
|
||||
metadataSeries: h.getSeriesBuffer(),
|
||||
}
|
||||
|
||||
// Allocate the exemplars buffer only if exemplars are enabled.
|
||||
if h.opts.EnableExemplarStorage {
|
||||
b.exemplars = h.getExemplarBuffer()
|
||||
}
|
||||
clear(a.typesInBatch)
|
||||
if st != stNone {
|
||||
a.typesInBatch[s] = st
|
||||
}
|
||||
a.batches = append(a.batches, &b)
|
||||
return &b
|
||||
}
|
||||
|
||||
// First batch ever. Create it.
|
||||
if len(a.batches) == 0 {
|
||||
return newBatch()
|
||||
}
|
||||
|
||||
// TODO(beorn7): If we ever see that the a.typesInBatch map grows so
|
||||
// large that it matters for total memory consumption, we could limit
|
||||
// the batch size here, i.e. cut a new batch even without a type change.
|
||||
// Something like:
|
||||
// if len(a.typesInBatch > limit) {
|
||||
// return newBatch()
|
||||
// }
|
||||
|
||||
lastBatch := a.batches[len(a.batches)-1]
|
||||
if st == stNone {
|
||||
// Type doesn't matter, last batch will always do.
|
||||
return lastBatch
|
||||
}
|
||||
prevST, ok := a.typesInBatch[s]
|
||||
switch {
|
||||
case !ok: // New series. Add it to map and return current batch.
|
||||
a.typesInBatch[s] = st
|
||||
return lastBatch
|
||||
case prevST == st: // Old series, same type. Just return batch.
|
||||
return lastBatch
|
||||
}
|
||||
// An old series got a new type. Start new batch.
|
||||
return newBatch()
|
||||
}
|
||||
|
||||
// appendable checks whether the given sample is valid for appending to the series.
|
||||
// If the sample is valid and in-order, it returns false with no error.
|
||||
// If the sample belongs to the out-of-order chunk, it returns true with no error.
|
||||
@ -638,7 +756,8 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
|
||||
return 0, err
|
||||
}
|
||||
|
||||
a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e})
|
||||
b := a.getCurrentBatch(stNone, chunks.HeadSeriesRef(ref))
|
||||
b.exemplars = append(b.exemplars, exemplarWithSeriesRef{ref, e})
|
||||
|
||||
return storage.SeriesRef(s.ref), nil
|
||||
}
|
||||
@ -667,11 +786,10 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
||||
}
|
||||
}
|
||||
|
||||
var created bool
|
||||
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||
if s == nil {
|
||||
var err error
|
||||
s, created, err = a.getOrCreate(lset)
|
||||
s, _, err = a.getOrCreate(lset)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -680,14 +798,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
||||
switch {
|
||||
case h != nil:
|
||||
s.Lock()
|
||||
|
||||
// TODO(krajorama): reorganize Commit() to handle samples in append order
|
||||
// not floats first and then histograms. Then we would not need to do this.
|
||||
// This whole "if" should be removed.
|
||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.lastHistogramValue = &histogram.Histogram{}
|
||||
}
|
||||
|
||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||
// to skip that sample from the WAL and write only in the WBL.
|
||||
_, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
@ -707,22 +817,19 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
a.histograms = append(a.histograms, record.RefHistogramSample{
|
||||
st := stHistogram
|
||||
if h.UsesCustomBuckets() {
|
||||
st = stCustomBucketHistogram
|
||||
}
|
||||
b := a.getCurrentBatch(st, s.ref)
|
||||
b.histograms = append(b.histograms, record.RefHistogramSample{
|
||||
Ref: s.ref,
|
||||
T: t,
|
||||
H: h,
|
||||
})
|
||||
a.histogramSeries = append(a.histogramSeries, s)
|
||||
b.histogramSeries = append(b.histogramSeries, s)
|
||||
case fh != nil:
|
||||
s.Lock()
|
||||
|
||||
// TODO(krajorama): reorganize Commit() to handle samples in append order
|
||||
// not floats first and then histograms. Then we would not need to do this.
|
||||
// This whole "if" should be removed.
|
||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
|
||||
}
|
||||
|
||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||
// to skip that sample from the WAL and write only in the WBL.
|
||||
_, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
@ -742,12 +849,17 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
|
||||
st := stFloatHistogram
|
||||
if fh.UsesCustomBuckets() {
|
||||
st = stCustomBucketFloatHistogram
|
||||
}
|
||||
b := a.getCurrentBatch(st, s.ref)
|
||||
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
|
||||
Ref: s.ref,
|
||||
T: t,
|
||||
FH: fh,
|
||||
})
|
||||
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
|
||||
b.floatHistogramSeries = append(b.floatHistogramSeries, s)
|
||||
}
|
||||
|
||||
if t < a.mint {
|
||||
@ -769,11 +881,10 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
|
||||
return 0, storage.ErrCTNewerThanSample
|
||||
}
|
||||
|
||||
var created bool
|
||||
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||
if s == nil {
|
||||
var err error
|
||||
s, created, err = a.getOrCreate(lset)
|
||||
s, _, err = a.getOrCreate(lset)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -784,16 +895,12 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
|
||||
zeroHistogram := &histogram.Histogram{
|
||||
// The CTZeroSample represents a counter reset by definition.
|
||||
CounterResetHint: histogram.CounterReset,
|
||||
// Replicate other fields to avoid needless chunk creation.
|
||||
Schema: h.Schema,
|
||||
ZeroThreshold: h.ZeroThreshold,
|
||||
CustomValues: h.CustomValues,
|
||||
}
|
||||
s.Lock()
|
||||
|
||||
// TODO(krajorama): reorganize Commit() to handle samples in append order
|
||||
// not floats first and then histograms. Then we would not need to do this.
|
||||
// This whole "if" should be removed.
|
||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.lastHistogramValue = zeroHistogram
|
||||
}
|
||||
|
||||
// For CTZeroSamples OOO is not allowed.
|
||||
// We set it to true to make this implementation as close as possible to the float implementation.
|
||||
isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
@ -815,26 +922,27 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
|
||||
|
||||
s.pendingCommit = true
|
||||
s.Unlock()
|
||||
a.histograms = append(a.histograms, record.RefHistogramSample{
|
||||
st := stHistogram
|
||||
if h.UsesCustomBuckets() {
|
||||
st = stCustomBucketHistogram
|
||||
}
|
||||
b := a.getCurrentBatch(st, s.ref)
|
||||
b.histograms = append(b.histograms, record.RefHistogramSample{
|
||||
Ref: s.ref,
|
||||
T: ct,
|
||||
H: zeroHistogram,
|
||||
})
|
||||
a.histogramSeries = append(a.histogramSeries, s)
|
||||
b.histogramSeries = append(b.histogramSeries, s)
|
||||
case fh != nil:
|
||||
zeroFloatHistogram := &histogram.FloatHistogram{
|
||||
// The CTZeroSample represents a counter reset by definition.
|
||||
CounterResetHint: histogram.CounterReset,
|
||||
// Replicate other fields to avoid needless chunk creation.
|
||||
Schema: fh.Schema,
|
||||
ZeroThreshold: fh.ZeroThreshold,
|
||||
CustomValues: fh.CustomValues,
|
||||
}
|
||||
s.Lock()
|
||||
|
||||
// TODO(krajorama): reorganize Commit() to handle samples in append order
|
||||
// not floats first and then histograms. Then we would not need to do this.
|
||||
// This whole "if" should be removed.
|
||||
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
|
||||
s.lastFloatHistogramValue = zeroFloatHistogram
|
||||
}
|
||||
|
||||
// We set it to true to make this implementation as close as possible to the float implementation.
|
||||
isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) // OOO is not allowed for CTZeroSamples.
|
||||
if err != nil {
|
||||
@ -855,12 +963,17 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
|
||||
|
||||
s.pendingCommit = true
|
||||
s.Unlock()
|
||||
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
|
||||
st := stFloatHistogram
|
||||
if fh.UsesCustomBuckets() {
|
||||
st = stCustomBucketFloatHistogram
|
||||
}
|
||||
b := a.getCurrentBatch(st, s.ref)
|
||||
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
|
||||
Ref: s.ref,
|
||||
T: ct,
|
||||
FH: zeroFloatHistogram,
|
||||
})
|
||||
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
|
||||
b.floatHistogramSeries = append(b.floatHistogramSeries, s)
|
||||
}
|
||||
|
||||
if ct > a.maxt {
|
||||
@ -889,13 +1002,14 @@ func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels,
|
||||
s.Unlock()
|
||||
|
||||
if hasNewMetadata {
|
||||
a.metadata = append(a.metadata, record.RefMetadata{
|
||||
b := a.getCurrentBatch(stNone, s.ref)
|
||||
b.metadata = append(b.metadata, record.RefMetadata{
|
||||
Ref: s.ref,
|
||||
Type: record.GetMetricType(meta.Type),
|
||||
Unit: meta.Unit,
|
||||
Help: meta.Help,
|
||||
})
|
||||
a.metadataSeries = append(a.metadataSeries, s)
|
||||
b.metadataSeries = append(b.metadataSeries, s)
|
||||
}
|
||||
|
||||
return ref, nil
|
||||
@ -932,25 +1046,26 @@ func (a *headAppender) log() error {
|
||||
return fmt.Errorf("log series: %w", err)
|
||||
}
|
||||
}
|
||||
if len(a.metadata) > 0 {
|
||||
rec = enc.Metadata(a.metadata, buf)
|
||||
for _, b := range a.batches {
|
||||
if len(b.metadata) > 0 {
|
||||
rec = enc.Metadata(b.metadata, buf)
|
||||
buf = rec[:0]
|
||||
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log metadata: %w", err)
|
||||
}
|
||||
}
|
||||
if len(a.samples) > 0 {
|
||||
rec = enc.Samples(a.samples, buf)
|
||||
if len(b.floats) > 0 {
|
||||
rec = enc.Samples(b.floats, buf)
|
||||
buf = rec[:0]
|
||||
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log samples: %w", err)
|
||||
}
|
||||
}
|
||||
if len(a.histograms) > 0 {
|
||||
if len(b.histograms) > 0 {
|
||||
var customBucketsHistograms []record.RefHistogramSample
|
||||
rec, customBucketsHistograms = enc.HistogramSamples(a.histograms, buf)
|
||||
rec, customBucketsHistograms = enc.HistogramSamples(b.histograms, buf)
|
||||
buf = rec[:0]
|
||||
if len(rec) > 0 {
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
@ -965,9 +1080,9 @@ func (a *headAppender) log() error {
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(a.floatHistograms) > 0 {
|
||||
if len(b.floatHistograms) > 0 {
|
||||
var customBucketsFloatHistograms []record.RefFloatHistogramSample
|
||||
rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(a.floatHistograms, buf)
|
||||
rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(b.floatHistograms, buf)
|
||||
buf = rec[:0]
|
||||
if len(rec) > 0 {
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
@ -986,14 +1101,15 @@ func (a *headAppender) log() error {
|
||||
// otherwise it might happen that we send the exemplars in a remote write
|
||||
// batch before the samples, which in turn means the exemplar is rejected
|
||||
// for missing series, since series are created due to samples.
|
||||
if len(a.exemplars) > 0 {
|
||||
rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf)
|
||||
if len(b.exemplars) > 0 {
|
||||
rec = enc.Exemplars(exemplarsForEncoding(b.exemplars), buf)
|
||||
buf = rec[:0]
|
||||
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log exemplars: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1040,10 +1156,10 @@ type appenderCommitContext struct {
|
||||
enc record.Encoder
|
||||
}
|
||||
|
||||
// commitExemplars adds all exemplars from headAppender to the head's exemplar storage.
|
||||
func (a *headAppender) commitExemplars() {
|
||||
// commitExemplars adds all exemplars from the provided batch to the head's exemplar storage.
|
||||
func (a *headAppender) commitExemplars(b *appendBatch) {
|
||||
// No errors logging to WAL, so pass the exemplars along to the in memory storage.
|
||||
for _, e := range a.exemplars {
|
||||
for _, e := range b.exemplars {
|
||||
s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref))
|
||||
if s == nil {
|
||||
// This is very unlikely to happen, but we have seen it in the wild.
|
||||
@ -1147,9 +1263,9 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld
|
||||
}
|
||||
}
|
||||
|
||||
// commitSamples processes and commits the samples in the headAppender to the series.
|
||||
// It handles both in-order and out-of-order samples, updating the appenderCommitContext
|
||||
// with the results of the append operations.
|
||||
// commitFloats processes and commits the samples in the provided batch to the
|
||||
// series. It handles both in-order and out-of-order samples, updating the
|
||||
// appenderCommitContext with the results of the append operations.
|
||||
//
|
||||
// The function iterates over the samples in the headAppender and attempts to append each sample
|
||||
// to its corresponding series. It handles various error cases such as out-of-order samples,
|
||||
@ -1166,14 +1282,68 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld
|
||||
// operations on the series after appending the samples.
|
||||
//
|
||||
// There are also specific functions to commit histograms and float histograms.
|
||||
func (a *headAppender) commitSamples(acc *appenderCommitContext) {
|
||||
func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext) {
|
||||
var ok, chunkCreated bool
|
||||
var series *memSeries
|
||||
|
||||
for i, s := range a.samples {
|
||||
series = a.sampleSeries[i]
|
||||
for i, s := range b.floats {
|
||||
series = b.floatSeries[i]
|
||||
series.Lock()
|
||||
|
||||
if value.IsStaleNaN(s.V) {
|
||||
// If a float staleness marker had been appended for a
|
||||
// series that got a histogram or float histogram
|
||||
// appended before via this same appender, it would not
|
||||
// show up here because we had already converted it. We
|
||||
// end up here for two reasons: (1) This is the very
|
||||
// first sample for this series appended via this
|
||||
// appender. (2) A float sample was appended to this
|
||||
// series before via this same appender.
|
||||
//
|
||||
// In either case, we need to check the previous sample
|
||||
// in the memSeries to append the appropriately typed
|
||||
// staleness marker. This is obviously so in case (1).
|
||||
// In case (2), we would usually expect a float sample
|
||||
// as the previous sample, but there might be concurrent
|
||||
// appends that have added a histogram sample in the
|
||||
// meantime. (This will probably lead to OOO shenanigans
|
||||
// anyway, but that's a different story.)
|
||||
//
|
||||
// If the last sample in the memSeries is indeed a
|
||||
// float, we don't have to do anything special here and
|
||||
// just go on with the normal commit for a float sample.
|
||||
// However, if the last sample in the memSeries is a
|
||||
// histogram or float histogram, we have to convert the
|
||||
// staleness marker to a histogram (or float histogram,
|
||||
// respectively), and just add it at the end of the
|
||||
// histograms (or float histograms) in the same batch,
|
||||
// to be committed later in commitHistograms (or
|
||||
// commitFloatHistograms). The latter is fine because we
|
||||
// know there is no other histogram (or float histogram)
|
||||
// sample for this same series in this same batch
|
||||
// (because any such sample would have triggered a new
|
||||
// batch).
|
||||
switch {
|
||||
case series.lastHistogramValue != nil:
|
||||
b.histograms = append(b.histograms, record.RefHistogramSample{
|
||||
Ref: series.ref,
|
||||
T: s.T,
|
||||
H: &histogram.Histogram{Sum: s.V},
|
||||
})
|
||||
b.histogramSeries = append(b.histogramSeries, series)
|
||||
series.Unlock()
|
||||
continue
|
||||
case series.lastFloatHistogramValue != nil:
|
||||
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
|
||||
Ref: series.ref,
|
||||
T: s.T,
|
||||
FH: &histogram.FloatHistogram{Sum: s.V},
|
||||
})
|
||||
b.floatHistogramSeries = append(b.floatHistogramSeries, series)
|
||||
series.Unlock()
|
||||
continue
|
||||
}
|
||||
}
|
||||
oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
if err != nil {
|
||||
handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected)
|
||||
@ -1261,15 +1431,24 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
|
||||
}
|
||||
}
|
||||
|
||||
// For details on the commitHistograms function, see the commitSamples docs.
|
||||
func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
|
||||
// For details on the commitHistograms function, see the commitFloats docs.
|
||||
func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitContext) {
|
||||
var ok, chunkCreated bool
|
||||
var series *memSeries
|
||||
|
||||
for i, s := range a.histograms {
|
||||
series = a.histogramSeries[i]
|
||||
for i, s := range b.histograms {
|
||||
series = b.histogramSeries[i]
|
||||
series.Lock()
|
||||
|
||||
// At this point, we could encounter a histogram staleness
|
||||
// marker that should better be a float staleness marker or a
|
||||
// float histogram staleness marker. This can only happen with
|
||||
// concurrent appenders appending to the same series _and_ doing
|
||||
// so in a mixed-type scenario. This case is expected to be very
|
||||
// rare, so we do not bother here to convert the staleness
|
||||
// marker. The worst case is that we need to cut a new chunk
|
||||
// just for the staleness marker.
|
||||
|
||||
oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
if err != nil {
|
||||
handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected)
|
||||
@ -1361,15 +1540,24 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
|
||||
}
|
||||
}
|
||||
|
||||
// For details on the commitFloatHistograms function, see the commitSamples docs.
|
||||
func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
|
||||
// For details on the commitFloatHistograms function, see the commitFloats docs.
|
||||
func (a *headAppender) commitFloatHistograms(b *appendBatch, acc *appenderCommitContext) {
|
||||
var ok, chunkCreated bool
|
||||
var series *memSeries
|
||||
|
||||
for i, s := range a.floatHistograms {
|
||||
series = a.floatHistogramSeries[i]
|
||||
for i, s := range b.floatHistograms {
|
||||
series = b.floatHistogramSeries[i]
|
||||
series.Lock()
|
||||
|
||||
// At this point, we could encounter a float histogram staleness
|
||||
// marker that should better be a float staleness marker or an
|
||||
// integer histogram staleness marker. This can only happen with
|
||||
// concurrent appenders appending to the same series _and_ doing
|
||||
// so in a mixed-type scenario. This case is expected to be very
|
||||
// rare, so we do not bother here to convert the staleness
|
||||
// marker. The worst case is that we need to cut a new chunk
|
||||
// just for the staleness marker.
|
||||
|
||||
oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
if err != nil {
|
||||
handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected)
|
||||
@ -1461,14 +1649,14 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
|
||||
}
|
||||
}
|
||||
|
||||
// commitMetadata commits the metadata for each series in the headAppender.
|
||||
// commitMetadata commits the metadata for each series in the provided batch.
|
||||
// It iterates over the metadata slice and updates the corresponding series
|
||||
// with the new metadata information. The series is locked during the update
|
||||
// to ensure thread safety.
|
||||
func (a *headAppender) commitMetadata() {
|
||||
func commitMetadata(b *appendBatch) {
|
||||
var series *memSeries
|
||||
for i, m := range a.metadata {
|
||||
series = a.metadataSeries[i]
|
||||
for i, m := range b.metadata {
|
||||
series = b.metadataSeries[i]
|
||||
series.Lock()
|
||||
series.meta = &metadata.Metadata{Type: record.ToMetricType(m.Type), Unit: m.Unit, Help: m.Help}
|
||||
series.Unlock()
|
||||
@ -1489,75 +1677,82 @@ func (a *headAppender) Commit() (err error) {
|
||||
if a.closed {
|
||||
return ErrAppenderClosed
|
||||
}
|
||||
defer func() { a.closed = true }()
|
||||
|
||||
h := a.head
|
||||
|
||||
defer func() {
|
||||
h.putRefSeriesBuffer(a.seriesRefs)
|
||||
h.putSeriesBuffer(a.series)
|
||||
a.closed = true
|
||||
}()
|
||||
|
||||
if err := a.log(); err != nil {
|
||||
_ = a.Rollback() // Most likely the same error will happen again.
|
||||
return fmt.Errorf("write to WAL: %w", err)
|
||||
}
|
||||
|
||||
if a.head.writeNotified != nil {
|
||||
a.head.writeNotified.Notify()
|
||||
if h.writeNotified != nil {
|
||||
h.writeNotified.Notify()
|
||||
}
|
||||
|
||||
a.commitExemplars()
|
||||
|
||||
defer a.head.metrics.activeAppenders.Dec()
|
||||
defer a.head.putAppendBuffer(a.samples)
|
||||
defer a.head.putSeriesBuffer(a.sampleSeries)
|
||||
defer a.head.putExemplarBuffer(a.exemplars)
|
||||
defer a.head.putHistogramBuffer(a.histograms)
|
||||
defer a.head.putFloatHistogramBuffer(a.floatHistograms)
|
||||
defer a.head.putMetadataBuffer(a.metadata)
|
||||
defer a.head.iso.closeAppend(a.appendID)
|
||||
|
||||
acc := &appenderCommitContext{
|
||||
floatsAppended: len(a.samples),
|
||||
histogramsAppended: len(a.histograms) + len(a.floatHistograms),
|
||||
inOrderMint: math.MaxInt64,
|
||||
inOrderMaxt: math.MinInt64,
|
||||
oooMinT: math.MaxInt64,
|
||||
oooMaxT: math.MinInt64,
|
||||
oooCapMax: a.head.opts.OutOfOrderCapMax.Load(),
|
||||
oooCapMax: h.opts.OutOfOrderCapMax.Load(),
|
||||
appendChunkOpts: chunkOpts{
|
||||
chunkDiskMapper: a.head.chunkDiskMapper,
|
||||
chunkRange: a.head.chunkRange.Load(),
|
||||
samplesPerChunk: a.head.opts.SamplesPerChunk,
|
||||
chunkDiskMapper: h.chunkDiskMapper,
|
||||
chunkRange: h.chunkRange.Load(),
|
||||
samplesPerChunk: h.opts.SamplesPerChunk,
|
||||
},
|
||||
}
|
||||
|
||||
for _, b := range a.batches {
|
||||
acc.floatsAppended += len(b.floats)
|
||||
acc.histogramsAppended += len(b.histograms) + len(b.floatHistograms)
|
||||
a.commitExemplars(b)
|
||||
defer b.close(h)
|
||||
}
|
||||
defer h.metrics.activeAppenders.Dec()
|
||||
defer h.iso.closeAppend(a.appendID)
|
||||
|
||||
defer func() {
|
||||
for i := range acc.oooRecords {
|
||||
a.head.putBytesBuffer(acc.oooRecords[i][:0])
|
||||
h.putBytesBuffer(acc.oooRecords[i][:0])
|
||||
}
|
||||
}()
|
||||
|
||||
a.commitSamples(acc)
|
||||
a.commitHistograms(acc)
|
||||
a.commitFloatHistograms(acc)
|
||||
a.commitMetadata()
|
||||
for _, b := range a.batches {
|
||||
// Do not change the order of these calls. The staleness marker
|
||||
// handling depends on it.
|
||||
a.commitFloats(b, acc)
|
||||
a.commitHistograms(b, acc)
|
||||
a.commitFloatHistograms(b, acc)
|
||||
commitMetadata(b)
|
||||
}
|
||||
// Unmark all series as pending commit after all samples have been committed.
|
||||
a.unmarkCreatedSeriesAsPendingCommit()
|
||||
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected))
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected))
|
||||
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected))
|
||||
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected))
|
||||
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended))
|
||||
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended))
|
||||
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted))
|
||||
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted))
|
||||
a.head.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt)
|
||||
a.head.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT)
|
||||
h.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected))
|
||||
h.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected))
|
||||
h.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected))
|
||||
h.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected))
|
||||
h.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended))
|
||||
h.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended))
|
||||
h.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted))
|
||||
h.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted))
|
||||
h.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt)
|
||||
h.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT)
|
||||
|
||||
acc.collectOOORecords(a)
|
||||
if a.head.wbl != nil {
|
||||
if err := a.head.wbl.Log(acc.oooRecords...); err != nil {
|
||||
if h.wbl != nil {
|
||||
if err := h.wbl.Log(acc.oooRecords...); err != nil {
|
||||
// TODO(codesome): Currently WBL logging of ooo samples is best effort here since we cannot try logging
|
||||
// until we have found what samples become OOO. We can try having a metric for this failure.
|
||||
// Returning the error here is not correct because we have already put the samples into the memory,
|
||||
// hence the append/insert was a success.
|
||||
a.head.logger.Error("Failed to log out of order samples into the WAL", "err", err)
|
||||
h.logger.Error("Failed to log out of order samples into the WAL", "err", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -2007,37 +2202,43 @@ func (a *headAppender) Rollback() (err error) {
|
||||
if a.closed {
|
||||
return ErrAppenderClosed
|
||||
}
|
||||
defer func() { a.closed = true }()
|
||||
defer a.head.metrics.activeAppenders.Dec()
|
||||
defer a.head.iso.closeAppend(a.appendID)
|
||||
defer a.head.putSeriesBuffer(a.sampleSeries)
|
||||
defer a.unmarkCreatedSeriesAsPendingCommit()
|
||||
h := a.head
|
||||
defer func() {
|
||||
a.unmarkCreatedSeriesAsPendingCommit()
|
||||
h.iso.closeAppend(a.appendID)
|
||||
h.metrics.activeAppenders.Dec()
|
||||
a.closed = true
|
||||
h.putRefSeriesBuffer(a.seriesRefs)
|
||||
h.putSeriesBuffer(a.series)
|
||||
}()
|
||||
|
||||
var series *memSeries
|
||||
for i := range a.samples {
|
||||
series = a.sampleSeries[i]
|
||||
fmt.Println("ROLLBACK")
|
||||
for _, b := range a.batches {
|
||||
for i := range b.floats {
|
||||
series = b.floatSeries[i]
|
||||
series.Lock()
|
||||
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
|
||||
series.pendingCommit = false
|
||||
series.Unlock()
|
||||
}
|
||||
for i := range a.histograms {
|
||||
series = a.histogramSeries[i]
|
||||
for i := range b.histograms {
|
||||
series = b.histogramSeries[i]
|
||||
series.Lock()
|
||||
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
|
||||
series.pendingCommit = false
|
||||
series.Unlock()
|
||||
}
|
||||
a.head.putAppendBuffer(a.samples)
|
||||
a.head.putExemplarBuffer(a.exemplars)
|
||||
a.head.putHistogramBuffer(a.histograms)
|
||||
a.head.putFloatHistogramBuffer(a.floatHistograms)
|
||||
a.head.putMetadataBuffer(a.metadata)
|
||||
a.samples = nil
|
||||
a.exemplars = nil
|
||||
a.histograms = nil
|
||||
a.metadata = nil
|
||||
|
||||
for i := range b.floatHistograms {
|
||||
series = b.floatHistogramSeries[i]
|
||||
series.Lock()
|
||||
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
|
||||
series.pendingCommit = false
|
||||
series.Unlock()
|
||||
}
|
||||
b.close(h)
|
||||
}
|
||||
a.batches = a.batches[:0]
|
||||
// Series are created in the head memory regardless of rollback. Thus we have
|
||||
// to log them to the WAL in any case.
|
||||
return a.log()
|
||||
|
@ -5336,8 +5336,6 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
||||
samples []chunks.Sample
|
||||
expChunks int
|
||||
err error
|
||||
// If this is empty, samples above will be taken instead of this.
|
||||
addToExp []chunks.Sample
|
||||
}{
|
||||
// Histograms that end up in the expected samples are copied here so that we
|
||||
// can independently set the CounterResetHint later.
|
||||
@ -5377,43 +5375,29 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
||||
samples: []chunks.Sample{sample{t: 100, fh: floatHists[4].Copy()}},
|
||||
err: storage.ErrOutOfOrderSample,
|
||||
},
|
||||
// The three next tests all failed before #15177 was fixed.
|
||||
{
|
||||
// Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also
|
||||
// verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order.
|
||||
samples: []chunks.Sample{
|
||||
sample{t: 400, f: 4},
|
||||
sample{t: 500, h: hists[5]}, // This won't be committed.
|
||||
sample{t: 500, h: hists[5]},
|
||||
sample{t: 600, f: 6},
|
||||
},
|
||||
addToExp: []chunks.Sample{
|
||||
sample{t: 400, f: 4},
|
||||
sample{t: 600, f: 6},
|
||||
},
|
||||
expChunks: 7, // Only 1 new chunk for float64.
|
||||
expChunks: 9, // Each of the three samples above creates a new chunk because the type changes.
|
||||
},
|
||||
{
|
||||
// Here the histogram is appended at the end, hence the first histogram is out of order.
|
||||
samples: []chunks.Sample{
|
||||
sample{t: 700, h: hists[7]}, // Out of order w.r.t. the next float64 sample that is appended first.
|
||||
sample{t: 700, h: hists[7]},
|
||||
sample{t: 800, f: 8},
|
||||
sample{t: 900, h: hists[9]},
|
||||
},
|
||||
addToExp: []chunks.Sample{
|
||||
sample{t: 800, f: 8},
|
||||
sample{t: 900, h: hists[9].Copy()},
|
||||
},
|
||||
expChunks: 8, // float64 added to old chunk, only 1 new for histograms.
|
||||
expChunks: 12, // Again each sample creates a new chunk.
|
||||
},
|
||||
{
|
||||
// Float histogram is appended at the end.
|
||||
samples: []chunks.Sample{
|
||||
sample{t: 1000, fh: floatHists[7]}, // Out of order w.r.t. the next histogram.
|
||||
sample{t: 1000, fh: floatHists[7]},
|
||||
sample{t: 1100, h: hists[9]},
|
||||
},
|
||||
addToExp: []chunks.Sample{
|
||||
sample{t: 1100, h: hists[9].Copy()},
|
||||
},
|
||||
expChunks: 8,
|
||||
expChunks: 14, // Even changes between float and integer histogram create new chunks.
|
||||
},
|
||||
}
|
||||
|
||||
@ -5431,11 +5415,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
||||
|
||||
if a.err == nil {
|
||||
require.NoError(t, app.Commit())
|
||||
if len(a.addToExp) > 0 {
|
||||
expResult = append(expResult, a.addToExp...)
|
||||
} else {
|
||||
expResult = append(expResult, a.samples...)
|
||||
}
|
||||
checkExpChunks(a.expChunks)
|
||||
} else {
|
||||
require.NoError(t, app.Rollback())
|
||||
@ -6751,7 +6731,27 @@ func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing
|
||||
|
||||
func TestHeadAppender_AppendCT(t *testing.T) {
|
||||
testHistogram := tsdbutil.GenerateTestHistogram(1)
|
||||
testHistogram.CounterResetHint = histogram.NotCounterReset
|
||||
testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1)
|
||||
testFloatHistogram.CounterResetHint = histogram.NotCounterReset
|
||||
// TODO(beorn7): Once issue #15346 is fixed, the CounterResetHint of the
|
||||
// following two zero histograms should be histogram.CounterReset.
|
||||
testZeroHistogram := &histogram.Histogram{
|
||||
Schema: testHistogram.Schema,
|
||||
ZeroThreshold: testHistogram.ZeroThreshold,
|
||||
PositiveSpans: testHistogram.PositiveSpans,
|
||||
NegativeSpans: testHistogram.NegativeSpans,
|
||||
PositiveBuckets: []int64{0, 0, 0, 0},
|
||||
NegativeBuckets: []int64{0, 0, 0, 0},
|
||||
}
|
||||
testZeroFloatHistogram := &histogram.FloatHistogram{
|
||||
Schema: testFloatHistogram.Schema,
|
||||
ZeroThreshold: testFloatHistogram.ZeroThreshold,
|
||||
PositiveSpans: testFloatHistogram.PositiveSpans,
|
||||
NegativeSpans: testFloatHistogram.NegativeSpans,
|
||||
PositiveBuckets: []float64{0, 0, 0, 0},
|
||||
NegativeBuckets: []float64{0, 0, 0, 0},
|
||||
}
|
||||
type appendableSamples struct {
|
||||
ts int64
|
||||
fSample float64
|
||||
@ -6783,12 +6783,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
||||
{ts: 101, h: testHistogram, ct: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
hNoCounterReset := *testHistogram
|
||||
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, h: &histogram.Histogram{}},
|
||||
sample{t: 1, h: testZeroHistogram},
|
||||
sample{t: 100, h: testHistogram},
|
||||
sample{t: 101, h: &hNoCounterReset},
|
||||
sample{t: 101, h: testHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
@ -6799,12 +6797,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
||||
{ts: 101, fh: testFloatHistogram, ct: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
fhNoCounterReset := *testFloatHistogram
|
||||
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||
sample{t: 1, fh: testZeroFloatHistogram},
|
||||
sample{t: 100, fh: testFloatHistogram},
|
||||
sample{t: 101, fh: &fhNoCounterReset},
|
||||
sample{t: 101, fh: testFloatHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
@ -6827,12 +6823,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
||||
{ts: 101, h: testHistogram, ct: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
hNoCounterReset := *testHistogram
|
||||
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, h: &histogram.Histogram{}},
|
||||
sample{t: 1, h: testZeroHistogram},
|
||||
sample{t: 100, h: testHistogram},
|
||||
sample{t: 101, h: &hNoCounterReset},
|
||||
sample{t: 101, h: testHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
@ -6843,12 +6837,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
||||
{ts: 101, fh: testFloatHistogram, ct: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
fhNoCounterReset := *testFloatHistogram
|
||||
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||
sample{t: 1, fh: testZeroFloatHistogram},
|
||||
sample{t: 100, fh: testFloatHistogram},
|
||||
sample{t: 101, fh: &fhNoCounterReset},
|
||||
sample{t: 101, fh: testFloatHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
@ -6872,9 +6864,9 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
||||
{ts: 102, h: testHistogram, ct: 101},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
sample{t: 1, h: &histogram.Histogram{}},
|
||||
sample{t: 1, h: testZeroHistogram},
|
||||
sample{t: 100, h: testHistogram},
|
||||
sample{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.UnknownCounterReset}},
|
||||
sample{t: 101, h: testZeroHistogram},
|
||||
sample{t: 102, h: testHistogram},
|
||||
},
|
||||
},
|
||||
@ -6885,9 +6877,9 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
||||
{ts: 102, fh: testFloatHistogram, ct: 101},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||
sample{t: 1, fh: testZeroFloatHistogram},
|
||||
sample{t: 100, fh: testFloatHistogram},
|
||||
sample{t: 101, fh: &histogram.FloatHistogram{CounterResetHint: histogram.UnknownCounterReset}},
|
||||
sample{t: 101, fh: testZeroFloatHistogram},
|
||||
sample{t: 102, fh: testFloatHistogram},
|
||||
},
|
||||
},
|
||||
@ -6910,12 +6902,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
||||
{ts: 101, h: testHistogram, ct: 100},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
hNoCounterReset := *testHistogram
|
||||
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, h: &histogram.Histogram{}},
|
||||
sample{t: 1, h: testZeroHistogram},
|
||||
sample{t: 100, h: testHistogram},
|
||||
sample{t: 101, h: &hNoCounterReset},
|
||||
sample{t: 101, h: testHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
@ -6926,12 +6916,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
||||
{ts: 101, fh: testFloatHistogram, ct: 100},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
fhNoCounterReset := *testFloatHistogram
|
||||
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||
sample{t: 1, fh: testZeroFloatHistogram},
|
||||
sample{t: 100, fh: testFloatHistogram},
|
||||
sample{t: 101, fh: &fhNoCounterReset},
|
||||
sample{t: 101, fh: testFloatHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
|
Loading…
x
Reference in New Issue
Block a user