From 3362bf6d795101c1b75ca500884288f435ffe028 Mon Sep 17 00:00:00 2001 From: "Signed-off-by: Jesus Vazquez" Date: Tue, 11 Oct 2022 22:05:35 +0530 Subject: [PATCH] Fix merge conflicts Signed-off-by: Jesus Vazquez Signed-off-by: Ganesh Vernekar Co-authored-by: Ganesh Vernekar --- documentation/examples/remote_storage/go.mod | 2 +- documentation/examples/remote_storage/go.sum | 28 ++-- storage/merge.go | 3 +- storage/merge_test.go | 62 ++++---- tsdb/chunkenc/chunk.go | 12 +- tsdb/chunkenc/histogram.go | 8 +- tsdb/chunkenc/histogram_meta.go | 44 +++--- tsdb/compact_test.go | 6 +- tsdb/db.go | 1 + tsdb/db_test.go | 48 ++++-- tsdb/head.go | 65 ++------ tsdb/head_append.go | 151 ++++++------------- tsdb/head_read.go | 115 ++------------ tsdb/head_read_test.go | 72 ++++----- tsdb/head_test.go | 56 +++---- tsdb/head_wal.go | 6 +- tsdb/ooo_head.go | 4 +- tsdb/ooo_head_read_test.go | 4 +- tsdb/ooo_head_test.go | 2 +- tsdb/tsdbutil/chunks.go | 25 ++- 20 files changed, 284 insertions(+), 430 deletions(-) diff --git a/documentation/examples/remote_storage/go.mod b/documentation/examples/remote_storage/go.mod index d36ded403d..93d8ff7606 100644 --- a/documentation/examples/remote_storage/go.mod +++ b/documentation/examples/remote_storage/go.mod @@ -54,7 +54,7 @@ require ( ) require ( - github.com/prometheus/prometheus v0.38.0 + github.com/prometheus/prometheus v0.37.1-0.20221011120840-430bdc9dd099 golang.org/x/oauth2 v0.0.0-20220808172628-8227340efae7 // indirect ) diff --git a/documentation/examples/remote_storage/go.sum b/documentation/examples/remote_storage/go.sum index 8ffcd4a531..ad06b1a773 100644 --- a/documentation/examples/remote_storage/go.sum +++ b/documentation/examples/remote_storage/go.sum @@ -19,7 +19,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= -github.com/armon/go-metrics v0.3.10 h1:FR+drcQStOe+32sYyJYyZ7FIdgoGGBnwLl+flodp8Uo= +github.com/armon/go-metrics v0.3.3 h1:a9F4rlj7EWWrbj7BYw8J8+x+ZZkJeqzNyRk8hdPF+ro= github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.44.72 h1:i7J5XT7pjBjtl1OrdIhiQHzsG89wkZCcM1HhyK++3DI= github.com/aws/aws-sdk-go v1.44.72/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= @@ -103,19 +103,19 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/gophercloud/gophercloud v0.25.0 h1:C3Oae7y0fUVQGSsBrb3zliAjdX+riCSEh4lNMejFNI4= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 h1:uirlL/j72L93RhV4+mkWhjv0cov2I0MIgPOG9rMDr1k= github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A= -github.com/hashicorp/consul/api v1.14.0 h1:Y64GIJ8hYTu+tuGekwO4G4ardXoiCivX9wv1iP/kihk= +github.com/hashicorp/consul/api v1.13.1 h1:r5cPdVFUy+pFF7nt+0ArLD9hm+E39OewJkvNdjKXcL4= github.com/hashicorp/cronexpr v1.1.1 h1:NJZDd87hGXjoZBdvyCF9mX4DCq5Wy7+A/w+A7q0wn6c= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= -github.com/hashicorp/go-hclog v0.14.1 h1:nQcJDQwIAGnmoUWp8ubocEX40cCml/17YkF6csQLReU= -github.com/hashicorp/go-immutable-radix v1.3.0 h1:8exGP7ego3OmkfksihtSouGMZ+hQrhxx+FVELeXpVPE= +github.com/hashicorp/go-hclog v0.12.2 h1:F1fdYblUEsxKiailtkhCCG2g4bipEgaHiDc8vffNpD4= +github.com/hashicorp/go-immutable-radix v1.2.0 h1:l6UW37iCXwZkZoAbEYnptSHVE/cQ5bOTPYG5W3vf9+8= github.com/hashicorp/go-retryablehttp v0.7.1 h1:sUiuQAnLlbvmExtFQs72iFW/HXeUn8Z1aJLQ4LJJbTQ= github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= -github.com/hashicorp/nomad/api v0.0.0-20220809212729-939d643fec2c h1:lV5A4cLQr1Bh1xGSSQ2R0fDRK4GZnfXxYia4Q7aaTXc= -github.com/hashicorp/serf v0.9.7 h1:hkdgbqizGQHuU5IPqYM1JdSMV8nKfpuOnZYXssk9muY= +github.com/hashicorp/nomad/api v0.0.0-20220629141207-c2428e1673ec h1:jAF71e0KoaY2LJlRsRxxGz6MNQOG5gTBIc+rklxfNO0= +github.com/hashicorp/serf v0.9.6 h1:uuEX1kLR6aoda1TBttmJQKDLZE1Ob7KN0NPdE7EtCDc= github.com/hetznercloud/hcloud-go v1.35.2 h1:eEDtmDiI2plZ2UQmj4YpiYse5XbtpXOUBpAdIOLxzgE= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/influxdata/influxdb v1.10.0 h1:8xDpt8KO3lzrzf/ss+l8r42AGUZvoITu5824berK7SE= @@ -157,7 +157,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182aff github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= -github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -205,10 +205,10 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= -github.com/prometheus/prometheus v0.38.0 h1:YSiJ5gDZmXnOntPRyHn1wb/6I1Frasj9dw57XowIqeA= -github.com/prometheus/prometheus v0.38.0/go.mod h1:2zHO5FtRhM+iu995gwKIb99EXxjeZEuXpKUTIRq4YI0= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/prometheus/prometheus v0.37.1-0.20221011120840-430bdc9dd099 h1:ISpgxhFfSrMztQTw0Za6xDDC3Fwe4kciR8Pwv3Sz9yE= +github.com/prometheus/prometheus v0.37.1-0.20221011120840-430bdc9dd099/go.mod h1:dfkjkdCd3FhLE0BiBIKwwwkZiDQnTnDThE1Zex1UwbA= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.9 h1:0roa6gXKgyta64uqh52AQG3wzZXH21unn+ltzQSXML0= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -322,7 +322,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/tools v0.1.13-0.20220908144252-ce397412b6a4 h1:glzimF7qHZuKVEiMbE7UqBu44MyTjt5u6j3Jz+rfMRM= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -331,7 +331,7 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= -google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 h1:7PEE9xCtufpGJzrqweakEEnTh7YFELmnKm/ee+5jmfQ= +google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78 h1:QntLWYqZeuBtJkth3m/6DLznnI0AHJr+AgJXvVh/izw= google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -351,7 +351,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= -gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI= +gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/storage/merge.go b/storage/merge.go index 56ae2bcf1d..258e4e3120 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -20,12 +20,13 @@ import ( "math" "sync" + "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" - "golang.org/x/exp/slices" ) type mergeGenericQuerier struct { diff --git a/storage/merge_test.go b/storage/merge_test.go index 7a2aff24e4..726296dd5b 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -560,9 +560,9 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) { { name: "single series", input: []ChunkSeries{ - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}}), }, - expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}}), }, { name: "two empty series", @@ -575,70 +575,70 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) { { name: "two non overlapping", input: []ChunkSeries{ - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}), - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7, nil, nil}, sample{9, 9, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}}), }, - expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}, []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}, []tsdbutil.Sample{sample{7, 7, nil, nil}, sample{9, 9, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}}), }, { name: "two overlapping", input: []ChunkSeries{ - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}), - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}, sample{8, 8, nil, nil}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7, nil, nil}, sample{9, 9, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}}), }, expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), - []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}, - []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}, + []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}, sample{8, 8, nil, nil}}, + []tsdbutil.Sample{sample{7, 7, nil, nil}, sample{9, 9, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}}, ), }, { name: "two duplicated", input: []ChunkSeries{ - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}), }, expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), - []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}, - []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}, + []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}, + []tsdbutil.Sample{sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}, ), }, { name: "three overlapping", input: []ChunkSeries{ - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}), - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{6, 6, nil, nil}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{4, 4, nil, nil}}), }, expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), - []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}, - []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}, - []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}, + []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}, + []tsdbutil.Sample{sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{6, 6, nil, nil}}, + []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{4, 4, nil, nil}}, ), }, { name: "three in chained overlap", input: []ChunkSeries{ - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}), - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4, nil, nil}, sample{6, 66, nil, nil}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6, nil, nil}, sample{10, 10, nil, nil}}), }, expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), - []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}, - []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}, - []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}, + []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}, + []tsdbutil.Sample{sample{4, 4, nil, nil}, sample{6, 66, nil, nil}}, + []tsdbutil.Sample{sample{6, 6, nil, nil}, sample{10, 10, nil, nil}}, ), }, { name: "three in chained overlap complex", input: []ChunkSeries{ - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}), - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}), - NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{5, 5, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}, sample{15, 15, nil, nil}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2, nil, nil}, sample{20, 20, nil, nil}}, []tsdbutil.Sample{sample{25, 25, nil, nil}, sample{30, 30, nil, nil}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18, nil, nil}, sample{26, 26, nil, nil}}, []tsdbutil.Sample{sample{31, 31, nil, nil}, sample{35, 35, nil, nil}}), }, expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), - []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}, - []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}, - []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}, + []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{5, 5, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}, sample{15, 15, nil, nil}}, + []tsdbutil.Sample{sample{2, 2, nil, nil}, sample{20, 20, nil, nil}}, []tsdbutil.Sample{sample{25, 25, nil, nil}, sample{30, 30, nil, nil}}, + []tsdbutil.Sample{sample{18, 18, nil, nil}, sample{26, 26, nil, nil}}, []tsdbutil.Sample{sample{31, 31, nil, nil}, sample{35, 35, nil, nil}}, ), }, { diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 2958d37580..0b7117ce9e 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -44,15 +44,6 @@ func (e Encoding) String() string { return "" } -// IsValidEncoding returns true for supported encodings. -func IsValidEncoding(e Encoding) bool { - switch e { - case EncXOR, EncHistogram: - return true - } - return false -} - // Chunk encodings for out-of-order chunks. // These encodings must be only used by the Head block for its internal bookkeeping. const ( @@ -64,8 +55,9 @@ func IsOutOfOrderChunk(e Encoding) bool { return (e & OutOfOrderMask) != 0 } +// IsValidEncoding returns true for supported encodings. func IsValidEncoding(e Encoding) bool { - return e == EncXOR || e == EncOOOXOR + return e == EncXOR || e == EncOOOXOR || e == EncHistogram } // Chunk holds a sequence of sample pairs that can be iterated over and appended to. diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 232cf067a0..28a39e57d0 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -29,10 +29,10 @@ import ( // delta of the delta to the previous number, xor = what we do for regular // sample values): // -// field → ts count zeroCount sum []posbuckets []negbuckets -// sample 1 raw raw raw raw []raw []raw -// sample 2 delta delta delta xor []delta []delta -// sample >2 dod dod dod xor []dod []dod +// field → ts count zeroCount sum []posbuckets []negbuckets +// sample 1 raw raw raw raw []raw []raw +// sample 2 delta delta delta xor []delta []delta +// sample >2 dod dod dod xor []dod []dod type HistogramChunk struct { b bstream } diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index dbf2122413..7a4407305c 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -94,18 +94,18 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) { // // * If the threshold is 0, store a single zero byte. // -// * If the threshold is a power of 2 between (and including) 2^-243 and 2^10, -// take the exponent from the IEEE 754 representation of the threshold, which -// covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242 -// in IEEE 754 representation, and 2^10 is 0.5*2^11.) Add 243 to the exponent -// and store the result (which will be between 1 and 254) as a single -// byte. Note that small powers of two are preferred values for the zero -// threshold. The default value for the zero threshold is 2^-128 (or -// 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a -// single byte (with value 116). +// - If the threshold is a power of 2 between (and including) 2^-243 and 2^10, +// take the exponent from the IEEE 754 representation of the threshold, which +// covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242 +// in IEEE 754 representation, and 2^10 is 0.5*2^11.) Add 243 to the exponent +// and store the result (which will be between 1 and 254) as a single +// byte. Note that small powers of two are preferred values for the zero +// threshold. The default value for the zero threshold is 2^-128 (or +// 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a +// single byte (with value 116). // -// * In all other cases, store 255 as a single byte, followed by the 8 bytes of -// the threshold as a float64, i.e. taking 9 bytes in total. +// - In all other cases, store 255 as a single byte, followed by the 8 bytes of +// the threshold as a float64, i.e. taking 9 bytes in total. func putZeroThreshold(b *bstream, threshold float64) { if threshold == 0 { b.writeByte(0) @@ -199,11 +199,11 @@ type Interjection struct { // // Let's say the old buckets look like this: // -// span syntax: [offset, length] -// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1] -// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15] -// raw values 6 3 3 2 4 5 1 -// deltas 6 -3 0 -1 2 1 -4 +// span syntax: [offset, length] +// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1] +// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15] +// raw values 6 3 3 2 4 5 1 +// deltas 6 -3 0 -1 2 1 -4 // // But now we introduce a new bucket layout. (Carefully chosen example where we // have a span appended, one unchanged[*], one prepended, and two merge - in @@ -213,12 +213,12 @@ type Interjection struct { // that, their offset needs to change if "disrupted" by spans changing ahead of // them // -// \/ this one is "unchanged" -// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ] -// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15] -// raw values 6 3 0 3 0 0 2 4 5 0 1 -// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1 -// delta mods: / \ / \ / \ +// \/ this one is "unchanged" +// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ] +// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15] +// raw values 6 3 0 3 0 0 2 4 5 0 1 +// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1 +// delta mods: / \ / \ / \ // // Note that whenever any new buckets are introduced, the subsequent "old" // bucket needs to readjust its delta to the new base of 0. Thus, for the caller diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 7aeb9c5455..a77d070e8c 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1298,7 +1298,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { } func TestHeadCompactionWithHistograms(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, false) + head, _ := newTestHead(t, DefaultBlockDuration, false, false) require.NoError(t, head.Init(0)) t.Cleanup(func() { require.NoError(t, head.Close()) @@ -1462,11 +1462,11 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { c.numBuckets, ), func(t *testing.T) { - oldHead, _ := newTestHead(t, DefaultBlockDuration, false) + oldHead, _ := newTestHead(t, DefaultBlockDuration, false, false) t.Cleanup(func() { require.NoError(t, oldHead.Close()) }) - sparseHead, _ := newTestHead(t, DefaultBlockDuration, false) + sparseHead, _ := newTestHead(t, DefaultBlockDuration, false, false) t.Cleanup(func() { require.NoError(t, sparseHead.Close()) }) diff --git a/tsdb/db.go b/tsdb/db.go index b7967ebdaa..54ed6467ab 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -81,6 +81,7 @@ func DefaultOptions() *Options { StripeSize: DefaultStripeSize, HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, IsolationDisabled: defaultIsolationDisabled, + HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, OutOfOrderCapMax: DefaultOutOfOrderCapMax, } } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index f9d12bb5fb..2c87be5141 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4069,8 +4069,8 @@ func TestOOOCompaction(t *testing.T) { fromMins, toMins := r[0], r[1] for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - series1Samples = append(series1Samples, sample{ts, float64(ts)}) - series2Samples = append(series2Samples, sample{ts, float64(2 * ts)}) + series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) + series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil}) } } expRes := map[string][]tsdbutil.Sample{ @@ -4137,8 +4137,8 @@ func TestOOOCompaction(t *testing.T) { series2Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - series1Samples = append(series1Samples, sample{ts, float64(ts)}) - series2Samples = append(series2Samples, sample{ts, float64(2 * ts)}) + series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) + series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil}) } expRes := map[string][]tsdbutil.Sample{ series1.String(): series1Samples, @@ -4269,8 +4269,8 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { series2Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - series1Samples = append(series1Samples, sample{ts, float64(ts)}) - series2Samples = append(series2Samples, sample{ts, float64(2 * ts)}) + series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) + series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil}) } expRes := map[string][]tsdbutil.Sample{ series1.String(): series1Samples, @@ -4457,7 +4457,7 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { var gotSamples []tsdbutil.Sample for _, chunk := range chks[series1.String()] { it := chunk.Chunk.Iterator(nil) - for it.Next() { + for it.Next() == chunkenc.ValFloat { ts, v := it.At() gotSamples = append(gotSamples, sample{t: ts, v: v}) } @@ -4643,7 +4643,7 @@ func TestOOODisabled(t *testing.T) { require.Equal(t, expSamples, seriesSet) require.Equal(t, float64(0), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch") require.Equal(t, float64(failedSamples), - prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples), + prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat)), "number of ooo/oob samples mismatch") // Verifying that no OOO artifacts were generated. @@ -4723,7 +4723,7 @@ func TestWBLAndMmapReplay(t *testing.T) { chk, err := db.head.chunkDiskMapper.Chunk(mc.ref) require.NoError(t, err) it := chk.Iterator(nil) - for it.Next() { + for it.Next() == chunkenc.ValFloat { ts, val := it.At() s1MmapSamples = append(s1MmapSamples, sample{t: ts, v: val}) } @@ -4952,7 +4952,7 @@ func TestOOOCompactionFailure(t *testing.T) { series1Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - series1Samples = append(series1Samples, sample{ts, float64(ts)}) + series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) } expRes := map[string][]tsdbutil.Sample{ series1.String(): series1Samples, @@ -5860,6 +5860,17 @@ func TestHistogramAppendAndQuery(t *testing.T) { }) t.Run("new buckets incoming", func(t *testing.T) { + // In the previous unit test, during the last histogram append, we + // changed the schema and that caused a new chunk creation. Because + // of the next append the layout of the last histogram will change + // because the chunk will be re-encoded. So this forces us to modify + // the last histogram in exp1 so when we query we get the expected + // results. + lh := exp1[len(exp1)-1].H().Copy() + lh.PositiveSpans[1].Length++ + lh.PositiveBuckets = append(lh.PositiveBuckets, -2) // -2 makes the last bucket 0. + exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh} + // This histogram with new bucket at the end causes the re-encoding of the previous histogram. // Hence the previous histogram is recoded into this new layout. // But the query returns the histogram from the in-memory buffer, hence we don't see the recode here yet. @@ -5869,6 +5880,21 @@ func TestHistogramAppendAndQuery(t *testing.T) { appendHistogram(series1, 104, h.Copy(), &exp1) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) + // Because of the previous two histograms being on the active chunk, + // and the next append is only adding a new bucket, the active chunk + // will be re-encoded to the new layout. + lh = exp1[len(exp1)-2].H().Copy() + lh.PositiveSpans[0].Length++ + lh.PositiveSpans[1].Offset-- + lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2} + exp1[len(exp1)-2] = sample{t: exp1[len(exp1)-2].T(), h: lh} + + lh = exp1[len(exp1)-1].H().Copy() + lh.PositiveSpans[0].Length++ + lh.PositiveSpans[1].Offset-- + lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1} + exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh} + // Now we add the new buckets in between. Empty bucket is again not present for the old histogram. h.PositiveSpans[0].Length++ h.PositiveSpans[1].Offset-- @@ -5973,7 +5999,7 @@ func TestQueryHistogramFromBlocks(t *testing.T) { t.Helper() opts := DefaultOptions() - opts.AllowOverlappingBlocks = true + opts.AllowOverlappingCompaction = true // TODO(jesus.vazquez) This replaced AllowOverlappingBlocks, make sure that works db := openTestDB(t, opts, nil) t.Cleanup(func() { require.NoError(t, db.Close()) diff --git a/tsdb/head.go b/tsdb/head.go index c55dbe9811..dff04e6603 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -295,31 +295,6 @@ func (h *Head) resetInMemoryState() error { } type headMetrics struct { -<<<<<<< HEAD - activeAppenders prometheus.Gauge - series prometheus.GaugeFunc - seriesCreated prometheus.Counter - seriesRemoved prometheus.Counter - seriesNotFound prometheus.Counter - chunks prometheus.Gauge - chunksCreated prometheus.Counter - chunksRemoved prometheus.Counter - gcDuration prometheus.Summary - samplesAppended *prometheus.CounterVec - outOfBoundSamples *prometheus.CounterVec - outOfOrderSamples *prometheus.CounterVec - walTruncateDuration prometheus.Summary - walCorruptionsTotal prometheus.Counter - walTotalReplayDuration prometheus.Gauge - headTruncateFail prometheus.Counter - headTruncateTotal prometheus.Counter - checkpointDeleteFail prometheus.Counter - checkpointDeleteTotal prometheus.Counter - checkpointCreationFail prometheus.Counter - checkpointCreationTotal prometheus.Counter - mmapChunkCorruptionTotal prometheus.Counter - snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1. -======= activeAppenders prometheus.Gauge series prometheus.GaugeFunc seriesCreated prometheus.Counter @@ -329,11 +304,11 @@ type headMetrics struct { chunksCreated prometheus.Counter chunksRemoved prometheus.Counter gcDuration prometheus.Summary - samplesAppended prometheus.Counter + samplesAppended *prometheus.CounterVec outOfOrderSamplesAppended prometheus.Counter - outOfBoundSamples prometheus.Counter - outOfOrderSamples prometheus.Counter - tooOldSamples prometheus.Counter + outOfBoundSamples *prometheus.CounterVec + outOfOrderSamples *prometheus.CounterVec + tooOldSamples *prometheus.CounterVec walTruncateDuration prometheus.Summary walCorruptionsTotal prometheus.Counter dataTotalReplayDuration prometheus.Gauge @@ -346,7 +321,6 @@ type headMetrics struct { mmapChunkCorruptionTotal prometheus.Counter snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1. oooHistogram prometheus.Histogram ->>>>>>> main } const ( @@ -409,35 +383,23 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { samplesAppended: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended samples.", -<<<<<<< HEAD }, []string{"type"}), - outOfBoundSamples: prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "prometheus_tsdb_out_of_bound_samples_total", - Help: "Total number of out of bound samples ingestion failed attempts.", - }, []string{"type"}), - outOfOrderSamples: prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "prometheus_tsdb_out_of_order_samples_total", - Help: "Total number of out of order samples ingestion failed attempts.", - }, []string{"type"}), -======= - }), outOfOrderSamplesAppended: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_out_of_order_samples_appended_total", Help: "Total number of appended out of order samples.", }), - outOfBoundSamples: prometheus.NewCounter(prometheus.CounterOpts{ + outOfBoundSamples: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "prometheus_tsdb_out_of_bound_samples_total", Help: "Total number of out of bound samples ingestion failed attempts with out of order support disabled.", - }), - outOfOrderSamples: prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{"type"}), + outOfOrderSamples: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "prometheus_tsdb_out_of_order_samples_total", Help: "Total number of out of order samples ingestion failed attempts due to out of order being disabled.", - }), - tooOldSamples: prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{"type"}), + tooOldSamples: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "prometheus_tsdb_too_old_samples_total", Help: "Total number of out of order samples ingestion failed attempts with out of support enabled, but sample outside of time window.", - }), ->>>>>>> main + }, []string{"type"}), headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_truncations_failed_total", Help: "Total number of head truncations that failed.", @@ -1886,6 +1848,9 @@ type memSeries struct { // We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates. lastValue float64 + // We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates. + lastHistogramValue *histogram.Histogram + // Current appender for the head chunk. Set when a new head chunk is cut. // It is nil only if headChunk is nil. E.g. if there was an appender that created a new series, but rolled back the commit // (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series). @@ -1894,13 +1859,11 @@ type memSeries struct { // txs is nil if isolation is disabled. txs *txRing -<<<<<<< HEAD // TODO(beorn7): The only reason we track this is to create a staleness // marker as either histogram or float sample. Perhaps there is a better way. isHistogramSeries bool -======= + pendingCommit bool // Whether there are samples waiting to be committed to this series. ->>>>>>> main } func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 54a0b3c244..a3b5027ea4 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -301,15 +301,10 @@ type headAppender struct { } func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { -<<<<<<< HEAD - if t < a.minValidTime { - a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Inc() -======= // For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append. // If OOO inserts are disabled, we may as well as check this as early as we can and avoid more work. if a.oooTimeWindow == 0 && t < a.minValidTime { - a.head.metrics.outOfBoundSamples.Inc() ->>>>>>> main + a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Inc() return 0, storage.ErrOutOfBounds } @@ -344,12 +339,6 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 } s.Lock() -<<<<<<< HEAD - if err := s.appendable(t, v); err != nil { - s.Unlock() - if err == storage.ErrOutOfOrderSample { - a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc() -======= // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -363,10 +352,9 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 if err != nil { switch err { case storage.ErrOutOfOrderSample: - a.head.metrics.outOfOrderSamples.Inc() + a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc() case storage.ErrTooOldSample: - a.head.metrics.tooOldSamples.Inc() ->>>>>>> main + a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Inc() } return 0, err } @@ -445,7 +433,7 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error { // We are allowing exact duplicates as we can encounter them in valid cases // like federation and erroring out at that time would be extremely noisy. - if !h.Equals(s.sampleBuf[3].h) { + if !h.Equals(s.lastHistogramValue) { return storage.ErrDuplicateSampleForTimestamp } return nil @@ -767,10 +755,6 @@ func (a *headAppender) Commit() (err error) { defer a.head.putHistogramBuffer(a.histograms) defer a.head.putMetadataBuffer(a.metadata) defer a.head.iso.closeAppend(a.appendID) -<<<<<<< HEAD - total := len(a.samples) - var series *memSeries -======= var ( samplesAppended = len(a.samples) @@ -827,35 +811,10 @@ func (a *headAppender) Commit() (err error) { wblSamples = nil oooMmapMarkers = nil } ->>>>>>> main for i, s := range a.samples { series = a.sampleSeries[i] series.Lock() -<<<<<<< HEAD - if !ok { - total-- - a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc() - } - if chunkCreated { - a.head.metrics.chunks.Inc() - a.head.metrics.chunksCreated.Inc() - } - } - - histogramsTotal := len(a.histograms) - for i, s := range a.histograms { - series = a.histogramSeries[i] - series.Lock() - ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper) - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - - if !ok { - histogramsTotal-- - a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() -======= oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow) switch err { case storage.ErrOutOfOrderSample: @@ -871,7 +830,6 @@ func (a *headAppender) Commit() (err error) { // Do nothing. default: samplesAppended-- ->>>>>>> main } var ok, chunkCreated bool @@ -939,6 +897,33 @@ func (a *headAppender) Commit() (err error) { series.Unlock() } + histogramsTotal := len(a.histograms) + histoOOORejected := 0 + for i, s := range a.histograms { + series = a.histogramSeries[i] + series.Lock() + ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper, chunkRange) + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + + if ok { + if s.T < inOrderMint { + inOrderMint = s.T + } + if s.T > inOrderMaxt { + inOrderMaxt = s.T + } + } else { + histogramsTotal-- + histoOOORejected++ + } + if chunkCreated { + a.head.metrics.chunks.Inc() + a.head.metrics.chunksCreated.Inc() + } + } + for i, m := range a.metadata { series = a.metadataSeries[i] series.Lock() @@ -946,19 +931,15 @@ func (a *headAppender) Commit() (err error) { series.Unlock() } -<<<<<<< HEAD - a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(total)) + a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooRejected)) + a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histoOOORejected)) + a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(oobRejected)) + a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(tooOldRejected)) + a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(samplesAppended)) a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsTotal)) - a.head.updateMinMaxTime(a.mint, a.maxt) -======= - a.head.metrics.outOfOrderSamples.Add(float64(oooRejected)) - a.head.metrics.outOfBoundSamples.Add(float64(oobRejected)) - a.head.metrics.tooOldSamples.Add(float64(tooOldRejected)) - a.head.metrics.samplesAppended.Add(float64(samplesAppended)) a.head.metrics.outOfOrderSamplesAppended.Add(float64(oooAccepted)) a.head.updateMinMaxTime(inOrderMint, inOrderMaxt) a.head.updateMinOOOMaxOOOTime(ooomint, ooomaxt) ->>>>>>> main collectOOORecords() if a.head.wbl != nil { @@ -998,9 +979,8 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -<<<<<<< HEAD -func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper) +func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper, chunkRange) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1009,10 +989,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper c.maxTime = t - s.sampleBuf[0] = s.sampleBuf[1] - s.sampleBuf[1] = s.sampleBuf[2] - s.sampleBuf[2] = s.sampleBuf[3] - s.sampleBuf[3] = sample{t: t, v: v} + s.lastValue = v if appendID > 0 { s.txs.add(appendID) @@ -1023,7 +1000,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards. We check for Appendable before // appendPreprocessor because in case it ends up creating a new chunk, @@ -1034,7 +1011,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui positiveInterjections, negativeInterjections []chunkenc.Interjection okToAppend, counterReset bool ) - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1050,7 +1027,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // recoding before we can append our histogram. // - okToAppend and no interjections → Chunk is ready to support our histogram. if !okToAppend || counterReset { - c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper) + c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange) chunkCreated = true } else if len(positiveInterjections) > 0 || len(negativeInterjections) > 0 { // New buckets have appeared. We need to recode all @@ -1081,10 +1058,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui c.maxTime = t - s.sampleBuf[0] = s.sampleBuf[1] - s.sampleBuf[1] = s.sampleBuf[2] - s.sampleBuf[2] = s.sampleBuf[3] - s.sampleBuf[3] = sample{t: t, h: h} + s.lastHistogramValue = h if appendID > 0 { s.txs.add(appendID) @@ -1097,11 +1071,8 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. func (s *memSeries) appendPreprocessor( - t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, + t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, ) (c *memChunk, sampleInOrder, chunkCreated bool) { -======= -func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { ->>>>>>> main // Based on Gorilla white papers this offers near-optimal compression ratio // so anything bigger that this has diminishing returns and increases // the time range within which we have to decompress all samples. @@ -1114,13 +1085,8 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // Out of order sample. Sample timestamp is already in the mmapped chunks, so ignore it. return c, false, false } -<<<<<<< HEAD - // There is no chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(t, e, chunkDiskMapper) -======= // There is no head chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(t, chunkDiskMapper, chunkRange) ->>>>>>> main + c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange) chunkCreated = true } @@ -1132,7 +1098,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper if c.chunk.Encoding() != e { // The chunk encoding expected by this append is different than the head chunk's // encoding. So we cut a new chunk with the expected encoding. - c = s.cutNewHeadChunk(t, e, chunkDiskMapper) + c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange) chunkCreated = true } @@ -1157,26 +1123,11 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // as we expect more chunks to come. // Note that next chunk will have its nextAt recalculated for the new rate. if t >= s.nextAt || numSamples >= samplesPerChunk*2 { -<<<<<<< HEAD - c = s.cutNewHeadChunk(t, e, chunkDiskMapper) + c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange) chunkCreated = true } + return c, true, chunkCreated -======= - c = s.cutNewHeadChunk(t, chunkDiskMapper, chunkRange) - chunkCreated = true - } - s.app.Append(t, v) - - c.maxTime = t - s.lastValue = v - - if appendID > 0 && s.txs != nil { - s.txs.add(appendID) - } - - return true, chunkCreated ->>>>>>> main } // computeChunkEndTime estimates the end timestamp based the beginning of a @@ -1192,13 +1143,9 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/n } -<<<<<<< HEAD func (s *memSeries) cutNewHeadChunk( - mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, + mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, ) *memChunk { -======= -func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) *memChunk { ->>>>>>> main s.mmapCurrentHeadChunk(chunkDiskMapper) s.headChunk = &memChunk{ diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 0e341e2977..6a273a0fd8 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -23,7 +23,6 @@ import ( "github.com/pkg/errors" "golang.org/x/exp/slices" - "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -487,7 +486,7 @@ func (o mergedOOOChunks) Bytes() []byte { panic(err) } it := o.Iterator(nil) - for it.Next() { + for it.Next() == chunkenc.ValFloat { t, v := it.At() app.Append(t, v) } @@ -536,7 +535,7 @@ func (b boundedChunk) Bytes() []byte { xor := chunkenc.NewXORChunk() a, _ := xor.Appender() it := b.Iterator(nil) - for it.Next() { + for it.Next() == chunkenc.ValFloat { t, v := it.At() a.Append(t, v) } @@ -565,33 +564,35 @@ type boundedIterator struct { // until its able to find a sample within the bounds minT and maxT. // If there are samples within bounds it will advance one by one amongst them. // If there are no samples within bounds it will return false. -func (b boundedIterator) Next() bool { - for b.Iterator.Next() { +func (b boundedIterator) Next() chunkenc.ValueType { + for b.Iterator.Next() == chunkenc.ValFloat { t, _ := b.Iterator.At() if t < b.minT { continue } else if t > b.maxT { - return false + return chunkenc.ValNone } - return true + return chunkenc.ValFloat } - return false + return chunkenc.ValNone } -func (b boundedIterator) Seek(t int64) bool { +func (b boundedIterator) Seek(t int64) chunkenc.ValueType { if t < b.minT { // We must seek at least up to b.minT if it is asked for something before that. - ok := b.Iterator.Seek(b.minT) - if !ok { - return false + val := b.Iterator.Seek(b.minT) + if !(val == chunkenc.ValFloat) { + return chunkenc.ValNone } t, _ := b.Iterator.At() - return t <= b.maxT + if t <= b.maxT { + return chunkenc.ValFloat + } } if t > b.maxT { // We seek anyway so that the subsequent Next() calls will also return false. b.Iterator.Seek(t) - return false + return chunkenc.ValNone } return b.Iterator.Seek(t) } @@ -685,92 +686,6 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch return makeStopIterator(c.chunk, it, stopAfter) } -// memSafeIterator returns values from the wrapped stopIterator -// except the last 4, which come from buf. -type memSafeIterator struct { - stopIterator - - total int - buf [4]sample -} - -func (it *memSafeIterator) Seek(t int64) chunkenc.ValueType { - if it.Err() != nil { - return chunkenc.ValNone - } - - var valueType chunkenc.ValueType - var ts int64 = math.MinInt64 - - if it.i > -1 { - ts = it.AtT() - } - - if t <= ts { - // We are already at the right sample, but we have to find out - // its ValueType. - if it.total-it.i > 4 { - return it.Iterator.Seek(ts) - } - return it.buf[4-(it.total-it.i)].Type() - } - - for t > ts || it.i == -1 { - if valueType = it.Next(); valueType == chunkenc.ValNone { - return chunkenc.ValNone - } - ts = it.AtT() - } - - return valueType -} - -func (it *memSafeIterator) Next() chunkenc.ValueType { - if it.i+1 >= it.stopAfter { - return chunkenc.ValNone - } - it.i++ - if it.total-it.i > 4 { - return it.Iterator.Next() - } - return it.buf[4-(it.total-it.i)].Type() -} - -func (it *memSafeIterator) At() (int64, float64) { - if it.total-it.i > 4 { - return it.Iterator.At() - } - s := it.buf[4-(it.total-it.i)] - return s.t, s.v -} - -func (it *memSafeIterator) AtHistogram() (int64, *histogram.Histogram) { - if it.total-it.i > 4 { - return it.Iterator.AtHistogram() - } - s := it.buf[4-(it.total-it.i)] - return s.t, s.h -} - -func (it *memSafeIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - if it.total-it.i > 4 { - return it.Iterator.AtFloatHistogram() - } - s := it.buf[4-(it.total-it.i)] - if s.fh != nil { - return s.t, s.fh - } - return s.t, s.h.ToFloat() -} - -func (it *memSafeIterator) AtT() int64 { - if it.total-it.i > 4 { - return it.Iterator.AtT() - } - s := it.buf[4-(it.total-it.i)] - return s.t -} - // stopIterator wraps an Iterator, but only returns the first // stopAfter values, if initialized with i=-1. type stopIterator struct { diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index 4c3ba885bb..2712bcd1a1 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -41,7 +41,7 @@ func TestBoundedChunk(t *testing.T) { name: "bounds represent a single sample", inputChunk: newTestChunk(10), expSamples: []sample{ - {0, 0}, + {0, 0, nil, nil}, }, }, { @@ -50,14 +50,14 @@ func TestBoundedChunk(t *testing.T) { inputMinT: 1, inputMaxT: 8, expSamples: []sample{ - {1, 1}, - {2, 2}, - {3, 3}, - {4, 4}, - {5, 5}, - {6, 6}, - {7, 7}, - {8, 8}, + {1, 1, nil, nil}, + {2, 2, nil, nil}, + {3, 3, nil, nil}, + {4, 4, nil, nil}, + {5, 5, nil, nil}, + {6, 6, nil, nil}, + {7, 7, nil, nil}, + {8, 8, nil, nil}, }, }, { @@ -66,12 +66,12 @@ func TestBoundedChunk(t *testing.T) { inputMinT: 0, inputMaxT: 5, expSamples: []sample{ - {0, 0}, - {1, 1}, - {2, 2}, - {3, 3}, - {4, 4}, - {5, 5}, + {0, 0, nil, nil}, + {1, 1, nil, nil}, + {2, 2, nil, nil}, + {3, 3, nil, nil}, + {4, 4, nil, nil}, + {5, 5, nil, nil}, }, }, { @@ -80,11 +80,11 @@ func TestBoundedChunk(t *testing.T) { inputMinT: 5, inputMaxT: 9, expSamples: []sample{ - {5, 5}, - {6, 6}, - {7, 7}, - {8, 8}, - {9, 9}, + {5, 5, nil, nil}, + {6, 6, nil, nil}, + {7, 7, nil, nil}, + {8, 8, nil, nil}, + {9, 9, nil, nil}, }, }, { @@ -95,11 +95,11 @@ func TestBoundedChunk(t *testing.T) { initialSeek: 1, seekIsASuccess: true, expSamples: []sample{ - {3, 3}, - {4, 4}, - {5, 5}, - {6, 6}, - {7, 7}, + {3, 3, nil, nil}, + {4, 4, nil, nil}, + {5, 5, nil, nil}, + {6, 6, nil, nil}, + {7, 7, nil, nil}, }, }, { @@ -110,9 +110,9 @@ func TestBoundedChunk(t *testing.T) { initialSeek: 5, seekIsASuccess: true, expSamples: []sample{ - {5, 5}, - {6, 6}, - {7, 7}, + {5, 5, nil, nil}, + {6, 6, nil, nil}, + {7, 7, nil, nil}, }, }, { @@ -144,23 +144,23 @@ func TestBoundedChunk(t *testing.T) { if tc.initialSeek != 0 { // Testing Seek() - ok := it.Seek(tc.initialSeek) - require.Equal(t, tc.seekIsASuccess, ok) - if ok { + val := it.Seek(tc.initialSeek) + require.Equal(t, tc.seekIsASuccess, val == chunkenc.ValFloat) + if val == chunkenc.ValFloat { t, v := it.At() - samples = append(samples, sample{t, v}) + samples = append(samples, sample{t, v, nil, nil}) } } // Testing Next() - for it.Next() { + for it.Next() == chunkenc.ValFloat { t, v := it.At() - samples = append(samples, sample{t, v}) + samples = append(samples, sample{t, v, nil, nil}) } - // it.Next() should keep returning false. + // it.Next() should keep returning no value. for i := 0; i < 10; i++ { - require.False(t, it.Next()) + require.True(t, it.Next() == chunkenc.ValNone) } require.Equal(t, tc.expSamples, samples) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 8f1dd30455..b4e12446e1 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -61,13 +61,10 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) ( opts.ChunkDirRoot = dir opts.EnableExemplarStorage = true opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) -<<<<<<< HEAD opts.EnableNativeHistograms.Store(true) -======= if oooEnabled { opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) } ->>>>>>> main h, err := NewHead(nil, nil, wlog, nil, opts, nil) require.NoError(t, err) @@ -526,19 +523,11 @@ func TestHead_ReadWAL(t *testing.T) { require.NoError(t, c.Err()) return x } -<<<<<<< HEAD - require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil))) - require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil))) + require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil, nil))) + require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil, nil))) // The samples before the new series record should be discarded since a duplicate record // is only possible when old samples were compacted. - require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil))) -======= - require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil, nil))) - require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil, nil))) - // The samples before the new series record should be discarded since a duplicate record - // is only possible when old samples were compacted. - require.Equal(t, []sample{{101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil, nil))) ->>>>>>> main + require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil, nil))) q, err := head.ExemplarQuerier(context.Background()) require.NoError(t, err) @@ -1335,8 +1324,9 @@ func TestMemSeries_appendHistogram(t *testing.T) { defer func() { require.NoError(t, chunkDiskMapper.Close()) }() + chunkRange := int64(1000) - s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) histograms := GenerateTestHistograms(4) histogramWithOneMoreBucket := histograms[3].Copy() @@ -1348,19 +1338,19 @@ func TestMemSeries_appendHistogram(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper) + ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper, chunkRange) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper) + ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper, chunkRange) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper) + ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper, chunkRange) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper) + ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper, chunkRange) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -1370,7 +1360,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range") require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range") - ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper) + ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper, chunkRange) require.True(t, ok, "append failed") require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk") @@ -2564,12 +2554,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil, nil) // First point. -<<<<<<< HEAD require.Equal(t, chunkenc.ValFloat, it.Seek(0)) -======= - ok := it.Seek(0) - require.True(t, ok) ->>>>>>> main ts, val := it.At() require.Equal(t, int64(0), ts) require.Equal(t, float64(0), val) @@ -2824,7 +2809,7 @@ func TestAppendHistogram(t *testing.T) { l := labels.Labels{{Name: "a", Value: "b"}} for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} { t.Run(fmt.Sprintf("%d", numHistograms), func(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -2869,7 +2854,7 @@ func TestAppendHistogram(t *testing.T) { } func TestHistogramInWALAndMmapChunk(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -2940,7 +2925,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { require.NoError(t, head.Close()) w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) require.NoError(t, err) - head, err = NewHead(nil, nil, w, head.opts, nil) + head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) require.NoError(t, head.Init(0)) @@ -3259,7 +3244,7 @@ func TestSnapshotError(t *testing.T) { } func TestHistogramMetrics(t *testing.T) { - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3284,7 +3269,7 @@ func TestHistogramMetrics(t *testing.T) { require.NoError(t, head.Close()) w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) require.NoError(t, err) - head, err = NewHead(nil, nil, w, head.opts, nil) + head, err = NewHead(nil, nil, w, nil, head.opts, nil) require.NoError(t, err) require.NoError(t, head.Init(0)) @@ -3294,7 +3279,7 @@ func TestHistogramMetrics(t *testing.T) { func TestHistogramStaleSample(t *testing.T) { l := labels.Labels{{Name: "a", Value: "b"}} numHistograms := 20 - head, _ := newTestHead(t, 100000, false) + head, _ := newTestHead(t, 100000, false, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3388,7 +3373,7 @@ func TestHistogramStaleSample(t *testing.T) { func TestHistogramCounterResetHeader(t *testing.T) { l := labels.Labels{{Name: "a", Value: "b"}} - head, _ := newTestHead(t, 1000, false) + head, _ := newTestHead(t, 1000, false, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -3799,7 +3784,7 @@ func TestOOOWalReplay(t *testing.T) { it := xor.Iterator(nil) actOOOSamples := make([]sample, 0, len(expOOOSamples)) - for it.Next() { + for it.Next() == chunkenc.ValFloat { ts, v := it.At() actOOOSamples = append(actOOOSamples, sample{t: ts, v: v}) } @@ -4108,7 +4093,6 @@ func TestReplayAfterMmapReplayError(t *testing.T) { require.NoError(t, h.Close()) } -<<<<<<< HEAD func TestHistogramValidation(t *testing.T) { tests := map[string]struct { h *histogram.Histogram @@ -4240,7 +4224,8 @@ func generateBigTestHistograms(n int) []*histogram.Histogram { histograms = append(histograms, h) } return histograms -======= +} + func TestOOOAppendWithNoSeries(t *testing.T) { dir := t.TempDir() wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) @@ -4367,5 +4352,4 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) { // So the lowest among them, 295, is set as minOOOTime. require.NoError(t, h.truncateOOO(0, 2)) require.Equal(t, 295*time.Minute.Milliseconds(), h.MinOOOTime()) ->>>>>>> main } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index b5c12cc094..e92d020851 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -314,6 +314,7 @@ Outer: exemplarsPool.Put(v) case []record.RefHistogramSample: samples := v + minValidTime := h.minValidTime.Load() // We split up the samples into chunks of 5000 samples or less. // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise // cause thousands of very large in flight buffers occupying large amounts @@ -329,6 +330,9 @@ Outer: } } for _, sam := range samples[:m] { + if sam.T < minValidTime { + continue // Before minValidTime: discard. + } if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } @@ -336,7 +340,7 @@ Outer: histogramShards[mod] = append(histogramShards[mod], sam) } for i := 0; i < n; i++ { - if len(shards[i]) > 0 { + if len(histogramShards[i]) > 0 { processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]} histogramShards[i] = nil } diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 3af6039912..c246ff2e55 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -41,7 +41,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool { if i >= len(o.samples) { // none found. append it at the end - o.samples = append(o.samples, sample{t, v}) + o.samples = append(o.samples, sample{t, v, nil, nil}) return true } @@ -52,7 +52,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool { // Expand length by 1 to make room. use a zero sample, we will overwrite it anyway. o.samples = append(o.samples, sample{}) copy(o.samples[i+1:], o.samples[i:]) - o.samples[i] = sample{t, v} + o.samples[i] = sample{t, v, nil, nil} return true } diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 486ca31f3f..8dca1ea59b 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -860,7 +860,7 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { var resultSamples tsdbutil.SampleSlice it := c.Iterator(nil) - for it.Next() { + for it.Next() == chunkenc.ValFloat { t, v := it.At() resultSamples = append(resultSamples, sample{t: t, v: v}) } @@ -1031,7 +1031,7 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( var resultSamples tsdbutil.SampleSlice it := c.Iterator(nil) - for it.Next() { + for it.Next() == chunkenc.ValFloat { ts, v := it.At() resultSamples = append(resultSamples, sample{t: ts, v: v}) } diff --git a/tsdb/ooo_head_test.go b/tsdb/ooo_head_test.go index de078b94c4..dcab28b61f 100644 --- a/tsdb/ooo_head_test.go +++ b/tsdb/ooo_head_test.go @@ -25,7 +25,7 @@ const testMaxSize int = 32 func valEven(pos int) int { return pos*2 + 2 } // s[0]=2, s[1]=4, s[2]=6, ..., s[31]=64 - Predictable pre-existing values func valOdd(pos int) int { return pos*2 + 1 } // s[0]=1, s[1]=3, s[2]=5, ..., s[31]=63 - New values will interject at chosen position because they sort before the pre-existing vals. -func samplify(v int) sample { return sample{int64(v), float64(v)} } +func samplify(v int) sample { return sample{int64(v), float64(v), nil, nil} } func makeEvenSampleSlice(n int) []sample { s := make([]sample, n) diff --git a/tsdb/tsdbutil/chunks.go b/tsdb/tsdbutil/chunks.go index d39fe13755..87cc345dd0 100644 --- a/tsdb/tsdbutil/chunks.go +++ b/tsdb/tsdbutil/chunks.go @@ -84,8 +84,10 @@ func ChunkFromSamplesGeneric(s Samples) chunks.Meta { } type sample struct { - t int64 - v float64 + t int64 + v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram } func (s sample) T() int64 { @@ -96,6 +98,25 @@ func (s sample) V() float64 { return s.v } +func (s sample) H() *histogram.Histogram { + return s.h +} + +func (s sample) FH() *histogram.FloatHistogram { + return s.fh +} + +func (s sample) Type() chunkenc.ValueType { + switch { + case s.h != nil: + return chunkenc.ValHistogram + case s.fh != nil: + return chunkenc.ValFloatHistogram + default: + return chunkenc.ValFloat + } +} + // PopulatedChunk creates a chunk populated with samples every second starting at minTime func PopulatedChunk(numSamples int, minTime int64) chunks.Meta { samples := make([]Sample, numSamples)