From f0688c21d6aacbc673ca5a7d8ea2829f9a9f2a49 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Wed, 11 Aug 2021 17:38:48 +0530 Subject: [PATCH] Log sparse histograms into WAL and replay from it (#9191) Signed-off-by: Ganesh Vernekar --- pkg/histogram/sparse_histogram.go | 5 +- tsdb/encoding/encoding.go | 7 +- tsdb/head.go | 13 --- tsdb/head_append.go | 8 +- tsdb/head_test.go | 64 +++++++++++++- tsdb/head_wal.go | 78 +++++++++++++++-- tsdb/record/record.go | 137 +++++++++++++++++++++++++++++- 7 files changed, 281 insertions(+), 31 deletions(-) diff --git a/pkg/histogram/sparse_histogram.go b/pkg/histogram/sparse_histogram.go index 0f07321764..0e94e9adc5 100644 --- a/pkg/histogram/sparse_histogram.go +++ b/pkg/histogram/sparse_histogram.go @@ -30,9 +30,10 @@ import ( // neg bucket idx 3 2 1 0 -1 ... // actively used bucket indices themselves are represented by the spans type SparseHistogram struct { - Count, ZeroCount uint64 - Sum, ZeroThreshold float64 Schema int32 + ZeroThreshold float64 + ZeroCount, Count uint64 + Sum float64 PositiveSpans, NegativeSpans []Span PositiveBuckets, NegativeBuckets []int64 } diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index 8a94ff7bac..b326ab4753 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -179,9 +179,10 @@ func NewDecbufRaw(bs ByteSlice, length int) Decbuf { return Decbuf{B: bs.Range(0, length)} } -func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) } -func (d *Decbuf) Be32int() int { return int(d.Be32()) } -func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) } +func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) } +func (d *Decbuf) Uvarint32() uint32 { return uint32(d.Uvarint64()) } +func (d *Decbuf) Be32int() int { return int(d.Be32()) } +func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) } // Crc32 returns a CRC32 checksum over the remaining bytes. func (d *Decbuf) Crc32(castagnoliTable *crc32.Table) uint32 { diff --git a/tsdb/head.go b/tsdb/head.go index 869bb6c8e2..0de066bda9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -63,9 +63,6 @@ type Head struct { lastWALTruncationTime atomic.Int64 lastMemoryTruncationTime atomic.Int64 lastSeriesID atomic.Uint64 - // hasHistograms this is used to m-map all chunks in case there are histograms. - // A hack to avoid updating all the failing tests. - hasHistograms atomic.Bool metrics *headMetrics opts *HeadOptions @@ -1158,16 +1155,6 @@ func (h *Head) Close() error { defer h.closedMtx.Unlock() h.closed = true - // M-map all in-memory chunks. - // A hack for the histogram till it is stored in WAL and replayed. - if h.hasHistograms.Load() { - for _, m := range h.series.series { - for _, s := range m { - s.mmapCurrentHeadChunk(h.chunkDiskMapper) - } - } - } - errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close()) if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown { errs.Add(h.performChunkSnapshot()) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 7396e4caf5..e98728012d 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -470,6 +470,13 @@ func (a *headAppender) log() error { return errors.Wrap(err, "log exemplars") } } + if len(a.histograms) > 0 { + rec = enc.Histograms(a.histograms, buf) + buf = rec[:0] + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log histograms") + } + } return nil } @@ -539,7 +546,6 @@ func (a *headAppender) Commit() (err error) { for i, s := range a.histograms { series = a.histogramSeries[i] series.Lock() - a.head.hasHistograms.Store(true) ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false diff --git a/tsdb/head_test.go b/tsdb/head_test.go index e04348ad5e..f215cef845 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2537,13 +2537,69 @@ func TestAppendHistogram(t *testing.T) { } } +func TestHistogramInWAL(t *testing.T) { + l := labels.Labels{{Name: "a", Value: "b"}} + numHistograms := 10 + head, _ := newTestHead(t, 1000, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + + require.NoError(t, head.Init(0)) + app := head.Appender(context.Background()) + + type timedHist struct { + t int64 + h histogram.SparseHistogram + } + expHists := make([]timedHist, 0, numHistograms) + for i, h := range generateHistograms(numHistograms) { + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + _, err := app.AppendHistogram(0, l, int64(i), h) + require.NoError(t, err) + expHists = append(expHists, timedHist{int64(i), h}) + } + require.NoError(t, app.Commit()) + + // Restart head. + require.NoError(t, head.Close()) + w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) + + q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, q.Close()) + }) + + ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + + require.True(t, ss.Next()) + s := ss.At() + require.False(t, ss.Next()) + + it := s.Iterator() + actHists := make([]timedHist, 0, len(expHists)) + for it.Next() { + t, h := it.AtHistogram() + actHists = append(actHists, timedHist{t, h.Copy()}) + } + + require.Equal(t, expHists, actHists) +} + func generateHistograms(n int) (r []histogram.SparseHistogram) { for i := 0; i < n; i++ { r = append(r, histogram.SparseHistogram{ - Count: 5 + uint64(i*4), - ZeroCount: 2 + uint64(i), - Sum: 18.4 * float64(i+1), - Schema: 1, + Count: 5 + uint64(i*4), + ZeroCount: 2 + uint64(i), + ZeroThreshold: 0.001, + Sum: 18.4 * float64(i+1), + Schema: 1, PositiveSpans: []histogram.Span{ {Offset: 0, Length: 2}, {Offset: 1, Length: 2}, diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 506a64de97..65a3482d14 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -46,16 +46,18 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks // for error reporting. var unknownRefs atomic.Uint64 var unknownExemplarRefs atomic.Uint64 + var unknownHistogramRefs atomic.Uint64 // Start workers that each process samples for a partition of the series ID space. // They are connected through a ring of channels which ensures that all sample batches // read from the WAL are processed in order. var ( - wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - inputs = make([]chan []record.RefSample, n) - outputs = make([]chan []record.RefSample, n) - exemplarsInput chan record.RefExemplar + wg sync.WaitGroup + n = runtime.GOMAXPROCS(0) + inputs = make([]chan []record.RefSample, n) + outputs = make([]chan []record.RefSample, n) + exemplarsInput chan record.RefExemplar + histogramsInput chan record.RefHistogram dec record.Decoder shards = make([][]record.RefSample, n) @@ -82,6 +84,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks return []record.RefExemplar{} }, } + histogramsPool = sync.Pool{ + New: func() interface{} { + return []record.RefHistogram{} + }, + } ) defer func() { @@ -94,6 +101,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks } } close(exemplarsInput) + close(histogramsInput) wg.Wait() } }() @@ -133,6 +141,42 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks } }(exemplarsInput) + wg.Add(1) + histogramsInput = make(chan record.RefHistogram, 300) + go func(input <-chan record.RefHistogram) { + defer wg.Done() + + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) + for rh := range input { + ms := h.series.getByID(rh.Ref) + if ms == nil { + unknownHistogramRefs.Inc() + continue + } + + if rh.T < h.minValidTime.Load() { + continue + } + + // At the moment the only possible error here is out of order exemplars, which we shouldn't see when + // replaying the WAL, so lets just log the error if it's not that type. + _, chunkCreated := ms.appendHistogram(rh.T, rh.H, 0, h.chunkDiskMapper) + if chunkCreated { + h.metrics.chunksCreated.Inc() + h.metrics.chunks.Inc() + } + + if rh.T > maxt { + maxt = rh.T + } + if rh.T < mint { + mint = rh.T + } + } + + h.updateMinMaxTime(mint, maxt) + }(histogramsInput) + go func() { defer close(decoded) for r.Next() { @@ -186,6 +230,18 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks return } decoded <- exemplars + case record.Histograms: + hists := histogramsPool.Get().([]record.RefHistogram)[:0] + hists, err = dec.Histograms(rec, hists) + if err != nil { + decodeErr = &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode histograms"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- hists default: // Noop. } @@ -319,6 +375,13 @@ Outer: } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. exemplarsPool.Put(v) + case []record.RefHistogram: + // TODO: split into multiple slices and have multiple workers processing the histograms like we do for samples. + for _, rh := range v { + histogramsInput <- rh + } + //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. + histogramsPool.Put(v) default: panic(fmt.Errorf("unexpected decoded type: %T", d)) } @@ -341,14 +404,15 @@ Outer: } } close(exemplarsInput) + close(histogramsInput) wg.Wait() if r.Err() != nil { return errors.Wrap(r.Err(), "read records") } - if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 { - level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load()) + if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 || unknownHistogramRefs.Load() > 0 { + level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load(), "histograms", unknownHistogramRefs.Load()) } return nil } diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 1d5873f04e..2e98fee5ed 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -40,6 +40,8 @@ const ( Tombstones Type = 3 // Exemplars is used to match WAL records of type Exemplars. Exemplars Type = 4 + // Histograms is used to match WAL records of type Histograms. + Histograms Type = 5 ) var ( @@ -87,7 +89,7 @@ func (d *Decoder) Type(rec []byte) Type { return Unknown } switch t := Type(rec[0]); t { - case Series, Samples, Tombstones, Exemplars: + case Series, Samples, Tombstones, Exemplars, Histograms: return t } return Unknown @@ -226,6 +228,88 @@ func (d *Decoder) Exemplars(rec []byte, exemplars []RefExemplar) ([]RefExemplar, return exemplars, nil } +func (d *Decoder) Histograms(rec []byte, hists []RefHistogram) ([]RefHistogram, error) { + dec := encoding.Decbuf{B: rec} + t := Type(dec.Byte()) + if t != Histograms { + return nil, errors.New("invalid record type") + } + if dec.Len() == 0 { + return hists, nil + } + var ( + baseRef = dec.Be64() + baseTime = dec.Be64int64() + ) + for len(dec.B) > 0 && dec.Err() == nil { + dref := dec.Varint64() + dtime := dec.Varint64() + + rh := RefHistogram{ + Ref: baseRef + uint64(dref), + T: baseTime + dtime, + H: histogram.SparseHistogram{ + Schema: 0, + ZeroThreshold: 0, + ZeroCount: 0, + Count: 0, + Sum: 0, + }, + } + + rh.H.Schema = int32(dec.Varint64()) + rh.H.ZeroThreshold = math.Float64frombits(dec.Be64()) + + rh.H.ZeroCount = dec.Uvarint64() + rh.H.Count = dec.Uvarint64() + rh.H.Sum = math.Float64frombits(dec.Be64()) + + l := dec.Uvarint() + if l > 0 { + rh.H.PositiveSpans = make([]histogram.Span, l) + } + for i := range rh.H.PositiveSpans { + rh.H.PositiveSpans[i].Offset = int32(dec.Varint64()) + rh.H.PositiveSpans[i].Length = dec.Uvarint32() + } + + l = dec.Uvarint() + if l > 0 { + rh.H.NegativeSpans = make([]histogram.Span, l) + } + for i := range rh.H.NegativeSpans { + rh.H.NegativeSpans[i].Offset = int32(dec.Varint64()) + rh.H.NegativeSpans[i].Length = dec.Uvarint32() + } + + l = dec.Uvarint() + if l > 0 { + rh.H.PositiveBuckets = make([]int64, l) + } + for i := range rh.H.PositiveBuckets { + rh.H.PositiveBuckets[i] = dec.Varint64() + } + + l = dec.Uvarint() + if l > 0 { + rh.H.NegativeBuckets = make([]int64, l) + } + for i := range rh.H.NegativeBuckets { + rh.H.NegativeBuckets[i] = dec.Varint64() + } + + hists = append(hists, rh) + } + + if dec.Err() != nil { + return nil, errors.Wrapf(dec.Err(), "decode error after %d histograms", len(hists)) + } + if len(dec.B) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) + } + return hists, nil +} + // Encoder encodes series, sample, and tombstones records. // The zero value is ready to use. type Encoder struct { @@ -316,3 +400,54 @@ func (e *Encoder) Exemplars(exemplars []RefExemplar, b []byte) []byte { return buf.Get() } + +func (e *Encoder) Histograms(hists []RefHistogram, b []byte) []byte { + buf := encoding.Encbuf{B: b} + buf.PutByte(byte(Histograms)) + + if len(hists) == 0 { + return buf.Get() + } + + // Store base timestamp and base reference number of first histogram. + // All histograms encode their timestamp and ref as delta to those. + first := hists[0] + buf.PutBE64(first.Ref) + buf.PutBE64int64(first.T) + + for _, h := range hists { + buf.PutVarint64(int64(h.Ref) - int64(first.Ref)) + buf.PutVarint64(h.T - first.T) + + buf.PutVarint64(int64(h.H.Schema)) + buf.PutBE64(math.Float64bits(h.H.ZeroThreshold)) + + buf.PutUvarint64(h.H.ZeroCount) + buf.PutUvarint64(h.H.Count) + buf.PutBE64(math.Float64bits(h.H.Sum)) + + buf.PutUvarint(len(h.H.PositiveSpans)) + for _, s := range h.H.PositiveSpans { + buf.PutVarint64(int64(s.Offset)) + buf.PutUvarint32(s.Length) + } + + buf.PutUvarint(len(h.H.NegativeSpans)) + for _, s := range h.H.NegativeSpans { + buf.PutVarint64(int64(s.Offset)) + buf.PutUvarint32(s.Length) + } + + buf.PutUvarint(len(h.H.PositiveBuckets)) + for _, b := range h.H.PositiveBuckets { + buf.PutVarint64(b) + } + + buf.PutUvarint(len(h.H.NegativeBuckets)) + for _, b := range h.H.NegativeBuckets { + buf.PutVarint64(b) + } + } + + return buf.Get() +}