From 8ceb8f2ae8ad57766949b6569242db523571d637 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Tue, 10 Apr 2018 00:35:14 +0800 Subject: [PATCH] Refactor Kubernetes Discovery Part 2: Refactoring - Do initial listing and syncing to scrape manager, then register event handlers may lost events happening in listing and syncing (if it lasted a long time). We should register event handlers at the very begining, before processing just wait until informers synced (sync in informer will list all objects and call OnUpdate event handler). - Use a queue then we don't block event callbacks and an object will be processed only once if added multiple times before it being processed. - Fix bug in `serviceUpdate` in endpoints.go, we should build endpoints when `exists && err == nil`. Add `^TestEndpointsDiscoveryWithService` tests to test this feature. Testing: - Use `k8s.io/client-go` testing framework and fake implementations which are more robust and reliable for testing. - `Test\w+DiscoveryBeforeRun` are used to test objects created before discoverer runs - `Test\w+DiscoveryAdd\w+` are used to test adding objects - `Test\w+DiscoveryDelete\w+` are used to test deleting objects - `Test\w+DiscoveryUpdate\w+` are used to test updating objects - `TestEndpointsDiscoveryWithService\w+` are used to test endpoints events triggered by services - `cache.DeletedFinalStateUnknown` related stuffs are removed, because we don't care deleted objects in store, we only need its name to send a specical `targetgroup.Group` to scrape manager Signed-off-by: Yecheng Fu --- discovery/kubernetes/endpoints.go | 153 ++++++----- discovery/kubernetes/endpoints_test.go | 342 +++++++++++++++--------- discovery/kubernetes/ingress.go | 121 +++++---- discovery/kubernetes/ingress_test.go | 49 ++-- discovery/kubernetes/kubernetes.go | 176 +++++++----- discovery/kubernetes/kubernetes_test.go | 186 +++++++++++++ discovery/kubernetes/node.go | 127 +++++---- discovery/kubernetes/node_test.go | 280 ++++--------------- discovery/kubernetes/pod.go | 121 +++++---- discovery/kubernetes/pod_test.go | 206 ++++---------- discovery/kubernetes/service.go | 120 +++++---- discovery/kubernetes/service_test.go | 150 ++--------- 12 files changed, 1045 insertions(+), 986 deletions(-) create mode 100644 discovery/kubernetes/kubernetes_test.go diff --git a/discovery/kubernetes/endpoints.go b/discovery/kubernetes/endpoints.go index b700399ffc..a318455413 100644 --- a/discovery/kubernetes/endpoints.go +++ b/discovery/kubernetes/endpoints.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" apiv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" ) // Endpoints discovers new endpoint targets. @@ -38,6 +39,8 @@ type Endpoints struct { podStore cache.Store endpointsStore cache.Store serviceStore cache.Store + + queue *workqueue.Type } // NewEndpoints returns a new endpoints discovery. @@ -45,7 +48,7 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints { if l == nil { l = log.NewNopLogger() } - ep := &Endpoints{ + e := &Endpoints{ logger: l, endpointsInf: eps, endpointsStore: eps.GetStore(), @@ -53,67 +56,21 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints { serviceStore: svc.GetStore(), podInf: pod, podStore: pod.GetStore(), - } - - return ep -} - -// Run implements the Discoverer interface. -func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - // Send full initial set of endpoint targets. - var initial []*targetgroup.Group - - for _, o := range e.endpointsStore.List() { - tg := e.buildEndpoints(o.(*apiv1.Endpoints)) - initial = append(initial, tg) - } - select { - case <-ctx.Done(): - return - case ch <- initial: - } - // Send target groups for pod updates. - send := func(tg *targetgroup.Group) { - if tg == nil { - return - } - level.Debug(e.logger).Log("msg", "endpoints update", "tg", fmt.Sprintf("%#v", tg)) - select { - case <-ctx.Done(): - case ch <- []*targetgroup.Group{tg}: - } + queue: workqueue.NewNamed("endpoints"), } e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { eventCount.WithLabelValues("endpoints", "add").Inc() - - eps, err := convertToEndpoints(o) - if err != nil { - level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err) - return - } - send(e.buildEndpoints(eps)) + e.enqueue(o) }, UpdateFunc: func(_, o interface{}) { eventCount.WithLabelValues("endpoints", "update").Inc() - - eps, err := convertToEndpoints(o) - if err != nil { - level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err) - return - } - send(e.buildEndpoints(eps)) + e.enqueue(o) }, DeleteFunc: func(o interface{}) { eventCount.WithLabelValues("endpoints", "delete").Inc() - - eps, err := convertToEndpoints(o) - if err != nil { - level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err) - return - } - send(&targetgroup.Group{Source: endpointsSource(eps)}) + e.enqueue(o) }, }) @@ -128,9 +85,10 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { ep.Namespace = svc.Namespace ep.Name = svc.Name obj, exists, err := e.endpointsStore.Get(ep) - if exists && err != nil { - send(e.buildEndpoints(obj.(*apiv1.Endpoints))) + if exists && err == nil { + e.enqueue(obj.(*apiv1.Endpoints)) } + if err != nil { level.Error(e.logger).Log("msg", "retrieving endpoints failed", "err", err) } @@ -152,31 +110,102 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { }, }) + return e +} + +func (e *Endpoints) enqueue(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + + e.queue.Add(key) +} + +// Run implements the Discoverer interface. +func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + defer e.queue.ShutDown() + + cacheSyncs := []cache.InformerSynced{ + e.endpointsInf.HasSynced, + e.serviceInf.HasSynced, + e.podInf.HasSynced, + } + if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) { + level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache") + return + } + + // Send target groups for pod updates. + send := func(tg *targetgroup.Group) { + if tg == nil { + return + } + level.Debug(e.logger).Log("msg", "endpoints update", "tg", fmt.Sprintf("%#v", tg)) + select { + case <-ctx.Done(): + case ch <- []*targetgroup.Group{tg}: + } + } + + go func() { + for e.process(send) { + } + }() + // Block until the target provider is explicitly canceled. <-ctx.Done() } +func (e *Endpoints) process(send func(tg *targetgroup.Group)) bool { + keyObj, quit := e.queue.Get() + if quit { + return false + } + defer e.queue.Done(keyObj) + key := keyObj.(string) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + level.Error(e.logger).Log("msg", "spliting key failed", "key", key) + return true + } + + o, exists, err := e.endpointsStore.GetByKey(key) + if err != nil { + level.Error(e.logger).Log("msg", "getting object from store failed", "key", key) + return true + } + if !exists { + send(&targetgroup.Group{Source: endpointsSourceFromNamespaceAndName(namespace, name)}) + return true + } + eps, err := convertToEndpoints(o) + if err != nil { + level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err) + return true + } + send(e.buildEndpoints(eps)) + return true +} + func convertToEndpoints(o interface{}) (*apiv1.Endpoints, error) { endpoints, ok := o.(*apiv1.Endpoints) if ok { return endpoints, nil } - deletedState, ok := o.(cache.DeletedFinalStateUnknown) - if !ok { - return nil, fmt.Errorf("Received unexpected object: %v", o) - } - endpoints, ok = deletedState.Obj.(*apiv1.Endpoints) - if !ok { - return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj) - } - return endpoints, nil + return nil, fmt.Errorf("Received unexpected object: %v", o) } func endpointsSource(ep *apiv1.Endpoints) string { return "endpoints/" + ep.ObjectMeta.Namespace + "/" + ep.ObjectMeta.Name } +func endpointsSourceFromNamespaceAndName(namespace, name string) string { + return "endpoints/" + namespace + "/" + name +} + const ( endpointsNameLabel = metaLabelPrefix + "endpoints_name" endpointReadyLabel = metaLabelPrefix + "endpoint_ready" diff --git a/discovery/kubernetes/endpoints_test.go b/discovery/kubernetes/endpoints_test.go index f7fe99f9d6..bd8c67bcfa 100644 --- a/discovery/kubernetes/endpoints_test.go +++ b/discovery/kubernetes/endpoints_test.go @@ -21,24 +21,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/cache" ) -func endpointsStoreKeyFunc(obj interface{}) (string, error) { - return obj.(*v1.Endpoints).ObjectMeta.Name, nil -} - -func newFakeEndpointsInformer() *fakeInformer { - return newFakeInformer(endpointsStoreKeyFunc) -} - -func makeTestEndpointsDiscovery() (*Endpoints, *fakeInformer, *fakeInformer, *fakeInformer) { - svc := newFakeServiceInformer() - eps := newFakeEndpointsInformer() - pod := newFakePodInformer() - return NewEndpoints(nil, svc, eps, pod), svc, eps, pod -} - func makeEndpoints() *v1.Endpoints { return &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -83,14 +67,19 @@ func makeEndpoints() *v1.Endpoints { } } -func TestEndpointsDiscoveryInitial(t *testing.T) { - n, _, eps, _ := makeTestEndpointsDiscovery() - eps.GetStore().Add(makeEndpoints()) +func TestEndpointsDiscoveryBeforeRun(t *testing.T) { + n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}) k8sDiscoveryTest{ discovery: n, - expectedInitial: []*targetgroup.Group{ - { + beforeRun: func() { + obj := makeEndpoints() + c.CoreV1().Endpoints(obj.Namespace).Create(obj) + w.Endpoints().Add(obj) + }, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "endpoints/default/testendpoints": { Targets: []model.LabelSet{ { "__address__": "1.2.3.4:9000", @@ -122,8 +111,7 @@ func TestEndpointsDiscoveryInitial(t *testing.T) { } func TestEndpointsDiscoveryAdd(t *testing.T) { - n, _, eps, pods := makeTestEndpointsDiscovery() - pods.GetStore().Add(&v1.Pod{ + obj := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "testpod", Namespace: "default", @@ -158,45 +146,45 @@ func TestEndpointsDiscoveryAdd(t *testing.T) { HostIP: "2.3.4.5", PodIP: "1.2.3.4", }, - }) + } + n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, obj) k8sDiscoveryTest{ discovery: n, afterStart: func() { - go func() { - eps.Add( - &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testendpoints", - Namespace: "default", - }, - Subsets: []v1.EndpointSubset{ + obj := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + }, + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ { - Addresses: []v1.EndpointAddress{ - { - IP: "4.3.2.1", - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Name: "testpod", - Namespace: "default", - }, - }, - }, - Ports: []v1.EndpointPort{ - { - Name: "testport", - Port: 9000, - Protocol: v1.ProtocolTCP, - }, + IP: "4.3.2.1", + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "testpod", + Namespace: "default", }, }, }, + Ports: []v1.EndpointPort{ + { + Name: "testport", + Port: 9000, + Protocol: v1.ProtocolTCP, + }, + }, }, - ) - }() + }, + } + c.CoreV1().Endpoints(obj.Namespace).Create(obj) + w.Endpoints().Add(obj) }, - expectedRes: []*targetgroup.Group{ - { + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "endpoints/default/testendpoints": { Targets: []model.LabelSet{ { "__address__": "4.3.2.1:9000", @@ -239,29 +227,18 @@ func TestEndpointsDiscoveryAdd(t *testing.T) { } func TestEndpointsDiscoveryDelete(t *testing.T) { - n, _, eps, _ := makeTestEndpointsDiscovery() - eps.GetStore().Add(makeEndpoints()) + n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints()) k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { eps.Delete(makeEndpoints()) }() }, - expectedRes: []*targetgroup.Group{ - { - Source: "endpoints/default/testendpoints", - }, + discovery: n, + afterStart: func() { + obj := makeEndpoints() + c.CoreV1().Endpoints(obj.Namespace).Delete(obj.Name, &metav1.DeleteOptions{}) + w.Endpoints().Delete(obj) }, - }.Run(t) -} - -func TestEndpointsDiscoveryDeleteUnknownCacheState(t *testing.T) { - n, _, eps, _ := makeTestEndpointsDiscovery() - eps.GetStore().Add(makeEndpoints()) - - k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { eps.Delete(cache.DeletedFinalStateUnknown{Obj: makeEndpoints()}) }() }, - expectedRes: []*targetgroup.Group{ - { + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpoints/default/testendpoints": { Source: "endpoints/default/testendpoints", }, }, @@ -269,53 +246,53 @@ func TestEndpointsDiscoveryDeleteUnknownCacheState(t *testing.T) { } func TestEndpointsDiscoveryUpdate(t *testing.T) { - n, _, eps, _ := makeTestEndpointsDiscovery() - eps.GetStore().Add(makeEndpoints()) + n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints()) k8sDiscoveryTest{ discovery: n, afterStart: func() { - go func() { - eps.Update(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testendpoints", - Namespace: "default", - }, - Subsets: []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{ - { - IP: "1.2.3.4", - }, - }, - Ports: []v1.EndpointPort{ - { - Name: "testport", - Port: 9000, - Protocol: v1.ProtocolTCP, - }, + obj := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + }, + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ + { + IP: "1.2.3.4", }, }, - { - Addresses: []v1.EndpointAddress{ - { - IP: "2.3.4.5", - }, - }, - Ports: []v1.EndpointPort{ - { - Name: "testport", - Port: 9001, - Protocol: v1.ProtocolTCP, - }, + Ports: []v1.EndpointPort{ + { + Name: "testport", + Port: 9000, + Protocol: v1.ProtocolTCP, }, }, }, - }) - }() + { + Addresses: []v1.EndpointAddress{ + { + IP: "2.3.4.5", + }, + }, + Ports: []v1.EndpointPort{ + { + Name: "testport", + Port: 9001, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + } + c.CoreV1().Endpoints(obj.Namespace).Update(obj) + w.Endpoints().Modify(obj) }, - expectedRes: []*targetgroup.Group{ - { + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpoints/default/testendpoints": { Targets: []model.LabelSet{ { "__address__": "1.2.3.4:9000", @@ -341,24 +318,24 @@ func TestEndpointsDiscoveryUpdate(t *testing.T) { } func TestEndpointsDiscoveryEmptySubsets(t *testing.T) { - n, _, eps, _ := makeTestEndpointsDiscovery() - eps.GetStore().Add(makeEndpoints()) + n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints()) k8sDiscoveryTest{ discovery: n, afterStart: func() { - go func() { - eps.Update(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testendpoints", - Namespace: "default", - }, - Subsets: []v1.EndpointSubset{}, - }) - }() + obj := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + }, + Subsets: []v1.EndpointSubset{}, + } + c.CoreV1().Endpoints(obj.Namespace).Update(obj) + w.Endpoints().Modify(obj) }, - expectedRes: []*targetgroup.Group{ - { + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpoints/default/testendpoints": { Labels: model.LabelSet{ "__meta_kubernetes_namespace": "default", "__meta_kubernetes_endpoints_name": "testendpoints", @@ -368,3 +345,124 @@ func TestEndpointsDiscoveryEmptySubsets(t *testing.T) { }, }.Run(t) } + +func TestEndpointsDiscoveryWithService(t *testing.T) { + n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints()) + + k8sDiscoveryTest{ + discovery: n, + beforeRun: func() { + obj := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + "app": "test", + }, + }, + } + c.CoreV1().Services(obj.Namespace).Create(obj) + w.Services().Add(obj) + }, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "endpoints/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "false", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_endpoints_name": "testendpoints", + "__meta_kubernetes_service_label_app": "test", + "__meta_kubernetes_service_name": "testendpoints", + }, + Source: "endpoints/default/testendpoints", + }, + }, + }.Run(t) +} + +func TestEndpointsDiscoveryWithServiceUpdate(t *testing.T) { + n, c, w := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints()) + + k8sDiscoveryTest{ + discovery: n, + beforeRun: func() { + obj := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + "app": "test", + }, + }, + } + c.CoreV1().Services(obj.Namespace).Create(obj) + w.Services().Add(obj) + }, + afterStart: func() { + obj := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + "app": "svc", + "component": "testing", + }, + }, + } + c.CoreV1().Services(obj.Namespace).Update(obj) + w.Services().Modify(obj) + }, + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpoints/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "false", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_endpoints_name": "testendpoints", + "__meta_kubernetes_service_label_app": "svc", + "__meta_kubernetes_service_name": "testendpoints", + "__meta_kubernetes_service_label_component": "testing", + }, + Source: "endpoints/default/testendpoints", + }, + }, + }.Run(t) +} diff --git a/discovery/kubernetes/ingress.go b/discovery/kubernetes/ingress.go index 07b5eb143a..f15fd4eb85 100644 --- a/discovery/kubernetes/ingress.go +++ b/discovery/kubernetes/ingress.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/util/strutil" "k8s.io/client-go/pkg/apis/extensions/v1beta1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" ) // Ingress implements discovery of Kubernetes ingresss. @@ -31,25 +32,45 @@ type Ingress struct { logger log.Logger informer cache.SharedInformer store cache.Store + queue *workqueue.Type } // NewIngress returns a new ingress discovery. func NewIngress(l log.Logger, inf cache.SharedInformer) *Ingress { - return &Ingress{logger: l, informer: inf, store: inf.GetStore()} + s := &Ingress{logger: l, informer: inf, store: inf.GetStore(), queue: workqueue.NewNamed("ingress")} + s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + eventCount.WithLabelValues("ingress", "add").Inc() + s.enqueue(o) + }, + DeleteFunc: func(o interface{}) { + eventCount.WithLabelValues("ingress", "delete").Inc() + s.enqueue(o) + }, + UpdateFunc: func(_, o interface{}) { + eventCount.WithLabelValues("ingress", "update").Inc() + s.enqueue(o) + }, + }) + return s +} + +func (e *Ingress) enqueue(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + + e.queue.Add(key) } // Run implements the Discoverer interface. func (s *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - // Send full initial set of pod targets. - var initial []*targetgroup.Group - for _, o := range s.store.List() { - tg := s.buildIngress(o.(*v1beta1.Ingress)) - initial = append(initial, tg) - } - select { - case <-ctx.Done(): + defer s.queue.ShutDown() + + if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) { + level.Error(s.logger).Log("msg", "ingress informer unable to sync cache") return - case ch <- initial: } // Send target groups for ingress updates. @@ -59,64 +80,64 @@ func (s *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { case ch <- []*targetgroup.Group{tg}: } } - s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(o interface{}) { - eventCount.WithLabelValues("ingress", "add").Inc() - ingress, err := convertToIngress(o) - if err != nil { - level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error()) - return - } - send(s.buildIngress(ingress)) - }, - DeleteFunc: func(o interface{}) { - eventCount.WithLabelValues("ingress", "delete").Inc() - - ingress, err := convertToIngress(o) - if err != nil { - level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error()) - return - } - send(&targetgroup.Group{Source: ingressSource(ingress)}) - }, - UpdateFunc: func(_, o interface{}) { - eventCount.WithLabelValues("ingress", "update").Inc() - - ingress, err := convertToIngress(o) - if err != nil { - level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err.Error()) - return - } - send(s.buildIngress(ingress)) - }, - }) + go func() { + for s.process(send) { + } + }() // Block until the target provider is explicitly canceled. <-ctx.Done() } +func (s *Ingress) process(send func(tg *targetgroup.Group)) bool { + + keyObj, quit := s.queue.Get() + if quit { + return false + } + defer s.queue.Done(keyObj) + key := keyObj.(string) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return true + } + + o, exists, err := s.store.GetByKey(key) + if err != nil { + return true + } + if !exists { + send(&targetgroup.Group{Source: ingressSourceFromNamespaceAndName(namespace, name)}) + return true + } + eps, err := convertToIngress(o) + if err != nil { + level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err) + return true + } + send(s.buildIngress(eps)) + return true +} + func convertToIngress(o interface{}) (*v1beta1.Ingress, error) { ingress, ok := o.(*v1beta1.Ingress) if ok { return ingress, nil } - deletedState, ok := o.(cache.DeletedFinalStateUnknown) - if !ok { - return nil, fmt.Errorf("Received unexpected object: %v", o) - } - ingress, ok = deletedState.Obj.(*v1beta1.Ingress) - if !ok { - return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Ingress object: %v", deletedState.Obj) - } - return ingress, nil + return nil, fmt.Errorf("Received unexpected object: %v", o) } func ingressSource(s *v1beta1.Ingress) string { return "ingress/" + s.Namespace + "/" + s.Name } +func ingressSourceFromNamespaceAndName(namespace, name string) string { + return "ingress/" + namespace + "/" + name +} + const ( ingressNameLabel = metaLabelPrefix + "ingress_name" ingressLabelPrefix = metaLabelPrefix + "ingress_label_" diff --git a/discovery/kubernetes/ingress_test.go b/discovery/kubernetes/ingress_test.go index 1b250c8d34..c6cd1c9b8c 100644 --- a/discovery/kubernetes/ingress_test.go +++ b/discovery/kubernetes/ingress_test.go @@ -22,19 +22,6 @@ import ( "k8s.io/client-go/pkg/apis/extensions/v1beta1" ) -func ingressStoreKeyFunc(obj interface{}) (string, error) { - return obj.(*v1beta1.Ingress).ObjectMeta.Name, nil -} - -func newFakeIngressInformer() *fakeInformer { - return newFakeInformer(ingressStoreKeyFunc) -} - -func makeTestIngressDiscovery() (*Ingress, *fakeInformer) { - i := newFakeIngressInformer() - return NewIngress(nil, i), i -} - func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress { return &v1beta1.Ingress{ ObjectMeta: metav1.ObjectMeta{ @@ -77,13 +64,13 @@ func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress { } } -func expectedTargetGroups(tls bool) []*targetgroup.Group { +func expectedTargetGroups(tls bool) map[string]*targetgroup.Group { scheme := "http" if tls { scheme = "https" } - return []*targetgroup.Group{ - { + return map[string]*targetgroup.Group{ + "ingress/default/testingress": { Targets: []model.LabelSet{ { "__meta_kubernetes_ingress_scheme": lv(scheme), @@ -115,22 +102,32 @@ func expectedTargetGroups(tls bool) []*targetgroup.Group { } } -func TestIngressDiscoveryInitial(t *testing.T) { - n, i := makeTestIngressDiscovery() - i.GetStore().Add(makeIngress(nil)) +func TestIngressDiscoveryAdd(t *testing.T) { + n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"default"}}) k8sDiscoveryTest{ - discovery: n, - expectedInitial: expectedTargetGroups(false), + discovery: n, + afterStart: func() { + obj := makeIngress(nil) + c.ExtensionsV1beta1().Ingresses("default").Create(obj) + w.Ingresses().Add(obj) + }, + expectedMaxItems: 1, + expectedRes: expectedTargetGroups(false), }.Run(t) } -func TestIngressDiscoveryInitialTLS(t *testing.T) { - n, i := makeTestIngressDiscovery() - i.GetStore().Add(makeIngress([]v1beta1.IngressTLS{{}})) +func TestIngressDiscoveryAddTLS(t *testing.T) { + n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"default"}}) k8sDiscoveryTest{ - discovery: n, - expectedInitial: expectedTargetGroups(true), + discovery: n, + afterStart: func() { + obj := makeIngress([]v1beta1.IngressTLS{{}}) + c.ExtensionsV1beta1().Ingresses("default").Create(obj) + w.Ingresses().Add(obj) + }, + expectedMaxItems: 1, + expectedRes: expectedTargetGroups(true), }.Run(t) } diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 6ccc40f748..ac03eff539 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -28,6 +28,9 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" yaml_util "github.com/prometheus/prometheus/util/yaml" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api" apiv1 "k8s.io/client-go/pkg/api/v1" @@ -152,13 +155,21 @@ func init() { } } -// Discovery implements the Discoverer interface for discovering +// Copy of discovery.Discoverer to avoid import cycle. +// This is only for internal use. +type discoverer interface { + Run(ctx context.Context, up chan<- []*targetgroup.Group) +} + +// Discovery implements the discoverer interface for discovering // targets from Kubernetes. type Discovery struct { + sync.RWMutex client kubernetes.Interface role Role logger log.Logger namespaceDiscovery *NamespaceDiscovery + discoverers []discoverer } func (d *Discovery) getNamespaces() []string { @@ -239,129 +250,156 @@ func New(l log.Logger, conf *SDConfig) (*Discovery, error) { logger: l, role: conf.Role, namespaceDiscovery: &conf.NamespaceDiscovery, + discoverers: make([]discoverer, 0), }, nil } const resyncPeriod = 10 * time.Minute -// Run implements the Discoverer interface. -func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - rclient := d.client.Core().RESTClient() - reclient := d.client.Extensions().RESTClient() +type hasSynced interface { + // hasSynced returns true if all informers' store has synced. + // This is only used in testing to determine when the cache stores have synced. + hasSynced() bool +} +var _ hasSynced = &Discovery{} + +func (d *Discovery) hasSynced() bool { + d.RLock() + defer d.RUnlock() + for _, discoverer := range d.discoverers { + if hasSynceddiscoverer, ok := discoverer.(hasSynced); ok { + if !hasSynceddiscoverer.hasSynced() { + return false + } + } + } + return true +} + +// Run implements the discoverer interface. +func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + d.Lock() namespaces := d.getNamespaces() switch d.role { case "endpoints": - var wg sync.WaitGroup - for _, namespace := range namespaces { - elw := cache.NewListWatchFromClient(rclient, "endpoints", namespace, nil) - slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil) - plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil) + elw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return d.client.CoreV1().Endpoints(namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return d.client.CoreV1().Endpoints(namespace).Watch(options) + }, + } + slw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return d.client.CoreV1().Services(namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return d.client.CoreV1().Services(namespace).Watch(options) + }, + } + plw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return d.client.CoreV1().Pods(namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return d.client.CoreV1().Pods(namespace).Watch(options) + }, + } eps := NewEndpoints( log.With(d.logger, "role", "endpoint"), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), ) + d.discoverers = append(d.discoverers, eps) go eps.endpointsInf.Run(ctx.Done()) go eps.serviceInf.Run(ctx.Done()) go eps.podInf.Run(ctx.Done()) - - for !eps.serviceInf.HasSynced() { - time.Sleep(100 * time.Millisecond) - } - for !eps.endpointsInf.HasSynced() { - time.Sleep(100 * time.Millisecond) - } - for !eps.podInf.HasSynced() { - time.Sleep(100 * time.Millisecond) - } - wg.Add(1) - go func() { - defer wg.Done() - eps.Run(ctx, ch) - }() } - wg.Wait() case "pod": - var wg sync.WaitGroup for _, namespace := range namespaces { - plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil) + plw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return d.client.CoreV1().Pods(namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return d.client.CoreV1().Pods(namespace).Watch(options) + }, + } pod := NewPod( log.With(d.logger, "role", "pod"), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), ) + d.discoverers = append(d.discoverers, pod) go pod.informer.Run(ctx.Done()) - - for !pod.informer.HasSynced() { - time.Sleep(100 * time.Millisecond) - } - wg.Add(1) - go func() { - defer wg.Done() - pod.Run(ctx, ch) - }() } - wg.Wait() case "service": - var wg sync.WaitGroup for _, namespace := range namespaces { - slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil) + slw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return d.client.CoreV1().Services(namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return d.client.CoreV1().Services(namespace).Watch(options) + }, + } svc := NewService( log.With(d.logger, "role", "service"), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), ) + d.discoverers = append(d.discoverers, svc) go svc.informer.Run(ctx.Done()) - - for !svc.informer.HasSynced() { - time.Sleep(100 * time.Millisecond) - } - wg.Add(1) - go func() { - defer wg.Done() - svc.Run(ctx, ch) - }() } - wg.Wait() case "ingress": - var wg sync.WaitGroup for _, namespace := range namespaces { - ilw := cache.NewListWatchFromClient(reclient, "ingresses", namespace, nil) + ilw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return d.client.ExtensionsV1beta1().Ingresses(namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return d.client.ExtensionsV1beta1().Ingresses(namespace).Watch(options) + }, + } ingress := NewIngress( log.With(d.logger, "role", "ingress"), cache.NewSharedInformer(ilw, &extensionsv1beta1.Ingress{}, resyncPeriod), ) + d.discoverers = append(d.discoverers, ingress) go ingress.informer.Run(ctx.Done()) - - for !ingress.informer.HasSynced() { - time.Sleep(100 * time.Millisecond) - } - wg.Add(1) - go func() { - defer wg.Done() - ingress.Run(ctx, ch) - }() } - wg.Wait() case "node": - nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil) + nlw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return d.client.CoreV1().Nodes().List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return d.client.CoreV1().Nodes().Watch(options) + }, + } node := NewNode( log.With(d.logger, "role", "node"), cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod), ) + d.discoverers = append(d.discoverers, node) go node.informer.Run(ctx.Done()) - - for !node.informer.HasSynced() { - time.Sleep(100 * time.Millisecond) - } - node.Run(ctx, ch) - default: level.Error(d.logger).Log("msg", "unknown Kubernetes discovery kind", "role", d.role) } + var wg sync.WaitGroup + for _, dd := range d.discoverers { + wg.Add(1) + go func(d discoverer) { + defer wg.Done() + d.Run(ctx, ch) + }(dd) + } + + d.Unlock() <-ctx.Done() } diff --git a/discovery/kubernetes/kubernetes_test.go b/discovery/kubernetes/kubernetes_test.go new file mode 100644 index 0000000000..1b713c4169 --- /dev/null +++ b/discovery/kubernetes/kubernetes_test.go @@ -0,0 +1,186 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "encoding/json" + "sync" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" +) + +type watcherFactory struct { + sync.RWMutex + watchers map[schema.GroupVersionResource]*watch.FakeWatcher +} + +func (wf *watcherFactory) watchFor(gvr schema.GroupVersionResource) *watch.FakeWatcher { + wf.Lock() + defer wf.Unlock() + + var fakewatch *watch.FakeWatcher + fakewatch, ok := wf.watchers[gvr] + if !ok { + fakewatch = watch.NewFakeWithChanSize(128, true) + wf.watchers[gvr] = fakewatch + } + return fakewatch +} + +func (wf *watcherFactory) Nodes() *watch.FakeWatcher { + return wf.watchFor(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"}) +} + +func (wf *watcherFactory) Ingresses() *watch.FakeWatcher { + return wf.watchFor(schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "ingresses"}) +} + +func (wf *watcherFactory) Endpoints() *watch.FakeWatcher { + return wf.watchFor(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "endpoints"}) +} + +func (wf *watcherFactory) Services() *watch.FakeWatcher { + return wf.watchFor(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}) +} + +func (wf *watcherFactory) Pods() *watch.FakeWatcher { + return wf.watchFor(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}) +} + +// makeDiscovery creates a kubernetes.Discovery instance for testing. +func makeDiscovery(role Role, nsDiscovery NamespaceDiscovery, objects ...runtime.Object) (*Discovery, kubernetes.Interface, *watcherFactory) { + clientset := fake.NewSimpleClientset(objects...) + // Current client-go we are using does not support push event on + // Add/Update/Create, so we need to emit event manually. + // See https://github.com/kubernetes/kubernetes/issues/54075. + // TODO update client-go thChanSizeand related packages to kubernetes-1.10.0+ + wf := &watcherFactory{ + watchers: make(map[schema.GroupVersionResource]*watch.FakeWatcher), + } + clientset.PrependWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) { + gvr := action.GetResource() + return true, wf.watchFor(gvr), nil + }) + return &Discovery{ + client: clientset, + logger: log.NewNopLogger(), + role: role, + namespaceDiscovery: &nsDiscovery, + }, clientset, wf +} + +type k8sDiscoveryTest struct { + // discovery is instance of discovery.Discoverer + discovery discoverer + // beforeRun runs before discoverer run + beforeRun func() + // afterStart runs after discoverer has synced + afterStart func() + // expectedMaxItems is expected max items we may get from channel + expectedMaxItems int + // expectedRes is expected final result + expectedRes map[string]*targetgroup.Group +} + +func (d k8sDiscoveryTest) Run(t *testing.T) { + ch := make(chan []*targetgroup.Group) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + if d.beforeRun != nil { + d.beforeRun() + } + + // Run discoverer and start a goroutine to read results. + go d.discovery.Run(ctx, ch) + resChan := make(chan map[string]*targetgroup.Group) + go readResultWithoutTimeout(t, ch, d.expectedMaxItems, time.Second, resChan) + + if dd, ok := d.discovery.(hasSynced); ok { + if !cache.WaitForCacheSync(ctx.Done(), dd.hasSynced) { + t.Errorf("discoverer failed to sync: %v", dd) + return + } + } + + if d.afterStart != nil { + d.afterStart() + } + + if d.expectedRes != nil { + res := <-resChan + requireTargetGroups(t, d.expectedRes, res) + } +} + +// readResultWithoutTimeout reads all targegroups from channel with timeout. +// It merges targegroups by source and sends the result to result channel. +func readResultWithoutTimeout(t *testing.T, ch <-chan []*targetgroup.Group, max int, timeout time.Duration, resChan chan<- map[string]*targetgroup.Group) { + allTgs := make([][]*targetgroup.Group, 0) + +Loop: + for { + select { + case tgs := <-ch: + allTgs = append(allTgs, tgs) + if len(allTgs) == max { + // Reached max target groups we may get, break fast. + break Loop + } + case <-time.After(timeout): + // Because we use queue, an object that is created then + // deleted or updated may be processed only once. + // So possibliy we may skip events, timed out here. + t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(allTgs), max) + break Loop + } + } + + // Merge by source and sent it to channel. + res := make(map[string]*targetgroup.Group) + for _, tgs := range allTgs { + for _, tg := range tgs { + if tg == nil { + continue + } + res[tg.Source] = tg + } + } + resChan <- res +} + +func requireTargetGroups(t *testing.T, expected, res map[string]*targetgroup.Group) { + b1, err := json.Marshal(expected) + if err != nil { + panic(err) + } + b2, err := json.Marshal(res) + if err != nil { + panic(err) + } + + require.JSONEq(t, string(b1), string(b2)) +} diff --git a/discovery/kubernetes/node.go b/discovery/kubernetes/node.go index f6d16aefca..f49466d2cd 100644 --- a/discovery/kubernetes/node.go +++ b/discovery/kubernetes/node.go @@ -27,6 +27,7 @@ import ( "k8s.io/client-go/pkg/api" apiv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" ) // Node discovers Kubernetes nodes. @@ -34,28 +35,55 @@ type Node struct { logger log.Logger informer cache.SharedInformer store cache.Store + queue *workqueue.Type } +var _ discoverer = &Node{} +var _ hasSynced = &Node{} + // NewNode returns a new node discovery. func NewNode(l log.Logger, inf cache.SharedInformer) *Node { if l == nil { l = log.NewNopLogger() } - return &Node{logger: l, informer: inf, store: inf.GetStore()} + n := &Node{logger: l, informer: inf, store: inf.GetStore(), queue: workqueue.NewNamed("node")} + n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + eventCount.WithLabelValues("node", "add").Inc() + n.enqueue(o) + }, + DeleteFunc: func(o interface{}) { + eventCount.WithLabelValues("node", "delete").Inc() + n.enqueue(o) + }, + UpdateFunc: func(_, o interface{}) { + eventCount.WithLabelValues("node", "update").Inc() + n.enqueue(o) + }, + }) + return n +} + +func (e *Node) enqueue(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + + e.queue.Add(key) +} + +func (n *Node) hasSynced() bool { + return n.informer.HasSynced() } // Run implements the Discoverer interface. func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - // Send full initial set of pod targets. - var initial []*targetgroup.Group - for _, o := range n.store.List() { - tg := n.buildNode(o.(*apiv1.Node)) - initial = append(initial, tg) - } - select { - case <-ctx.Done(): + defer n.queue.ShutDown() + + if !cache.WaitForCacheSync(ctx.Done(), n.informer.HasSynced) { + level.Error(n.logger).Log("msg", "node informer unable to sync cache") return - case ch <- initial: } // Send target groups for service updates. @@ -68,64 +96,63 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { case ch <- []*targetgroup.Group{tg}: } } - n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(o interface{}) { - eventCount.WithLabelValues("node", "add").Inc() - node, err := convertToNode(o) - if err != nil { - level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err) - return - } - send(n.buildNode(node)) - }, - DeleteFunc: func(o interface{}) { - eventCount.WithLabelValues("node", "delete").Inc() - - node, err := convertToNode(o) - if err != nil { - level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err) - return - } - send(&targetgroup.Group{Source: nodeSource(node)}) - }, - UpdateFunc: func(_, o interface{}) { - eventCount.WithLabelValues("node", "update").Inc() - - node, err := convertToNode(o) - if err != nil { - level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err) - return - } - send(n.buildNode(node)) - }, - }) + go func() { + for n.process(send) { + } + }() // Block until the target provider is explicitly canceled. <-ctx.Done() } +func (n *Node) process(send func(tg *targetgroup.Group)) bool { + keyObj, quit := n.queue.Get() + if quit { + return false + } + defer n.queue.Done(keyObj) + key := keyObj.(string) + + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return true + } + + o, exists, err := n.store.GetByKey(key) + if err != nil { + return true + } + if !exists { + send(&targetgroup.Group{Source: nodeSourceFromName(name)}) + return true + } + node, err := convertToNode(o) + if err != nil { + level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err) + return true + } + send(n.buildNode(node)) + return true +} + func convertToNode(o interface{}) (*apiv1.Node, error) { node, ok := o.(*apiv1.Node) if ok { return node, nil } - deletedState, ok := o.(cache.DeletedFinalStateUnknown) - if !ok { - return nil, fmt.Errorf("Received unexpected object: %v", o) - } - node, ok = deletedState.Obj.(*apiv1.Node) - if !ok { - return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) - } - return node, nil + return nil, fmt.Errorf("Received unexpected object: %v", o) } func nodeSource(n *apiv1.Node) string { return "node/" + n.Name } +func nodeSourceFromName(name string) string { + return "node/" + name +} + const ( nodeNameLabel = metaLabelPrefix + "node_name" nodeLabelPrefix = metaLabelPrefix + "node_label_" diff --git a/discovery/kubernetes/node_test.go b/discovery/kubernetes/node_test.go index 3266841b7e..8a021757d7 100644 --- a/discovery/kubernetes/node_test.go +++ b/discovery/kubernetes/node_test.go @@ -14,152 +14,15 @@ package kubernetes import ( - "context" - "encoding/json" "fmt" - "sync" "testing" - "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery/targetgroup" - "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/cache" ) -type fakeInformer struct { - store cache.Store - handlers []cache.ResourceEventHandler - - blockDeltas sync.Mutex -} - -func newFakeInformer(f func(obj interface{}) (string, error)) *fakeInformer { - i := &fakeInformer{ - store: cache.NewStore(f), - } - // We want to make sure that all delta events (Add/Update/Delete) are blocked - // until our handlers to test have been added. - i.blockDeltas.Lock() - return i -} - -func (i *fakeInformer) AddEventHandler(h cache.ResourceEventHandler) { - i.handlers = append(i.handlers, h) - // Only now that there is a registered handler, we are able to handle deltas. - i.blockDeltas.Unlock() -} - -func (i *fakeInformer) AddEventHandlerWithResyncPeriod(h cache.ResourceEventHandler, _ time.Duration) { - i.AddEventHandler(h) -} - -func (i *fakeInformer) GetStore() cache.Store { - return i.store -} - -func (i *fakeInformer) GetController() cache.Controller { - return nil -} - -func (i *fakeInformer) Run(stopCh <-chan struct{}) { -} - -func (i *fakeInformer) HasSynced() bool { - return true -} - -func (i *fakeInformer) LastSyncResourceVersion() string { - return "0" -} - -func (i *fakeInformer) Add(obj interface{}) { - i.blockDeltas.Lock() - defer i.blockDeltas.Unlock() - - for _, h := range i.handlers { - h.OnAdd(obj) - } -} - -func (i *fakeInformer) Delete(obj interface{}) { - i.blockDeltas.Lock() - defer i.blockDeltas.Unlock() - - for _, h := range i.handlers { - h.OnDelete(obj) - } -} - -func (i *fakeInformer) Update(obj interface{}) { - i.blockDeltas.Lock() - defer i.blockDeltas.Unlock() - - for _, h := range i.handlers { - h.OnUpdate(nil, obj) - } -} - -type discoverer interface { - Run(ctx context.Context, up chan<- []*targetgroup.Group) -} - -type k8sDiscoveryTest struct { - discovery discoverer - afterStart func() - expectedInitial []*targetgroup.Group - expectedRes []*targetgroup.Group -} - -func (d k8sDiscoveryTest) Run(t *testing.T) { - ch := make(chan []*targetgroup.Group) - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) - defer cancel() - go func() { - d.discovery.Run(ctx, ch) - }() - - initialRes := <-ch - if d.expectedInitial != nil { - requireTargetGroups(t, d.expectedInitial, initialRes) - } - - if d.afterStart != nil && d.expectedRes != nil { - d.afterStart() - res := <-ch - - requireTargetGroups(t, d.expectedRes, res) - } -} - -func requireTargetGroups(t *testing.T, expected, res []*targetgroup.Group) { - b1, err := json.Marshal(expected) - if err != nil { - panic(err) - } - b2, err := json.Marshal(res) - if err != nil { - panic(err) - } - - require.JSONEq(t, string(b1), string(b2)) -} - -func nodeStoreKeyFunc(obj interface{}) (string, error) { - return obj.(*v1.Node).ObjectMeta.Name, nil -} - -func newFakeNodeInformer() *fakeInformer { - return newFakeInformer(nodeStoreKeyFunc) -} - -func makeTestNodeDiscovery() (*Node, *fakeInformer) { - i := newFakeNodeInformer() - return NewNode(nil, i), i -} - func makeNode(name, address string, labels map[string]string, annotations map[string]string) *v1.Node { return &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -187,19 +50,24 @@ func makeEnumeratedNode(i int) *v1.Node { return makeNode(fmt.Sprintf("test%d", i), "1.2.3.4", map[string]string{}, map[string]string{}) } -func TestNodeDiscoveryInitial(t *testing.T) { - n, i := makeTestNodeDiscovery() - i.GetStore().Add(makeNode( - "test", - "1.2.3.4", - map[string]string{"testlabel": "testvalue"}, - map[string]string{"testannotation": "testannotationvalue"}, - )) +func TestNodeDiscoveryBeforeStart(t *testing.T) { + n, c, w := makeDiscovery(RoleNode, NamespaceDiscovery{}) k8sDiscoveryTest{ discovery: n, - expectedInitial: []*targetgroup.Group{ - { + beforeRun: func() { + obj := makeNode( + "test", + "1.2.3.4", + map[string]string{"testlabel": "testvalue"}, + map[string]string{"testannotation": "testannotationvalue"}, + ) + c.CoreV1().Nodes().Create(obj) + w.Nodes().Add(obj) + }, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "node/test": { Targets: []model.LabelSet{ { "__address__": "1.2.3.4:10250", @@ -219,13 +87,18 @@ func TestNodeDiscoveryInitial(t *testing.T) { } func TestNodeDiscoveryAdd(t *testing.T) { - n, i := makeTestNodeDiscovery() + n, c, w := makeDiscovery(RoleNode, NamespaceDiscovery{}) k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Add(makeEnumeratedNode(1)) }() }, - expectedRes: []*targetgroup.Group{ - { + discovery: n, + afterStart: func() { + obj := makeEnumeratedNode(1) + c.CoreV1().Nodes().Create(obj) + w.Nodes().Add(obj) + }, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "node/test1": { Targets: []model.LabelSet{ { "__address__": "1.2.3.4:10250", @@ -243,59 +116,18 @@ func TestNodeDiscoveryAdd(t *testing.T) { } func TestNodeDiscoveryDelete(t *testing.T) { - n, i := makeTestNodeDiscovery() - i.GetStore().Add(makeEnumeratedNode(0)) + obj := makeEnumeratedNode(0) + n, c, w := makeDiscovery(RoleNode, NamespaceDiscovery{}, obj) k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Delete(makeEnumeratedNode(0)) }() }, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__address__": "1.2.3.4:10250", - "instance": "test0", - "__meta_kubernetes_node_address_InternalIP": "1.2.3.4", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_node_name": "test0", - }, - Source: "node/test0", - }, + discovery: n, + afterStart: func() { + c.CoreV1().Nodes().Delete(obj.Name, &metav1.DeleteOptions{}) + w.Nodes().Delete(obj) }, - expectedRes: []*targetgroup.Group{ - { - Source: "node/test0", - }, - }, - }.Run(t) -} - -func TestNodeDiscoveryDeleteUnknownCacheState(t *testing.T) { - n, i := makeTestNodeDiscovery() - i.GetStore().Add(makeEnumeratedNode(0)) - - k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeEnumeratedNode(0)}) }() }, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__address__": "1.2.3.4:10250", - "instance": "test0", - "__meta_kubernetes_node_address_InternalIP": "1.2.3.4", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_node_name": "test0", - }, - Source: "node/test0", - }, - }, - expectedRes: []*targetgroup.Group{ - { + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "node/test0": { Source: "node/test0", }, }, @@ -303,40 +135,26 @@ func TestNodeDiscoveryDeleteUnknownCacheState(t *testing.T) { } func TestNodeDiscoveryUpdate(t *testing.T) { - n, i := makeTestNodeDiscovery() - i.GetStore().Add(makeEnumeratedNode(0)) + n, c, w := makeDiscovery(RoleNode, NamespaceDiscovery{}) k8sDiscoveryTest{ discovery: n, afterStart: func() { - go func() { - i.Update( - makeNode( - "test0", - "1.2.3.4", - map[string]string{"Unschedulable": "true"}, - map[string]string{}, - ), - ) - }() + obj1 := makeEnumeratedNode(0) + c.CoreV1().Nodes().Create(obj1) + w.Nodes().Add(obj1) + obj2 := makeNode( + "test0", + "1.2.3.4", + map[string]string{"Unschedulable": "true"}, + map[string]string{}, + ) + c.CoreV1().Nodes().Update(obj2) + w.Nodes().Modify(obj2) }, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__address__": "1.2.3.4:10250", - "instance": "test0", - "__meta_kubernetes_node_address_InternalIP": "1.2.3.4", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_node_name": "test0", - }, - Source: "node/test0", - }, - }, - expectedRes: []*targetgroup.Group{ - { + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "node/test0": { Targets: []model.LabelSet{ { "__address__": "1.2.3.4:10250", diff --git a/discovery/kubernetes/pod.go b/discovery/kubernetes/pod.go index 2c307a667d..0967634255 100644 --- a/discovery/kubernetes/pod.go +++ b/discovery/kubernetes/pod.go @@ -26,6 +26,7 @@ import ( "k8s.io/client-go/pkg/api" apiv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/strutil" @@ -36,6 +37,7 @@ type Pod struct { informer cache.SharedInformer store cache.Store logger log.Logger + queue *workqueue.Type } // NewPod creates a new pod discovery. @@ -43,27 +45,45 @@ func NewPod(l log.Logger, pods cache.SharedInformer) *Pod { if l == nil { l = log.NewNopLogger() } - return &Pod{ + p := &Pod{ informer: pods, store: pods.GetStore(), logger: l, + queue: workqueue.NewNamed("pod"), } + p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + eventCount.WithLabelValues("pod", "add").Inc() + p.enqueue(o) + }, + DeleteFunc: func(o interface{}) { + eventCount.WithLabelValues("pod", "delete").Inc() + p.enqueue(o) + }, + UpdateFunc: func(_, o interface{}) { + eventCount.WithLabelValues("pod", "update").Inc() + p.enqueue(o) + }, + }) + return p +} + +func (e *Pod) enqueue(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + + e.queue.Add(key) } // Run implements the Discoverer interface. func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - // Send full initial set of pod targets. - var initial []*targetgroup.Group - for _, o := range p.store.List() { - tg := p.buildPod(o.(*apiv1.Pod)) - initial = append(initial, tg) + defer p.queue.ShutDown() - level.Debug(p.logger).Log("msg", "initial pod", "tg", fmt.Sprintf("%#v", tg)) - } - select { - case <-ctx.Done(): + if !cache.WaitForCacheSync(ctx.Done(), p.informer.HasSynced) { + level.Error(p.logger).Log("msg", "pod informer unable to sync cache") return - case ch <- initial: } // Send target groups for pod updates. @@ -77,58 +97,53 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { case ch <- []*targetgroup.Group{tg}: } } - p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(o interface{}) { - eventCount.WithLabelValues("pod", "add").Inc() - pod, err := convertToPod(o) - if err != nil { - level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err) - return - } - send(p.buildPod(pod)) - }, - DeleteFunc: func(o interface{}) { - eventCount.WithLabelValues("pod", "delete").Inc() - - pod, err := convertToPod(o) - if err != nil { - level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err) - return - } - send(&targetgroup.Group{Source: podSource(pod)}) - }, - UpdateFunc: func(_, o interface{}) { - eventCount.WithLabelValues("pod", "update").Inc() - - pod, err := convertToPod(o) - if err != nil { - level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err) - return - } - send(p.buildPod(pod)) - }, - }) + go func() { + for p.process(send) { + } + }() // Block until the target provider is explicitly canceled. <-ctx.Done() } +func (p *Pod) process(send func(tg *targetgroup.Group)) bool { + keyObj, quit := p.queue.Get() + if quit { + return false + } + defer p.queue.Done(keyObj) + key := keyObj.(string) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return true + } + + o, exists, err := p.store.GetByKey(key) + if err != nil { + return true + } + if !exists { + send(&targetgroup.Group{Source: podSourceFromNamespaceAndName(namespace, name)}) + return true + } + eps, err := convertToPod(o) + if err != nil { + level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err) + return true + } + send(p.buildPod(eps)) + return true +} + func convertToPod(o interface{}) (*apiv1.Pod, error) { pod, ok := o.(*apiv1.Pod) if ok { return pod, nil } - deletedState, ok := o.(cache.DeletedFinalStateUnknown) - if !ok { - return nil, fmt.Errorf("Received unexpected object: %v", o) - } - pod, ok = deletedState.Obj.(*apiv1.Pod) - if !ok { - return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj) - } - return pod, nil + return nil, fmt.Errorf("Received unexpected object: %v", o) } const ( @@ -215,6 +230,10 @@ func podSource(pod *apiv1.Pod) string { return "pod/" + pod.Namespace + "/" + pod.Name } +func podSourceFromNamespaceAndName(namespace, name string) string { + return "pod/" + namespace + "/" + name +} + func podReady(pod *apiv1.Pod) model.LabelValue { for _, cond := range pod.Status.Conditions { if cond.Type == apiv1.PodReady { diff --git a/discovery/kubernetes/pod_test.go b/discovery/kubernetes/pod_test.go index 43c2d9adc1..3023db4eca 100644 --- a/discovery/kubernetes/pod_test.go +++ b/discovery/kubernetes/pod_test.go @@ -21,23 +21,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/cache" ) -func podStoreKeyFunc(obj interface{}) (string, error) { - return obj.(*v1.Pod).ObjectMeta.Name, nil -} - -func newFakePodInformer() *fakeInformer { - return newFakeInformer(podStoreKeyFunc) -} - -func makeTestPodDiscovery() (*Pod, *fakeInformer) { - i := newFakePodInformer() - return NewPod(nil, i), i -} - -func makeMultiPortPod() *v1.Pod { +func makeMultiPortPods() *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "testpod", @@ -82,7 +68,7 @@ func makeMultiPortPod() *v1.Pod { } } -func makePod() *v1.Pod { +func makePods() *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "testpod", @@ -117,14 +103,19 @@ func makePod() *v1.Pod { } } -func TestPodDiscoveryInitial(t *testing.T) { - n, i := makeTestPodDiscovery() - i.GetStore().Add(makeMultiPortPod()) +func TestPodDiscoveryBeforeRun(t *testing.T) { + n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{}) k8sDiscoveryTest{ discovery: n, - expectedInitial: []*targetgroup.Group{ - { + beforeRun: func() { + obj := makeMultiPortPods() + c.CoreV1().Pods(obj.Namespace).Create(obj) + w.Pods().Add(obj) + }, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "pod/default/testpod": { Targets: []model.LabelSet{ { "__address__": "1.2.3.4:9000", @@ -163,13 +154,17 @@ func TestPodDiscoveryInitial(t *testing.T) { } func TestPodDiscoveryAdd(t *testing.T) { - n, i := makeTestPodDiscovery() + n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{}) k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Add(makePod()) }() }, - expectedRes: []*targetgroup.Group{ - { + discovery: n, + afterStart: func() { + obj := makePods() + c.CoreV1().Pods(obj.Namespace).Create(obj) + w.Pods().Add(obj) + }, + expectedRes: map[string]*targetgroup.Group{ + "pod/default/testpod": { Targets: []model.LabelSet{ { "__address__": "1.2.3.4:9000", @@ -195,75 +190,18 @@ func TestPodDiscoveryAdd(t *testing.T) { } func TestPodDiscoveryDelete(t *testing.T) { - n, i := makeTestPodDiscovery() - i.GetStore().Add(makePod()) + obj := makePods() + n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{}, obj) k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Delete(makePod()) }() }, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__address__": "1.2.3.4:9000", - "__meta_kubernetes_pod_container_name": "testcontainer", - "__meta_kubernetes_pod_container_port_name": "testport", - "__meta_kubernetes_pod_container_port_number": "9000", - "__meta_kubernetes_pod_container_port_protocol": "TCP", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_pod_name": "testpod", - "__meta_kubernetes_namespace": "default", - "__meta_kubernetes_pod_node_name": "testnode", - "__meta_kubernetes_pod_ip": "1.2.3.4", - "__meta_kubernetes_pod_host_ip": "2.3.4.5", - "__meta_kubernetes_pod_ready": "true", - "__meta_kubernetes_pod_uid": "abc123", - }, - Source: "pod/default/testpod", - }, + discovery: n, + afterStart: func() { + obj := makePods() + c.CoreV1().Pods(obj.Namespace).Delete(obj.Name, &metav1.DeleteOptions{}) + w.Pods().Delete(obj) }, - expectedRes: []*targetgroup.Group{ - { - Source: "pod/default/testpod", - }, - }, - }.Run(t) -} - -func TestPodDiscoveryDeleteUnknownCacheState(t *testing.T) { - n, i := makeTestPodDiscovery() - i.GetStore().Add(makePod()) - - k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makePod()}) }() }, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__address__": "1.2.3.4:9000", - "__meta_kubernetes_pod_container_name": "testcontainer", - "__meta_kubernetes_pod_container_port_name": "testport", - "__meta_kubernetes_pod_container_port_number": "9000", - "__meta_kubernetes_pod_container_port_protocol": "TCP", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_pod_name": "testpod", - "__meta_kubernetes_namespace": "default", - "__meta_kubernetes_pod_node_name": "testnode", - "__meta_kubernetes_pod_ip": "1.2.3.4", - "__meta_kubernetes_pod_host_ip": "2.3.4.5", - "__meta_kubernetes_pod_ready": "true", - "__meta_kubernetes_pod_uid": "abc123", - }, - Source: "pod/default/testpod", - }, - }, - expectedRes: []*targetgroup.Group{ - { + expectedRes: map[string]*targetgroup.Group{ + "pod/default/testpod": { Source: "pod/default/testpod", }, }, @@ -271,8 +209,7 @@ func TestPodDiscoveryDeleteUnknownCacheState(t *testing.T) { } func TestPodDiscoveryUpdate(t *testing.T) { - n, i := makeTestPodDiscovery() - i.GetStore().Add(&v1.Pod{ + obj := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "testpod", Namespace: "default", @@ -297,36 +234,18 @@ func TestPodDiscoveryUpdate(t *testing.T) { PodIP: "1.2.3.4", HostIP: "2.3.4.5", }, - }) + } + n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{}, obj) k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Update(makePod()) }() }, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__address__": "1.2.3.4:9000", - "__meta_kubernetes_pod_container_name": "testcontainer", - "__meta_kubernetes_pod_container_port_name": "testport", - "__meta_kubernetes_pod_container_port_number": "9000", - "__meta_kubernetes_pod_container_port_protocol": "TCP", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_pod_name": "testpod", - "__meta_kubernetes_namespace": "default", - "__meta_kubernetes_pod_node_name": "testnode", - "__meta_kubernetes_pod_ip": "1.2.3.4", - "__meta_kubernetes_pod_host_ip": "2.3.4.5", - "__meta_kubernetes_pod_ready": "unknown", - "__meta_kubernetes_pod_uid": "xyz321", - }, - Source: "pod/default/testpod", - }, + discovery: n, + afterStart: func() { + obj := makePods() + c.CoreV1().Pods(obj.Namespace).Create(obj) + w.Pods().Modify(obj) }, - expectedRes: []*targetgroup.Group{ - { + expectedRes: map[string]*targetgroup.Group{ + "pod/default/testpod": { Targets: []model.LabelSet{ { "__address__": "1.2.3.4:9000", @@ -352,42 +271,25 @@ func TestPodDiscoveryUpdate(t *testing.T) { } func TestPodDiscoveryUpdateEmptyPodIP(t *testing.T) { - n, i := makeTestPodDiscovery() - initialPod := makePod() + n, c, w := makeDiscovery(RolePod, NamespaceDiscovery{}) + initialPod := makePods() - updatedPod := makePod() + updatedPod := makePods() updatedPod.Status.PodIP = "" - i.GetStore().Add(initialPod) - k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Update(updatedPod) }() }, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__address__": "1.2.3.4:9000", - "__meta_kubernetes_pod_container_name": "testcontainer", - "__meta_kubernetes_pod_container_port_name": "testport", - "__meta_kubernetes_pod_container_port_number": "9000", - "__meta_kubernetes_pod_container_port_protocol": "TCP", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_pod_name": "testpod", - "__meta_kubernetes_namespace": "default", - "__meta_kubernetes_pod_node_name": "testnode", - "__meta_kubernetes_pod_ip": "1.2.3.4", - "__meta_kubernetes_pod_host_ip": "2.3.4.5", - "__meta_kubernetes_pod_ready": "true", - "__meta_kubernetes_pod_uid": "abc123", - }, - Source: "pod/default/testpod", - }, + discovery: n, + beforeRun: func() { + c.CoreV1().Pods(initialPod.Namespace).Create(initialPod) + w.Pods().Add(initialPod) }, - expectedRes: []*targetgroup.Group{ - { + afterStart: func() { + c.CoreV1().Pods(updatedPod.Namespace).Create(updatedPod) + w.Pods().Modify(updatedPod) + }, + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "pod/default/testpod": { Source: "pod/default/testpod", }, }, diff --git a/discovery/kubernetes/service.go b/discovery/kubernetes/service.go index 09856ed06b..625f2343c0 100644 --- a/discovery/kubernetes/service.go +++ b/discovery/kubernetes/service.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/common/model" apiv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/strutil" @@ -34,6 +35,7 @@ type Service struct { logger log.Logger informer cache.SharedInformer store cache.Store + queue *workqueue.Type } // NewService returns a new service discovery. @@ -41,21 +43,40 @@ func NewService(l log.Logger, inf cache.SharedInformer) *Service { if l == nil { l = log.NewNopLogger() } - return &Service{logger: l, informer: inf, store: inf.GetStore()} + s := &Service{logger: l, informer: inf, store: inf.GetStore(), queue: workqueue.NewNamed("ingress")} + s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + eventCount.WithLabelValues("service", "add").Inc() + s.enqueue(o) + }, + DeleteFunc: func(o interface{}) { + eventCount.WithLabelValues("service", "delete").Inc() + s.enqueue(o) + }, + UpdateFunc: func(_, o interface{}) { + eventCount.WithLabelValues("service", "update").Inc() + s.enqueue(o) + }, + }) + return s +} + +func (e *Service) enqueue(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + + e.queue.Add(key) } // Run implements the Discoverer interface. func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - // Send full initial set of pod targets. - var initial []*targetgroup.Group - for _, o := range s.store.List() { - tg := s.buildService(o.(*apiv1.Service)) - initial = append(initial, tg) - } - select { - case <-ctx.Done(): + defer s.queue.ShutDown() + + if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) { + level.Error(s.logger).Log("msg", "service informer unable to sync cache") return - case ch <- initial: } // Send target groups for service updates. @@ -65,63 +86,62 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { case ch <- []*targetgroup.Group{tg}: } } - s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(o interface{}) { - eventCount.WithLabelValues("service", "add").Inc() - svc, err := convertToService(o) - if err != nil { - level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err) - return - } - send(s.buildService(svc)) - }, - DeleteFunc: func(o interface{}) { - eventCount.WithLabelValues("service", "delete").Inc() - - svc, err := convertToService(o) - if err != nil { - level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err) - return - } - send(&targetgroup.Group{Source: serviceSource(svc)}) - }, - UpdateFunc: func(_, o interface{}) { - eventCount.WithLabelValues("service", "update").Inc() - - svc, err := convertToService(o) - if err != nil { - level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err) - return - } - send(s.buildService(svc)) - }, - }) + go func() { + for s.process(send) { + } + }() // Block until the target provider is explicitly canceled. <-ctx.Done() } +func (s *Service) process(send func(tg *targetgroup.Group)) bool { + keyObj, quit := s.queue.Get() + if quit { + return false + } + defer s.queue.Done(keyObj) + key := keyObj.(string) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return true + } + + o, exists, err := s.store.GetByKey(key) + if err != nil { + return true + } + if !exists { + send(&targetgroup.Group{Source: serviceSourceFromNamespaceAndName(namespace, name)}) + return true + } + eps, err := convertToService(o) + if err != nil { + level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err) + return true + } + send(s.buildService(eps)) + return true +} + func convertToService(o interface{}) (*apiv1.Service, error) { service, ok := o.(*apiv1.Service) if ok { return service, nil } - deletedState, ok := o.(cache.DeletedFinalStateUnknown) - if !ok { - return nil, fmt.Errorf("Received unexpected object: %v", o) - } - service, ok = deletedState.Obj.(*apiv1.Service) - if !ok { - return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Service object: %v", deletedState.Obj) - } - return service, nil + return nil, fmt.Errorf("Received unexpected object: %v", o) } func serviceSource(s *apiv1.Service) string { return "svc/" + s.Namespace + "/" + s.Name } +func serviceSourceFromNamespaceAndName(namespace, name string) string { + return "svc/" + namespace + "/" + name +} + const ( serviceNameLabel = metaLabelPrefix + "service_name" serviceLabelPrefix = metaLabelPrefix + "service_label_" diff --git a/discovery/kubernetes/service_test.go b/discovery/kubernetes/service_test.go index f979dba8ac..db5feab726 100644 --- a/discovery/kubernetes/service_test.go +++ b/discovery/kubernetes/service_test.go @@ -21,22 +21,8 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/cache" ) -func serviceStoreKeyFunc(obj interface{}) (string, error) { - return obj.(*v1.Service).ObjectMeta.Name, nil -} - -func newFakeServiceInformer() *fakeInformer { - return newFakeInformer(serviceStoreKeyFunc) -} - -func makeTestServiceDiscovery() (*Service, *fakeInformer) { - i := newFakeServiceInformer() - return NewService(nil, i), i -} - func makeMultiPortService() *v1.Service { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -84,46 +70,19 @@ func makeService() *v1.Service { return makeSuffixedService("") } -func TestServiceDiscoveryInitial(t *testing.T) { - n, i := makeTestServiceDiscovery() - i.GetStore().Add(makeMultiPortService()) +func TestServiceDiscoveryAdd(t *testing.T) { + n, c, w := makeDiscovery(RoleService, NamespaceDiscovery{}) k8sDiscoveryTest{ discovery: n, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__meta_kubernetes_service_port_protocol": "TCP", - "__address__": "testservice.default.svc:30900", - "__meta_kubernetes_service_port_name": "testport0", - }, - { - "__meta_kubernetes_service_port_protocol": "UDP", - "__address__": "testservice.default.svc:30901", - "__meta_kubernetes_service_port_name": "testport1", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_service_name": "testservice", - "__meta_kubernetes_namespace": "default", - "__meta_kubernetes_service_label_testlabel": "testvalue", - "__meta_kubernetes_service_annotation_testannotation": "testannotationvalue", - }, - Source: "svc/default/testservice", - }, + afterStart: func() { + obj := makeService() + c.CoreV1().Services(obj.Namespace).Create(obj) + w.Services().Add(obj) }, - }.Run(t) -} - -func TestServiceDiscoveryAdd(t *testing.T) { - n, i := makeTestServiceDiscovery() - - k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Add(makeService()) }() }, - expectedRes: []*targetgroup.Group{ - { + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "svc/default/testservice": { Targets: []model.LabelSet{ { "__meta_kubernetes_service_port_protocol": "TCP", @@ -142,61 +101,18 @@ func TestServiceDiscoveryAdd(t *testing.T) { } func TestServiceDiscoveryDelete(t *testing.T) { - n, i := makeTestServiceDiscovery() - i.GetStore().Add(makeService()) + n, c, w := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService()) k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Delete(makeService()) }() }, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__meta_kubernetes_service_port_protocol": "TCP", - "__address__": "testservice.default.svc:30900", - "__meta_kubernetes_service_port_name": "testport", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_service_name": "testservice", - "__meta_kubernetes_namespace": "default", - }, - Source: "svc/default/testservice", - }, + discovery: n, + afterStart: func() { + obj := makeService() + c.CoreV1().Services(obj.Namespace).Delete(obj.Name, &metav1.DeleteOptions{}) + w.Services().Delete(obj) }, - expectedRes: []*targetgroup.Group{ - { - Source: "svc/default/testservice", - }, - }, - }.Run(t) -} - -func TestServiceDiscoveryDeleteUnknownCacheState(t *testing.T) { - n, i := makeTestServiceDiscovery() - i.GetStore().Add(makeService()) - - k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeService()}) }() }, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__meta_kubernetes_service_port_protocol": "TCP", - "__address__": "testservice.default.svc:30900", - "__meta_kubernetes_service_port_name": "testport", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_service_name": "testservice", - "__meta_kubernetes_namespace": "default", - }, - Source: "svc/default/testservice", - }, - }, - expectedRes: []*targetgroup.Group{ - { + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "svc/default/testservice": { Source: "svc/default/testservice", }, }, @@ -204,30 +120,18 @@ func TestServiceDiscoveryDeleteUnknownCacheState(t *testing.T) { } func TestServiceDiscoveryUpdate(t *testing.T) { - n, i := makeTestServiceDiscovery() - i.GetStore().Add(makeService()) + n, c, w := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService()) k8sDiscoveryTest{ - discovery: n, - afterStart: func() { go func() { i.Update(makeMultiPortService()) }() }, - expectedInitial: []*targetgroup.Group{ - { - Targets: []model.LabelSet{ - { - "__meta_kubernetes_service_port_protocol": "TCP", - "__address__": "testservice.default.svc:30900", - "__meta_kubernetes_service_port_name": "testport", - }, - }, - Labels: model.LabelSet{ - "__meta_kubernetes_service_name": "testservice", - "__meta_kubernetes_namespace": "default", - }, - Source: "svc/default/testservice", - }, + discovery: n, + afterStart: func() { + obj := makeMultiPortService() + c.CoreV1().Services(obj.Namespace).Update(obj) + w.Services().Modify(obj) }, - expectedRes: []*targetgroup.Group{ - { + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "svc/default/testservice": { Targets: []model.LabelSet{ { "__meta_kubernetes_service_port_protocol": "TCP",