feat(discovery/kubernetes): allow attaching namespace metadata

to endpointslice, endpoints and pod roles

after injecting the labels for endpointslice, claude-4-sonnet
helped transpose the code and tests to endpoints and pod roles

fixes https://github.com/prometheus/prometheus/issues/9510
supersedes https://github.com/prometheus/prometheus/pull/13798

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
Co-authored-by: Paul BARRIE <paul.barrie.calmels@gmail.com>
This commit is contained in:
machine424 2025-07-03 17:06:44 +02:00
parent 61064cb774
commit c2d6e528e4
No known key found for this signature in database
GPG Key ID: A4B001A4FDEE017D
8 changed files with 801 additions and 143 deletions

View File

@ -40,6 +40,8 @@ type Endpoints struct {
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
namespaceInf cache.SharedInformer
withNamespaceMetadata bool
podStore cache.Store
endpointsStore cache.Store
@ -50,7 +52,7 @@ type Endpoints struct {
// NewEndpoints returns a new endpoints discovery.
// Endpoints API is deprecated in k8s v1.33+, but we should still support it.
func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer, eventCount *prometheus.CounterVec) *Endpoints {
func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *Endpoints {
if l == nil {
l = promslog.NewNopLogger()
}
@ -75,6 +77,8 @@ func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
namespaceInf: namespace,
withNamespaceMetadata: namespace != nil,
queue: workqueue.NewNamed(RoleEndpoint.String()),
}
@ -177,6 +181,19 @@ func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node
}
}
if e.withNamespaceMetadata {
_, err = e.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// Create and Delete should be covered by the other handlers.
UpdateFunc: func(_, o interface{}) {
namespace := o.(*apiv1.Namespace)
e.enqueueNamespace(namespace.Name)
},
})
if err != nil {
l.Error("Error adding namespaces event handler.", "err", err)
}
}
return e
}
@ -192,6 +209,18 @@ func (e *Endpoints) enqueueNode(nodeName string) {
}
}
func (e *Endpoints) enqueueNamespace(namespace string) {
endpoints, err := e.endpointsInf.GetIndexer().ByIndex(cache.NamespaceIndex, namespace)
if err != nil {
e.logger.Error("Error getting endpoints in namespace", "namespace", namespace, "err", err)
return
}
for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}
func (e *Endpoints) enqueuePod(podNamespacedName string) {
endpoints, err := e.endpointsInf.GetIndexer().ByIndex(podIndex, podNamespacedName)
if err != nil {
@ -221,6 +250,9 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
if e.withNamespaceMetadata {
cacheSyncs = append(cacheSyncs, e.namespaceInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
@ -308,6 +340,10 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *targetgroup.Group {
// Add endpoints labels metadata.
addObjectMetaLabels(tg.Labels, eps.ObjectMeta, RoleEndpoint)
if e.withNamespaceMetadata {
tg.Labels = addNamespaceLabels(tg.Labels, e.namespaceInf, e.logger, eps.Namespace)
}
type podEntry struct {
pod *apiv1.Pod
servicePorts []apiv1.EndpointPort
@ -502,3 +538,20 @@ func addNodeLabels(tg model.LabelSet, nodeInf cache.SharedInformer, logger *slog
addObjectMetaLabels(nodeLabelset, node.ObjectMeta, RoleNode)
return tg.Merge(nodeLabelset)
}
func addNamespaceLabels(tg model.LabelSet, namespaceInf cache.SharedInformer, logger *slog.Logger, namespace string) model.LabelSet {
obj, exists, err := namespaceInf.GetStore().GetByKey(namespace)
if err != nil {
logger.Error("Error getting namespace", "namespace", namespace, "err", err)
return tg
}
if !exists {
return tg
}
n := obj.(*apiv1.Namespace)
namespaceLabelset := make(model.LabelSet)
addNamespaceMetaLabels(namespaceLabelset, n.ObjectMeta)
return tg.Merge(namespaceLabelset)
}

View File

@ -15,6 +15,7 @@ package kubernetes
import (
"context"
"fmt"
"testing"
"github.com/prometheus/common/model"
@ -28,12 +29,12 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
)
func makeEndpoints() *v1.Endpoints {
func makeEndpoints(namespace string) *v1.Endpoints {
nodeName := "foobar"
return &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Namespace: namespace,
Annotations: map[string]string{
"test.annotation": "test",
},
@ -103,7 +104,7 @@ func TestEndpointsDiscoveryBeforeRun(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
beforeRun: func() {
obj := makeEndpoints()
obj := makeEndpoints("default")
c.CoreV1().Endpoints(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
},
expectedMaxItems: 1,
@ -279,12 +280,12 @@ func TestEndpointsDiscoveryAdd(t *testing.T) {
func TestEndpointsDiscoveryDelete(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default"))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeEndpoints()
obj := makeEndpoints("default")
c.CoreV1().Endpoints(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{})
},
expectedMaxItems: 2,
@ -298,7 +299,7 @@ func TestEndpointsDiscoveryDelete(t *testing.T) {
func TestEndpointsDiscoveryUpdate(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default"))
k8sDiscoveryTest{
discovery: n,
@ -370,7 +371,7 @@ func TestEndpointsDiscoveryUpdate(t *testing.T) {
func TestEndpointsDiscoveryEmptySubsets(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default"))
k8sDiscoveryTest{
discovery: n,
@ -399,7 +400,7 @@ func TestEndpointsDiscoveryEmptySubsets(t *testing.T) {
func TestEndpointsDiscoveryWithService(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default"))
k8sDiscoveryTest{
discovery: n,
@ -465,7 +466,7 @@ func TestEndpointsDiscoveryWithService(t *testing.T) {
func TestEndpointsDiscoveryWithServiceUpdate(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints())
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, makeEndpoints("default"))
k8sDiscoveryTest{
discovery: n,
@ -560,7 +561,7 @@ func TestEndpointsDiscoveryWithNodeMetadata(t *testing.T) {
},
},
}
n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), svc, node1, node2)
n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints("default"), svc, node1, node2)
k8sDiscoveryTest{
discovery: n,
@ -634,7 +635,7 @@ func TestEndpointsDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
},
},
}
n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints(), node1, node2, svc)
n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, metadataConfig, makeEndpoints("default"), node1, node2, svc)
k8sDiscoveryTest{
discovery: n,
@ -698,7 +699,7 @@ func TestEndpointsDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
func TestEndpointsDiscoveryNamespaces(t *testing.T) {
t.Parallel()
epOne := makeEndpoints()
epOne := makeEndpoints("default")
epOne.Namespace = "ns1"
objs := []runtime.Object{
epOne,
@ -850,10 +851,10 @@ func TestEndpointsDiscoveryNamespaces(t *testing.T) {
func TestEndpointsDiscoveryOwnNamespace(t *testing.T) {
t.Parallel()
epOne := makeEndpoints()
epOne := makeEndpoints("default")
epOne.Namespace = "own-ns"
epTwo := makeEndpoints()
epTwo := makeEndpoints("default")
epTwo.Namespace = "non-own-ns"
podOne := &v1.Pod{
@ -945,7 +946,7 @@ func TestEndpointsDiscoveryOwnNamespace(t *testing.T) {
func TestEndpointsDiscoveryEmptyPodStatus(t *testing.T) {
t.Parallel()
ep := makeEndpoints()
ep := makeEndpoints("default")
ep.Namespace = "ns"
pod := &v1.Pod{
@ -1274,6 +1275,145 @@ func TestEndpointsDiscoverySidecarContainer(t *testing.T) {
}.Run(t)
}
func TestEndpointsDiscoveryWithNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"environment": "production", "team": "backend"}
nsAnnotations := map[string]string{"owner": "platform", "version": "v1.2.3"}
n, _ := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, makeNamespace(ns, nsLabels, nsAnnotations), makeEndpoints(ns))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("endpoints/%s/testendpoints", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
{
"__address__": "6.7.8.9:9002",
"__meta_kubernetes_endpoint_address_target_kind": "Node",
"__meta_kubernetes_endpoint_address_target_name": "barbaz",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_owner": "platform",
"__meta_kubernetes_namespace_annotationpresent_owner": "true",
"__meta_kubernetes_namespace_annotation_version": "v1.2.3",
"__meta_kubernetes_namespace_annotationpresent_version": "true",
"__meta_kubernetes_namespace_label_environment": "production",
"__meta_kubernetes_namespace_labelpresent_environment": "true",
"__meta_kubernetes_namespace_label_team": "backend",
"__meta_kubernetes_namespace_labelpresent_team": "true",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_endpoints_annotation_test_annotation": "test",
"__meta_kubernetes_endpoints_annotationpresent_test_annotation": "true",
},
Source: fmt.Sprintf("endpoints/%s/testendpoints", ns),
},
},
}.Run(t)
}
func TestEndpointsDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"environment": "development", "team": "frontend"}
nsAnnotations := map[string]string{"owner": "devops", "version": "v2.1.0"}
namespace := makeNamespace(ns, nsLabels, nsAnnotations)
n, c := makeDiscoveryWithMetadata(RoleEndpoint, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, namespace, makeEndpoints(ns))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 2,
afterStart: func() {
namespace.Labels["environment"] = "staging"
namespace.Labels["region"] = "us-west"
namespace.Annotations["owner"] = "sre"
namespace.Annotations["cost-center"] = "engineering"
c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{})
},
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("endpoints/%s/testendpoints", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpoint_node_name": "foobar",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
{
"__address__": "6.7.8.9:9002",
"__meta_kubernetes_endpoint_address_target_kind": "Node",
"__meta_kubernetes_endpoint_address_target_name": "barbaz",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_owner": "sre",
"__meta_kubernetes_namespace_annotationpresent_owner": "true",
"__meta_kubernetes_namespace_annotation_version": "v2.1.0",
"__meta_kubernetes_namespace_annotationpresent_version": "true",
"__meta_kubernetes_namespace_annotation_cost_center": "engineering",
"__meta_kubernetes_namespace_annotationpresent_cost_center": "true",
"__meta_kubernetes_namespace_label_environment": "staging",
"__meta_kubernetes_namespace_labelpresent_environment": "true",
"__meta_kubernetes_namespace_label_team": "frontend",
"__meta_kubernetes_namespace_labelpresent_team": "true",
"__meta_kubernetes_namespace_label_region": "us-west",
"__meta_kubernetes_namespace_labelpresent_region": "true",
"__meta_kubernetes_endpoints_name": "testendpoints",
"__meta_kubernetes_endpoints_annotation_test_annotation": "test",
"__meta_kubernetes_endpoints_annotationpresent_test_annotation": "true",
},
Source: fmt.Sprintf("endpoints/%s/testendpoints", ns),
},
},
}.Run(t)
}
func BenchmarkResolvePodRef(b *testing.B) {
indexer := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil)
e := &Endpoints{

View File

@ -43,6 +43,8 @@ type EndpointSlice struct {
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
namespaceInf cache.SharedInformer
withNamespaceMetadata bool
podStore cache.Store
endpointSliceStore cache.Store
@ -52,7 +54,7 @@ type EndpointSlice struct {
}
// NewEndpointSlice returns a new endpointslice discovery.
func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node cache.SharedInformer, eventCount *prometheus.CounterVec) *EndpointSlice {
func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *EndpointSlice {
if l == nil {
l = promslog.NewNopLogger()
}
@ -75,6 +77,8 @@ func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, n
podStore: pod.GetStore(),
nodeInf: node,
withNodeMetadata: node != nil,
namespaceInf: namespace,
withNamespaceMetadata: namespace != nil,
queue: workqueue.NewNamed(RoleEndpointSlice.String()),
}
@ -154,6 +158,19 @@ func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, n
}
}
if e.withNamespaceMetadata {
_, err = e.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// Create and Delete should be covered by the other handlers.
UpdateFunc: func(_, o interface{}) {
namespace := o.(*apiv1.Namespace)
e.enqueueNamespace(namespace.Name)
},
})
if err != nil {
l.Error("Error adding namespaces event handler.", "err", err)
}
}
return e
}
@ -169,6 +186,18 @@ func (e *EndpointSlice) enqueueNode(nodeName string) {
}
}
func (e *EndpointSlice) enqueueNamespace(namespace string) {
endpoints, err := e.endpointSliceInf.GetIndexer().ByIndex(cache.NamespaceIndex, namespace)
if err != nil {
e.logger.Error("Error getting endpoints in namespace", "namespace", namespace, "err", err)
return
}
for _, endpoint := range endpoints {
e.enqueue(endpoint)
}
}
func (e *EndpointSlice) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
@ -186,6 +215,9 @@ func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group)
if e.withNodeMetadata {
cacheSyncs = append(cacheSyncs, e.nodeInf.HasSynced)
}
if e.withNamespaceMetadata {
cacheSyncs = append(cacheSyncs, e.namespaceInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
e.logger.Error("endpointslice informer unable to sync cache")
@ -274,6 +306,10 @@ func (e *EndpointSlice) buildEndpointSlice(eps v1.EndpointSlice) *targetgroup.Gr
e.addServiceLabels(eps, tg)
if e.withNamespaceMetadata {
tg.Labels = addNamespaceLabels(tg.Labels, e.namespaceInf, e.logger, eps.Namespace)
}
type podEntry struct {
pod *apiv1.Pod
servicePorts []v1.EndpointPort

View File

@ -15,6 +15,7 @@ package kubernetes
import (
"context"
"fmt"
"testing"
"github.com/prometheus/common/model"
@ -44,11 +45,11 @@ func protocolptr(p corev1.Protocol) *corev1.Protocol {
return &p
}
func makeEndpointSliceV1() *v1.EndpointSlice {
func makeEndpointSliceV1(namespace string) *v1.EndpointSlice {
return &v1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: "default",
Namespace: namespace,
Labels: map[string]string{
v1.LabelServiceName: "testendpoints",
},
@ -113,6 +114,16 @@ func makeEndpointSliceV1() *v1.EndpointSlice {
}
}
func makeNamespace(name string, labels, annotations map[string]string) *corev1.Namespace {
return &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
Annotations: annotations,
},
}
}
func TestEndpointSliceDiscoveryBeforeRun(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}})
@ -120,7 +131,7 @@ func TestEndpointSliceDiscoveryBeforeRun(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
beforeRun: func() {
obj := makeEndpointSliceV1()
obj := makeEndpointSliceV1("default")
c.DiscoveryV1().EndpointSlices(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
},
expectedMaxItems: 1,
@ -325,12 +336,12 @@ func TestEndpointSliceDiscoveryAdd(t *testing.T) {
func TestEndpointSliceDiscoveryDelete(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1())
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default"))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeEndpointSliceV1()
obj := makeEndpointSliceV1("default")
c.DiscoveryV1().EndpointSlices(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{})
},
expectedMaxItems: 2,
@ -344,12 +355,12 @@ func TestEndpointSliceDiscoveryDelete(t *testing.T) {
func TestEndpointSliceDiscoveryUpdate(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1())
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default"))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeEndpointSliceV1()
obj := makeEndpointSliceV1("default")
obj.ObjectMeta.Labels = nil
obj.ObjectMeta.Annotations = nil
obj.Endpoints = obj.Endpoints[0:2]
@ -401,12 +412,12 @@ func TestEndpointSliceDiscoveryUpdate(t *testing.T) {
func TestEndpointSliceDiscoveryEmptyEndpoints(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1())
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default"))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makeEndpointSliceV1()
obj := makeEndpointSliceV1("default")
obj.Endpoints = []v1.Endpoint{}
c.DiscoveryV1().EndpointSlices(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{})
},
@ -430,7 +441,7 @@ func TestEndpointSliceDiscoveryEmptyEndpoints(t *testing.T) {
func TestEndpointSliceDiscoveryWithService(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1())
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default"))
k8sDiscoveryTest{
discovery: n,
@ -523,7 +534,7 @@ func TestEndpointSliceDiscoveryWithService(t *testing.T) {
func TestEndpointSliceDiscoveryWithServiceUpdate(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1())
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, makeEndpointSliceV1("default"))
k8sDiscoveryTest{
discovery: n,
@ -643,7 +654,7 @@ func TestEndpointsSlicesDiscoveryWithNodeMetadata(t *testing.T) {
},
},
}
objs := []runtime.Object{makeEndpointSliceV1(), makeNode("foobar", "", "", nodeLabels1, nil), makeNode("barbaz", "", "", nodeLabels2, nil), svc}
objs := []runtime.Object{makeEndpointSliceV1("default"), makeNode("foobar", "", "", nodeLabels1, nil), makeNode("barbaz", "", "", nodeLabels2, nil), svc}
n, _ := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...)
k8sDiscoveryTest{
@ -745,7 +756,7 @@ func TestEndpointsSlicesDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
}
node1 := makeNode("foobar", "", "", nodeLabels1, nil)
node2 := makeNode("barbaz", "", "", nodeLabels2, nil)
objs := []runtime.Object{makeEndpointSliceV1(), node1, node2, svc}
objs := []runtime.Object{makeEndpointSliceV1("default"), node1, node2, svc}
n, c := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, objs...)
k8sDiscoveryTest{
@ -837,7 +848,7 @@ func TestEndpointsSlicesDiscoveryWithUpdatedNodeMetadata(t *testing.T) {
func TestEndpointSliceDiscoveryNamespaces(t *testing.T) {
t.Parallel()
epOne := makeEndpointSliceV1()
epOne := makeEndpointSliceV1("default")
epOne.Namespace = "ns1"
objs := []runtime.Object{
epOne,
@ -1014,10 +1025,10 @@ func TestEndpointSliceDiscoveryNamespaces(t *testing.T) {
func TestEndpointSliceDiscoveryOwnNamespace(t *testing.T) {
t.Parallel()
epOne := makeEndpointSliceV1()
epOne := makeEndpointSliceV1("default")
epOne.Namespace = "own-ns"
epTwo := makeEndpointSliceV1()
epTwo := makeEndpointSliceV1("default")
epTwo.Namespace = "non-own-ns"
podOne := &corev1.Pod{
@ -1135,7 +1146,7 @@ func TestEndpointSliceDiscoveryOwnNamespace(t *testing.T) {
func TestEndpointSliceDiscoveryEmptyPodStatus(t *testing.T) {
t.Parallel()
ep := makeEndpointSliceV1()
ep := makeEndpointSliceV1("default")
ep.Namespace = "ns"
pod := &corev1.Pod{
@ -1380,3 +1391,223 @@ func TestEndpointSliceDiscoverySidecarContainer(t *testing.T) {
},
}.Run(t)
}
func TestEndpointsSlicesDiscoveryWithNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"service": "web", "layer": "frontend"}
nsAnnotations := map[string]string{"contact": "platform", "release": "v5.6.7"}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: ns,
Labels: map[string]string{
"app/name": "test",
},
},
}
objs := []runtime.Object{makeNamespace(ns, nsLabels, nsAnnotations), svc, makeEndpointSliceV1(ns)}
n, _ := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, objs...)
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("endpointslice/%s/testendpoints", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "",
"__meta_kubernetes_endpointslice_address_target_name": "",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpointslice_endpoint_node_name": "foobar",
"__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_topology": "value",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "2.3.4.5:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1b",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "3.4.5.6:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "true",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1c",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "4.5.6.7:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "Node",
"__meta_kubernetes_endpointslice_address_target_name": "barbaz",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_name": "testendpoints",
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "testendpoints",
"__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true",
"__meta_kubernetes_endpointslice_annotation_test_annotation": "test",
"__meta_kubernetes_endpointslice_annotationpresent_test_annotation": "true",
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_contact": "platform",
"__meta_kubernetes_namespace_annotationpresent_contact": "true",
"__meta_kubernetes_namespace_annotation_release": "v5.6.7",
"__meta_kubernetes_namespace_annotationpresent_release": "true",
"__meta_kubernetes_namespace_label_service": "web",
"__meta_kubernetes_namespace_labelpresent_service": "true",
"__meta_kubernetes_namespace_label_layer": "frontend",
"__meta_kubernetes_namespace_labelpresent_layer": "true",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: fmt.Sprintf("endpointslice/%s/testendpoints", ns),
},
},
}.Run(t)
}
func TestEndpointsSlicesDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"component": "database", "layer": "backend"}
nsAnnotations := map[string]string{"contact": "dba", "release": "v6.7.8"}
metadataConfig := AttachMetadataConfig{Namespace: true}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "testendpoints",
Namespace: ns,
Labels: map[string]string{
"app/name": "test",
},
},
}
namespace := makeNamespace(ns, nsLabels, nsAnnotations)
n, c := makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, metadataConfig, namespace, svc, makeEndpointSliceV1(ns))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 2,
afterStart: func() {
namespace.Labels["component"] = "cache"
namespace.Labels["region"] = "us-central"
namespace.Annotations["contact"] = "sre"
namespace.Annotations["monitoring"] = "enabled"
c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{})
},
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("endpointslice/%s/testendpoints", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "",
"__meta_kubernetes_endpointslice_address_target_name": "",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
"__meta_kubernetes_endpointslice_endpoint_node_name": "foobar",
"__meta_kubernetes_endpointslice_endpoint_topology_present_topology": "true",
"__meta_kubernetes_endpointslice_endpoint_topology_topology": "value",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "2.3.4.5:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1b",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "3.4.5.6:9000",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "true",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1c",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
{
"__address__": "4.5.6.7:9000",
"__meta_kubernetes_endpointslice_address_target_kind": "Node",
"__meta_kubernetes_endpointslice_address_target_name": "barbaz",
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_serving": "true",
"__meta_kubernetes_endpointslice_endpoint_conditions_terminating": "false",
"__meta_kubernetes_endpointslice_endpoint_zone": "us-east-1a",
"__meta_kubernetes_endpointslice_port": "9000",
"__meta_kubernetes_endpointslice_port_app_protocol": "http",
"__meta_kubernetes_endpointslice_port_name": "testport",
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_endpointslice_address_type": "IPv4",
"__meta_kubernetes_endpointslice_name": "testendpoints",
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": "testendpoints",
"__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true",
"__meta_kubernetes_endpointslice_annotation_test_annotation": "test",
"__meta_kubernetes_endpointslice_annotationpresent_test_annotation": "true",
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_contact": "sre",
"__meta_kubernetes_namespace_annotationpresent_contact": "true",
"__meta_kubernetes_namespace_annotation_release": "v6.7.8",
"__meta_kubernetes_namespace_annotationpresent_release": "true",
"__meta_kubernetes_namespace_annotation_monitoring": "enabled",
"__meta_kubernetes_namespace_annotationpresent_monitoring": "true",
"__meta_kubernetes_namespace_label_component": "cache",
"__meta_kubernetes_namespace_labelpresent_component": "true",
"__meta_kubernetes_namespace_label_layer": "backend",
"__meta_kubernetes_namespace_labelpresent_layer": "true",
"__meta_kubernetes_namespace_label_region": "us-central",
"__meta_kubernetes_namespace_labelpresent_region": "true",
"__meta_kubernetes_service_label_app_name": "test",
"__meta_kubernetes_service_labelpresent_app_name": "true",
"__meta_kubernetes_service_name": "testendpoints",
},
Source: fmt.Sprintf("endpointslice/%s/testendpoints", ns),
},
},
}.Run(t)
}

View File

@ -153,9 +153,10 @@ type resourceSelector struct {
}
// AttachMetadataConfig is the configuration for attaching additional metadata
// coming from nodes on which the targets are scheduled.
// coming from namespaces or nodes on which the targets are scheduled.
type AttachMetadataConfig struct {
Node bool `yaml:"node"`
Namespace bool `yaml:"namespace"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -397,7 +398,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return e.Watch(ctx, options)
},
}
informer = d.newEndpointSlicesByNodeInformer(elw, &disv1.EndpointSlice{})
informer = d.newIndexedEndpointSlicesInformer(elw, &disv1.EndpointSlice{})
s := d.client.CoreV1().Services(namespace)
slw := &cache.ListWatch{
@ -430,12 +431,18 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
nodeInf = d.newNodeInformer(context.Background())
go nodeInf.Run(ctx.Done())
}
var namespaceInf cache.SharedInformer
if d.attachMetadata.Namespace {
namespaceInf = d.newNamespaceInformer(context.Background())
go namespaceInf.Run(ctx.Done())
}
eps := NewEndpointSlice(
d.logger.With("role", "endpointslice"),
informer,
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
nodeInf,
namespaceInf,
d.metrics.eventCount,
)
d.discoverers = append(d.discoverers, eps)
@ -489,13 +496,19 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
nodeInf = d.newNodeInformer(ctx)
go nodeInf.Run(ctx.Done())
}
var namespaceInf cache.SharedInformer
if d.attachMetadata.Namespace {
namespaceInf = d.newNamespaceInformer(ctx)
go namespaceInf.Run(ctx.Done())
}
eps := NewEndpoints(
d.logger.With("role", "endpoint"),
d.newEndpointsByNodeInformer(elw),
d.newIndexedEndpointsInformer(elw),
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
nodeInf,
namespaceInf,
d.metrics.eventCount,
)
d.discoverers = append(d.discoverers, eps)
@ -509,6 +522,11 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
nodeInformer = d.newNodeInformer(ctx)
go nodeInformer.Run(ctx.Done())
}
var namespaceInformer cache.SharedInformer
if d.attachMetadata.Namespace {
namespaceInformer = d.newNamespaceInformer(ctx)
go namespaceInformer.Run(ctx.Done())
}
for _, namespace := range namespaces {
p := d.client.CoreV1().Pods(namespace)
@ -526,8 +544,9 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
pod := NewPod(
d.logger.With("role", "pod"),
d.newPodsByNodeInformer(plw),
d.newIndexedPodsInformer(plw),
nodeInformer,
namespaceInformer,
d.metrics.eventCount,
)
d.discoverers = append(d.discoverers, pod)
@ -651,7 +670,20 @@ func (d *Discovery) newNodeInformer(ctx context.Context) cache.SharedInformer {
return d.mustNewSharedInformer(nlw, &apiv1.Node{}, resyncDisabled)
}
func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
func (d *Discovery) newNamespaceInformer(ctx context.Context) cache.SharedInformer {
// We don't filter on NamespaceDiscovery.
nlw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Namespaces().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return d.client.CoreV1().Namespaces().Watch(ctx, options)
},
}
return d.mustNewSharedInformer(nlw, &apiv1.Namespace{}, resyncDisabled)
}
func (d *Discovery) newIndexedPodsInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
if d.attachMetadata.Node {
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
@ -663,10 +695,14 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde
}
}
if d.attachMetadata.Namespace {
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc
}
return d.mustNewSharedIndexInformer(plw, &apiv1.Pod{}, resyncDisabled, indexers)
}
func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
func (d *Discovery) newIndexedEndpointsInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
indexers[podIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*apiv1.Endpoints)
@ -683,10 +719,8 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
}
return pods, nil
}
if !d.attachMetadata.Node {
return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
}
if d.attachMetadata.Node {
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*apiv1.Endpoints)
if !ok {
@ -709,11 +743,16 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
}
return nodes, nil
}
}
if d.attachMetadata.Namespace {
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc
}
return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
}
func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
func (d *Discovery) newIndexedEndpointSlicesInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
indexers[serviceIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*disv1.EndpointSlice)
@ -728,10 +767,8 @@ func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object
return []string{namespacedName(e.Namespace, svcName)}, nil
}
if !d.attachMetadata.Node {
return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
}
if d.attachMetadata.Node {
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
e, ok := obj.(*disv1.EndpointSlice)
if !ok {
@ -754,6 +791,11 @@ func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object
return nodes, nil
}
}
if d.attachMetadata.Namespace {
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc
}
return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
}
@ -783,22 +825,29 @@ func (d *Discovery) mustNewSharedIndexInformer(lw cache.ListerWatcher, exampleOb
return informer
}
func addObjectMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, role Role) {
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_name")] = lv(objectMeta.Name)
func addObjectAnnotationsAndLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, resource string) {
for k, v := range objectMeta.Labels {
ln := strutil.SanitizeLabelName(k)
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_label_"+ln)] = lv(v)
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_labelpresent_"+ln)] = presentValue
labelSet[model.LabelName(metaLabelPrefix+resource+"_label_"+ln)] = lv(v)
labelSet[model.LabelName(metaLabelPrefix+resource+"_labelpresent_"+ln)] = presentValue
}
for k, v := range objectMeta.Annotations {
ln := strutil.SanitizeLabelName(k)
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_annotation_"+ln)] = lv(v)
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_annotationpresent_"+ln)] = presentValue
labelSet[model.LabelName(metaLabelPrefix+resource+"_annotation_"+ln)] = lv(v)
labelSet[model.LabelName(metaLabelPrefix+resource+"_annotationpresent_"+ln)] = presentValue
}
}
func addObjectMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, role Role) {
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_name")] = lv(objectMeta.Name)
addObjectAnnotationsAndLabels(labelSet, objectMeta, string(role))
}
func addNamespaceMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta) {
// Omitting the namespace name because should be already injected elsewhere.
addObjectAnnotationsAndLabels(labelSet, objectMeta, "namespace")
}
func namespacedName(namespace, name string) string {
return namespace + "/" + name
}

View File

@ -43,13 +43,15 @@ type Pod struct {
podInf cache.SharedIndexInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
namespaceInf cache.SharedInformer
withNamespaceMetadata bool
store cache.Store
logger *slog.Logger
queue *workqueue.Type
}
// NewPod creates a new pod discovery.
func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedInformer, eventCount *prometheus.CounterVec) *Pod {
func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *Pod {
if l == nil {
l = promslog.NewNopLogger()
}
@ -62,6 +64,8 @@ func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedIn
podInf: pods,
nodeInf: nodes,
withNodeMetadata: nodes != nil,
namespaceInf: namespace,
withNamespaceMetadata: namespace != nil,
store: pods.GetStore(),
logger: l,
queue: workqueue.NewNamed(RolePod.String()),
@ -107,6 +111,19 @@ func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedIn
}
}
if p.withNamespaceMetadata {
_, err = p.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// Create and Delete should be covered by the other handlers.
UpdateFunc: func(_, o interface{}) {
namespace := o.(*apiv1.Namespace)
p.enqueuePodsForNamespace(namespace.Name)
},
})
if err != nil {
l.Error("Error adding namespaces event handler.", "err", err)
}
}
return p
}
@ -127,6 +144,9 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
if p.withNodeMetadata {
cacheSyncs = append(cacheSyncs, p.nodeInf.HasSynced)
}
if p.withNamespaceMetadata {
cacheSyncs = append(cacheSyncs, p.namespaceInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !errors.Is(ctx.Err(), context.Canceled) {
@ -269,6 +289,9 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group {
if p.withNodeMetadata {
tg.Labels = addNodeLabels(tg.Labels, p.nodeInf, p.logger, &pod.Spec.NodeName)
}
if p.withNamespaceMetadata {
tg.Labels = addNamespaceLabels(tg.Labels, p.namespaceInf, p.logger, pod.Namespace)
}
containers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
for i, c := range containers {
@ -327,6 +350,18 @@ func (p *Pod) enqueuePodsForNode(nodeName string) {
}
}
func (p *Pod) enqueuePodsForNamespace(namespace string) {
pods, err := p.podInf.GetIndexer().ByIndex(cache.NamespaceIndex, namespace)
if err != nil {
p.logger.Error("Error getting pods in namespace", "namespace", namespace, "err", err)
return
}
for _, pod := range pods {
p.enqueue(pod.(*apiv1.Pod))
}
}
func podSource(pod *apiv1.Pod) string {
return podSourceFromNamespaceAndName(pod.Namespace, pod.Name)
}

View File

@ -95,11 +95,11 @@ func makeMultiPortPods() *v1.Pod {
}
}
func makePods() *v1.Pod {
func makePods(namespace string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
Namespace: "default",
Namespace: namespace,
UID: types.UID("abc123"),
},
Spec: v1.PodSpec{
@ -337,7 +337,7 @@ func TestPodDiscoveryAdd(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makePods()
obj := makePods("default")
c.CoreV1().Pods(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
},
expectedMaxItems: 1,
@ -347,13 +347,13 @@ func TestPodDiscoveryAdd(t *testing.T) {
func TestPodDiscoveryDelete(t *testing.T) {
t.Parallel()
obj := makePods()
obj := makePods("default")
n, c := makeDiscovery(RolePod, NamespaceDiscovery{}, obj)
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makePods()
obj := makePods("default")
c.CoreV1().Pods(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{})
},
expectedMaxItems: 2,
@ -399,7 +399,7 @@ func TestPodDiscoveryUpdate(t *testing.T) {
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
obj := makePods()
obj := makePods("default")
c.CoreV1().Pods(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{})
},
expectedMaxItems: 2,
@ -410,9 +410,9 @@ func TestPodDiscoveryUpdate(t *testing.T) {
func TestPodDiscoveryUpdateEmptyPodIP(t *testing.T) {
t.Parallel()
n, c := makeDiscovery(RolePod, NamespaceDiscovery{})
initialPod := makePods()
initialPod := makePods("default")
updatedPod := makePods()
updatedPod := makePods("default")
updatedPod.Status.PodIP = ""
k8sDiscoveryTest{
@ -444,7 +444,7 @@ func TestPodDiscoveryNamespaces(t *testing.T) {
discovery: n,
beforeRun: func() {
for _, ns := range []string{"ns1", "ns2"} {
pod := makePods()
pod := makePods("default")
pod.Namespace = ns
c.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
}
@ -463,7 +463,7 @@ func TestPodDiscoveryOwnNamespace(t *testing.T) {
discovery: n,
beforeRun: func() {
for _, ns := range []string{"own-ns", "non-own-ns"} {
pod := makePods()
pod := makePods("default")
pod.Namespace = ns
c.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
}
@ -485,7 +485,7 @@ func TestPodDiscoveryWithNodeMetadata(t *testing.T) {
nodes := makeNode("testnode", "", "", nodeLbls, nil)
c.CoreV1().Nodes().Create(context.Background(), nodes, metav1.CreateOptions{})
pods := makePods()
pods := makePods("default")
c.CoreV1().Pods(pods.Namespace).Create(context.Background(), pods, metav1.CreateOptions{})
},
expectedMaxItems: 2,
@ -507,7 +507,7 @@ func TestPodDiscoveryWithNodeMetadataUpdateNode(t *testing.T) {
c.CoreV1().Nodes().Create(context.Background(), nodes, metav1.CreateOptions{})
},
afterStart: func() {
pods := makePods()
pods := makePods("default")
c.CoreV1().Pods(pods.Namespace).Create(context.Background(), pods, metav1.CreateOptions{})
nodes := makeNode("testnode", "", "", nodeLbls, nil)
@ -517,3 +517,114 @@ func TestPodDiscoveryWithNodeMetadataUpdateNode(t *testing.T) {
expectedRes: expectedPodTargetGroupsWithNodeMeta("default", "testnode", nodeLbls),
}.Run(t)
}
func TestPodDiscoveryWithNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"app": "web", "tier": "frontend"}
nsAnnotations := map[string]string{"maintainer": "devops", "build": "v3.4.5"}
n, _ := makeDiscoveryWithMetadata(RolePod, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, makeNamespace(ns, nsLabels, nsAnnotations), makePods(ns))
k8sDiscoveryTest{
discovery: n,
expectedMaxItems: 1,
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("pod/%s/testpod", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_image": "testcontainer:latest",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
"__meta_kubernetes_pod_container_init": "false",
"__meta_kubernetes_pod_container_id": "docker://a1b2c3d4e5f6",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_maintainer": "devops",
"__meta_kubernetes_namespace_annotationpresent_maintainer": "true",
"__meta_kubernetes_namespace_annotation_build": "v3.4.5",
"__meta_kubernetes_namespace_annotationpresent_build": "true",
"__meta_kubernetes_namespace_label_app": "web",
"__meta_kubernetes_namespace_labelpresent_app": "true",
"__meta_kubernetes_namespace_label_tier": "frontend",
"__meta_kubernetes_namespace_labelpresent_tier": "true",
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_phase": "Running",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_uid": "abc123",
},
Source: fmt.Sprintf("pod/%s/testpod", ns),
},
},
}.Run(t)
}
func TestPodDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
t.Parallel()
ns := "test-ns"
nsLabels := map[string]string{"app": "api", "tier": "backend"}
nsAnnotations := map[string]string{"maintainer": "platform", "build": "v4.5.6"}
namespace := makeNamespace(ns, nsLabels, nsAnnotations)
n, c := makeDiscoveryWithMetadata(RolePod, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, namespace, makePods(ns))
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
namespace.Labels["app"] = "service"
namespace.Labels["zone"] = "us-east"
namespace.Annotations["maintainer"] = "sre"
namespace.Annotations["deployment"] = "canary"
c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{})
},
expectedMaxItems: 2,
expectedRes: map[string]*targetgroup.Group{
fmt.Sprintf("pod/%s/testpod", ns): {
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_image": "testcontainer:latest",
"__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport",
"__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP",
"__meta_kubernetes_pod_container_init": "false",
"__meta_kubernetes_pod_container_id": "docker://a1b2c3d4e5f6",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": model.LabelValue(ns),
"__meta_kubernetes_namespace_annotation_maintainer": "sre",
"__meta_kubernetes_namespace_annotationpresent_maintainer": "true",
"__meta_kubernetes_namespace_annotation_build": "v4.5.6",
"__meta_kubernetes_namespace_annotationpresent_build": "true",
"__meta_kubernetes_namespace_annotation_deployment": "canary",
"__meta_kubernetes_namespace_annotationpresent_deployment": "true",
"__meta_kubernetes_namespace_label_app": "service",
"__meta_kubernetes_namespace_labelpresent_app": "true",
"__meta_kubernetes_namespace_label_tier": "backend",
"__meta_kubernetes_namespace_labelpresent_tier": "true",
"__meta_kubernetes_namespace_label_zone": "us-east",
"__meta_kubernetes_namespace_labelpresent_zone": "true",
"__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_pod_ip": "1.2.3.4",
"__meta_kubernetes_pod_ready": "true",
"__meta_kubernetes_pod_phase": "Running",
"__meta_kubernetes_pod_node_name": "testnode",
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
"__meta_kubernetes_pod_uid": "abc123",
},
Source: fmt.Sprintf("pod/%s/testpod", ns),
},
},
}.Run(t)
}

View File

@ -1967,6 +1967,9 @@ attach_metadata:
# Attaches node metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice.
# When set to true, Prometheus must have permissions to get Nodes.
[ node: <boolean> | default = false ]
# Attaches namespace metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice.
# When set to true, Prometheus must have permissions to list/watch Namespaces.
[ namespace: <boolean> | default = false ]
# HTTP client settings, including authentication methods (such as basic auth and
# authorization), proxy configurations, TLS options, custom HTTP headers, etc.