diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 8e6177d432..b11f975d49 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -126,7 +126,7 @@ func (c *HistoChunk) Appender() (Appender, error) { return nil, err } - a := &histoAppender{ + a := &HistoAppender{ b: &c.b, schema: it.schema, @@ -192,7 +192,7 @@ func (c *HistoChunk) Iterator(it Iterator) Iterator { return c.iterator(it) } -type histoAppender struct { +type HistoAppender struct { b *bstream // Metadata: @@ -230,12 +230,36 @@ func putUvarint(b *bstream, buf []byte, x uint64) { } } -func (a *histoAppender) Append(int64, float64) {} +func (a *HistoAppender) Append(int64, float64) {} + +// Appendable returns whether the chunk can be appended to, and if so +// whether any recoding needs to happen using the provided interjections +// (in case of any new buckets, positive or negative range, respectively) +// The chunk is not appendable if: +// * the schema has changed +// * the zerobucket threshold has changed +// * any buckets disappeared +func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]interjection, []interjection, bool) { + // TODO zerothreshold + if h.Schema != a.schema { + return nil, nil, false + } + posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans) + if !ok { + return nil, nil, false + } + negInterjections, ok := compareSpans(a.negSpans, h.NegativeSpans) + if !ok { + return nil, nil, false + } + return posInterjections, negInterjections, ok +} // AppendHistogram appends a SparseHistogram to the chunk. We assume the // histogram is properly structured. E.g. that the number of pos/neg buckets // used corresponds to the number conveyed by the pos/neg span structures. -func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { +// callers must call Appendable() first and act accordingly! +func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { var tDelta, cntDelta, zcntDelta int64 num := binary.BigEndian.Uint16(a.b.bytes()) @@ -265,19 +289,6 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { putVarint(a.b, a.buf64, buck) } case 1: - // TODO if zerobucket thresh or schema is different, we should create a new chunk - posInterjections, _ := compareSpans(a.posSpans, h.PositiveSpans) - //if !ok { - // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead - //} - negInterjections, _ := compareSpans(a.negSpans, h.NegativeSpans) - //if !ok { - // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead - //} - if len(posInterjections) > 0 || len(negInterjections) > 0 { - // new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one. - a.recode(posInterjections, negInterjections, h.PositiveSpans, h.NegativeSpans) - } tDelta = t - a.t cntDelta = int64(h.Count) - int64(a.cnt) @@ -300,19 +311,6 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { a.negbucketsDelta[i] = delta } default: - // TODO if zerobucket thresh or schema is different, we should create a new chunk - posInterjections, _ := compareSpans(a.posSpans, h.PositiveSpans) - //if !ok { - // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead - //} - negInterjections, _ := compareSpans(a.negSpans, h.NegativeSpans) - //if !ok { - // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead - //} - if len(posInterjections) > 0 || len(negInterjections) > 0 { - // new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one. - a.recode(posInterjections, negInterjections, h.PositiveSpans, h.NegativeSpans) - } tDelta = t - a.t cntDelta = int64(h.Count) - int64(a.cnt) zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) @@ -357,13 +355,14 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { } -// recode converts the current chunk to accommodate an expansion of the set of +// Recode converts the current chunk to accommodate an expansion of the set of // (positive and/or negative) buckets used, according to the provided interjections, resulting in // the honoring of the provided new posSpans and negSpans // note: the decode-recode can probably be done more efficiently, but that's for a future optimization -func (a *histoAppender) recode(posInterjections, negInterjections []interjection, posSpans, negSpans []histogram.Span) { +func (a *HistoAppender) Recode(posInterjections, negInterjections []interjection, posSpans, negSpans []histogram.Span) (Chunk, Appender) { it := newHistoIterator(a.b.bytes()) - app, err := NewHistoChunk().Appender() + hc := NewHistoChunk() + app, err := hc.Appender() if err != nil { panic(err) } @@ -381,20 +380,13 @@ func (a *histoAppender) recode(posInterjections, negInterjections []interjection if len(negInterjections) > 0 { hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negbuckets, negInterjections) } - // there is no risk of infinite recursion here as all histograms get appended with the same schema (number of buckets) app.AppendHistogram(tOld, hOld) } - // adopt the new appender into ourselves - // we skip porting some fields like schema, t, cnt and zcnt, sum because they didn't change between our old chunk and the recoded one - app2 := app.(*histoAppender) - a.b = app2.b - a.posSpans, a.negSpans = posSpans, negSpans - a.posbuckets, a.negbuckets = app2.posbuckets, app2.negbuckets - a.posbucketsDelta, a.negbucketsDelta = app2.posbucketsDelta, app2.negbucketsDelta + return hc, app } -func (a *histoAppender) writeSumDelta(v float64) { +func (a *HistoAppender) writeSumDelta(v float64) { vDelta := math.Float64bits(v) ^ math.Float64bits(a.sum) if vDelta == 0 { diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index 42af9aba81..dc75f7a3e2 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -191,7 +191,7 @@ func TestHistoChunkBucketChanges(t *testing.T) { // TODO is this okay? // the appender can rewrite its own bytes slice but it is not able to update the HistoChunk, so our histochunk is outdated until we update it manually - c.b = *(app.(*histoAppender).b) + c.b = *(app.(*HistoAppender).b) require.Equal(t, 2, c.NumSamples()) // because the 2nd histogram has expanded buckets, we should expect all histograms (in particular the first) diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index c1c56c7e61..56eb9bf71f 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -150,7 +150,7 @@ type xorAppender struct { } func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { - panic("cannot call xorAppender.AppendHistogram().") + //panic("cannot call xorAppender.AppendHistogram().") } func (a *xorAppender) Append(t int64, v float64) { diff --git a/tsdb/head.go b/tsdb/head.go index dbffe67d45..06a7721968 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2624,6 +2624,24 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen return sampleInOrder, chunkCreated } + if !chunkCreated { + // Head controls the execution of recoding, so that we own the proper chunk reference afterwards + app, _ := s.app.(*chunkenc.HistoAppender) + posInterjections, negInterjections, ok := app.Appendable(sh) + // we have 3 cases here + // !ok -> we need to cut a new chunk + // ok but we have interjections -> existing chunk needs recoding before we can append our histogram + // ok and no interjections -> chunk is ready to support our histogram + if !ok { + c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper) + chunkCreated = true + + } else if len(posInterjections) > 0 || len(negInterjections) > 0 { + // new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one. + s.headChunk.chunk, s.app = app.Recode(posInterjections, negInterjections, sh.PositiveSpans, sh.NegativeSpans) + } + } + s.app.AppendHistogram(t, sh) c.maxTime = t