From c2d6e528e4832e98978fe29d9cae9fbe4ce30494 Mon Sep 17 00:00:00 2001 From: machine424 Date: Thu, 3 Jul 2025 17:06:44 +0200 Subject: [PATCH] feat(discovery/kubernetes): allow attaching namespace metadata to endpointslice, endpoints and pod roles after injecting the labels for endpointslice, claude-4-sonnet helped transpose the code and tests to endpoints and pod roles fixes https://github.com/prometheus/prometheus/issues/9510 supersedes https://github.com/prometheus/prometheus/pull/13798 Signed-off-by: machine424 Co-authored-by: Paul BARRIE --- discovery/kubernetes/endpoints.go | 85 +++++-- discovery/kubernetes/endpoints_test.go | 170 +++++++++++-- discovery/kubernetes/endpointslice.go | 68 ++++-- discovery/kubernetes/endpointslice_test.go | 265 +++++++++++++++++++-- discovery/kubernetes/kubernetes.go | 157 +++++++----- discovery/kubernetes/pod.go | 61 ++++- discovery/kubernetes/pod_test.go | 135 ++++++++++- docs/configuration/configuration.md | 3 + 8 files changed, 801 insertions(+), 143 deletions(-) diff --git a/discovery/kubernetes/endpoints.go b/discovery/kubernetes/endpoints.go index c179779277..edc427ba08 100644 --- a/discovery/kubernetes/endpoints.go +++ b/discovery/kubernetes/endpoints.go @@ -35,11 +35,13 @@ import ( type Endpoints struct { logger *slog.Logger - endpointsInf cache.SharedIndexInformer - serviceInf cache.SharedInformer - podInf cache.SharedInformer - nodeInf cache.SharedInformer - withNodeMetadata bool + endpointsInf cache.SharedIndexInformer + serviceInf cache.SharedInformer + podInf cache.SharedInformer + nodeInf cache.SharedInformer + withNodeMetadata bool + namespaceInf cache.SharedInformer + withNamespaceMetadata bool podStore cache.Store endpointsStore cache.Store @@ -50,7 +52,7 @@ type Endpoints struct { // NewEndpoints returns a new endpoints discovery. // Endpoints API is deprecated in k8s v1.33+, but we should still support it. -func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer, eventCount *prometheus.CounterVec) *Endpoints { +func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *Endpoints { if l == nil { l = promslog.NewNopLogger() } @@ -66,16 +68,18 @@ func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node podUpdateCount := eventCount.WithLabelValues(RolePod.String(), MetricLabelRoleUpdate) e := &Endpoints{ - logger: l, - endpointsInf: eps, - endpointsStore: eps.GetStore(), - serviceInf: svc, - serviceStore: svc.GetStore(), - podInf: pod, - podStore: pod.GetStore(), - nodeInf: node, - withNodeMetadata: node != nil, - queue: workqueue.NewNamed(RoleEndpoint.String()), + logger: l, + endpointsInf: eps, + endpointsStore: eps.GetStore(), + serviceInf: svc, + serviceStore: svc.GetStore(), + podInf: pod, + podStore: pod.GetStore(), + nodeInf: node, + withNodeMetadata: node != nil, + namespaceInf: namespace, + withNamespaceMetadata: namespace != nil, + queue: workqueue.NewNamed(RoleEndpoint.String()), } _, err := e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -177,6 +181,19 @@ func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node } } + if e.withNamespaceMetadata { + _, err = e.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + // Create and Delete should be covered by the other handlers. + UpdateFunc: func(_, o interface{}) { + namespace := o.(*apiv1.Namespace) + e.enqueueNamespace(namespace.Name) + }, + }) + if err != nil { + l.Error("Error adding namespaces event handler.", "err", err) + } + } + return e } @@ -192,6 +209,18 @@ func (e *Endpoints) enqueueNode(nodeName string) { } } +func (e *Endpoints) enqueueNamespace(namespace string) { + endpoints, err := e.endpointsInf.GetIndexer().ByIndex(cache.NamespaceIndex, namespace) + if err != nil { + e.logger.Error("Error getting endpoints in namespace", "namespace", namespace, "err", err) + return + } + + for _, endpoint := range endpoints { + e.enqueue(endpoint) + } +} + func (e *Endpoints) enqueuePod(podNamespacedName string) { endpoints, err := e.endpointsInf.GetIndexer().ByIndex(podIndex, podNamespacedName) if err != nil { @@ -221,6 +250,9 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { if e.withNodeMetadata { cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced) } + if e.withNamespaceMetadata { + cacheSyncs = append(cacheSyncs, e.namespaceInf.HasSynced) + } if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) { if !errors.Is(ctx.Err(), context.Canceled) { @@ -308,6 +340,10 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *targetgroup.Group { // Add endpoints labels metadata. addObjectMetaLabels(tg.Labels, eps.ObjectMeta, RoleEndpoint) + if e.withNamespaceMetadata { + tg.Labels = addNamespaceLabels(tg.Labels, e.namespaceInf, e.logger, eps.Namespace) + } + type podEntry struct { pod *apiv1.Pod servicePorts []apiv1.EndpointPort @@ -502,3 +538,20 @@ func addNodeLabels(tg model.LabelSet, nodeInf cache.SharedInformer, logger *slog addObjectMetaLabels(nodeLabelset, node.ObjectMeta, RoleNode) return tg.Merge(nodeLabelset) } + +func addNamespaceLabels(tg model.LabelSet, namespaceInf cache.SharedInformer, logger *slog.Logger, namespace string) model.LabelSet { + obj, exists, err := namespaceInf.GetStore().GetByKey(namespace) + if err != nil { + logger.Error("Error getting namespace", "namespace", namespace, "err", err) + return tg + } + + if !exists { + return tg + } + + n := obj.(*apiv1.Namespace) + namespaceLabelset := make(model.LabelSet) + addNamespaceMetaLabels(namespaceLabelset, n.ObjectMeta) + return tg.Merge(namespaceLabelset) +} diff --git a/discovery/kubernetes/endpoints_test.go b/discovery/kubernetes/endpoints_test.go index 28ad5697bc..a10ffe6dde 100644 --- a/discovery/kubernetes/endpoints_test.go +++ b/discovery/kubernetes/endpoints_test.go @@ -15,6 +15,7 @@ package kubernetes import ( "context" + "fmt" "testing" "github.com/prometheus/common/model" @@ -28,12 +29,12 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" ) -func makeEndpoints() *v1.Endpoints { +func makeEndpoints(namespace string) *v1.Endpoints { nodeName := "foobar" return &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "testendpoints", - Namespace: "default", + Namespace: namespace, Annotations: map[string]string{ "test.annotation": "test", }, @@ -103,7 +104,7 @@ func TestEndpointsDiscoveryBeforeRun(t *testing.T) { k8sDiscoveryTest{ discovery: n, beforeRun: func() { - obj := makeEndpoints() + obj := makeEndpoints("default") c.CoreV1().Endpoints(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{}) }, expectedMaxItems: 1, @@ -279,12 +280,12 @@ func TestEndpointsDiscoveryAdd(t *testing.T) { func TestEndpointsDiscoveryDelete(t *testing.T) { t.Parallel() - n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints()) + n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default")) k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makeEndpoints() + obj := makeEndpoints("default") c.CoreV1().Endpoints(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{}) }, expectedMaxItems: 2, @@ -298,7 +299,7 @@ func TestEndpointsDiscoveryDelete(t *testing.T) { func TestEndpointsDiscoveryUpdate(t *testing.T) { t.Parallel() - n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints()) + n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default")) k8sDiscoveryTest{ discovery: n, @@ -370,7 +371,7 @@ func TestEndpointsDiscoveryUpdate(t *testing.T) { func TestEndpointsDiscoveryEmptySubsets(t *testing.T) { t.Parallel() - n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints()) + n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default")) k8sDiscoveryTest{ discovery: n, @@ -399,7 +400,7 @@ func TestEndpointsDiscoveryEmptySubsets(t *testing.T) { func TestEndpointsDiscoveryWithService(t *testing.T) { t.Parallel() - n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints()) + n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default")) k8sDiscoveryTest{ discovery: n, @@ -465,7 +466,7 @@ func TestEndpointsDiscoveryWithService(t *testing.T) { func TestEndpointsDiscoveryWithServiceUpdate(t *testing.T) { t.Parallel() - n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints()) + n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default")) k8sDiscoveryTest{ discovery: n, @@ -560,7 +561,7 @@ func TestEndpointsDiscoveryWithNodeMetadata(t *testing.T) { }, }, } - n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), svc, node1, node2) + n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints("default"), svc, node1, node2) k8sDiscoveryTest{ discovery: n, @@ -634,7 +635,7 @@ func TestEndpointsDiscoveryWithUpdatedNodeMetadata(t *testing.T) { }, }, } - n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), node1, node2, svc) + n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints("default"), node1, node2, svc) k8sDiscoveryTest{ discovery: n, @@ -698,7 +699,7 @@ func TestEndpointsDiscoveryWithUpdatedNodeMetadata(t *testing.T) { func TestEndpointsDiscoveryNamespaces(t *testing.T) { t.Parallel() - epOne := makeEndpoints() + epOne := makeEndpoints("default") epOne.Namespace = "ns1" objs := []runtime.Object{ epOne, @@ -850,10 +851,10 @@ func TestEndpointsDiscoveryNamespaces(t *testing.T) { func TestEndpointsDiscoveryOwnNamespace(t *testing.T) { t.Parallel() - epOne := makeEndpoints() + epOne := makeEndpoints("default") epOne.Namespace = "own-ns" - epTwo := makeEndpoints() + epTwo := makeEndpoints("default") epTwo.Namespace = "non-own-ns" podOne := &v1.Pod{ @@ -945,7 +946,7 @@ func TestEndpointsDiscoveryOwnNamespace(t *testing.T) { func TestEndpointsDiscoveryEmptyPodStatus(t *testing.T) { t.Parallel() - ep := makeEndpoints() + ep := makeEndpoints("default") ep.Namespace = "ns" pod := &v1.Pod{ @@ -1274,6 +1275,145 @@ func TestEndpointsDiscoverySidecarContainer(t *testing.T) { }.Run(t) } +func TestEndpointsDiscoveryWithNamespaceMetadata(t *testing.T) { + t.Parallel() + + ns := "test-ns" + nsLabels := map[string]string{"environment": "production", "team": "backend"} + nsAnnotations := map[string]string{"owner": "platform", "version": "v1.2.3"} + + n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, makeNamespace(ns, nsLabels, nsAnnotations), makeEndpoints(ns)) + k8sDiscoveryTest{ + discovery: n, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + fmt.Sprintf("endpoints/%s/testendpoints", ns): { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpoint_node_name": "foobar", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "false", + }, + { + "__address__": "6.7.8.9:9002", + "__meta_kubernetes_endpoint_address_target_kind": "Node", + "__meta_kubernetes_endpoint_address_target_name": "barbaz", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_namespace": model.LabelValue(ns), + "__meta_kubernetes_namespace_annotation_owner": "platform", + "__meta_kubernetes_namespace_annotationpresent_owner": "true", + "__meta_kubernetes_namespace_annotation_version": "v1.2.3", + "__meta_kubernetes_namespace_annotationpresent_version": "true", + "__meta_kubernetes_namespace_label_environment": "production", + "__meta_kubernetes_namespace_labelpresent_environment": "true", + "__meta_kubernetes_namespace_label_team": "backend", + "__meta_kubernetes_namespace_labelpresent_team": "true", + "__meta_kubernetes_endpoints_name": "testendpoints", + "__meta_kubernetes_endpoints_annotation_test_annotation": "test", + "__meta_kubernetes_endpoints_annotationpresent_test_annotation": "true", + }, + Source: fmt.Sprintf("endpoints/%s/testendpoints", ns), + }, + }, + }.Run(t) +} + +func TestEndpointsDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) { + t.Parallel() + + ns := "test-ns" + nsLabels := map[string]string{"environment": "development", "team": "frontend"} + nsAnnotations := map[string]string{"owner": "devops", "version": "v2.1.0"} + + namespace := makeNamespace(ns, nsLabels, nsAnnotations) + n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, namespace, makeEndpoints(ns)) + + k8sDiscoveryTest{ + discovery: n, + expectedMaxItems: 2, + afterStart: func() { + namespace.Labels["environment"] = "staging" + namespace.Labels["region"] = "us-west" + namespace.Annotations["owner"] = "sre" + namespace.Annotations["cost-center"] = "engineering" + c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{}) + }, + expectedRes: map[string]*targetgroup.Group{ + fmt.Sprintf("endpoints/%s/testendpoints", ns): { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpoint_node_name": "foobar", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + { + "__address__": "2.3.4.5:9001", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "false", + }, + { + "__address__": "6.7.8.9:9002", + "__meta_kubernetes_endpoint_address_target_kind": "Node", + "__meta_kubernetes_endpoint_address_target_name": "barbaz", + "__meta_kubernetes_endpoint_port_name": "testport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_namespace": model.LabelValue(ns), + "__meta_kubernetes_namespace_annotation_owner": "sre", + "__meta_kubernetes_namespace_annotationpresent_owner": "true", + "__meta_kubernetes_namespace_annotation_version": "v2.1.0", + "__meta_kubernetes_namespace_annotationpresent_version": "true", + "__meta_kubernetes_namespace_annotation_cost_center": "engineering", + "__meta_kubernetes_namespace_annotationpresent_cost_center": "true", + "__meta_kubernetes_namespace_label_environment": "staging", + "__meta_kubernetes_namespace_labelpresent_environment": "true", + "__meta_kubernetes_namespace_label_team": "frontend", + "__meta_kubernetes_namespace_labelpresent_team": "true", + "__meta_kubernetes_namespace_label_region": "us-west", + "__meta_kubernetes_namespace_labelpresent_region": "true", + "__meta_kubernetes_endpoints_name": "testendpoints", + "__meta_kubernetes_endpoints_annotation_test_annotation": "test", + "__meta_kubernetes_endpoints_annotationpresent_test_annotation": "true", + }, + Source: fmt.Sprintf("endpoints/%s/testendpoints", ns), + }, + }, + }.Run(t) +} + func BenchmarkResolvePodRef(b *testing.B) { indexer := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil) e := &Endpoints{ diff --git a/discovery/kubernetes/endpointslice.go b/discovery/kubernetes/endpointslice.go index 625601abc1..191f29f288 100644 --- a/discovery/kubernetes/endpointslice.go +++ b/discovery/kubernetes/endpointslice.go @@ -38,11 +38,13 @@ const serviceIndex = "service" type EndpointSlice struct { logger *slog.Logger - endpointSliceInf cache.SharedIndexInformer - serviceInf cache.SharedInformer - podInf cache.SharedInformer - nodeInf cache.SharedInformer - withNodeMetadata bool + endpointSliceInf cache.SharedIndexInformer + serviceInf cache.SharedInformer + podInf cache.SharedInformer + nodeInf cache.SharedInformer + withNodeMetadata bool + namespaceInf cache.SharedInformer + withNamespaceMetadata bool podStore cache.Store endpointSliceStore cache.Store @@ -52,7 +54,7 @@ type EndpointSlice struct { } // NewEndpointSlice returns a new endpointslice discovery. -func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer, eventCount *prometheus.CounterVec) *EndpointSlice { +func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *EndpointSlice { if l == nil { l = promslog.NewNopLogger() } @@ -66,16 +68,18 @@ func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, n svcDeleteCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleDelete) e := &EndpointSlice{ - logger: l, - endpointSliceInf: eps, - endpointSliceStore: eps.GetStore(), - serviceInf: svc, - serviceStore: svc.GetStore(), - podInf: pod, - podStore: pod.GetStore(), - nodeInf: node, - withNodeMetadata: node != nil, - queue: workqueue.NewNamed(RoleEndpointSlice.String()), + logger: l, + endpointSliceInf: eps, + endpointSliceStore: eps.GetStore(), + serviceInf: svc, + serviceStore: svc.GetStore(), + podInf: pod, + podStore: pod.GetStore(), + nodeInf: node, + withNodeMetadata: node != nil, + namespaceInf: namespace, + withNamespaceMetadata: namespace != nil, + queue: workqueue.NewNamed(RoleEndpointSlice.String()), } _, err := e.endpointSliceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -154,6 +158,19 @@ func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, n } } + if e.withNamespaceMetadata { + _, err = e.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + // Create and Delete should be covered by the other handlers. + UpdateFunc: func(_, o interface{}) { + namespace := o.(*apiv1.Namespace) + e.enqueueNamespace(namespace.Name) + }, + }) + if err != nil { + l.Error("Error adding namespaces event handler.", "err", err) + } + } + return e } @@ -169,6 +186,18 @@ func (e *EndpointSlice) enqueueNode(nodeName string) { } } +func (e *EndpointSlice) enqueueNamespace(namespace string) { + endpoints, err := e.endpointSliceInf.GetIndexer().ByIndex(cache.NamespaceIndex, namespace) + if err != nil { + e.logger.Error("Error getting endpoints in namespace", "namespace", namespace, "err", err) + return + } + + for _, endpoint := range endpoints { + e.enqueue(endpoint) + } +} + func (e *EndpointSlice) enqueue(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { @@ -186,6 +215,9 @@ func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) if e.withNodeMetadata { cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced) } + if e.withNamespaceMetadata { + cacheSyncs = append(cacheSyncs, e.namespaceInf.HasSynced) + } if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) { if !errors.Is(ctx.Err(), context.Canceled) { e.logger.Error("endpointslice informer unable to sync cache") @@ -274,6 +306,10 @@ func (e *EndpointSlice) buildEndpointSlice(eps v1.EndpointSlice) *targetgroup.Gr e.addServiceLabels(eps, tg) + if e.withNamespaceMetadata { + tg.Labels = addNamespaceLabels(tg.Labels, e.namespaceInf, e.logger, eps.Namespace) + } + type podEntry struct { pod *apiv1.Pod servicePorts []v1.EndpointPort diff --git a/discovery/kubernetes/endpointslice_test.go b/discovery/kubernetes/endpointslice_test.go index 9eea9abd7b..cfd6be709e 100644 --- a/discovery/kubernetes/endpointslice_test.go +++ b/discovery/kubernetes/endpointslice_test.go @@ -15,6 +15,7 @@ package kubernetes import ( "context" + "fmt" "testing" "github.com/prometheus/common/model" @@ -44,11 +45,11 @@ func protocolptr(p corev1.Protocol) *corev1.Protocol { return &p } -func makeEndpointSliceV1() *v1.EndpointSlice { +func makeEndpointSliceV1(namespace string) *v1.EndpointSlice { return &v1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: "testendpoints", - Namespace: "default", + Namespace: namespace, Labels: map[string]string{ v1.LabelServiceName: "testendpoints", }, @@ -113,6 +114,16 @@ func makeEndpointSliceV1() *v1.EndpointSlice { } } +func makeNamespace(name string, labels, annotations map[string]string) *corev1.Namespace { + return &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: labels, + Annotations: annotations, + }, + } +} + func TestEndpointSliceDiscoveryBeforeRun(t *testing.T) { t.Parallel() n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}) @@ -120,7 +131,7 @@ func TestEndpointSliceDiscoveryBeforeRun(t *testing.T) { k8sDiscoveryTest{ discovery: n, beforeRun: func() { - obj := makeEndpointSliceV1() + obj := makeEndpointSliceV1("default") c.DiscoveryV1().EndpointSlices(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{}) }, expectedMaxItems: 1, @@ -325,12 +336,12 @@ func TestEndpointSliceDiscoveryAdd(t *testing.T) { func TestEndpointSliceDiscoveryDelete(t *testing.T) { t.Parallel() - n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1()) + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default")) k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makeEndpointSliceV1() + obj := makeEndpointSliceV1("default") c.DiscoveryV1().EndpointSlices(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{}) }, expectedMaxItems: 2, @@ -344,12 +355,12 @@ func TestEndpointSliceDiscoveryDelete(t *testing.T) { func TestEndpointSliceDiscoveryUpdate(t *testing.T) { t.Parallel() - n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1()) + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default")) k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makeEndpointSliceV1() + obj := makeEndpointSliceV1("default") obj.ObjectMeta.Labels = nil obj.ObjectMeta.Annotations = nil obj.Endpoints = obj.Endpoints[0:2] @@ -401,12 +412,12 @@ func TestEndpointSliceDiscoveryUpdate(t *testing.T) { func TestEndpointSliceDiscoveryEmptyEndpoints(t *testing.T) { t.Parallel() - n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1()) + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default")) k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makeEndpointSliceV1() + obj := makeEndpointSliceV1("default") obj.Endpoints = []v1.Endpoint{} c.DiscoveryV1().EndpointSlices(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{}) }, @@ -430,7 +441,7 @@ func TestEndpointSliceDiscoveryEmptyEndpoints(t *testing.T) { func TestEndpointSliceDiscoveryWithService(t *testing.T) { t.Parallel() - n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1()) + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default")) k8sDiscoveryTest{ discovery: n, @@ -523,7 +534,7 @@ func TestEndpointSliceDiscoveryWithService(t *testing.T) { func TestEndpointSliceDiscoveryWithServiceUpdate(t *testing.T) { t.Parallel() - n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1()) + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default")) k8sDiscoveryTest{ discovery: n, @@ -643,7 +654,7 @@ func TestEndpointsSlicesDiscoveryWithNodeMetadata(t *testing.T) { }, }, } - objs := []runtime.Object{makeEndpointSliceV1(), makeNode("foobar", "", "", nodeLabels1, nil), makeNode("barbaz", "", "", nodeLabels2, nil), svc} + objs := []runtime.Object{makeEndpointSliceV1("default"), makeNode("foobar", "", "", nodeLabels1, nil), makeNode("barbaz", "", "", nodeLabels2, nil), svc} n, _ := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...) k8sDiscoveryTest{ @@ -745,7 +756,7 @@ func TestEndpointsSlicesDiscoveryWithUpdatedNodeMetadata(t *testing.T) { } node1 := makeNode("foobar", "", "", nodeLabels1, nil) node2 := makeNode("barbaz", "", "", nodeLabels2, nil) - objs := []runtime.Object{makeEndpointSliceV1(), node1, node2, svc} + objs := []runtime.Object{makeEndpointSliceV1("default"), node1, node2, svc} n, c := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...) k8sDiscoveryTest{ @@ -837,7 +848,7 @@ func TestEndpointsSlicesDiscoveryWithUpdatedNodeMetadata(t *testing.T) { func TestEndpointSliceDiscoveryNamespaces(t *testing.T) { t.Parallel() - epOne := makeEndpointSliceV1() + epOne := makeEndpointSliceV1("default") epOne.Namespace = "ns1" objs := []runtime.Object{ epOne, @@ -1014,10 +1025,10 @@ func TestEndpointSliceDiscoveryNamespaces(t *testing.T) { func TestEndpointSliceDiscoveryOwnNamespace(t *testing.T) { t.Parallel() - epOne := makeEndpointSliceV1() + epOne := makeEndpointSliceV1("default") epOne.Namespace = "own-ns" - epTwo := makeEndpointSliceV1() + epTwo := makeEndpointSliceV1("default") epTwo.Namespace = "non-own-ns" podOne := &corev1.Pod{ @@ -1135,7 +1146,7 @@ func TestEndpointSliceDiscoveryOwnNamespace(t *testing.T) { func TestEndpointSliceDiscoveryEmptyPodStatus(t *testing.T) { t.Parallel() - ep := makeEndpointSliceV1() + ep := makeEndpointSliceV1("default") ep.Namespace = "ns" pod := &corev1.Pod{ @@ -1380,3 +1391,223 @@ func TestEndpointSliceDiscoverySidecarContainer(t *testing.T) { }, }.Run(t) } + +func TestEndpointsSlicesDiscoveryWithNamespaceMetadata(t *testing.T) { + t.Parallel() + + ns := "test-ns" + nsLabels := map[string]string{"service": "web", "layer": "frontend"} + nsAnnotations := map[string]string{"contact": "platform", "release": "v5.6.7"} + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: ns, + Labels: map[string]string{ + "app/name": "test", + }, + }, + } + objs := []runtime.Object{makeNamespace(ns, nsLabels, nsAnnotations), svc, makeEndpointSliceV1(ns)} + n, _ := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, objs...) + k8sDiscoveryTest{ + discovery: n, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + fmt.Sprintf("endpointslice/%s/testendpoints", ns): { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpointslice_address_target_kind": "", + "__meta_kubernetes_endpointslice_address_target_name": "", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false", + "__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpointslice_endpoint_node_name": "foobar", + "__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true", + "__meta_kubernetes_endpointslice_endpoint_topology_topology": "value", + "__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "2.3.4.5:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false", + "__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1b", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "3.4.5.6:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false", + "__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "true", + "__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1c", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "4.5.6.7:9000", + "__meta_kubernetes_endpointslice_address_target_kind": "Node", + "__meta_kubernetes_endpointslice_address_target_name": "barbaz", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false", + "__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "testendpoints", + "__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true", + "__meta_kubernetes_endpointslice_annotation_test_annotation": "test", + "__meta_kubernetes_endpointslice_annotationpresent_test_annotation": "true", + "__meta_kubernetes_namespace": model.LabelValue(ns), + "__meta_kubernetes_namespace_annotation_contact": "platform", + "__meta_kubernetes_namespace_annotationpresent_contact": "true", + "__meta_kubernetes_namespace_annotation_release": "v5.6.7", + "__meta_kubernetes_namespace_annotationpresent_release": "true", + "__meta_kubernetes_namespace_label_service": "web", + "__meta_kubernetes_namespace_labelpresent_service": "true", + "__meta_kubernetes_namespace_label_layer": "frontend", + "__meta_kubernetes_namespace_labelpresent_layer": "true", + "__meta_kubernetes_service_label_app_name": "test", + "__meta_kubernetes_service_labelpresent_app_name": "true", + "__meta_kubernetes_service_name": "testendpoints", + }, + Source: fmt.Sprintf("endpointslice/%s/testendpoints", ns), + }, + }, + }.Run(t) +} + +func TestEndpointsSlicesDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) { + t.Parallel() + + ns := "test-ns" + nsLabels := map[string]string{"component": "database", "layer": "backend"} + nsAnnotations := map[string]string{"contact": "dba", "release": "v6.7.8"} + metadataConfig := AttachMetadataConfig{Namespace: true} + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: ns, + Labels: map[string]string{ + "app/name": "test", + }, + }, + } + + namespace := makeNamespace(ns, nsLabels, nsAnnotations) + n, c := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, namespace, svc, makeEndpointSliceV1(ns)) + + k8sDiscoveryTest{ + discovery: n, + expectedMaxItems: 2, + afterStart: func() { + namespace.Labels["component"] = "cache" + namespace.Labels["region"] = "us-central" + namespace.Annotations["contact"] = "sre" + namespace.Annotations["monitoring"] = "enabled" + c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{}) + }, + expectedRes: map[string]*targetgroup.Group{ + fmt.Sprintf("endpointslice/%s/testendpoints", ns): { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpointslice_address_target_kind": "", + "__meta_kubernetes_endpointslice_address_target_name": "", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false", + "__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpointslice_endpoint_node_name": "foobar", + "__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true", + "__meta_kubernetes_endpointslice_endpoint_topology_topology": "value", + "__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "2.3.4.5:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false", + "__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1b", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "3.4.5.6:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false", + "__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "true", + "__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1c", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "4.5.6.7:9000", + "__meta_kubernetes_endpointslice_address_target_kind": "Node", + "__meta_kubernetes_endpointslice_address_target_name": "barbaz", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true", + "__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false", + "__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_app_protocol": "http", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "testendpoints", + "__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true", + "__meta_kubernetes_endpointslice_annotation_test_annotation": "test", + "__meta_kubernetes_endpointslice_annotationpresent_test_annotation": "true", + "__meta_kubernetes_namespace": model.LabelValue(ns), + "__meta_kubernetes_namespace_annotation_contact": "sre", + "__meta_kubernetes_namespace_annotationpresent_contact": "true", + "__meta_kubernetes_namespace_annotation_release": "v6.7.8", + "__meta_kubernetes_namespace_annotationpresent_release": "true", + "__meta_kubernetes_namespace_annotation_monitoring": "enabled", + "__meta_kubernetes_namespace_annotationpresent_monitoring": "true", + "__meta_kubernetes_namespace_label_component": "cache", + "__meta_kubernetes_namespace_labelpresent_component": "true", + "__meta_kubernetes_namespace_label_layer": "backend", + "__meta_kubernetes_namespace_labelpresent_layer": "true", + "__meta_kubernetes_namespace_label_region": "us-central", + "__meta_kubernetes_namespace_labelpresent_region": "true", + "__meta_kubernetes_service_label_app_name": "test", + "__meta_kubernetes_service_labelpresent_app_name": "true", + "__meta_kubernetes_service_name": "testendpoints", + }, + Source: fmt.Sprintf("endpointslice/%s/testendpoints", ns), + }, + }, + }.Run(t) +} diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 2261fb3efe..e8737e48c0 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -153,9 +153,10 @@ type resourceSelector struct { } // AttachMetadataConfig is the configuration for attaching additional metadata -// coming from nodes on which the targets are scheduled. +// coming from namespaces or nodes on which the targets are scheduled. type AttachMetadataConfig struct { - Node bool `yaml:"node"` + Node bool `yaml:"node"` + Namespace bool `yaml:"namespace"` } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -397,7 +398,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { return e.Watch(ctx, options) }, } - informer = d.newEndpointSlicesByNodeInformer(elw, &disv1.EndpointSlice{}) + informer = d.newIndexedEndpointSlicesInformer(elw, &disv1.EndpointSlice{}) s := d.client.CoreV1().Services(namespace) slw := &cache.ListWatch{ @@ -430,12 +431,18 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { nodeInf = d.newNodeInformer(context.Background()) go nodeInf.Run(ctx.Done()) } + var namespaceInf cache.SharedInformer + if d.attachMetadata.Namespace { + namespaceInf = d.newNamespaceInformer(context.Background()) + go namespaceInf.Run(ctx.Done()) + } eps := NewEndpointSlice( d.logger.With("role", "endpointslice"), informer, d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled), d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled), nodeInf, + namespaceInf, d.metrics.eventCount, ) d.discoverers = append(d.discoverers, eps) @@ -489,13 +496,19 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { nodeInf = d.newNodeInformer(ctx) go nodeInf.Run(ctx.Done()) } + var namespaceInf cache.SharedInformer + if d.attachMetadata.Namespace { + namespaceInf = d.newNamespaceInformer(ctx) + go namespaceInf.Run(ctx.Done()) + } eps := NewEndpoints( d.logger.With("role", "endpoint"), - d.newEndpointsByNodeInformer(elw), + d.newIndexedEndpointsInformer(elw), d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled), d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled), nodeInf, + namespaceInf, d.metrics.eventCount, ) d.discoverers = append(d.discoverers, eps) @@ -509,6 +522,11 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { nodeInformer = d.newNodeInformer(ctx) go nodeInformer.Run(ctx.Done()) } + var namespaceInformer cache.SharedInformer + if d.attachMetadata.Namespace { + namespaceInformer = d.newNamespaceInformer(ctx) + go namespaceInformer.Run(ctx.Done()) + } for _, namespace := range namespaces { p := d.client.CoreV1().Pods(namespace) @@ -526,8 +544,9 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { } pod := NewPod( d.logger.With("role", "pod"), - d.newPodsByNodeInformer(plw), + d.newIndexedPodsInformer(plw), nodeInformer, + namespaceInformer, d.metrics.eventCount, ) d.discoverers = append(d.discoverers, pod) @@ -651,7 +670,20 @@ func (d *Discovery) newNodeInformer(ctx context.Context) cache.SharedInformer { return d.mustNewSharedInformer(nlw, &apiv1.Node{}, resyncDisabled) } -func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer { +func (d *Discovery) newNamespaceInformer(ctx context.Context) cache.SharedInformer { + // We don't filter on NamespaceDiscovery. + nlw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return d.client.CoreV1().Namespaces().List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return d.client.CoreV1().Namespaces().Watch(ctx, options) + }, + } + return d.mustNewSharedInformer(nlw, &apiv1.Namespace{}, resyncDisabled) +} + +func (d *Discovery) newIndexedPodsInformer(plw *cache.ListWatch) cache.SharedIndexInformer { indexers := make(map[string]cache.IndexFunc) if d.attachMetadata.Node { indexers[nodeIndex] = func(obj interface{}) ([]string, error) { @@ -663,10 +695,14 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde } } + if d.attachMetadata.Namespace { + indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc + } + return d.mustNewSharedIndexInformer(plw, &apiv1.Pod{}, resyncDisabled, indexers) } -func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer { +func (d *Discovery) newIndexedEndpointsInformer(plw *cache.ListWatch) cache.SharedIndexInformer { indexers := make(map[string]cache.IndexFunc) indexers[podIndex] = func(obj interface{}) ([]string, error) { e, ok := obj.(*apiv1.Endpoints) @@ -683,37 +719,40 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share } return pods, nil } - if !d.attachMetadata.Node { - return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers) - } - indexers[nodeIndex] = func(obj interface{}) ([]string, error) { - e, ok := obj.(*apiv1.Endpoints) - if !ok { - return nil, errors.New("object is not endpoints") - } - var nodes []string - for _, target := range e.Subsets { - for _, addr := range target.Addresses { - if addr.TargetRef != nil { - switch addr.TargetRef.Kind { - case "Pod": - if addr.NodeName != nil { - nodes = append(nodes, *addr.NodeName) + if d.attachMetadata.Node { + indexers[nodeIndex] = func(obj interface{}) ([]string, error) { + e, ok := obj.(*apiv1.Endpoints) + if !ok { + return nil, errors.New("object is not endpoints") + } + var nodes []string + for _, target := range e.Subsets { + for _, addr := range target.Addresses { + if addr.TargetRef != nil { + switch addr.TargetRef.Kind { + case "Pod": + if addr.NodeName != nil { + nodes = append(nodes, *addr.NodeName) + } + case "Node": + nodes = append(nodes, addr.TargetRef.Name) } - case "Node": - nodes = append(nodes, addr.TargetRef.Name) } } } + return nodes, nil } - return nodes, nil + } + + if d.attachMetadata.Namespace { + indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc } return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers) } -func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer { +func (d *Discovery) newIndexedEndpointSlicesInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer { indexers := make(map[string]cache.IndexFunc) indexers[serviceIndex] = func(obj interface{}) ([]string, error) { e, ok := obj.(*disv1.EndpointSlice) @@ -728,31 +767,34 @@ func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object return []string{namespacedName(e.Namespace, svcName)}, nil } - if !d.attachMetadata.Node { - return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers) - } - indexers[nodeIndex] = func(obj interface{}) ([]string, error) { - e, ok := obj.(*disv1.EndpointSlice) - if !ok { - return nil, errors.New("object is not an endpointslice") - } + if d.attachMetadata.Node { + indexers[nodeIndex] = func(obj interface{}) ([]string, error) { + e, ok := obj.(*disv1.EndpointSlice) + if !ok { + return nil, errors.New("object is not an endpointslice") + } - var nodes []string - for _, target := range e.Endpoints { - if target.TargetRef != nil { - switch target.TargetRef.Kind { - case "Pod": - if target.NodeName != nil { - nodes = append(nodes, *target.NodeName) + var nodes []string + for _, target := range e.Endpoints { + if target.TargetRef != nil { + switch target.TargetRef.Kind { + case "Pod": + if target.NodeName != nil { + nodes = append(nodes, *target.NodeName) + } + case "Node": + nodes = append(nodes, target.TargetRef.Name) } - case "Node": - nodes = append(nodes, target.TargetRef.Name) } } - } - return nodes, nil + return nodes, nil + } + } + + if d.attachMetadata.Namespace { + indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc } return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers) @@ -783,22 +825,29 @@ func (d *Discovery) mustNewSharedIndexInformer(lw cache.ListerWatcher, exampleOb return informer } -func addObjectMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, role Role) { - labelSet[model.LabelName(metaLabelPrefix+string(role)+"_name")] = lv(objectMeta.Name) - +func addObjectAnnotationsAndLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, resource string) { for k, v := range objectMeta.Labels { ln := strutil.SanitizeLabelName(k) - labelSet[model.LabelName(metaLabelPrefix+string(role)+"_label_"+ln)] = lv(v) - labelSet[model.LabelName(metaLabelPrefix+string(role)+"_labelpresent_"+ln)] = presentValue + labelSet[model.LabelName(metaLabelPrefix+resource+"_label_"+ln)] = lv(v) + labelSet[model.LabelName(metaLabelPrefix+resource+"_labelpresent_"+ln)] = presentValue } - for k, v := range objectMeta.Annotations { ln := strutil.SanitizeLabelName(k) - labelSet[model.LabelName(metaLabelPrefix+string(role)+"_annotation_"+ln)] = lv(v) - labelSet[model.LabelName(metaLabelPrefix+string(role)+"_annotationpresent_"+ln)] = presentValue + labelSet[model.LabelName(metaLabelPrefix+resource+"_annotation_"+ln)] = lv(v) + labelSet[model.LabelName(metaLabelPrefix+resource+"_annotationpresent_"+ln)] = presentValue } } +func addObjectMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, role Role) { + labelSet[model.LabelName(metaLabelPrefix+string(role)+"_name")] = lv(objectMeta.Name) + addObjectAnnotationsAndLabels(labelSet, objectMeta, string(role)) +} + +func addNamespaceMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta) { + // Omitting the namespace name because should be already injected elsewhere. + addObjectAnnotationsAndLabels(labelSet, objectMeta, "namespace") +} + func namespacedName(namespace, name string) string { return namespace + "/" + name } diff --git a/discovery/kubernetes/pod.go b/discovery/kubernetes/pod.go index 169c6a78a1..11d6e11eff 100644 --- a/discovery/kubernetes/pod.go +++ b/discovery/kubernetes/pod.go @@ -40,16 +40,18 @@ const ( // Pod discovers new pod targets. type Pod struct { - podInf cache.SharedIndexInformer - nodeInf cache.SharedInformer - withNodeMetadata bool - store cache.Store - logger *slog.Logger - queue *workqueue.Type + podInf cache.SharedIndexInformer + nodeInf cache.SharedInformer + withNodeMetadata bool + namespaceInf cache.SharedInformer + withNamespaceMetadata bool + store cache.Store + logger *slog.Logger + queue *workqueue.Type } // NewPod creates a new pod discovery. -func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedInformer, eventCount *prometheus.CounterVec) *Pod { +func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *Pod { if l == nil { l = promslog.NewNopLogger() } @@ -59,12 +61,14 @@ func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedIn podUpdateCount := eventCount.WithLabelValues(RolePod.String(), MetricLabelRoleUpdate) p := &Pod{ - podInf: pods, - nodeInf: nodes, - withNodeMetadata: nodes != nil, - store: pods.GetStore(), - logger: l, - queue: workqueue.NewNamed(RolePod.String()), + podInf: pods, + nodeInf: nodes, + withNodeMetadata: nodes != nil, + namespaceInf: namespace, + withNamespaceMetadata: namespace != nil, + store: pods.GetStore(), + logger: l, + queue: workqueue.NewNamed(RolePod.String()), } _, err := p.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { @@ -107,6 +111,19 @@ func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedIn } } + if p.withNamespaceMetadata { + _, err = p.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + // Create and Delete should be covered by the other handlers. + UpdateFunc: func(_, o interface{}) { + namespace := o.(*apiv1.Namespace) + p.enqueuePodsForNamespace(namespace.Name) + }, + }) + if err != nil { + l.Error("Error adding namespaces event handler.", "err", err) + } + } + return p } @@ -127,6 +144,9 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { if p.withNodeMetadata { cacheSyncs = append(cacheSyncs, p.nodeInf.HasSynced) } + if p.withNamespaceMetadata { + cacheSyncs = append(cacheSyncs, p.namespaceInf.HasSynced) + } if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) { if !errors.Is(ctx.Err(), context.Canceled) { @@ -269,6 +289,9 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group { if p.withNodeMetadata { tg.Labels = addNodeLabels(tg.Labels, p.nodeInf, p.logger, &pod.Spec.NodeName) } + if p.withNamespaceMetadata { + tg.Labels = addNamespaceLabels(tg.Labels, p.namespaceInf, p.logger, pod.Namespace) + } containers := append(pod.Spec.Containers, pod.Spec.InitContainers...) for i, c := range containers { @@ -327,6 +350,18 @@ func (p *Pod) enqueuePodsForNode(nodeName string) { } } +func (p *Pod) enqueuePodsForNamespace(namespace string) { + pods, err := p.podInf.GetIndexer().ByIndex(cache.NamespaceIndex, namespace) + if err != nil { + p.logger.Error("Error getting pods in namespace", "namespace", namespace, "err", err) + return + } + + for _, pod := range pods { + p.enqueue(pod.(*apiv1.Pod)) + } +} + func podSource(pod *apiv1.Pod) string { return podSourceFromNamespaceAndName(pod.Namespace, pod.Name) } diff --git a/discovery/kubernetes/pod_test.go b/discovery/kubernetes/pod_test.go index 7a3079a265..71f7f7e621 100644 --- a/discovery/kubernetes/pod_test.go +++ b/discovery/kubernetes/pod_test.go @@ -95,11 +95,11 @@ func makeMultiPortPods() *v1.Pod { } } -func makePods() *v1.Pod { +func makePods(namespace string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "testpod", - Namespace: "default", + Namespace: namespace, UID: types.UID("abc123"), }, Spec: v1.PodSpec{ @@ -337,7 +337,7 @@ func TestPodDiscoveryAdd(t *testing.T) { k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makePods() + obj := makePods("default") c.CoreV1().Pods(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{}) }, expectedMaxItems: 1, @@ -347,13 +347,13 @@ func TestPodDiscoveryAdd(t *testing.T) { func TestPodDiscoveryDelete(t *testing.T) { t.Parallel() - obj := makePods() + obj := makePods("default") n, c := makeDiscovery(RolePod, NamespaceDiscovery{}, obj) k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makePods() + obj := makePods("default") c.CoreV1().Pods(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{}) }, expectedMaxItems: 2, @@ -399,7 +399,7 @@ func TestPodDiscoveryUpdate(t *testing.T) { k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makePods() + obj := makePods("default") c.CoreV1().Pods(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{}) }, expectedMaxItems: 2, @@ -410,9 +410,9 @@ func TestPodDiscoveryUpdate(t *testing.T) { func TestPodDiscoveryUpdateEmptyPodIP(t *testing.T) { t.Parallel() n, c := makeDiscovery(RolePod, NamespaceDiscovery{}) - initialPod := makePods() + initialPod := makePods("default") - updatedPod := makePods() + updatedPod := makePods("default") updatedPod.Status.PodIP = "" k8sDiscoveryTest{ @@ -444,7 +444,7 @@ func TestPodDiscoveryNamespaces(t *testing.T) { discovery: n, beforeRun: func() { for _, ns := range []string{"ns1", "ns2"} { - pod := makePods() + pod := makePods("default") pod.Namespace = ns c.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{}) } @@ -463,7 +463,7 @@ func TestPodDiscoveryOwnNamespace(t *testing.T) { discovery: n, beforeRun: func() { for _, ns := range []string{"own-ns", "non-own-ns"} { - pod := makePods() + pod := makePods("default") pod.Namespace = ns c.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{}) } @@ -485,7 +485,7 @@ func TestPodDiscoveryWithNodeMetadata(t *testing.T) { nodes := makeNode("testnode", "", "", nodeLbls, nil) c.CoreV1().Nodes().Create(context.Background(), nodes, metav1.CreateOptions{}) - pods := makePods() + pods := makePods("default") c.CoreV1().Pods(pods.Namespace).Create(context.Background(), pods, metav1.CreateOptions{}) }, expectedMaxItems: 2, @@ -507,7 +507,7 @@ func TestPodDiscoveryWithNodeMetadataUpdateNode(t *testing.T) { c.CoreV1().Nodes().Create(context.Background(), nodes, metav1.CreateOptions{}) }, afterStart: func() { - pods := makePods() + pods := makePods("default") c.CoreV1().Pods(pods.Namespace).Create(context.Background(), pods, metav1.CreateOptions{}) nodes := makeNode("testnode", "", "", nodeLbls, nil) @@ -517,3 +517,114 @@ func TestPodDiscoveryWithNodeMetadataUpdateNode(t *testing.T) { expectedRes: expectedPodTargetGroupsWithNodeMeta("default", "testnode", nodeLbls), }.Run(t) } + +func TestPodDiscoveryWithNamespaceMetadata(t *testing.T) { + t.Parallel() + + ns := "test-ns" + nsLabels := map[string]string{"app": "web", "tier": "frontend"} + nsAnnotations := map[string]string{"maintainer": "devops", "build": "v3.4.5"} + + n, _ := makeDiscoveryWithMetadata(RolePod, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, makeNamespace(ns, nsLabels, nsAnnotations), makePods(ns)) + k8sDiscoveryTest{ + discovery: n, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + fmt.Sprintf("pod/%s/testpod", ns): { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_pod_container_image": "testcontainer:latest", + "__meta_kubernetes_pod_container_name": "testcontainer", + "__meta_kubernetes_pod_container_port_name": "testport", + "__meta_kubernetes_pod_container_port_number": "9000", + "__meta_kubernetes_pod_container_port_protocol": "TCP", + "__meta_kubernetes_pod_container_init": "false", + "__meta_kubernetes_pod_container_id": "docker://a1b2c3d4e5f6", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_namespace": model.LabelValue(ns), + "__meta_kubernetes_namespace_annotation_maintainer": "devops", + "__meta_kubernetes_namespace_annotationpresent_maintainer": "true", + "__meta_kubernetes_namespace_annotation_build": "v3.4.5", + "__meta_kubernetes_namespace_annotationpresent_build": "true", + "__meta_kubernetes_namespace_label_app": "web", + "__meta_kubernetes_namespace_labelpresent_app": "true", + "__meta_kubernetes_namespace_label_tier": "frontend", + "__meta_kubernetes_namespace_labelpresent_tier": "true", + "__meta_kubernetes_pod_name": "testpod", + "__meta_kubernetes_pod_ip": "1.2.3.4", + "__meta_kubernetes_pod_ready": "true", + "__meta_kubernetes_pod_phase": "Running", + "__meta_kubernetes_pod_node_name": "testnode", + "__meta_kubernetes_pod_host_ip": "2.3.4.5", + "__meta_kubernetes_pod_uid": "abc123", + }, + Source: fmt.Sprintf("pod/%s/testpod", ns), + }, + }, + }.Run(t) +} + +func TestPodDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) { + t.Parallel() + + ns := "test-ns" + nsLabels := map[string]string{"app": "api", "tier": "backend"} + nsAnnotations := map[string]string{"maintainer": "platform", "build": "v4.5.6"} + + namespace := makeNamespace(ns, nsLabels, nsAnnotations) + n, c := makeDiscoveryWithMetadata(RolePod, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, namespace, makePods(ns)) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + namespace.Labels["app"] = "service" + namespace.Labels["zone"] = "us-east" + namespace.Annotations["maintainer"] = "sre" + namespace.Annotations["deployment"] = "canary" + c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{}) + }, + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + fmt.Sprintf("pod/%s/testpod", ns): { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_pod_container_image": "testcontainer:latest", + "__meta_kubernetes_pod_container_name": "testcontainer", + "__meta_kubernetes_pod_container_port_name": "testport", + "__meta_kubernetes_pod_container_port_number": "9000", + "__meta_kubernetes_pod_container_port_protocol": "TCP", + "__meta_kubernetes_pod_container_init": "false", + "__meta_kubernetes_pod_container_id": "docker://a1b2c3d4e5f6", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_namespace": model.LabelValue(ns), + "__meta_kubernetes_namespace_annotation_maintainer": "sre", + "__meta_kubernetes_namespace_annotationpresent_maintainer": "true", + "__meta_kubernetes_namespace_annotation_build": "v4.5.6", + "__meta_kubernetes_namespace_annotationpresent_build": "true", + "__meta_kubernetes_namespace_annotation_deployment": "canary", + "__meta_kubernetes_namespace_annotationpresent_deployment": "true", + "__meta_kubernetes_namespace_label_app": "service", + "__meta_kubernetes_namespace_labelpresent_app": "true", + "__meta_kubernetes_namespace_label_tier": "backend", + "__meta_kubernetes_namespace_labelpresent_tier": "true", + "__meta_kubernetes_namespace_label_zone": "us-east", + "__meta_kubernetes_namespace_labelpresent_zone": "true", + "__meta_kubernetes_pod_name": "testpod", + "__meta_kubernetes_pod_ip": "1.2.3.4", + "__meta_kubernetes_pod_ready": "true", + "__meta_kubernetes_pod_phase": "Running", + "__meta_kubernetes_pod_node_name": "testnode", + "__meta_kubernetes_pod_host_ip": "2.3.4.5", + "__meta_kubernetes_pod_uid": "abc123", + }, + Source: fmt.Sprintf("pod/%s/testpod", ns), + }, + }, + }.Run(t) +} diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 45f099af4e..00f6c59353 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1967,6 +1967,9 @@ attach_metadata: # Attaches node metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice. # When set to true, Prometheus must have permissions to get Nodes. [ node: | default = false ] +# Attaches namespace metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice. +# When set to true, Prometheus must have permissions to list/watch Namespaces. + [ namespace: | default = false ] # HTTP client settings, including authentication methods (such as basic auth and # authorization), proxy configurations, TLS options, custom HTTP headers, etc.