diff --git a/CHANGELOG.md b/CHANGELOG.md index 11699d053c..4134fb7eb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 2.3.2 / 2018-07-12 + +* [BUGFIX] Fix various tsdb bugs #4369 +* [BUGFIX] Reorder startup and shutdown to prevent panics. #4321 +* [BUGFIX] Exit with non-zero code on error #4296 +* [BUGFIX] discovery/kubernetes/ingress: fix scheme discovery #4329 +* [BUGFIX] Fix race in zookeeper sd #4355 +* [BUGFIX] Better timeout handling in promql #4291 #4300 +* [BUGFIX] Propogate errors when selecting series from the tsdb #4136 + ## 2.3.1 / 2018-06-19 * [BUGFIX] Avoid infinite loop on duplicate NaN values. #4275 diff --git a/VERSION b/VERSION index 2bf1c1ccf3..f90b1afc08 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.3.1 +2.3.2 diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 1188a7bbe5..fc9e707b9f 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -472,7 +472,9 @@ func main() { }, func(err error) { - close(cancel) + // Wait for any in-progress reloads to complete to avoid + // reloading things after they have been shutdown. + cancel <- struct{}{} }, ) } @@ -506,6 +508,23 @@ func main() { }, ) } + { + // Rule manager. + // TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel. + cancel := make(chan struct{}) + g.Add( + func() error { + <-reloadReady.C + ruleManager.Run() + <-cancel + return nil + }, + func(err error) { + ruleManager.Stop() + close(cancel) + }, + ) + } { // TSDB. cancel := make(chan struct{}) @@ -547,30 +566,10 @@ func main() { return nil }, func(err error) { - // Keep this interrupt before the ruleManager.Stop(). - // Shutting down the query engine before the rule manager will cause pending queries - // to be canceled and ensures a quick shutdown of the rule manager. cancelWeb() }, ) } - { - // Rule manager. - - // TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel. - cancel := make(chan struct{}) - g.Add( - func() error { - ruleManager.Run() - <-cancel - return nil - }, - func(err error) { - ruleManager.Stop() - close(cancel) - }, - ) - } { // Notifier. @@ -595,6 +594,7 @@ func main() { } if err := g.Run(); err != nil { level.Error(logger).Log("err", err) + os.Exit(1) } level.Info(logger).Log("msg", "See you next time!") } @@ -626,6 +626,7 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config if failed { return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%s)", filename) } + level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename) return nil } diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index ee805674a9..605ba816eb 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -20,6 +20,7 @@ import ( "os" "os/exec" "path/filepath" + "syscall" "testing" "time" @@ -155,3 +156,20 @@ func TestComputeExternalURL(t *testing.T) { } } } + +// Let's provide an invalid configuration file and verify the exit status indicates the error. +func TestFailedStartupExitCode(t *testing.T) { + fakeInputFile := "fake-input-file" + expectedExitStatus := 1 + + prom := exec.Command(promPath, "--config.file="+fakeInputFile) + err := prom.Run() + testutil.NotOk(t, err, "") + + if exitError, ok := err.(*exec.ExitError); ok { + status := exitError.Sys().(syscall.WaitStatus) + testutil.Equals(t, expectedExitStatus, status.ExitStatus()) + } else { + t.Errorf("unable to retrieve the exit status for prometheus: %v", err) + } +} diff --git a/discovery/kubernetes/ingress.go b/discovery/kubernetes/ingress.go index 592550212f..0ff3b0e0a8 100644 --- a/discovery/kubernetes/ingress.go +++ b/discovery/kubernetes/ingress.go @@ -176,13 +176,22 @@ func (s *Ingress) buildIngress(ingress *v1beta1.Ingress) *targetgroup.Group { } tg.Labels = ingressLabels(ingress) - schema := "http" - if ingress.Spec.TLS != nil { - schema = "https" + tlsHosts := make(map[string]struct{}) + for _, tls := range ingress.Spec.TLS { + for _, host := range tls.Hosts { + tlsHosts[host] = struct{}{} + } } + for _, rule := range ingress.Spec.Rules { paths := pathsFromIngressRule(&rule.IngressRuleValue) + schema := "http" + _, isTLS := tlsHosts[rule.Host] + if isTLS { + schema = "https" + } + for _, path := range paths { tg.Targets = append(tg.Targets, model.LabelSet{ model.AddressLabel: lv(rule.Host), diff --git a/discovery/kubernetes/ingress_test.go b/discovery/kubernetes/ingress_test.go index b3832ebce7..54ea8caf65 100644 --- a/discovery/kubernetes/ingress_test.go +++ b/discovery/kubernetes/ingress_test.go @@ -23,8 +23,16 @@ import ( "k8s.io/client-go/pkg/apis/extensions/v1beta1" ) -func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress { - return &v1beta1.Ingress{ +type TLSMode int + +const ( + TLSNo TLSMode = iota + TLSYes + TLSMixed +) + +func makeIngress(tls TLSMode) *v1beta1.Ingress { + ret := &v1beta1.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: "testingress", Namespace: "default", @@ -32,7 +40,7 @@ func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress { Annotations: map[string]string{"testannotation": "testannotationvalue"}, }, Spec: v1beta1.IngressSpec{ - TLS: tls, + TLS: nil, Rules: []v1beta1.IngressRule{ { Host: "example.com", @@ -63,31 +71,47 @@ func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress { }, }, } + + switch tls { + case TLSYes: + ret.Spec.TLS = []v1beta1.IngressTLS{{Hosts: []string{"example.com", "test.example.com"}}} + case TLSMixed: + ret.Spec.TLS = []v1beta1.IngressTLS{{Hosts: []string{"example.com"}}} + } + + return ret } -func expectedTargetGroups(ns string, tls bool) map[string]*targetgroup.Group { - scheme := "http" - if tls { - scheme = "https" +func expectedTargetGroups(ns string, tls TLSMode) map[string]*targetgroup.Group { + scheme1 := "http" + scheme2 := "http" + + switch tls { + case TLSYes: + scheme1 = "https" + scheme2 = "https" + case TLSMixed: + scheme1 = "https" } + key := fmt.Sprintf("ingress/%s/testingress", ns) return map[string]*targetgroup.Group{ key: { Targets: []model.LabelSet{ { - "__meta_kubernetes_ingress_scheme": lv(scheme), + "__meta_kubernetes_ingress_scheme": lv(scheme1), "__meta_kubernetes_ingress_host": "example.com", "__meta_kubernetes_ingress_path": "/", "__address__": "example.com", }, { - "__meta_kubernetes_ingress_scheme": lv(scheme), + "__meta_kubernetes_ingress_scheme": lv(scheme1), "__meta_kubernetes_ingress_host": "example.com", "__meta_kubernetes_ingress_path": "/foo", "__address__": "example.com", }, { - "__meta_kubernetes_ingress_scheme": lv(scheme), + "__meta_kubernetes_ingress_scheme": lv(scheme2), "__meta_kubernetes_ingress_host": "test.example.com", "__address__": "test.example.com", "__meta_kubernetes_ingress_path": "/", @@ -110,12 +134,12 @@ func TestIngressDiscoveryAdd(t *testing.T) { k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makeIngress(nil) + obj := makeIngress(TLSNo) c.ExtensionsV1beta1().Ingresses("default").Create(obj) w.Ingresses().Add(obj) }, expectedMaxItems: 1, - expectedRes: expectedTargetGroups("default", false), + expectedRes: expectedTargetGroups("default", TLSNo), }.Run(t) } @@ -125,27 +149,42 @@ func TestIngressDiscoveryAddTLS(t *testing.T) { k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makeIngress([]v1beta1.IngressTLS{{}}) + obj := makeIngress(TLSYes) c.ExtensionsV1beta1().Ingresses("default").Create(obj) w.Ingresses().Add(obj) }, expectedMaxItems: 1, - expectedRes: expectedTargetGroups("default", true), + expectedRes: expectedTargetGroups("default", TLSYes), + }.Run(t) +} + +func TestIngressDiscoveryAddMixed(t *testing.T) { + n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"default"}}) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + obj := makeIngress(TLSMixed) + c.ExtensionsV1beta1().Ingresses("default").Create(obj) + w.Ingresses().Add(obj) + }, + expectedMaxItems: 1, + expectedRes: expectedTargetGroups("default", TLSMixed), }.Run(t) } func TestIngressDiscoveryNamespaces(t *testing.T) { n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"ns1", "ns2"}}) - expected := expectedTargetGroups("ns1", false) - for k, v := range expectedTargetGroups("ns2", false) { + expected := expectedTargetGroups("ns1", TLSNo) + for k, v := range expectedTargetGroups("ns2", TLSNo) { expected[k] = v } k8sDiscoveryTest{ discovery: n, afterStart: func() { for _, ns := range []string{"ns1", "ns2"} { - obj := makeIngress(nil) + obj := makeIngress(TLSNo) obj.Namespace = ns c.ExtensionsV1beta1().Ingresses(obj.Namespace).Create(obj) w.Ingresses().Add(obj) diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index 60a26e8605..4871214576 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -137,8 +137,11 @@ func NewDiscovery( logger = log.NewNopLogger() } - conn, _, err := zk.Connect(srvs, timeout) - conn.SetLogger(treecache.NewZookeeperLogger(logger)) + conn, _, err := zk.Connect( + srvs, timeout, + func(c *zk.Conn) { + c.SetLogger(treecache.NewZookeeperLogger(logger)) + }) if err != nil { return nil } diff --git a/promql/ast.go b/promql/ast.go index af3ed4189f..0c8a3ad2ab 100644 --- a/promql/ast.go +++ b/promql/ast.go @@ -237,57 +237,80 @@ type VectorMatching struct { // Visitor allows visiting a Node and its child nodes. The Visit method is // invoked for each node with the path leading to the node provided additionally. -// If the result visitor w is not nil, Walk visits each of the children +// If the result visitor w is not nil and no error, Walk visits each of the children // of node with the visitor w, followed by a call of w.Visit(nil, nil). type Visitor interface { - Visit(node Node, path []Node) (w Visitor) + Visit(node Node, path []Node) (w Visitor, err error) } // Walk traverses an AST in depth-first order: It starts by calling // v.Visit(node, path); node must not be nil. If the visitor w returned by -// v.Visit(node, path) is not nil, Walk is invoked recursively with visitor -// w for each of the non-nil children of node, followed by a call of -// w.Visit(nil). +// v.Visit(node, path) is not nil and the visitor returns no error, Walk is +// invoked recursively with visitor w for each of the non-nil children of node, +// followed by a call of w.Visit(nil), returning an error // As the tree is descended the path of previous nodes is provided. -func Walk(v Visitor, node Node, path []Node) { - if v = v.Visit(node, path); v == nil { - return +func Walk(v Visitor, node Node, path []Node) error { + var err error + if v, err = v.Visit(node, path); v == nil || err != nil { + return err } path = append(path, node) switch n := node.(type) { case Statements: for _, s := range n { - Walk(v, s, path) + if err := Walk(v, s, path); err != nil { + return err + } } case *AlertStmt: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case *EvalStmt: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case *RecordStmt: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case Expressions: for _, e := range n { - Walk(v, e, path) + if err := Walk(v, e, path); err != nil { + return err + } } case *AggregateExpr: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case *BinaryExpr: - Walk(v, n.LHS, path) - Walk(v, n.RHS, path) + if err := Walk(v, n.LHS, path); err != nil { + return err + } + if err := Walk(v, n.RHS, path); err != nil { + return err + } case *Call: - Walk(v, n.Args, path) + if err := Walk(v, n.Args, path); err != nil { + return err + } case *ParenExpr: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case *UnaryExpr: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case *MatrixSelector, *NumberLiteral, *StringLiteral, *VectorSelector: // nothing to do @@ -296,21 +319,23 @@ func Walk(v Visitor, node Node, path []Node) { panic(fmt.Errorf("promql.Walk: unhandled node type %T", node)) } - v.Visit(nil, nil) + _, err = v.Visit(nil, nil) + return err } -type inspector func(Node, []Node) bool +type inspector func(Node, []Node) error -func (f inspector) Visit(node Node, path []Node) Visitor { - if f(node, path) { - return f +func (f inspector) Visit(node Node, path []Node) (Visitor, error) { + if err := f(node, path); err == nil { + return f, nil + } else { + return nil, err } - return nil } // Inspect traverses an AST in depth-first order: It starts by calling -// f(node, path); node must not be nil. If f returns true, Inspect invokes f +// f(node, path); node must not be nil. If f returns a nil error, Inspect invokes f // for all the non-nil children of node, recursively. -func Inspect(node Node, f func(Node, []Node) bool) { +func Inspect(node Node, f inspector) { Walk(inspector(f), node, nil) } diff --git a/promql/engine.go b/promql/engine.go index aeea2b5d34..5b19f50380 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -451,7 +451,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) { var maxOffset time.Duration - Inspect(s.Expr, func(node Node, _ []Node) bool { + Inspect(s.Expr, func(node Node, _ []Node) error { switch n := node.(type) { case *VectorSelector: if maxOffset < LookbackDelta { @@ -468,7 +468,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev maxOffset = n.Offset + n.Range } } - return true + return nil }) mint := s.Start.Add(-maxOffset) @@ -478,7 +478,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev return nil, err } - Inspect(s.Expr, func(node Node, path []Node) bool { + Inspect(s.Expr, func(node Node, path []Node) error { var set storage.SeriesSet params := &storage.SelectParams{ Step: int64(s.Interval / time.Millisecond), @@ -491,13 +491,13 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev set, err = querier.Select(params, n.LabelMatchers...) if err != nil { level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) - return false + return err } - n.series, err = expandSeriesSet(set) + n.series, err = expandSeriesSet(ctx, set) if err != nil { // TODO(fabxc): use multi-error. level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) - return false + return err } case *MatrixSelector: @@ -506,15 +506,15 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev set, err = querier.Select(params, n.LabelMatchers...) if err != nil { level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) - return false + return err } - n.series, err = expandSeriesSet(set) + n.series, err = expandSeriesSet(ctx, set) if err != nil { level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) - return false + return err } } - return true + return nil }) return querier, err } @@ -538,8 +538,13 @@ func extractFuncFromPath(p []Node) string { return extractFuncFromPath(p[:len(p)-1]) } -func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) { +func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, err error) { for it.Next() { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } res = append(res, it.At()) } return res, it.Err() @@ -1039,6 +1044,9 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { var it *storage.BufferedSeriesIterator for i, s := range node.series { + if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + ev.error(err) + } if it == nil { it = storage.NewBuffer(s.Iterator(), durationMilliseconds(node.Range)) } else { diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 9ae2adbde0..1a45fb9717 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -164,6 +164,13 @@ type BlockStats struct { NumTombstones uint64 `json:"numTombstones,omitempty"` } +// BlockDesc describes a block by ULID and time range. +type BlockDesc struct { + ULID ulid.ULID `json:"ulid"` + MinTime int64 `json:"minTime"` + MaxTime int64 `json:"maxTime"` +} + // BlockMetaCompaction holds information about compactions a block went through. type BlockMetaCompaction struct { // Maximum number of compaction cycles any source block has @@ -171,6 +178,9 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + // Short descriptions of the direct blocks that were used to create + // this block. + Parents []BlockDesc `json:"parents,omitempty"` Failed bool `json:"failed,omitempty"` } @@ -424,7 +434,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := memTombstones{} + stones := NewMemTombstones() var lset labels.Labels var chks []chunks.Meta @@ -437,10 +447,10 @@ Outer: } for _, chk := range chks { - if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { + if chk.OverlapsClosedInterval(mint, maxt) { // Delete only until the current values and not beyond. tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) - stones[p.At()] = Intervals{{tmin, tmax}} + stones.addInterval(p.At(), Interval{tmin, tmax}) continue Outer } } @@ -452,7 +462,7 @@ Outer: err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { for _, iv := range ivs { - stones.add(id, iv) + stones.addInterval(id, iv) pb.meta.Stats.NumTombstones++ } return nil @@ -475,19 +485,17 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { pb.tombstones.Iter(func(id uint64, ivs Intervals) error { numStones += len(ivs) - return nil }) - if numStones == 0 { return nil, nil } - uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime) + meta := pb.Meta() + uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta) if err != nil { return nil, err } - return &uid, nil } @@ -531,6 +539,13 @@ func (pb *Block) Snapshot(dir string) error { return nil } +// Returns true if the block overlaps [mint, maxt]. +func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool { + // The block itself is a half-open interval + // [pb.meta.MinTime, pb.meta.MaxTime). + return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime +} + func clampInterval(a, b, mint, maxt int64) (int64, int64) { if a < mint { a = mint diff --git a/vendor/github.com/prometheus/tsdb/chunks/chunks.go b/vendor/github.com/prometheus/tsdb/chunks/chunks.go index 9c80767ff7..5eab23982d 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/chunks.go +++ b/vendor/github.com/prometheus/tsdb/chunks/chunks.go @@ -57,6 +57,12 @@ func (cm *Meta) writeHash(h hash.Hash) error { return nil } +// Returns true if the chunk overlaps [mint, maxt]. +func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool { + // The chunk itself is a closed interval [cm.MinTime, cm.MaxTime]. + return cm.MinTime <= maxt && mint <= cm.MaxTime +} + var ( errInvalidSize = fmt.Errorf("invalid size") errInvalidFlag = fmt.Errorf("invalid flag") @@ -296,7 +302,7 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err } // Verify magic number. if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { - return nil, fmt.Errorf("invalid magic number %x", m) + return nil, errors.Errorf("invalid magic number %x", m) } } return &cr, nil @@ -357,8 +363,8 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { r := b.Range(off, off+binary.MaxVarintLen32) l, n := binary.Uvarint(r) - if n < 0 { - return nil, fmt.Errorf("reading chunk length failed") + if n <= 0 { + return nil, errors.Errorf("reading chunk length failed with %d", n) } r = b.Range(off+n, off+n+int(l)) diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 16a3bd7471..1da130057a 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -55,7 +55,7 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. - Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) + Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). @@ -297,6 +297,11 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { for _, s := range b.Compaction.Sources { sources[s] = struct{}{} } + res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{ + ULID: b.ULID, + MinTime: b.MinTime, + MaxTime: b.MaxTime, + }) } res.Compaction.Level++ @@ -367,7 +372,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, return uid, merr } -func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { +func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -379,6 +384,12 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) ( meta.Compaction.Level = 1 meta.Compaction.Sources = []ulid.ULID{uid} + if parent != nil { + meta.Compaction.Parents = []BlockDesc{ + {ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime}, + } + } + err := c.write(dest, meta, b) if err != nil { return uid, err @@ -472,7 +483,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil { + if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -581,7 +592,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, if len(dranges) > 0 { // Re-encode the chunk to not have deleted values. for i, chk := range chks { - if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) { + if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) { continue } diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index 28cb14f331..fcfbeeeb29 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -75,7 +75,7 @@ type Appender interface { // Returned reference numbers are ephemeral and may be rejected in calls // to AddFast() at any point. Adding the sample via Add() returns a new // reference number. - // If the reference is the empty string it must not be used for caching. + // If the reference is 0 it must not be used for caching. Add(l labels.Labels, t int64, v float64) (uint64, error) // Add adds a sample pair for the referenced series. It is generally faster @@ -267,17 +267,9 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - _, err1 := db.retentionCutoff() - if err1 != nil { - level.Error(db.logger).Log("msg", "retention cutoff failed", "err", err1) - } - - _, err2 := db.compact() - if err2 != nil { - level.Error(db.logger).Log("msg", "compaction failed", "err", err2) - } - - if err1 != nil || err2 != nil { + _, err := db.compact() + if err != nil { + level.Error(db.logger).Log("msg", "compaction failed", "err", err) backoff = exponential(backoff, 1*time.Second, 1*time.Minute) } else { backoff = 0 @@ -289,19 +281,9 @@ func (db *DB) run() { } } -func (db *DB) retentionCutoff() (b bool, err error) { - defer func() { - if !b && err == nil { - // no data had to be cut off. - return - } - db.metrics.cutoffs.Inc() - if err != nil { - db.metrics.cutoffsFailed.Inc() - } - }() +func (db *DB) beyondRetention(meta *BlockMeta) bool { if db.opts.RetentionDuration == 0 { - return false, nil + return false } db.mtx.RLock() @@ -309,23 +291,13 @@ func (db *DB) retentionCutoff() (b bool, err error) { db.mtx.RUnlock() if len(blocks) == 0 { - return false, nil + return false } last := blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) - dirs, err := retentionCutoffDirs(db.dir, mint) - if err != nil { - return false, err - } - // This will close the dirs and then delete the dirs. - if len(dirs) > 0 { - return true, db.reload(dirs...) - } - - return false, nil + return meta.MaxTime < mint } // Appender opens a new appender against the database. @@ -354,6 +326,13 @@ func (a dbAppender) Commit() error { return err } +// Compact data if possible. After successful compaction blocks are reloaded +// which will also trigger blocks to be deleted that fall out of the retention +// window. +// If no blocks are compacted, the retention window state doesn't change. Thus, +// this is sufficient to reliably delete old data. +// Old blocks are only deleted on reload based on the new block's parent information. +// See DB.reload documentation for further information. func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -381,9 +360,15 @@ func (db *DB) compact() (changes bool, err error) { head := &rangeHead{ head: db.head, mint: mint, - maxt: maxt, + // We remove 1 millisecond from maxt because block + // intervals are half-open: [b.MinTime, b.MaxTime). But + // chunk intervals are closed: [c.MinTime, c.MaxTime]; + // so in order to make sure that overlaps are evaluated + // consistently, we explicitly remove the last value + // from the block interval here. + maxt: maxt - 1, } - if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { + if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true @@ -418,7 +403,7 @@ func (db *DB) compact() (changes bool, err error) { changes = true runtime.GC() - if err := db.reload(plan...); err != nil { + if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } runtime.GC() @@ -427,39 +412,6 @@ func (db *DB) compact() (changes bool, err error) { return changes, nil } -// retentionCutoffDirs returns all directories of blocks in dir that are strictly -// before mint. -func retentionCutoffDirs(dir string, mint int64) ([]string, error) { - df, err := fileutil.OpenDir(dir) - if err != nil { - return nil, errors.Wrapf(err, "open directory") - } - defer df.Close() - - dirs, err := blockDirs(dir) - if err != nil { - return nil, errors.Wrapf(err, "list block dirs %s", dir) - } - - delDirs := []string{} - - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - return nil, errors.Wrapf(err, "read block meta %s", dir) - } - // The first block we encounter marks that we crossed the boundary - // of deletable blocks. - if meta.MaxTime >= mint { - break - } - - delDirs = append(delDirs, dir) - } - - return delDirs, nil -} - func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { for _, b := range db.blocks { if b.Meta().ULID == id { @@ -469,18 +421,10 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -func stringsContain(set []string, elem string) bool { - for _, e := range set { - if elem == e { - return true - } - } - return false -} - // reload on-disk blocks and trigger head truncation if new blocks appeared. It takes // a list of block directories which should be deleted during reload. -func (db *DB) reload(deleteable ...string) (err error) { +// Blocks that are obsolete due to replacement or retention will be deleted. +func (db *DB) reload() (err error) { defer func() { if err != nil { db.metrics.reloadsFailed.Inc() @@ -492,21 +436,58 @@ func (db *DB) reload(deleteable ...string) (err error) { if err != nil { return errors.Wrap(err, "find blocks") } + // We delete old blocks that have been superseded by new ones by gathering all parents + // from existing blocks. Those parents all have newer replacements and can be safely deleted + // after we loaded the other blocks. + // This makes us resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. By creating + // blocks with their parents, we can pick up the deletion where it left off during a crash. var ( - blocks []*Block - exist = map[ulid.ULID]struct{}{} + blocks []*Block + corrupted = map[ulid.ULID]error{} + opened = map[ulid.ULID]struct{}{} + deleteable = map[ulid.ULID]struct{}{} ) + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + // The block was potentially in the middle of being deleted during a crash. + // Skip it since we may delete it properly further down again. + level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) + ulid, err2 := ulid.Parse(filepath.Base(dir)) + if err2 != nil { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + continue + } + corrupted[ulid] = err + continue + } + if db.beyondRetention(meta) { + deleteable[meta.ULID] = struct{}{} + continue + } + for _, b := range meta.Compaction.Parents { + deleteable[b.ULID] = struct{}{} + } + } + // Blocks we failed to open should all be those we are want to delete anyway. + for c, err := range corrupted { + if _, ok := deleteable[c]; !ok { + return errors.Wrapf(err, "unexpected corrupted block %s", c) + } + } + // Load new blocks into memory. for _, dir := range dirs { meta, err := readMetaFile(dir) if err != nil { return errors.Wrapf(err, "read meta information %s", dir) } - // If the block is pending for deletion, don't add it to the new block set. - if stringsContain(deleteable, dir) { + // Don't load blocks that are scheduled for deletion. + if _, ok := deleteable[meta.ULID]; ok { continue } - + // See if we already have the block in memory or open it otherwise. b, ok := db.getBlock(meta.ULID) if !ok { b, err = OpenBlock(dir, db.chunkPool) @@ -514,9 +495,8 @@ func (db *DB) reload(deleteable ...string) (err error) { return errors.Wrapf(err, "open block %s", dir) } } - blocks = append(blocks, b) - exist[meta.ULID] = struct{}{} + opened[meta.ULID] = struct{}{} } sort.Slice(blocks, func(i, j int) bool { return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime @@ -533,15 +513,19 @@ func (db *DB) reload(deleteable ...string) (err error) { db.blocks = blocks db.mtx.Unlock() + // Drop old blocks from memory. for _, b := range oldBlocks { - if _, ok := exist[b.Meta().ULID]; ok { + if _, ok := opened[b.Meta().ULID]; ok { continue } if err := b.Close(); err != nil { level.Warn(db.logger).Log("msg", "closing block failed", "err", err) } - if err := os.RemoveAll(b.Dir()); err != nil { - level.Warn(db.logger).Log("msg", "deleting block failed", "err", err) + } + // Delete all obsolete blocks. None of them are opened any longer. + for ulid := range deleteable { + if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { + return errors.Wrapf(err, "delete obsolete block %s", ulid) } } @@ -765,7 +749,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error { if !withHead { return nil } - _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil) return errors.Wrap(err, "snapshot head block") } @@ -778,8 +762,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { defer db.mtx.RUnlock() for _, b := range db.blocks { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + if b.OverlapsClosedInterval(mint, maxt) { blocks = append(blocks, b) } } @@ -821,8 +804,7 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { defer db.mtx.RUnlock() for _, b := range db.blocks { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + if b.OverlapsClosedInterval(mint, maxt) { g.Go(func(b *Block) func() error { return func() error { return b.Delete(mint, maxt, ms...) } }(b)) @@ -859,27 +841,15 @@ func (db *DB) CleanTombstones() (err error) { blocks := db.blocks[:] db.mtx.RUnlock() - deletable := []string{} for _, b := range blocks { if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil { err = errors.Wrapf(er, "clean tombstones: %s", b.Dir()) return err } else if uid != nil { // New block was created. - deletable = append(deletable, b.Dir()) newUIDs = append(newUIDs, *uid) } } - - if len(deletable) == 0 { - return nil - } - - return errors.Wrap(db.reload(deletable...), "reload blocks") -} - -func intervalOverlap(amin, amax, bmin, bmax int64) bool { - // Checks Overlap: http://stackoverflow.com/questions/3269434/ - return amin <= bmax && bmin <= amax + return errors.Wrap(db.reload(), "reload blocks") } func isBlockDir(fi os.FileInfo) bool { diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 2c7c7ec38a..372842865c 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -69,7 +69,7 @@ type Head struct { postings *index.MemPostings // postings lists for terms - tombstones memTombstones + tombstones *memTombstones } type headMetrics struct { @@ -189,7 +189,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), - tombstones: memTombstones{}, + tombstones: NewMemTombstones(), } h.metrics = newHeadMetrics(h, r) @@ -300,7 +300,7 @@ func (h *Head) ReadWAL() error { if itv.Maxt < mint { continue } - h.tombstones.add(s.ref, itv) + h.tombstones.addInterval(s.ref, itv) } } } @@ -521,7 +521,8 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { } func (a *headAppender) Commit() error { - defer a.Rollback() + defer a.head.metrics.activeAppenders.Dec() + defer a.head.putAppendBuffer(a.samples) if err := a.head.wal.LogSeries(a.series); err != nil { return err @@ -565,7 +566,9 @@ func (a *headAppender) Rollback() error { a.head.metrics.activeAppenders.Dec() a.head.putAppendBuffer(a.samples) - return nil + // Series are created in the head memory regardless of rollback. Thus we have + // to log them to the WAL in any case. + return a.head.wal.LogSeries(a.series) } // Delete all samples in the range of [mint, maxt] for series that satisfy the given @@ -602,7 +605,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { return err } for _, s := range stones { - h.tombstones.add(s.ref, s.intervals[0]) + h.tombstones.addInterval(s.ref, s.intervals[0]) } return nil } @@ -732,19 +735,14 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s.Lock() c := s.chunk(int(cid)) - // This means that the chunk has been garbage collected. - if c == nil { + // This means that the chunk has been garbage collected or is outside + // the specified range. + if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() return nil, ErrNotFound } - - mint, maxt := c.minTime, c.maxTime s.Unlock() - // Do not expose chunks that are outside of the specified range. - if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) { - return nil, ErrNotFound - } return &safeChunk{ Chunk: c.chunk, s: s, @@ -849,7 +847,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks for i, c := range s.chunks { // Do not expose chunks that are outside of the specified range. - if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { + if !c.OverlapsClosedInterval(h.mint, h.maxt) { continue } *chks = append(*chks, chunks.Meta{ @@ -1288,6 +1286,11 @@ type memChunk struct { minTime, maxTime int64 } +// Returns true if the chunk overlaps [mint, maxt]. +func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { + return mc.minTime <= maxt && mint <= mc.maxTime +} + type memSafeIterator struct { chunkenc.Iterator diff --git a/vendor/github.com/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/tsdb/index/index.go index 72ca3835ae..c58ff6ea83 100644 --- a/vendor/github.com/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/tsdb/index/index.go @@ -740,8 +740,8 @@ func (r *Reader) decbufUvarintAt(off int) decbuf { b := r.b.Range(off, off+binary.MaxVarintLen32) l, n := binary.Uvarint(b) - if n > binary.MaxVarintLen32 { - return decbuf{e: errors.New("invalid uvarint")} + if n <= 0 || n > binary.MaxVarintLen32 { + return decbuf{e: errors.Errorf("invalid uvarint %d", n)} } if r.b.Len() < off+n+int(l)+4 { diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index b5b9ae050d..2e048e4952 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -478,7 +478,7 @@ type baseChunkSeries struct { // over them. It drops chunks based on tombstones in the given reader. func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { - tr = EmptyTombstoneReader() + tr = NewMemTombstones() } p, err := PostingsForMatchers(ir, ms...) if err != nil { diff --git a/vendor/github.com/prometheus/tsdb/tombstones.go b/vendor/github.com/prometheus/tsdb/tombstones.go index 8c760cdcec..d4a3d0ef1b 100644 --- a/vendor/github.com/prometheus/tsdb/tombstones.go +++ b/vendor/github.com/prometheus/tsdb/tombstones.go @@ -16,12 +16,12 @@ package tsdb import ( "encoding/binary" "fmt" + "github.com/pkg/errors" "io" "io/ioutil" "os" "path/filepath" - - "github.com/pkg/errors" + "sync" ) const tombstoneFilename = "tombstones" @@ -107,10 +107,10 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (memTombstones, error) { +func readTombstones(dir string) (*memTombstones, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return memTombstones{}, nil + return NewMemTombstones(), nil } else if err != nil { return nil, err } @@ -140,7 +140,7 @@ func readTombstones(dir string) (memTombstones, error) { return nil, errors.New("checksum did not match") } - stonesMap := memTombstones{} + stonesMap := NewMemTombstones() for d.len() > 0 { k := d.uvarint64() @@ -150,27 +150,31 @@ func readTombstones(dir string) (memTombstones, error) { return nil, d.err() } - stonesMap.add(k, Interval{mint, maxt}) + stonesMap.addInterval(k, Interval{mint, maxt}) } return stonesMap, nil } -type memTombstones map[uint64]Intervals - -var emptyTombstoneReader = memTombstones{} - -// EmptyTombstoneReader returns a TombstoneReader that is always empty. -func EmptyTombstoneReader() TombstoneReader { - return emptyTombstoneReader +type memTombstones struct { + intvlGroups map[uint64]Intervals + mtx sync.RWMutex } -func (t memTombstones) Get(ref uint64) (Intervals, error) { - return t[ref], nil +func NewMemTombstones() *memTombstones { + return &memTombstones{intvlGroups: make(map[uint64]Intervals)} } -func (t memTombstones) Iter(f func(uint64, Intervals) error) error { - for ref, ivs := range t { +func (t *memTombstones) Get(ref uint64) (Intervals, error) { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.intvlGroups[ref], nil +} + +func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { + t.mtx.RLock() + defer t.mtx.RUnlock() + for ref, ivs := range t.intvlGroups { if err := f(ref, ivs); err != nil { return err } @@ -178,8 +182,13 @@ func (t memTombstones) Iter(f func(uint64, Intervals) error) error { return nil } -func (t memTombstones) add(ref uint64, itv Interval) { - t[ref] = t[ref].add(itv) +// addInterval to an existing memTombstones +func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { + t.mtx.Lock() + defer t.mtx.Unlock() + for _, itv := range itvs { + t.intvlGroups[ref] = t.intvlGroups[ref].add(itv) + } } func (memTombstones) Close() error { @@ -208,7 +217,7 @@ func (tr Interval) isSubrange(dranges Intervals) bool { // Intervals represents a set of increasing and non-overlapping time-intervals. type Intervals []Interval -// This adds the new time-range to the existing ones. +// add the new time-range to the existing ones. // The existing ones must be sorted. func (itvs Intervals) add(n Interval) Intervals { for i, r := range itvs { diff --git a/vendor/vendor.json b/vendor/vendor.json index 1c5c47c515..4ae303726a 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -820,40 +820,40 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "eohOTRwnox/+qrSrgYmnxeJB2yM=", + "checksumSHA1": "gzvR+g1v/ILXxAt/NuxzIPWk1x0=", "path": "github.com/prometheus/tsdb", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { "checksumSHA1": "QI0UME2olSr4kH6Z8UkpffM59Mc=", "path": "github.com/prometheus/tsdb/chunkenc", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { - "checksumSHA1": "746Mjy2y6wdsGjY/FcGhc8tI4w8=", + "checksumSHA1": "+5bPifRe479zdFeTYhZ+CZRLMgw=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { "checksumSHA1": "dnyelqeik/xHDRCvCmKFv/Op9XQ=", "path": "github.com/prometheus/tsdb/fileutil", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { - "checksumSHA1": "A2uIFwIgeHmXGBzOpna95kM80RY=", + "checksumSHA1": "AZGFK4UtJe8/j8pHqGTNQ8wu27g=", "path": "github.com/prometheus/tsdb/index", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { "checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=", "path": "github.com/prometheus/tsdb/labels", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",