diff --git a/.travis.yml b/.travis.yml index bdb820c0f6..594894877e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,7 @@ language: go go: - 1.8.x +- 1.x go_import_path: github.com/prometheus/prometheus diff --git a/CHANGELOG.md b/CHANGELOG.md index d5ee23bec4..49a3affef8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,13 +1,17 @@ -## v2.0.0-beta.2 / 2017-08-18 +## v2.0.0-beta.4 / 2017-09-14 This release includes numerous changes to the new storage layer. The main changes are: -* [CHANGES] Deterministic block boundaries -* [ENHANCEMENTS] Avoid memory usage spikes during compactions +* [CHANGES] Single, compacted write ahead log +* [CHANGES] Single in-memory block with garbage collection +* [ENHANCEMENTS] Cache series dropped via `metric_relabel_configs` +* [ENHANCEMENTS] Pool byte buffers for scraping -It's generally advised to start with a clean storage directory. As a best effort, -running `sed -i .bkp 's/generation/level/g' */meta.json` from within the directory -should be sufficient to migrate data written by v2.0.0-beta.0. +Overall the changes achieve a baseline reduction in memory consumption and reduce +peak memory usage by 30-40% compared to the 2.0.0-beta.2 release. + +This release requires a clean storage directory and is not compatible with files +created by previous beta releases. ## 1.7.1 / 2017-06-12 diff --git a/VERSION b/VERSION index e2d1981035..20c4e2607d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.0.0-beta.2 +2.0.0-beta.4 diff --git a/config/config.go b/config/config.go index 1633072e21..f83b0146a1 100644 --- a/config/config.go +++ b/config/config.go @@ -1010,10 +1010,11 @@ type KubernetesRole string // The valid options for KubernetesRole. const ( - KubernetesRoleNode = "node" - KubernetesRolePod = "pod" - KubernetesRoleService = "service" - KubernetesRoleEndpoint = "endpoints" + KubernetesRoleNode KubernetesRole = "node" + KubernetesRolePod KubernetesRole = "pod" + KubernetesRoleService KubernetesRole = "service" + KubernetesRoleEndpoint KubernetesRole = "endpoints" + KubernetesRoleIngress KubernetesRole = "ingress" ) // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -1022,7 +1023,7 @@ func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error return err } switch *c { - case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleService, KubernetesRoleEndpoint: + case KubernetesRoleNode, KubernetesRolePod, KubernetesRoleService, KubernetesRoleEndpoint, KubernetesRoleIngress: return nil default: return fmt.Errorf("Unknown Kubernetes SD role %q", *c) @@ -1141,6 +1142,7 @@ type EC2SDConfig struct { AccessKey string `yaml:"access_key,omitempty"` SecretKey Secret `yaml:"secret_key,omitempty"` Profile string `yaml:"profile,omitempty"` + RoleARN string `yaml:"role_arn,omitempty"` RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` Port int `yaml:"port"` diff --git a/discovery/ec2/ec2.go b/discovery/ec2/ec2.go index b0fd46912f..0923c8fd1f 100644 --- a/discovery/ec2/ec2.go +++ b/discovery/ec2/ec2.go @@ -21,6 +21,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/session" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -72,6 +73,7 @@ type Discovery struct { aws *aws.Config interval time.Duration profile string + roleARN string port int logger log.Logger } @@ -91,6 +93,7 @@ func NewDiscovery(conf *config.EC2SDConfig, logger log.Logger) *Discovery { Credentials: creds, }, profile: conf.Profile, + roleARN: conf.RoleARN, interval: time.Duration(conf.RefreshInterval), port: conf.Port, logger: logger, @@ -151,7 +154,13 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { return nil, fmt.Errorf("could not create aws session: %s", err) } - ec2s := ec2.New(sess) + var ec2s *ec2.EC2 + if d.roleARN != "" { + creds := stscreds.NewCredentials(sess, d.roleARN) + ec2s = ec2.New(sess, &aws.Config{Credentials: creds}) + } else { + ec2s = ec2.New(sess) + } tg = &config.TargetGroup{ Source: *d.aws.Region, } diff --git a/discovery/kubernetes/endpoints.go b/discovery/kubernetes/endpoints.go index 50bd7eb419..b8e702fa1b 100644 --- a/discovery/kubernetes/endpoints.go +++ b/discovery/kubernetes/endpoints.go @@ -158,18 +158,19 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } func convertToEndpoints(o interface{}) (*apiv1.Endpoints, error) { - endpoints, isEndpoints := o.(*apiv1.Endpoints) - if !isEndpoints { - deletedState, ok := o.(cache.DeletedFinalStateUnknown) - if !ok { - return nil, fmt.Errorf("Received unexpected object: %v", o) - } - endpoints, ok = deletedState.Obj.(*apiv1.Endpoints) - if !ok { - return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj) - } + endpoints, ok := o.(*apiv1.Endpoints) + if ok { + return endpoints, nil } + deletedState, ok := o.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, fmt.Errorf("Received unexpected object: %v", o) + } + endpoints, ok = deletedState.Obj.(*apiv1.Endpoints) + if !ok { + return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj) + } return endpoints, nil } diff --git a/discovery/kubernetes/ingress.go b/discovery/kubernetes/ingress.go new file mode 100644 index 0000000000..33a4250937 --- /dev/null +++ b/discovery/kubernetes/ingress.go @@ -0,0 +1,185 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "fmt" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/net/context" + "k8s.io/client-go/pkg/apis/extensions/v1beta1" + "k8s.io/client-go/tools/cache" +) + +// Ingress implements discovery of Kubernetes ingresss. +type Ingress struct { + logger log.Logger + informer cache.SharedInformer + store cache.Store +} + +// NewIngress returns a new ingress discovery. +func NewIngress(l log.Logger, inf cache.SharedInformer) *Ingress { + return &Ingress{logger: l, informer: inf, store: inf.GetStore()} +} + +// Run implements the TargetProvider interface. +func (s *Ingress) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // Send full initial set of pod targets. + var initial []*config.TargetGroup + for _, o := range s.store.List() { + tg := s.buildIngress(o.(*v1beta1.Ingress)) + initial = append(initial, tg) + } + select { + case <-ctx.Done(): + return + case ch <- initial: + } + + // Send target groups for ingress updates. + send := func(tg *config.TargetGroup) { + select { + case <-ctx.Done(): + case ch <- []*config.TargetGroup{tg}: + } + } + s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + eventCount.WithLabelValues("ingress", "add").Inc() + + ingress, err := convertToIngress(o) + if err != nil { + level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error()) + return + } + send(s.buildIngress(ingress)) + }, + DeleteFunc: func(o interface{}) { + eventCount.WithLabelValues("ingress", "delete").Inc() + + ingress, err := convertToIngress(o) + if err != nil { + level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error()) + return + } + send(&config.TargetGroup{Source: ingressSource(ingress)}) + }, + UpdateFunc: func(_, o interface{}) { + eventCount.WithLabelValues("ingress", "update").Inc() + + ingress, err := convertToIngress(o) + if err != nil { + level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error()) + return + } + send(s.buildIngress(ingress)) + }, + }) + + // Block until the target provider is explicitly canceled. + <-ctx.Done() +} + +func convertToIngress(o interface{}) (*v1beta1.Ingress, error) { + ingress, ok := o.(*v1beta1.Ingress) + if ok { + return ingress, nil + } + + deletedState, ok := o.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, fmt.Errorf("Received unexpected object: %v", o) + } + ingress, ok = deletedState.Obj.(*v1beta1.Ingress) + if !ok { + return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Ingress object: %v", deletedState.Obj) + } + return ingress, nil +} + +func ingressSource(s *v1beta1.Ingress) string { + return "ingress/" + s.Namespace + "/" + s.Name +} + +const ( + ingressNameLabel = metaLabelPrefix + "ingress_name" + ingressLabelPrefix = metaLabelPrefix + "ingress_label_" + ingressAnnotationPrefix = metaLabelPrefix + "ingress_annotation_" + ingressSchemeLabel = metaLabelPrefix + "ingress_scheme" + ingressHostLabel = metaLabelPrefix + "ingress_host" + ingressPathLabel = metaLabelPrefix + "ingress_path" +) + +func ingressLabels(ingress *v1beta1.Ingress) model.LabelSet { + ls := make(model.LabelSet, len(ingress.Labels)+len(ingress.Annotations)+2) + ls[ingressNameLabel] = lv(ingress.Name) + ls[namespaceLabel] = lv(ingress.Namespace) + + for k, v := range ingress.Labels { + ln := strutil.SanitizeLabelName(ingressLabelPrefix + k) + ls[model.LabelName(ln)] = lv(v) + } + + for k, v := range ingress.Annotations { + ln := strutil.SanitizeLabelName(ingressAnnotationPrefix + k) + ls[model.LabelName(ln)] = lv(v) + } + return ls +} + +func pathsFromIngressRule(rv *v1beta1.IngressRuleValue) []string { + if rv.HTTP == nil { + return []string{"/"} + } + paths := make([]string, len(rv.HTTP.Paths)) + for n, p := range rv.HTTP.Paths { + path := p.Path + if path == "" { + path = "/" + } + paths[n] = path + } + return paths +} + +func (s *Ingress) buildIngress(ingress *v1beta1.Ingress) *config.TargetGroup { + tg := &config.TargetGroup{ + Source: ingressSource(ingress), + } + tg.Labels = ingressLabels(ingress) + + schema := "http" + if ingress.Spec.TLS != nil { + schema = "https" + } + for _, rule := range ingress.Spec.Rules { + paths := pathsFromIngressRule(&rule.IngressRuleValue) + + for _, path := range paths { + tg.Targets = append(tg.Targets, model.LabelSet{ + model.AddressLabel: lv(rule.Host), + ingressSchemeLabel: lv(schema), + ingressHostLabel: lv(rule.Host), + ingressPathLabel: lv(path), + }) + } + } + + return tg +} diff --git a/discovery/kubernetes/ingress_test.go b/discovery/kubernetes/ingress_test.go new file mode 100644 index 0000000000..6c8bf7f14b --- /dev/null +++ b/discovery/kubernetes/ingress_test.go @@ -0,0 +1,136 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/pkg/apis/extensions/v1beta1" +) + +func ingressStoreKeyFunc(obj interface{}) (string, error) { + return obj.(*v1beta1.Ingress).ObjectMeta.Name, nil +} + +func newFakeIngressInformer() *fakeInformer { + return newFakeInformer(ingressStoreKeyFunc) +} + +func makeTestIngressDiscovery() (*Ingress, *fakeInformer) { + i := newFakeIngressInformer() + return NewIngress(nil, i), i +} + +func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress { + return &v1beta1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testingress", + Namespace: "default", + Labels: map[string]string{"testlabel": "testvalue"}, + Annotations: map[string]string{"testannotation": "testannotationvalue"}, + }, + Spec: v1beta1.IngressSpec{ + TLS: tls, + Rules: []v1beta1.IngressRule{ + { + Host: "example.com", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + {Path: "/"}, + {Path: "/foo"}, + }, + }, + }, + }, + { + // No backend config, ignored + Host: "nobackend.example.com", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{}, + }, + }, + { + Host: "test.example.com", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{{}}, + }, + }, + }, + }, + }, + } +} + +func expectedTargetGroups(tls bool) []*config.TargetGroup { + scheme := "http" + if tls { + scheme = "https" + } + return []*config.TargetGroup{ + { + Targets: []model.LabelSet{ + { + "__meta_kubernetes_ingress_scheme": lv(scheme), + "__meta_kubernetes_ingress_host": "example.com", + "__meta_kubernetes_ingress_path": "/", + "__address__": "example.com", + }, + { + "__meta_kubernetes_ingress_scheme": lv(scheme), + "__meta_kubernetes_ingress_host": "example.com", + "__meta_kubernetes_ingress_path": "/foo", + "__address__": "example.com", + }, + { + "__meta_kubernetes_ingress_scheme": lv(scheme), + "__meta_kubernetes_ingress_host": "test.example.com", + "__address__": "test.example.com", + "__meta_kubernetes_ingress_path": "/", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_ingress_name": "testingress", + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_ingress_label_testlabel": "testvalue", + "__meta_kubernetes_ingress_annotation_testannotation": "testannotationvalue", + }, + Source: "ingress/default/testingress", + }, + } +} + +func TestIngressDiscoveryInitial(t *testing.T) { + n, i := makeTestIngressDiscovery() + i.GetStore().Add(makeIngress(nil)) + + k8sDiscoveryTest{ + discovery: n, + expectedInitial: expectedTargetGroups(false), + }.Run(t) +} + +func TestIngressDiscoveryInitialTLS(t *testing.T) { + n, i := makeTestIngressDiscovery() + i.GetStore().Add(makeIngress([]v1beta1.IngressTLS{{}})) + + k8sDiscoveryTest{ + discovery: n, + expectedInitial: expectedTargetGroups(true), + }.Run(t) +} diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 1226a8edcf..91d78ec198 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api" apiv1 "k8s.io/client-go/pkg/api/v1" + extensionsv1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) @@ -155,6 +156,7 @@ const resyncPeriod = 10 * time.Minute // Run implements the TargetProvider interface. func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { rclient := d.client.Core().RESTClient() + reclient := d.client.Extensions().RESTClient() namespaces := d.getNamespaces() @@ -197,7 +199,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { for _, namespace := range namespaces { plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil) pod := NewPod( - log.With(d.logger, "k8s_sd", "pod"), + log.With(d.logger, "role", "pod"), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), ) go pod.informer.Run(ctx.Done()) @@ -217,7 +219,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { for _, namespace := range namespaces { slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil) svc := NewService( - log.With(d.logger, "k8s_sd", "service"), + log.With(d.logger, "role", "service"), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), ) go svc.informer.Run(ctx.Done()) @@ -232,10 +234,30 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { }() } wg.Wait() + case "ingress": + var wg sync.WaitGroup + for _, namespace := range namespaces { + ilw := cache.NewListWatchFromClient(reclient, "ingresses", namespace, nil) + ingress := NewIngress( + log.With(d.logger, "role", "ingress"), + cache.NewSharedInformer(ilw, &extensionsv1beta1.Ingress{}, resyncPeriod), + ) + go ingress.informer.Run(ctx.Done()) + + for !ingress.informer.HasSynced() { + time.Sleep(100 * time.Millisecond) + } + wg.Add(1) + go func() { + defer wg.Done() + ingress.Run(ctx, ch) + }() + } + wg.Wait() case "node": nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil) node := NewNode( - log.With(d.logger, "k8s_sd", "node"), + log.With(d.logger, "role", "node"), cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod), ) go node.informer.Run(ctx.Done()) diff --git a/discovery/kubernetes/node.go b/discovery/kubernetes/node.go index 4b6a644b97..e3162d0f2f 100644 --- a/discovery/kubernetes/node.go +++ b/discovery/kubernetes/node.go @@ -103,18 +103,19 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } func convertToNode(o interface{}) (*apiv1.Node, error) { - node, isNode := o.(*apiv1.Node) - if !isNode { - deletedState, ok := o.(cache.DeletedFinalStateUnknown) - if !ok { - return nil, fmt.Errorf("Received unexpected object: %v", o) - } - node, ok = deletedState.Obj.(*apiv1.Node) - if !ok { - return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) - } + node, ok := o.(*apiv1.Node) + if ok { + return node, nil } + deletedState, ok := o.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, fmt.Errorf("Received unexpected object: %v", o) + } + node, ok = deletedState.Obj.(*apiv1.Node) + if !ok { + return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + } return node, nil } @@ -130,7 +131,7 @@ const ( ) func nodeLabels(n *apiv1.Node) model.LabelSet { - ls := make(model.LabelSet, len(n.Labels)+len(n.Annotations)+2) + ls := make(model.LabelSet, len(n.Labels)+len(n.Annotations)+1) ls[nodeNameLabel] = lv(n.Name) diff --git a/discovery/kubernetes/pod.go b/discovery/kubernetes/pod.go index b29c7e125e..325efeece3 100644 --- a/discovery/kubernetes/pod.go +++ b/discovery/kubernetes/pod.go @@ -111,18 +111,19 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } func convertToPod(o interface{}) (*apiv1.Pod, error) { - pod, isPod := o.(*apiv1.Pod) - if !isPod { - deletedState, ok := o.(cache.DeletedFinalStateUnknown) - if !ok { - return nil, fmt.Errorf("Received unexpected object: %v", o) - } - pod, ok = deletedState.Obj.(*apiv1.Pod) - if !ok { - return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj) - } + pod, ok := o.(*apiv1.Pod) + if ok { + return pod, nil } + deletedState, ok := o.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, fmt.Errorf("Received unexpected object: %v", o) + } + pod, ok = deletedState.Obj.(*apiv1.Pod) + if !ok { + return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj) + } return pod, nil } diff --git a/discovery/kubernetes/service.go b/discovery/kubernetes/service.go index 774eddb1b3..8e00698b60 100644 --- a/discovery/kubernetes/service.go +++ b/discovery/kubernetes/service.go @@ -102,18 +102,18 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } func convertToService(o interface{}) (*apiv1.Service, error) { - service, isService := o.(*apiv1.Service) - if !isService { - deletedState, ok := o.(cache.DeletedFinalStateUnknown) - if !ok { - return nil, fmt.Errorf("Received unexpected object: %v", o) - } - service, ok = deletedState.Obj.(*apiv1.Service) - if !ok { - return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Service object: %v", deletedState.Obj) - } + service, ok := o.(*apiv1.Service) + if ok { + return service, nil + } + deletedState, ok := o.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, fmt.Errorf("Received unexpected object: %v", o) + } + service, ok = deletedState.Obj.(*apiv1.Service) + if !ok { + return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Service object: %v", deletedState.Obj) } - return service, nil } @@ -133,6 +133,7 @@ func serviceLabels(svc *apiv1.Service) model.LabelSet { ls := make(model.LabelSet, len(svc.Labels)+len(svc.Annotations)+2) ls[serviceNameLabel] = lv(svc.Name) + ls[namespaceLabel] = lv(svc.Namespace) for k, v := range svc.Labels { ln := strutil.SanitizeLabelName(serviceLabelPrefix + k) @@ -151,7 +152,6 @@ func (s *Service) buildService(svc *apiv1.Service) *config.TargetGroup { Source: serviceSource(svc), } tg.Labels = serviceLabels(svc) - tg.Labels[namespaceLabel] = lv(svc.Namespace) for _, port := range svc.Spec.Ports { addr := net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", strconv.FormatInt(int64(port.Port), 10)) diff --git a/documentation/examples/prometheus-kubernetes.yml b/documentation/examples/prometheus-kubernetes.yml index a6b9bb0fbc..3768ee8018 100644 --- a/documentation/examples/prometheus-kubernetes.yml +++ b/documentation/examples/prometheus-kubernetes.yml @@ -192,7 +192,7 @@ scrape_configs: - source_labels: [__address__] target_label: __param_target - target_label: __address__ - replacement: blackbox + replacement: blackbox-exporter.example.com:9115 - source_labels: [__param_target] target_label: instance - action: labelmap @@ -202,6 +202,40 @@ scrape_configs: - source_labels: [__meta_kubernetes_service_name] target_label: kubernetes_name +# Example scrape config for probing ingresses via the Blackbox Exporter. +# +# The relabeling allows the actual ingress scrape endpoint to be configured +# via the following annotations: +# +# * `prometheus.io/probe`: Only probe services that have a value of `true` +- job_name: 'kubernetes-ingresses' + + metrics_path: /probe + params: + module: [http_2xx] + + kubernetes_sd_configs: + - role: ingress + + relabel_configs: + - source_labels: [__meta_kubernetes_ingress_annotation_prometheus_io_probe] + action: keep + regex: true + - source_labels: [__meta_kubernetes_ingress_scheme,__address__,__meta_kubernetes_ingress_path] + regex: (.+);(.+);(.+) + replacement: ${1}://${2}${3} + target_label: __param_target + - target_label: __address__ + replacement: blackbox-exporter.example.com:9115 + - source_labels: [__param_target] + target_label: instance + - action: labelmap + regex: __meta_kubernetes_ingress_label_(.+) + - source_labels: [__meta_kubernetes_namespace] + target_label: kubernetes_namespace + - source_labels: [__meta_kubernetes_ingress_name] + target_label: kubernetes_name + # Example scrape config for pods # # The relabeling allows the actual pod scrape endpoint to be configured via the diff --git a/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go b/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go index d43ee0c7b9..86108d9504 100644 --- a/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go +++ b/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go @@ -48,6 +48,10 @@ func NewClient(logger log.Logger, conf influx.HTTPConfig, db string, rp string) os.Exit(1) } + if logger == nil { + logger = log.NewNopLogger() + } + return &Client{ logger: logger, client: c, diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go new file mode 100644 index 0000000000..7cfa78f42c --- /dev/null +++ b/pkg/pool/pool.go @@ -0,0 +1,75 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import "sync" + +// BytesPool is a bucketed pool for variably sized byte slices. +type BytesPool struct { + buckets []sync.Pool + sizes []int +} + +// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize +// increasing by the given factor. +func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool { + if minSize < 1 { + panic("invalid minimum pool size") + } + if maxSize < 1 { + panic("invalid maximum pool size") + } + if factor < 1 { + panic("invalid factor") + } + + var sizes []int + + for s := minSize; s <= maxSize; s = int(float64(s) * factor) { + sizes = append(sizes, s) + } + + p := &BytesPool{ + buckets: make([]sync.Pool, len(sizes)), + sizes: sizes, + } + + return p +} + +// Get returns a new byte slices that fits the given size. +func (p *BytesPool) Get(sz int) []byte { + for i, bktSize := range p.sizes { + if sz > bktSize { + continue + } + b, ok := p.buckets[i].Get().([]byte) + if !ok { + b = make([]byte, 0, bktSize) + } + return b + } + return make([]byte, 0, sz) +} + +// Put returns a byte slice to the right bucket in the pool. +func (p *BytesPool) Put(b []byte) { + for i, bktSize := range p.sizes { + if cap(b) > bktSize { + continue + } + p.buckets[i].Put(b[:0]) + return + } +} diff --git a/retrieval/scrape.go b/retrieval/scrape.go index c737326c93..947701da9d 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -28,6 +28,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/common/version" "golang.org/x/net/context" @@ -35,6 +36,8 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/pool" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" @@ -121,8 +124,8 @@ func init() { // scrapePool manages scrapes for sets of targets. type scrapePool struct { appendable Appendable - - ctx context.Context + logger log.Logger + ctx context.Context mtx sync.RWMutex config *config.ScrapeConfig @@ -133,39 +136,46 @@ type scrapePool struct { loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. - newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop - - logger log.Logger - maxAheadTime time.Duration + newLoop func(*Target, scraper) loop } +const maxAheadTime = 10 * time.Minute + +type labelsMutator func(labels.Labels) labels.Labels + func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool { + if logger == nil { + logger = log.NewNopLogger() + } + client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig) if err != nil { // Any errors that could occur here should be caught during config validation. level.Error(logger).Log("msg", "Error creating HTTP client", "err", err) } - newLoop := func( - ctx context.Context, - s scraper, - app, reportApp func() storage.Appender, - l log.Logger, - ) loop { - return newScrapeLoop(ctx, s, app, reportApp, l) + buffers := pool.NewBytesPool(163, 100e6, 3) + + sp := &scrapePool{ + appendable: app, + config: cfg, + ctx: ctx, + client: client, + targets: map[uint64]*Target{}, + loops: map[uint64]loop{}, + logger: logger, + } + sp.newLoop = func(t *Target, s scraper) loop { + return newScrapeLoop(sp.ctx, s, + log.With(logger, "target", t), + buffers, + func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) }, + func(l labels.Labels) labels.Labels { return sp.mutateReportSampleLabels(l, t) }, + sp.appender, + ) } - return &scrapePool{ - appendable: app, - config: cfg, - ctx: ctx, - client: client, - targets: map[uint64]*Target{}, - loops: map[uint64]loop{}, - newLoop: newLoop, - logger: logger, - maxAheadTime: 10 * time.Minute, - } + return sp } // stop terminates all scrape loops and returns after they all terminated. @@ -217,15 +227,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { var ( t = sp.targets[fp] s = &targetScraper{Target: t, client: sp.client, timeout: timeout} - newLoop = sp.newLoop(sp.ctx, s, - func() storage.Appender { - return sp.sampleAppender(t) - }, - func() storage.Appender { - return sp.reportAppender(t) - }, - log.With(sp.logger, "target", t.labels), - ) + newLoop = sp.newLoop(t, s) ) wg.Add(1) @@ -287,15 +289,7 @@ func (sp *scrapePool) sync(targets []*Target) { if _, ok := sp.targets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client, timeout: timeout} - l := sp.newLoop(sp.ctx, s, - func() storage.Appender { - return sp.sampleAppender(t) - }, - func() storage.Appender { - return sp.reportAppender(t) - }, - log.With(sp.logger, "target", t.labels), - ) + l := sp.newLoop(t, s) sp.targets[hash] = t sp.loops[hash] = l @@ -327,18 +321,58 @@ func (sp *scrapePool) sync(targets []*Target) { wg.Wait() } -// sampleAppender returns an appender for ingested samples from the target. -func (sp *scrapePool) sampleAppender(target *Target) storage.Appender { +func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) labels.Labels { + lb := labels.NewBuilder(lset) + + if sp.config.HonorLabels { + for _, l := range target.Labels() { + if lv := lset.Get(l.Name); lv == "" { + lb.Set(l.Name, l.Value) + } + } + } else { + for _, l := range target.Labels() { + lv := lset.Get(l.Name) + if lv != "" { + lb.Set(model.ExportedLabelPrefix+l.Name, lv) + } + lb.Set(l.Name, l.Value) + } + } + + res := lb.Labels() + + if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { + res = relabel.Process(res, mrc...) + } + + return res +} + +func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels { + lb := labels.NewBuilder(lset) + + for _, l := range target.Labels() { + lv := lset.Get(l.Name) + if lv != "" { + lb.Set(model.ExportedLabelPrefix+l.Name, lv) + } + lb.Set(l.Name, l.Value) + } + + return lb.Labels() +} + +// appender returns an appender for ingested samples from the target. +func (sp *scrapePool) appender() storage.Appender { app, err := sp.appendable.Appender() if err != nil { panic(err) } - if sp.maxAheadTime > 0 { - app = &timeLimitAppender{ - Appender: app, - maxTime: timestamp.FromTime(time.Now().Add(sp.maxAheadTime)), - } + app = &timeLimitAppender{ + Appender: app, + maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } // The limit is applied after metrics are potentially dropped via relabeling. @@ -348,42 +382,9 @@ func (sp *scrapePool) sampleAppender(target *Target) storage.Appender { limit: int(sp.config.SampleLimit), } } - - // The relabelAppender has to be inside the label-modifying appenders - // so the relabeling rules are applied to the correct label set. - if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { - app = relabelAppender{ - Appender: app, - relabelings: mrc, - } - } - - if sp.config.HonorLabels { - app = honorLabelsAppender{ - Appender: app, - labels: target.Labels(), - } - } else { - app = ruleLabelsAppender{ - Appender: app, - labels: target.Labels(), - } - } return app } -// reportAppender returns an appender for reporting samples for the target. -func (sp *scrapePool) reportAppender(target *Target) storage.Appender { - app, err := sp.appendable.Appender() - if err != nil { - panic(err) - } - return ruleLabelsAppender{ - Appender: app, - labels: target.Labels(), - } -} - // A scraper retrieves samples and accepts a status report at the end. type scraper interface { scrape(ctx context.Context, w io.Writer) error @@ -471,12 +472,15 @@ type refEntry struct { } type scrapeLoop struct { - scraper scraper - l log.Logger - cache *scrapeCache + scraper scraper + l log.Logger + cache *scrapeCache + lastScrapeSize int + buffers *pool.BytesPool - appender func() storage.Appender - reportAppender func() storage.Appender + appender func() storage.Appender + sampleMutator labelsMutator + reportSampleMutator labelsMutator ctx context.Context scrapeCtx context.Context @@ -493,6 +497,11 @@ type scrapeCache struct { refs map[string]*refEntry // Parsed string to ref. lsets map[uint64]*lsetCacheEntry // Ref to labelset and string. + // Cache of dropped metric strings and their iteration. The iteration must + // be a pointer so we can update it without setting a new entry with an unsafe + // string in addDropped(). + dropped map[string]*uint64 + // seriesCur and seriesPrev store the labels of series that were seen // in the current and previous scrape. // We hold two maps and swap them out to save allocations. @@ -504,6 +513,7 @@ func newScrapeCache() *scrapeCache { return &scrapeCache{ refs: map[string]*refEntry{}, lsets: map[uint64]*lsetCacheEntry{}, + dropped: map[string]*uint64{}, seriesCur: map[uint64]labels.Labels{}, seriesPrev: map[uint64]labels.Labels{}, } @@ -519,6 +529,11 @@ func (c *scrapeCache) iterDone() { delete(c.lsets, e.ref) } } + for s, iter := range c.dropped { + if *iter < c.iter { + delete(c.dropped, s) + } + } // Swap current and previous series. c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev @@ -556,6 +571,19 @@ func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash ui c.lsets[ref] = &lsetCacheEntry{metric: met, lset: lset, hash: hash} } +func (c *scrapeCache) addDropped(met string) { + iter := c.iter + c.dropped[met] = &iter +} + +func (c *scrapeCache) getDropped(met string) bool { + iterp, ok := c.dropped[met] + if ok { + *iterp = c.iter + } + return ok +} + func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) { c.seriesCur[hash] = lset } @@ -573,20 +601,28 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { func newScrapeLoop( ctx context.Context, sc scraper, - app, reportApp func() storage.Appender, l log.Logger, + buffers *pool.BytesPool, + sampleMutator labelsMutator, + reportSampleMutator labelsMutator, + appender func() storage.Appender, ) *scrapeLoop { if l == nil { l = log.NewNopLogger() } + if buffers == nil { + buffers = pool.NewBytesPool(1e3, 1e6, 3) + } sl := &scrapeLoop{ - scraper: sc, - appender: app, - cache: newScrapeCache(), - reportAppender: reportApp, - stopped: make(chan struct{}), - ctx: ctx, - l: l, + scraper: sc, + buffers: buffers, + cache: newScrapeCache(), + appender: appender, + sampleMutator: sampleMutator, + reportSampleMutator: reportSampleMutator, + stopped: make(chan struct{}), + ctx: ctx, + l: l, } sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) @@ -632,12 +668,20 @@ mainLoop: time.Since(last).Seconds(), ) } + b := sl.buffers.Get(sl.lastScrapeSize) + buf := bytes.NewBuffer(b) scrapeErr := sl.scraper.scrape(scrapeCtx, buf) cancel() - var b []byte + if scrapeErr == nil { b = buf.Bytes() + // NOTE: There were issues with misbehaving clients in the past + // that occasionally returned empty results. We don't want those + // to falsely reset our buffer size. + if len(b) > 0 { + sl.lastScrapeSize = len(b) + } } else { level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error()) if errc != nil { @@ -657,6 +701,8 @@ mainLoop: } } + sl.buffers.Put(b) + if scrapeErr == nil { scrapeErr = appErr } @@ -776,6 +822,9 @@ loop: t = *tp } + if sl.cache.getDropped(yoloString(met)) { + continue + } ref, ok := sl.cache.getRef(yoloString(met)) if ok { lset := sl.cache.lsets[ref].lset @@ -787,9 +836,6 @@ loop: } case storage.ErrNotFound: ok = false - case errSeriesDropped: - err = nil - continue case storage.ErrOutOfOrderSample: numOutOfOrder++ level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met)) @@ -828,6 +874,16 @@ loop: } else { mets = p.Metric(&lset) hash = lset.Hash() + + // Hash label set as it is seen local to the target. Then add target labels + // and relabeling and store the final label set. + lset = sl.sampleMutator(lset) + + // The label set may be set to nil to indicate dropping. + if lset == nil { + sl.cache.addDropped(mets) + continue + } } var ref uint64 @@ -835,9 +891,6 @@ loop: // TODO(fabxc): also add a dropped-cache? switch err { case nil: - case errSeriesDropped: - err = nil - continue case storage.ErrOutOfOrderSample: err = nil numOutOfOrder++ @@ -892,8 +945,6 @@ loop: // Series no longer exposed, mark it stale. _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) switch err { - case errSeriesDropped: - err = nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if a target // goes away and comes back again with a new scrape loop. @@ -928,8 +979,7 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a if err == nil { health = 1 } - - app := sl.reportAppender() + app := sl.appender() if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { app.Rollback() @@ -952,7 +1002,8 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a func (sl *scrapeLoop) reportStale(start time.Time) error { ts := timestamp.FromTime(start) - app := sl.reportAppender() + app := sl.appender() + stale := math.Float64frombits(value.StaleNaN) if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { @@ -999,10 +1050,14 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v lset := labels.Labels{ labels.Label{Name: labels.MetricName, Value: s}, } + + hash := lset.Hash() + lset = sl.reportSampleMutator(lset) + ref, err := app.Add(lset, t, v) switch err { case nil: - sl.cache.addRef(s2, ref, lset, lset.Hash()) + sl.cache.addRef(s2, ref, lset, hash) return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: return nil diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 058e2fcb3b..12e366ed33 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -28,7 +28,6 @@ import ( "testing" "time" - "github.com/go-kit/kit/log" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "golang.org/x/net/context" @@ -145,7 +144,7 @@ func TestScrapePoolReload(t *testing.T) { } // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. - newLoop := func(ctx context.Context, s scraper, app, reportApp func() storage.Appender, _ log.Logger) loop { + newLoop := func(_ *Target, s scraper) loop { l := &testLoop{} l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { if interval != 3*time.Second { @@ -167,7 +166,7 @@ func TestScrapePoolReload(t *testing.T) { targets: map[uint64]*Target{}, loops: map[uint64]loop{}, newLoop: newLoop, - logger: log.NewNopLogger(), + logger: nil, } // Reloading a scrape pool with a new scrape configuration must stop all scrape @@ -228,92 +227,48 @@ func TestScrapePoolReload(t *testing.T) { } } -func TestScrapePoolReportAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) +func TestScrapePoolAppender(t *testing.T) { + cfg := &config.ScrapeConfig{} app := &nopAppendable{} - sp := newScrapePool(context.Background(), cfg, app, nil) - cfg.HonorLabels = false - wrapped := sp.reportAppender(target) + wrapped := sp.appender() - rl, ok := wrapped.(ruleLabelsAppender) + tl, ok := wrapped.(*timeLimitAppender) if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + t.Fatalf("Expected timeLimitAppender but got %T", wrapped) } - if _, ok := rl.Appender.(nopAppender); !ok { - t.Fatalf("Expected base appender but got %T", rl.Appender) + if _, ok := tl.Appender.(nopAppender); !ok { + t.Fatalf("Expected base appender but got %T", tl.Appender) } - cfg.HonorLabels = true - wrapped = sp.reportAppender(target) - - hl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if _, ok := rl.Appender.(nopAppender); !ok { - t.Fatalf("Expected base appender but got %T", hl.Appender) - } -} - -func TestScrapePoolSampleAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - app := &nopAppendable{} - - sp := newScrapePool(context.Background(), cfg, app, nil) - sp.maxAheadTime = 0 - - cfg.HonorLabels = false - wrapped := sp.sampleAppender(target) - - rl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - re, ok := rl.Appender.(relabelAppender) - if !ok { - t.Fatalf("Expected relabelAppender but got %T", rl.Appender) - } - if _, ok := re.Appender.(nopAppender); !ok { - t.Fatalf("Expected base appender but got %T", re.Appender) - } - - cfg.HonorLabels = true cfg.SampleLimit = 100 - wrapped = sp.sampleAppender(target) - hl, ok := wrapped.(honorLabelsAppender) + wrapped = sp.appender() + + sl, ok := wrapped.(*limitAppender) if !ok { - t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) + t.Fatalf("Expected limitAppender but got %T", wrapped) } - re, ok = hl.Appender.(relabelAppender) + tl, ok = sl.Appender.(*timeLimitAppender) if !ok { - t.Fatalf("Expected relabelAppender but got %T", hl.Appender) + t.Fatalf("Expected limitAppender but got %T", sl.Appender) } - lm, ok := re.Appender.(*limitAppender) - if !ok { - t.Fatalf("Expected limitAppender but got %T", lm.Appender) - } - if _, ok := lm.Appender.(nopAppender); !ok { - t.Fatalf("Expected base appender but got %T", re.Appender) + if _, ok := tl.Appender.(nopAppender); !ok { + t.Fatalf("Expected base appender but got %T", tl.Appender) } } func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil) + + sl := newScrapeLoop(context.Background(), + scraper, + nil, nil, + nopMutator, + nopMutator, + nil, + ) // The scrape pool synchronizes on stopping scrape loops. However, new scrape // loops are started asynchronously. Thus it's possible, that a loop is stopped @@ -358,22 +313,28 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) { } } -func TestScrapeLoopStop(t *testing.T) { - appender := &collectResultAppender{} - reportAppender := &collectResultAppender{} - var ( - signal = make(chan struct{}) +func nopMutator(l labels.Labels) labels.Labels { return l } - scraper = &testScraper{} - app = func() storage.Appender { return appender } - reportApp = func() storage.Appender { return reportAppender } - numScrapes = 0 +func TestScrapeLoopStop(t *testing.T) { + var ( + signal = make(chan struct{}) + appender = &collectResultAppender{} + scraper = &testScraper{} + app = func() storage.Appender { return appender } ) defer close(signal) - sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil) + sl := newScrapeLoop(context.Background(), + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) + + // Terminate loop after 2 scrapes. + numScrapes := 0 - // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { numScrapes++ if numScrapes == 2 { @@ -394,25 +355,25 @@ func TestScrapeLoopStop(t *testing.T) { t.Fatalf("Scrape wasn't stopped.") } - if len(appender.result) < 2 { - t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 2, len(appender.result)) + // We expected 1 actual sample for each scrape plus 4 for report samples. + // At least 2 scrapes were made, plus the final stale markers. + if len(appender.result) < 5*3 || len(appender.result)%5 != 0 { + t.Fatalf("Expected at least 3 scrapes with 4 samples each, got %d samples", len(appender.result)) } - if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) { - t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)-1].v)) + // All samples in a scrape must have the same timestmap. + var ts int64 + for i, s := range appender.result { + if i%5 == 0 { + ts = s.t + } else if s.t != ts { + t.Fatalf("Unexpected multiple timestamps within single scrape") + } } - - if len(reportAppender.result) < 8 { - t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 8, len(reportAppender.result)) - } - if len(reportAppender.result)%4 != 0 { - t.Fatalf("Appended samples not as expected. Wanted: samples mod 4 == 0 Got: %d samples", len(reportAppender.result)) - } - if !value.IsStaleNaN(reportAppender.result[len(reportAppender.result)-1].v) { - t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(reportAppender.result[len(reportAppender.result)].v)) - } - - if reportAppender.result[len(reportAppender.result)-1].t != appender.result[len(appender.result)-1].t { - t.Fatalf("Expected last append and report sample to have same timestamp. Append: stale NaN Report: %x", appender.result[len(appender.result)-1].t, reportAppender.result[len(reportAppender.result)-1].t) + // All samples from the last scrape must be stale markers. + for _, s := range appender.result[len(appender.result)-5:] { + if !value.IsStaleNaN(s.v) { + t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.v)) + } } } @@ -421,14 +382,19 @@ func TestScrapeLoopRun(t *testing.T) { signal = make(chan struct{}) errc = make(chan error) - scraper = &testScraper{} - app = func() storage.Appender { return &nopAppender{} } - reportApp = func() storage.Appender { return &nopAppender{} } + scraper = &testScraper{} + app = func() storage.Appender { return &nopAppender{} } ) defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) // The loop must terminate during the initial offset if the context // is canceled. @@ -466,7 +432,13 @@ func TestScrapeLoopRun(t *testing.T) { } ctx, cancel = context.WithCancel(context.Background()) - sl = newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl = newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) go func() { sl.run(time.Second, 100*time.Millisecond, errc) @@ -501,19 +473,23 @@ func TestScrapeLoopRun(t *testing.T) { func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { appender := &collectResultAppender{} var ( - signal = make(chan struct{}) - - scraper = &testScraper{} - app = func() storage.Appender { return appender } - reportApp = func() storage.Appender { return &nopAppender{} } - numScrapes = 0 + signal = make(chan struct{}) + scraper = &testScraper{} + app = func() storage.Appender { return appender } ) defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) - + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) // Succeed once, several failures, then stop. + numScrapes := 0 + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { numScrapes++ @@ -523,7 +499,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { } else if numScrapes == 5 { cancel() } - return fmt.Errorf("Scrape failed.") + return fmt.Errorf("scrape failed") } go func() { @@ -537,31 +513,37 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { t.Fatalf("Scrape wasn't stopped.") } - if len(appender.result) != 2 { - t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 2, len(appender.result)) + // 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for + // each scrape successful or not. + if len(appender.result) != 22 { + t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result)) } if appender.result[0].v != 42.0 { - t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0], 42) + t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0) } - if !value.IsStaleNaN(appender.result[1].v) { - t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[1].v)) + if !value.IsStaleNaN(appender.result[5].v) { + t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v)) } } func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { appender := &collectResultAppender{} var ( - signal = make(chan struct{}) - + signal = make(chan struct{}) scraper = &testScraper{} app = func() storage.Appender { return appender } - reportApp = func() storage.Appender { return &nopAppender{} } numScrapes = 0 ) defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -576,7 +558,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { } else if numScrapes == 3 { cancel() } - return fmt.Errorf("Scrape failed.") + return fmt.Errorf("scrape failed") } go func() { @@ -590,25 +572,29 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { t.Fatalf("Scrape wasn't stopped.") } - if len(appender.result) != 2 { - t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 2, len(appender.result)) + // 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for + // each scrape successful or not. + if len(appender.result) != 14 { + t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result)) } if appender.result[0].v != 42.0 { - t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0], 42) + t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0].v, 42.0) } - if !value.IsStaleNaN(appender.result[1].v) { - t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[1].v)) + if !value.IsStaleNaN(appender.result[5].v) { + t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v)) } } func TestScrapeLoopAppend(t *testing.T) { app := &collectResultAppender{} - sl := newScrapeLoop(context.Background(), nil, + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + nopMutator, + nopMutator, func() storage.Appender { return app }, - func() storage.Appender { return nopAppender{} }, - nil, ) + now := time.Now() _, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now) if err != nil { @@ -641,10 +627,12 @@ func TestScrapeLoopAppend(t *testing.T) { func TestScrapeLoopAppendStaleness(t *testing.T) { app := &collectResultAppender{} - sl := newScrapeLoop(context.Background(), nil, + + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + nopMutator, + nopMutator, func() storage.Appender { return app }, - func() storage.Appender { return nopAppender{} }, - nil, ) now := time.Now() @@ -684,10 +672,11 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { app := &collectResultAppender{} - sl := newScrapeLoop(context.Background(), nil, + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + nopMutator, + nopMutator, func() storage.Appender { return app }, - func() storage.Appender { return nopAppender{} }, - nil, ) now := time.Now() @@ -710,128 +699,23 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { if !reflect.DeepEqual(want, app.result) { t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result) } - -} - -func TestScrapeLoopRunAppliesScrapeLimit(t *testing.T) { - - cases := []struct { - appender func() storage.Appender - up float64 - scrapeSamplesScraped float64 - scrapeSamplesScrapedPostMetricRelabelling float64 - }{ - { - appender: func() storage.Appender { return nopAppender{} }, - up: 1, - scrapeSamplesScraped: 3, - scrapeSamplesScrapedPostMetricRelabelling: 3, - }, - { - appender: func() storage.Appender { - return &limitAppender{Appender: nopAppender{}, limit: 3} - }, - up: 1, - scrapeSamplesScraped: 3, - scrapeSamplesScrapedPostMetricRelabelling: 3, - }, - { - appender: func() storage.Appender { - return &limitAppender{Appender: nopAppender{}, limit: 2} - }, - up: 0, - scrapeSamplesScraped: 3, - scrapeSamplesScrapedPostMetricRelabelling: 3, - }, - { - appender: func() storage.Appender { - return &relabelAppender{ - Appender: &limitAppender{Appender: nopAppender{}, limit: 2}, - relabelings: []*config.RelabelConfig{ - &config.RelabelConfig{ - SourceLabels: model.LabelNames{"__name__"}, - Regex: config.MustNewRegexp("a"), - Action: config.RelabelDrop, - }, - }, - } - }, - up: 1, - scrapeSamplesScraped: 3, - scrapeSamplesScrapedPostMetricRelabelling: 2, - }, - } - - for i, c := range cases { - reportAppender := &collectResultAppender{} - var ( - signal = make(chan struct{}) - scraper = &testScraper{} - numScrapes = 0 - reportApp = func() storage.Appender { - // Get result of the 2nd scrape. - if numScrapes == 2 { - return reportAppender - } else { - return nopAppender{} - } - } - ) - defer close(signal) - - ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil) - - // Setup a series to be stale, then 3 samples, then stop. - scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { - numScrapes += 1 - if numScrapes == 1 { - w.Write([]byte("stale 0\n")) - return nil - } else if numScrapes == 2 { - w.Write([]byte("a 0\nb 0\nc 0 \n")) - return nil - } else if numScrapes == 3 { - cancel() - } - return fmt.Errorf("Scrape failed.") - } - - go func() { - sl.run(10*time.Millisecond, time.Hour, nil) - signal <- struct{}{} - }() - - select { - case <-signal: - case <-time.After(5 * time.Second): - t.Fatalf("Scrape wasn't stopped.") - } - - if len(reportAppender.result) != 4 { - t.Fatalf("Case %d appended report samples not as expected. Wanted: %d samples Got: %d", i, 4, len(reportAppender.result)) - } - if reportAppender.result[0].v != c.up { - t.Fatalf("Case %d appended up sample not as expected. Wanted: %f Got: %+v", i, c.up, reportAppender.result[0]) - } - if reportAppender.result[2].v != c.scrapeSamplesScraped { - t.Fatalf("Case %d appended scrape_samples_scraped sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScraped, reportAppender.result[2]) - } - if reportAppender.result[3].v != c.scrapeSamplesScrapedPostMetricRelabelling { - t.Fatalf("Case %d appended scrape_samples_scraped_post_metric_relabeling sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScrapedPostMetricRelabelling, reportAppender.result[3]) - } - } } func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { var ( - scraper = &testScraper{} - reportAppender = &collectResultAppender{} - reportApp = func() storage.Appender { return reportAppender } + scraper = &testScraper{} + appender = &collectResultAppender{} + app = func() storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { cancel() @@ -840,31 +724,37 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { sl.run(10*time.Millisecond, time.Hour, nil) - if reportAppender.result[0].v != 0 { - t.Fatalf("bad 'up' value; want 0, got %v", reportAppender.result[0].v) + if appender.result[0].v != 0 { + t.Fatalf("bad 'up' value; want 0, got %v", appender.result[0].v) } } func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { var ( - scraper = &testScraper{} - reportAppender = &collectResultAppender{} - reportApp = func() storage.Appender { return reportAppender } + scraper = &testScraper{} + appender = &collectResultAppender{} + app = func() storage.Appender { return appender } ) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + ) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { cancel() - w.Write([]byte("a{l=\"\xff\"} 0\n")) + w.Write([]byte("a{l=\"\xff\"} 1\n")) return nil } sl.run(10*time.Millisecond, time.Hour, nil) - if reportAppender.result[0].v != 0 { - t.Fatalf("bad 'up' value; want 0, got %v", reportAppender.result[0].v) + if appender.result[0].v != 0 { + t.Fatalf("bad 'up' value; want 0, got %v", appender.result[0].v) } } @@ -891,10 +781,13 @@ func (app *errorAppender) AddFast(lset labels.Labels, ref uint64, t int64, v flo func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) { app := &errorAppender{} - sl := newScrapeLoop(context.Background(), nil, - func() storage.Appender { return app }, - func() storage.Appender { return nopAppender{} }, + + sl := newScrapeLoop(context.Background(), nil, + nil, nil, + nopMutator, + nopMutator, + func() storage.Appender { return app }, ) now := time.Unix(1, 0) @@ -916,15 +809,17 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { app := &collectResultAppender{} - sl := newScrapeLoop(context.Background(), nil, + sl := newScrapeLoop(context.Background(), + nil, + nil, nil, + nopMutator, + nopMutator, func() storage.Appender { return &timeLimitAppender{ Appender: app, maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)), } }, - func() storage.Appender { return nopAppender{} }, - nil, ) now := time.Now().Add(20 * time.Minute) diff --git a/retrieval/target.go b/retrieval/target.go index b305b5abc7..862911ebaa 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -253,63 +253,6 @@ func (app *timeLimitAppender) AddFast(lset labels.Labels, ref uint64, t int64, v return nil } -// Merges the ingested sample's metric with the label set. On a collision the -// value of the ingested label is stored in a label prefixed with 'exported_'. -type ruleLabelsAppender struct { - storage.Appender - labels labels.Labels -} - -func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - lb := labels.NewBuilder(lset) - - for _, l := range app.labels { - lv := lset.Get(l.Name) - if lv != "" { - lb.Set(model.ExportedLabelPrefix+l.Name, lv) - } - lb.Set(l.Name, l.Value) - } - - return app.Appender.Add(lb.Labels(), t, v) -} - -type honorLabelsAppender struct { - storage.Appender - labels labels.Labels -} - -// Merges the sample's metric with the given labels if the label is not -// already present in the metric. -// This also considers labels explicitly set to the empty string. -func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - lb := labels.NewBuilder(lset) - - for _, l := range app.labels { - if lv := lset.Get(l.Name); lv == "" { - lb.Set(l.Name, l.Value) - } - } - return app.Appender.Add(lb.Labels(), t, v) -} - -// Applies a set of relabel configurations to the sample's metric -// before actually appending it. -type relabelAppender struct { - storage.Appender - relabelings []*config.RelabelConfig -} - -var errSeriesDropped = errors.New("series dropped") - -func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - lset = relabel.Process(lset, app.relabelings...) - if lset == nil { - return 0, errSeriesDropped - } - return app.Appender.Add(lset, t, v) -} - // populateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns a nil label set if the target is dropped during relabeling. diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 7d8174f0d3..a3bc7d17a3 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -17,7 +17,6 @@ import ( "math/rand" "os" "path/filepath" - "runtime" "sort" "time" @@ -365,10 +364,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) - - // We might have done quite a few allocs. Enforce a GC so they do not accumulate - // with subsequent compactions or head GCs. - runtime.GC() }(time.Now()) dir := filepath.Join(dest, meta.ULID.String()) @@ -570,14 +565,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return errors.Wrap(err, "write postings") } } - // Write a postings list containing all series. - all := make([]uint64, i) - for i := range all { - all[i] = uint64(i) - } - if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { - return errors.Wrap(err, "write 'all' postings") - } return nil } diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index ad034d8b02..c9745cfc6f 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "os" "path/filepath" + "runtime" "sort" "strconv" "sync" @@ -349,9 +350,12 @@ func (db *DB) compact() (changes bool, err error) { } changes = true + runtime.GC() + if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } + runtime.GC() } // Check for compactions of multiple blocks. @@ -380,10 +384,12 @@ func (db *DB) compact() (changes bool, err error) { return changes, errors.Wrap(err, "delete compacted block") } } + runtime.GC() if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } + runtime.GC() } return changes, nil diff --git a/vendor/github.com/prometheus/tsdb/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/encoding_helpers.go index 9aa4ba4097..17c3ff0811 100644 --- a/vendor/github.com/prometheus/tsdb/encoding_helpers.go +++ b/vendor/github.com/prometheus/tsdb/encoding_helpers.go @@ -77,6 +77,22 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) } +// uvarintTempStr decodes like uvarintStr but the returned string is +// not safe to use if the underyling buffer changes. +func (d *decbuf) uvarintTempStr() string { + l := d.uvarint64() + if d.e != nil { + return "" + } + if len(d.b) < int(l) { + d.e = errInvalidSize + return "" + } + s := yoloString(d.b[:l]) + d.b = d.b[l:] + return s +} + func (d *decbuf) uvarintStr() string { l := d.uvarint64() if d.e != nil { diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index a74552bcaf..ea7b63f8a6 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -15,7 +15,6 @@ package tsdb import ( "math" - "runtime" "sort" "sync" "sync/atomic" @@ -402,10 +401,13 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if s == nil { return errors.Wrap(ErrNotFound, "unknown series") } - if err := s.appendable(t, v); err != nil { + s.Lock() + err := s.appendable(t, v) + s.Unlock() + + if err != nil { return err } - if t < a.mint { return ErrOutOfBounds } @@ -435,7 +437,10 @@ func (a *headAppender) Commit() error { total := len(a.samples) for _, s := range a.samples { + s.series.Lock() ok, chunkCreated := s.series.append(s.T, s.V) + s.series.Unlock() + if !ok { total-- } @@ -509,8 +514,6 @@ Outer: // gc removes data before the minimum timestmap from the head. func (h *Head) gc() { - defer runtime.GC() - // Only data strictly lower than this timestamp must be deleted. mint := h.MinTime() @@ -672,9 +675,9 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { s := h.head.series.getByID(sid) - s.mtx.RLock() + s.Lock() c := s.chunk(int(cid)) - s.mtx.RUnlock() + s.Unlock() // Do not expose chunks that are outside of the specified range. if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { @@ -694,9 +697,10 @@ type safeChunk struct { } func (c *safeChunk) Iterator() chunks.Iterator { - c.s.mtx.RLock() - defer c.s.mtx.RUnlock() - return c.s.iterator(c.cid) + c.s.Lock() + it := c.s.iterator(c.cid) + c.s.Unlock() + return it } // func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } @@ -803,8 +807,8 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM } *lbls = append((*lbls)[:0], s.lset...) - s.mtx.RLock() - defer s.mtx.RUnlock() + s.Lock() + defer s.Unlock() *chks = (*chks)[:0] @@ -956,11 +960,11 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { for hash, all := range s.hashes[i] { for _, series := range all { - series.mtx.Lock() + series.Lock() rmChunks += series.truncateChunksBefore(mint) if len(series.chunks) > 0 { - series.mtx.Unlock() + series.Unlock() continue } @@ -983,7 +987,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { s.locks[j].Unlock() } - series.mtx.Unlock() + series.Unlock() } } @@ -1040,8 +1044,10 @@ type sample struct { v float64 } +// memSeries is the in-memory representation of a series. None of its methods +// are goroutine safe and its the callers responsibility to lock it. type memSeries struct { - mtx sync.RWMutex + sync.Mutex ref uint64 lset labels.Labels @@ -1143,8 +1149,6 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { const samplesPerChunk = 120 - s.mtx.Lock() - c := s.head() if c == nil { @@ -1152,7 +1156,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { chunkCreated = true } if c.maxTime >= t { - s.mtx.Unlock() return false, chunkCreated } if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { @@ -1175,8 +1178,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} - s.mtx.Unlock() - return true, chunkCreated } diff --git a/vendor/github.com/prometheus/tsdb/index.go b/vendor/github.com/prometheus/tsdb/index.go index ddc2c4f52a..fd9b251623 100644 --- a/vendor/github.com/prometheus/tsdb/index.go +++ b/vendor/github.com/prometheus/tsdb/index.go @@ -292,10 +292,22 @@ func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkM w.buf2.putUvarint(len(chunks)) - for _, c := range chunks { + if len(chunks) > 0 { + c := chunks[0] w.buf2.putVarint64(c.MinTime) - w.buf2.putVarint64(c.MaxTime) + w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime)) w.buf2.putUvarint64(c.Ref) + t0 := c.MaxTime + ref0 := int64(c.Ref) + + for _, c := range chunks[1:] { + w.buf2.putUvarint64(uint64(c.MinTime - t0)) + w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime)) + t0 = c.MaxTime + + w.buf2.putVarint64(int64(c.Ref) - ref0) + ref0 = int64(c.Ref) + } } w.buf1.reset() @@ -335,10 +347,6 @@ func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { for _, s := range symbols { w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len()) - - // NOTE: len(s) gives the number of runes, not the number of bytes. - // Therefore the read-back length for strings with unicode characters will - // be off when not using putUvarintStr. w.buf2.putUvarintStr(s) } @@ -636,7 +644,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.uvarintStr()) + keys = append(keys, d2.uvarintTempStr()) } res[strings.Join(keys, sep)] = uint32(d2.uvarint()) @@ -673,7 +681,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { func (r *indexReader) lookupSymbol(o uint32) (string, error) { d := r.decbufAt(int(o)) - s := d.uvarintStr() + s := d.uvarintTempStr() if d.err() != nil { return "", errors.Wrapf(d.err(), "read symbol at %d", o) } @@ -688,7 +696,7 @@ func (r *indexReader) Symbols() (map[string]struct{}, error) { sym := make(map[string]struct{}, count) for ; count > 0; count-- { - s := d2.uvarintStr() + s := d2.uvarintTempStr() sym[s] = struct{}{} } @@ -775,17 +783,34 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) // Read the chunks meta data. k = int(d2.uvarint()) - for i := 0; i < k; i++ { - mint := d2.varint64() - maxt := d2.varint64() - off := d2.uvarint64() + if k == 0 { + return nil + } + + t0 := d2.varint64() + maxt := int64(d2.uvarint64()) + t0 + ref0 := int64(d2.uvarint64()) + + *chks = append(*chks, ChunkMeta{ + Ref: uint64(ref0), + MinTime: t0, + MaxTime: maxt, + }) + t0 = maxt + + for i := 1; i < k; i++ { + mint := int64(d2.uvarint64()) + t0 + maxt := int64(d2.uvarint64()) + mint + + ref0 += d2.varint64() + t0 = maxt if d2.err() != nil { return errors.Wrapf(d2.err(), "read meta for chunk %d", i) } *chks = append(*chks, ChunkMeta{ - Ref: off, + Ref: uint64(ref0), MinTime: mint, MaxTime: maxt, }) diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 1dadc8f2c3..9af9a18536 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -52,13 +52,16 @@ const ( WALEntryDeletes WALEntryType = 4 ) -// SamplesCB is the callback after reading samples. +// SamplesCB is the callback after reading samples. The passed slice +// is only valid until the call returns. type SamplesCB func([]RefSample) error -// SeriesCB is the callback after reading series. +// SeriesCB is the callback after reading series. The passed slice +// is only valid until the call returns. type SeriesCB func([]RefSeries) error -// DeletesCB is the callback after reading deletes. +// DeletesCB is the callback after reading deletes. The passed slice +// is only valid until the call returns. type DeletesCB func([]Stone) error // WAL is a write ahead log that can log new series labels and samples. @@ -395,6 +398,10 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { buf := w.getBuffer() flag := w.encodeSeries(buf, series) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntrySeries, flag, buf.get()) w.putBuffer(buf) @@ -410,10 +417,6 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { tf.minSeries = s.Ref } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -422,6 +425,10 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { buf := w.getBuffer() flag := w.encodeSamples(buf, samples) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntrySamples, flag, buf.get()) w.putBuffer(buf) @@ -436,10 +443,6 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { tf.maxTime = s.T } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -448,6 +451,10 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { buf := w.getBuffer() flag := w.encodeDeletes(buf, stones) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntryDeletes, flag, buf.get()) w.putBuffer(buf) @@ -464,10 +471,6 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { } } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -522,19 +525,26 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { func (w *SegmentWAL) cut() error { // Sync current head to disk and close. if hf := w.head(); hf != nil { - if err := w.sync(); err != nil { - return err - } - off, err := hf.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } - if err := hf.Truncate(off); err != nil { - return err - } - if err := hf.Close(); err != nil { + if err := w.flush(); err != nil { return err } + // Finish last segment asynchronously to not block the WAL moving along + // in the new segment. + go func() { + off, err := hf.Seek(0, os.SEEK_CUR) + if err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Truncate(off); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Sync(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Close(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + }() } p, _, err := nextSequenceFile(w.dirFile.Name()) @@ -546,9 +556,11 @@ func (w *SegmentWAL) cut() error { return err } - if err = w.dirFile.Sync(); err != nil { - return err - } + go func() { + if err = w.dirFile.Sync(); err != nil { + w.logger.Log("msg", "sync WAL directory", "err", err) + } + }() w.files = append(w.files, newSegmentFile(f)) @@ -594,6 +606,9 @@ func (w *SegmentWAL) sync() error { if err := w.flush(); err != nil { return err } + if w.head() == nil { + return nil + } return fileutil.Fdatasync(w.head().File) } @@ -655,8 +670,6 @@ const ( ) func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { - w.mtx.Lock() - defer w.mtx.Unlock() // Cut to the next segment if the entry exceeds the file size unless it would also // exceed the size of a new segment. // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize. @@ -769,6 +782,10 @@ type walReader struct { curBuf []byte lastOffset int64 // offset after last successfully read entry + seriesBuf []RefSeries + sampleBuf []RefSample + tombstoneBuf []Stone + err error } @@ -996,7 +1013,8 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { } func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { - series := []RefSeries{} + r.seriesBuf = r.seriesBuf[:0] + dec := decbuf{b: b} for len(dec.b) > 0 && dec.err() == nil { @@ -1010,7 +1028,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { } sort.Sort(lset) - series = append(series, RefSeries{ + r.seriesBuf = append(r.seriesBuf, RefSeries{ Ref: ref, Labels: lset, }) @@ -1019,16 +1037,16 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { return nil, dec.err() } if len(dec.b) > 0 { - return series, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return series, nil + return r.seriesBuf, nil } func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { if len(b) == 0 { return nil, nil } - samples := []RefSample{} + r.sampleBuf = r.sampleBuf[:0] dec := decbuf{b: b} var ( @@ -1041,7 +1059,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { dtime := dec.varint64() val := dec.be64() - samples = append(samples, RefSample{ + r.sampleBuf = append(r.sampleBuf, RefSample{ Ref: uint64(int64(baseRef) + dref), T: baseTime + dtime, V: math.Float64frombits(val), @@ -1049,20 +1067,20 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { } if dec.err() != nil { - return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) + return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf)) } if len(dec.b) > 0 { - return samples, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return samples, nil + return r.sampleBuf, nil } func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { dec := &decbuf{b: b} - var stones []Stone + r.tombstoneBuf = r.tombstoneBuf[:0] for dec.len() > 0 && dec.err() == nil { - stones = append(stones, Stone{ + r.tombstoneBuf = append(r.tombstoneBuf, Stone{ ref: dec.be64(), intervals: Intervals{ {Mint: dec.varint64(), Maxt: dec.varint64()}, @@ -1073,7 +1091,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { return nil, dec.err() } if len(dec.b) > 0 { - return stones, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return stones, nil + return r.tombstoneBuf, nil } diff --git a/vendor/vendor.json b/vendor/vendor.json index 8ce00acf86..907fb7e060 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -871,22 +871,22 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "AoNkGFKIyLNi4a/QcO8p5D7xIXs=", + "checksumSHA1": "mDKxPAubVLTWW/Gar13m7YDHSek=", "path": "github.com/prometheus/tsdb", - "revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02", - "revisionTime": "2017-09-07T11:04:02Z" + "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", + "revisionTime": "2017-09-11T08:41:33Z" }, { "checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02", - "revisionTime": "2017-09-07T11:04:02Z" + "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", + "revisionTime": "2017-09-11T08:41:33Z" }, { "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "path": "github.com/prometheus/tsdb/labels", - "revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02", - "revisionTime": "2017-09-07T11:04:02Z" + "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", + "revisionTime": "2017-09-11T08:41:33Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", diff --git a/web/web.go b/web/web.go index 3bd7cb7f11..39bf80a5c3 100644 --- a/web/web.go +++ b/web/web.go @@ -284,19 +284,23 @@ func serveDebug(w http.ResponseWriter, req *http.Request) { ctx := req.Context() subpath := route.Param(ctx, "subpath") - // Based off paths from init() in golang.org/src/net/http/pprof/pprof.go - if subpath == "/pprof/" { - pprof.Index(w, req) - } else if subpath == "/pprof/cmdline" { - pprof.Cmdline(w, req) - } else if subpath == "/pprof/profile" { - pprof.Profile(w, req) - } else if subpath == "/pprof/symbol" { - pprof.Symbol(w, req) - } else if subpath == "/pprof/trace" { - pprof.Trace(w, req) - } else { + if !strings.HasPrefix(subpath, "/pprof/") { http.NotFound(w, req) + return + } + subpath = strings.TrimPrefix(subpath, "/pprof/") + + switch subpath { + case "cmdline": + pprof.Cmdline(w, req) + case "profile": + pprof.Profile(w, req) + case "symbol": + pprof.Symbol(w, req) + case "trace": + pprof.Trace(w, req) + default: + pprof.Index(w, req) } }