mirror of
https://github.com/prometheus/prometheus.git
synced 2025-08-05 05:37:10 +02:00
feat(discovery/kubernetes): allow attaching namespace metadata
to ingress and service roles. with the help of claude-4-sonnet Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
parent
c2d6e528e4
commit
a9f6fdd910
@ -183,11 +183,12 @@ func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node,
|
||||
|
||||
if e.withNamespaceMetadata {
|
||||
_, err = e.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
// Create and Delete should be covered by the other handlers.
|
||||
UpdateFunc: func(_, o interface{}) {
|
||||
namespace := o.(*apiv1.Namespace)
|
||||
e.enqueueNamespace(namespace.Name)
|
||||
},
|
||||
// Creation and deletion will trigger events for the change handlers of the resources within the namespace.
|
||||
// No need to have additional handlers for them here.
|
||||
})
|
||||
if err != nil {
|
||||
l.Error("Error adding namespaces event handler.", "err", err)
|
||||
|
@ -160,11 +160,12 @@ func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, n
|
||||
|
||||
if e.withNamespaceMetadata {
|
||||
_, err = e.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
// Create and Delete should be covered by the other handlers.
|
||||
UpdateFunc: func(_, o interface{}) {
|
||||
namespace := o.(*apiv1.Namespace)
|
||||
e.enqueueNamespace(namespace.Name)
|
||||
},
|
||||
// Creation and deletion will trigger events for the change handlers of the resources within the namespace.
|
||||
// No need to have additional handlers for them here.
|
||||
})
|
||||
if err != nil {
|
||||
l.Error("Error adding namespaces event handler.", "err", err)
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/model"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/networking/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
@ -31,23 +32,27 @@ import (
|
||||
|
||||
// Ingress implements discovery of Kubernetes ingress.
|
||||
type Ingress struct {
|
||||
logger *slog.Logger
|
||||
informer cache.SharedInformer
|
||||
store cache.Store
|
||||
queue *workqueue.Type
|
||||
logger *slog.Logger
|
||||
informer cache.SharedIndexInformer
|
||||
store cache.Store
|
||||
queue *workqueue.Type
|
||||
namespaceInf cache.SharedInformer
|
||||
withNamespaceMetadata bool
|
||||
}
|
||||
|
||||
// NewIngress returns a new ingress discovery.
|
||||
func NewIngress(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus.CounterVec) *Ingress {
|
||||
func NewIngress(l *slog.Logger, inf cache.SharedIndexInformer, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *Ingress {
|
||||
ingressAddCount := eventCount.WithLabelValues(RoleIngress.String(), MetricLabelRoleAdd)
|
||||
ingressUpdateCount := eventCount.WithLabelValues(RoleIngress.String(), MetricLabelRoleUpdate)
|
||||
ingressDeleteCount := eventCount.WithLabelValues(RoleIngress.String(), MetricLabelRoleDelete)
|
||||
|
||||
s := &Ingress{
|
||||
logger: l,
|
||||
informer: inf,
|
||||
store: inf.GetStore(),
|
||||
queue: workqueue.NewNamed(RoleIngress.String()),
|
||||
logger: l,
|
||||
informer: inf,
|
||||
store: inf.GetStore(),
|
||||
queue: workqueue.NewNamed(RoleIngress.String()),
|
||||
namespaceInf: namespace,
|
||||
withNamespaceMetadata: namespace != nil,
|
||||
}
|
||||
|
||||
_, err := s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
@ -67,6 +72,21 @@ func NewIngress(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus
|
||||
if err != nil {
|
||||
l.Error("Error adding ingresses event handler.", "err", err)
|
||||
}
|
||||
|
||||
if s.withNamespaceMetadata {
|
||||
_, err = s.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(_, o interface{}) {
|
||||
namespace := o.(*apiv1.Namespace)
|
||||
s.enqueueNamespace(namespace.Name)
|
||||
},
|
||||
// Creation and deletion will trigger events for the change handlers of the resources within the namespace.
|
||||
// No need to have additional handlers for them here.
|
||||
})
|
||||
if err != nil {
|
||||
l.Error("Error adding namespaces event handler.", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
@ -79,11 +99,28 @@ func (i *Ingress) enqueue(obj interface{}) {
|
||||
i.queue.Add(key)
|
||||
}
|
||||
|
||||
func (i *Ingress) enqueueNamespace(namespace string) {
|
||||
ingresses, err := i.informer.GetIndexer().ByIndex(cache.NamespaceIndex, namespace)
|
||||
if err != nil {
|
||||
i.logger.Error("Error getting ingresses in namespace", "namespace", namespace, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, ingress := range ingresses {
|
||||
i.enqueue(ingress)
|
||||
}
|
||||
}
|
||||
|
||||
// Run implements the Discoverer interface.
|
||||
func (i *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
||||
defer i.queue.ShutDown()
|
||||
|
||||
if !cache.WaitForCacheSync(ctx.Done(), i.informer.HasSynced) {
|
||||
cacheSyncs := []cache.InformerSynced{i.informer.HasSynced}
|
||||
if i.withNamespaceMetadata {
|
||||
cacheSyncs = append(cacheSyncs, i.namespaceInf.HasSynced)
|
||||
}
|
||||
|
||||
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
|
||||
if !errors.Is(ctx.Err(), context.Canceled) {
|
||||
i.logger.Error("ingress informer unable to sync cache")
|
||||
}
|
||||
@ -200,6 +237,10 @@ func (i *Ingress) buildIngress(ingress v1.Ingress) *targetgroup.Group {
|
||||
}
|
||||
tg.Labels = ingressLabels(ingress)
|
||||
|
||||
if i.withNamespaceMetadata {
|
||||
tg.Labels = addNamespaceLabels(tg.Labels, i.namespaceInf, i.logger, ingress.Namespace)
|
||||
}
|
||||
|
||||
for _, rule := range ingress.Spec.Rules {
|
||||
scheme := "http"
|
||||
paths := pathsFromIngressPaths(rulePaths(rule))
|
||||
|
@ -34,11 +34,11 @@ const (
|
||||
TLSWildcard
|
||||
)
|
||||
|
||||
func makeIngress(tls TLSMode) *v1.Ingress {
|
||||
func makeIngress(namespace string, tls TLSMode) *v1.Ingress {
|
||||
ret := &v1.Ingress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testingress",
|
||||
Namespace: "default",
|
||||
Namespace: namespace,
|
||||
Labels: map[string]string{"test/label": "testvalue"},
|
||||
Annotations: map[string]string{"test/annotation": "testannotationvalue"},
|
||||
},
|
||||
@ -150,7 +150,7 @@ func TestIngressDiscoveryAdd(t *testing.T) {
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
obj := makeIngress(TLSNo)
|
||||
obj := makeIngress("default", TLSNo)
|
||||
c.NetworkingV1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
},
|
||||
expectedMaxItems: 1,
|
||||
@ -165,7 +165,7 @@ func TestIngressDiscoveryAddTLS(t *testing.T) {
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
obj := makeIngress(TLSYes)
|
||||
obj := makeIngress("default", TLSYes)
|
||||
c.NetworkingV1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
},
|
||||
expectedMaxItems: 1,
|
||||
@ -180,7 +180,7 @@ func TestIngressDiscoveryAddMixed(t *testing.T) {
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
obj := makeIngress(TLSMixed)
|
||||
obj := makeIngress("default", TLSMixed)
|
||||
c.NetworkingV1().Ingresses("default").Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
},
|
||||
expectedMaxItems: 1,
|
||||
@ -200,8 +200,7 @@ func TestIngressDiscoveryNamespaces(t *testing.T) {
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
for _, ns := range []string{"ns1", "ns2"} {
|
||||
obj := makeIngress(TLSNo)
|
||||
obj.Namespace = ns
|
||||
obj := makeIngress(ns, TLSNo)
|
||||
c.NetworkingV1().Ingresses(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
}
|
||||
},
|
||||
@ -219,8 +218,7 @@ func TestIngressDiscoveryOwnNamespace(t *testing.T) {
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
for _, ns := range []string{"own-ns", "non-own-ns"} {
|
||||
obj := makeIngress(TLSNo)
|
||||
obj.Namespace = ns
|
||||
obj := makeIngress(ns, TLSNo)
|
||||
c.NetworkingV1().Ingresses(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
}
|
||||
},
|
||||
@ -228,3 +226,128 @@ func TestIngressDiscoveryOwnNamespace(t *testing.T) {
|
||||
expectedRes: expected,
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestIngressDiscoveryWithNamespaceMetadata(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ns := "test-ns"
|
||||
nsLabels := map[string]string{"service": "web", "layer": "frontend"}
|
||||
nsAnnotations := map[string]string{"contact": "platform", "release": "v5.6.7"}
|
||||
|
||||
n, _ := makeDiscoveryWithMetadata(RoleIngress, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, makeNamespace(ns, nsLabels, nsAnnotations), makeIngress(ns, TLSNo))
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
expectedMaxItems: 1,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
fmt.Sprintf("ingress/%s/testingress", ns): {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "example.com",
|
||||
"__meta_kubernetes_ingress_host": "example.com",
|
||||
"__meta_kubernetes_ingress_path": "/",
|
||||
"__meta_kubernetes_ingress_scheme": "http",
|
||||
},
|
||||
{
|
||||
"__address__": "example.com",
|
||||
"__meta_kubernetes_ingress_host": "example.com",
|
||||
"__meta_kubernetes_ingress_path": "/foo",
|
||||
"__meta_kubernetes_ingress_scheme": "http",
|
||||
},
|
||||
{
|
||||
"__address__": "test.example.com",
|
||||
"__meta_kubernetes_ingress_host": "test.example.com",
|
||||
"__meta_kubernetes_ingress_path": "/",
|
||||
"__meta_kubernetes_ingress_scheme": "http",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_namespace": model.LabelValue(ns),
|
||||
"__meta_kubernetes_namespace_annotation_contact": "platform",
|
||||
"__meta_kubernetes_namespace_annotationpresent_contact": "true",
|
||||
"__meta_kubernetes_namespace_annotation_release": "v5.6.7",
|
||||
"__meta_kubernetes_namespace_annotationpresent_release": "true",
|
||||
"__meta_kubernetes_namespace_label_service": "web",
|
||||
"__meta_kubernetes_namespace_labelpresent_service": "true",
|
||||
"__meta_kubernetes_namespace_label_layer": "frontend",
|
||||
"__meta_kubernetes_namespace_labelpresent_layer": "true",
|
||||
"__meta_kubernetes_ingress_name": "testingress",
|
||||
"__meta_kubernetes_ingress_label_test_label": "testvalue",
|
||||
"__meta_kubernetes_ingress_labelpresent_test_label": "true",
|
||||
"__meta_kubernetes_ingress_annotation_test_annotation": "testannotationvalue",
|
||||
"__meta_kubernetes_ingress_annotationpresent_test_annotation": "true",
|
||||
"__meta_kubernetes_ingress_class_name": "testclass",
|
||||
},
|
||||
Source: fmt.Sprintf("ingress/%s/testingress", ns),
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestIngressDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ns := "test-ns"
|
||||
nsLabels := map[string]string{"component": "database", "layer": "backend"}
|
||||
nsAnnotations := map[string]string{"contact": "dba", "release": "v6.7.8"}
|
||||
|
||||
namespace := makeNamespace(ns, nsLabels, nsAnnotations)
|
||||
n, c := makeDiscoveryWithMetadata(RoleIngress, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, namespace, makeIngress(ns, TLSNo))
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
expectedMaxItems: 2,
|
||||
afterStart: func() {
|
||||
namespace.Labels["component"] = "cache"
|
||||
namespace.Labels["region"] = "us-central"
|
||||
namespace.Annotations["contact"] = "sre"
|
||||
namespace.Annotations["monitoring"] = "enabled"
|
||||
c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{})
|
||||
},
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
fmt.Sprintf("ingress/%s/testingress", ns): {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "example.com",
|
||||
"__meta_kubernetes_ingress_host": "example.com",
|
||||
"__meta_kubernetes_ingress_path": "/",
|
||||
"__meta_kubernetes_ingress_scheme": "http",
|
||||
},
|
||||
{
|
||||
"__address__": "example.com",
|
||||
"__meta_kubernetes_ingress_host": "example.com",
|
||||
"__meta_kubernetes_ingress_path": "/foo",
|
||||
"__meta_kubernetes_ingress_scheme": "http",
|
||||
},
|
||||
{
|
||||
"__address__": "test.example.com",
|
||||
"__meta_kubernetes_ingress_host": "test.example.com",
|
||||
"__meta_kubernetes_ingress_path": "/",
|
||||
"__meta_kubernetes_ingress_scheme": "http",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_namespace": model.LabelValue(ns),
|
||||
"__meta_kubernetes_namespace_annotation_contact": "sre",
|
||||
"__meta_kubernetes_namespace_annotationpresent_contact": "true",
|
||||
"__meta_kubernetes_namespace_annotation_release": "v6.7.8",
|
||||
"__meta_kubernetes_namespace_annotationpresent_release": "true",
|
||||
"__meta_kubernetes_namespace_annotation_monitoring": "enabled",
|
||||
"__meta_kubernetes_namespace_annotationpresent_monitoring": "true",
|
||||
"__meta_kubernetes_namespace_label_component": "cache",
|
||||
"__meta_kubernetes_namespace_labelpresent_component": "true",
|
||||
"__meta_kubernetes_namespace_label_layer": "backend",
|
||||
"__meta_kubernetes_namespace_labelpresent_layer": "true",
|
||||
"__meta_kubernetes_namespace_label_region": "us-central",
|
||||
"__meta_kubernetes_namespace_labelpresent_region": "true",
|
||||
"__meta_kubernetes_ingress_name": "testingress",
|
||||
"__meta_kubernetes_ingress_label_test_label": "testvalue",
|
||||
"__meta_kubernetes_ingress_labelpresent_test_label": "true",
|
||||
"__meta_kubernetes_ingress_annotation_test_annotation": "testannotationvalue",
|
||||
"__meta_kubernetes_ingress_annotationpresent_test_annotation": "true",
|
||||
"__meta_kubernetes_ingress_class_name": "testclass",
|
||||
},
|
||||
Source: fmt.Sprintf("ingress/%s/testingress", ns),
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
@ -553,6 +553,12 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
||||
go pod.podInf.Run(ctx.Done())
|
||||
}
|
||||
case RoleService:
|
||||
var namespaceInformer cache.SharedInformer
|
||||
if d.attachMetadata.Namespace {
|
||||
namespaceInformer = d.newNamespaceInformer(ctx)
|
||||
go namespaceInformer.Run(ctx.Done())
|
||||
}
|
||||
|
||||
for _, namespace := range namespaces {
|
||||
s := d.client.CoreV1().Services(namespace)
|
||||
slw := &cache.ListWatch{
|
||||
@ -569,15 +575,21 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
||||
}
|
||||
svc := NewService(
|
||||
d.logger.With("role", "service"),
|
||||
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
|
||||
d.newIndexedServicesInformer(slw),
|
||||
namespaceInformer,
|
||||
d.metrics.eventCount,
|
||||
)
|
||||
d.discoverers = append(d.discoverers, svc)
|
||||
go svc.informer.Run(ctx.Done())
|
||||
}
|
||||
case RoleIngress:
|
||||
var namespaceInformer cache.SharedInformer
|
||||
if d.attachMetadata.Namespace {
|
||||
namespaceInformer = d.newNamespaceInformer(ctx)
|
||||
go namespaceInformer.Run(ctx.Done())
|
||||
}
|
||||
|
||||
for _, namespace := range namespaces {
|
||||
var informer cache.SharedInformer
|
||||
i := d.client.NetworkingV1().Ingresses(namespace)
|
||||
ilw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
@ -591,10 +603,10 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
||||
return i.Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
informer = d.mustNewSharedInformer(ilw, &networkv1.Ingress{}, resyncDisabled)
|
||||
ingress := NewIngress(
|
||||
d.logger.With("role", "ingress"),
|
||||
informer,
|
||||
d.newIndexedIngressesInformer(ilw),
|
||||
namespaceInformer,
|
||||
d.metrics.eventCount,
|
||||
)
|
||||
d.discoverers = append(d.discoverers, ingress)
|
||||
@ -800,6 +812,26 @@ func (d *Discovery) newIndexedEndpointSlicesInformer(plw *cache.ListWatch, objec
|
||||
return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
|
||||
}
|
||||
|
||||
func (d *Discovery) newIndexedServicesInformer(slw *cache.ListWatch) cache.SharedIndexInformer {
|
||||
indexers := make(map[string]cache.IndexFunc)
|
||||
|
||||
if d.attachMetadata.Namespace {
|
||||
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc
|
||||
}
|
||||
|
||||
return d.mustNewSharedIndexInformer(slw, &apiv1.Service{}, resyncDisabled, indexers)
|
||||
}
|
||||
|
||||
func (d *Discovery) newIndexedIngressesInformer(ilw *cache.ListWatch) cache.SharedIndexInformer {
|
||||
indexers := make(map[string]cache.IndexFunc)
|
||||
|
||||
if d.attachMetadata.Namespace {
|
||||
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc
|
||||
}
|
||||
|
||||
return d.mustNewSharedIndexInformer(ilw, &networkv1.Ingress{}, resyncDisabled, indexers)
|
||||
}
|
||||
|
||||
func (d *Discovery) informerWatchErrorHandler(r *cache.Reflector, err error) {
|
||||
d.metrics.failuresCount.Inc()
|
||||
cache.DefaultWatchErrorHandler(r, err)
|
||||
|
@ -113,11 +113,12 @@ func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes, namespace cac
|
||||
|
||||
if p.withNamespaceMetadata {
|
||||
_, err = p.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
// Create and Delete should be covered by the other handlers.
|
||||
UpdateFunc: func(_, o interface{}) {
|
||||
namespace := o.(*apiv1.Namespace)
|
||||
p.enqueuePodsForNamespace(namespace.Name)
|
||||
},
|
||||
// Creation and deletion will trigger events for the change handlers of the resources within the namespace.
|
||||
// No need to have additional handlers for them here.
|
||||
})
|
||||
if err != nil {
|
||||
l.Error("Error adding namespaces event handler.", "err", err)
|
||||
|
@ -33,14 +33,16 @@ import (
|
||||
|
||||
// Service implements discovery of Kubernetes services.
|
||||
type Service struct {
|
||||
logger *slog.Logger
|
||||
informer cache.SharedInformer
|
||||
store cache.Store
|
||||
queue *workqueue.Type
|
||||
logger *slog.Logger
|
||||
informer cache.SharedIndexInformer
|
||||
store cache.Store
|
||||
queue *workqueue.Type
|
||||
namespaceInf cache.SharedInformer
|
||||
withNamespaceMetadata bool
|
||||
}
|
||||
|
||||
// NewService returns a new service discovery.
|
||||
func NewService(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus.CounterVec) *Service {
|
||||
func NewService(l *slog.Logger, inf cache.SharedIndexInformer, namespace cache.SharedInformer, eventCount *prometheus.CounterVec) *Service {
|
||||
if l == nil {
|
||||
l = promslog.NewNopLogger()
|
||||
}
|
||||
@ -50,10 +52,12 @@ func NewService(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus
|
||||
svcDeleteCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleDelete)
|
||||
|
||||
s := &Service{
|
||||
logger: l,
|
||||
informer: inf,
|
||||
store: inf.GetStore(),
|
||||
queue: workqueue.NewNamed(RoleService.String()),
|
||||
logger: l,
|
||||
informer: inf,
|
||||
store: inf.GetStore(),
|
||||
queue: workqueue.NewNamed(RoleService.String()),
|
||||
namespaceInf: namespace,
|
||||
withNamespaceMetadata: namespace != nil,
|
||||
}
|
||||
|
||||
_, err := s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
@ -73,6 +77,21 @@ func NewService(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus
|
||||
if err != nil {
|
||||
l.Error("Error adding services event handler.", "err", err)
|
||||
}
|
||||
|
||||
if s.withNamespaceMetadata {
|
||||
_, err = s.namespaceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(_, o interface{}) {
|
||||
namespace := o.(*apiv1.Namespace)
|
||||
s.enqueueNamespace(namespace.Name)
|
||||
},
|
||||
// Creation and deletion will trigger events for the change handlers of the resources within the namespace.
|
||||
// No need to have additional handlers for them here.
|
||||
})
|
||||
if err != nil {
|
||||
l.Error("Error adding namespaces event handler.", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
@ -85,11 +104,28 @@ func (s *Service) enqueue(obj interface{}) {
|
||||
s.queue.Add(key)
|
||||
}
|
||||
|
||||
func (s *Service) enqueueNamespace(namespace string) {
|
||||
services, err := s.informer.GetIndexer().ByIndex(cache.NamespaceIndex, namespace)
|
||||
if err != nil {
|
||||
s.logger.Error("Error getting services in namespace", "namespace", namespace, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, service := range services {
|
||||
s.enqueue(service)
|
||||
}
|
||||
}
|
||||
|
||||
// Run implements the Discoverer interface.
|
||||
func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
||||
defer s.queue.ShutDown()
|
||||
|
||||
if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) {
|
||||
cacheSyncs := []cache.InformerSynced{s.informer.HasSynced}
|
||||
if s.withNamespaceMetadata {
|
||||
cacheSyncs = append(cacheSyncs, s.namespaceInf.HasSynced)
|
||||
}
|
||||
|
||||
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
|
||||
if !errors.Is(ctx.Err(), context.Canceled) {
|
||||
s.logger.Error("service informer unable to sync cache")
|
||||
}
|
||||
@ -175,6 +211,10 @@ func (s *Service) buildService(svc *apiv1.Service) *targetgroup.Group {
|
||||
}
|
||||
tg.Labels = serviceLabels(svc)
|
||||
|
||||
if s.withNamespaceMetadata {
|
||||
tg.Labels = addNamespaceLabels(tg.Labels, s.namespaceInf, s.logger, svc.Namespace)
|
||||
}
|
||||
|
||||
for _, port := range svc.Spec.Ports {
|
||||
addr := net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", strconv.FormatInt(int64(port.Port), 10))
|
||||
|
||||
|
@ -52,11 +52,11 @@ func makeMultiPortService() *v1.Service {
|
||||
}
|
||||
}
|
||||
|
||||
func makeSuffixedService(suffix string) *v1.Service {
|
||||
func makeService(namespace string) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("testservice%s", suffix),
|
||||
Namespace: "default",
|
||||
Name: "testservice",
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Ports: []v1.ServicePort{
|
||||
@ -72,10 +72,6 @@ func makeSuffixedService(suffix string) *v1.Service {
|
||||
}
|
||||
}
|
||||
|
||||
func makeService() *v1.Service {
|
||||
return makeSuffixedService("")
|
||||
}
|
||||
|
||||
func makeExternalService() *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -124,7 +120,7 @@ func TestServiceDiscoveryAdd(t *testing.T) {
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
obj := makeService()
|
||||
obj := makeService("default")
|
||||
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
obj = makeExternalService()
|
||||
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
@ -191,12 +187,12 @@ func TestServiceDiscoveryAdd(t *testing.T) {
|
||||
|
||||
func TestServiceDiscoveryDelete(t *testing.T) {
|
||||
t.Parallel()
|
||||
n, c := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService())
|
||||
n, c := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService("default"))
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
obj := makeService()
|
||||
obj := makeService("default")
|
||||
c.CoreV1().Services(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{})
|
||||
},
|
||||
expectedMaxItems: 2,
|
||||
@ -210,7 +206,7 @@ func TestServiceDiscoveryDelete(t *testing.T) {
|
||||
|
||||
func TestServiceDiscoveryUpdate(t *testing.T) {
|
||||
t.Parallel()
|
||||
n, c := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService())
|
||||
n, c := makeDiscovery(RoleService, NamespaceDiscovery{}, makeService("default"))
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
@ -261,8 +257,7 @@ func TestServiceDiscoveryNamespaces(t *testing.T) {
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
for _, ns := range []string{"ns1", "ns2"} {
|
||||
obj := makeService()
|
||||
obj.Namespace = ns
|
||||
obj := makeService(ns)
|
||||
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
}
|
||||
},
|
||||
@ -314,8 +309,7 @@ func TestServiceDiscoveryOwnNamespace(t *testing.T) {
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
for _, ns := range []string{"own-ns", "non-own-ns"} {
|
||||
obj := makeService()
|
||||
obj.Namespace = ns
|
||||
obj := makeService(ns)
|
||||
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
}
|
||||
},
|
||||
@ -350,8 +344,7 @@ func TestServiceDiscoveryAllNamespaces(t *testing.T) {
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
for _, ns := range []string{"own-ns", "non-own-ns"} {
|
||||
obj := makeService()
|
||||
obj.Namespace = ns
|
||||
obj := makeService(ns)
|
||||
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
}
|
||||
},
|
||||
@ -394,3 +387,98 @@ func TestServiceDiscoveryAllNamespaces(t *testing.T) {
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestServiceDiscoveryWithNamespaceMetadata(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ns := "test-ns"
|
||||
nsLabels := map[string]string{"environment": "production", "team": "backend"}
|
||||
nsAnnotations := map[string]string{"owner": "platform", "version": "v1.2.3"}
|
||||
|
||||
n, _ := makeDiscoveryWithMetadata(RoleService, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, makeNamespace(ns, nsLabels, nsAnnotations), makeService(ns))
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
expectedMaxItems: 1,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
fmt.Sprintf("svc/%s/testservice", ns): {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "testservice.test-ns.svc:30900",
|
||||
"__meta_kubernetes_service_cluster_ip": "10.0.0.1",
|
||||
"__meta_kubernetes_service_port_name": "testport",
|
||||
"__meta_kubernetes_service_port_number": "30900",
|
||||
"__meta_kubernetes_service_port_protocol": "TCP",
|
||||
"__meta_kubernetes_service_type": "ClusterIP",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_namespace": model.LabelValue(ns),
|
||||
"__meta_kubernetes_namespace_annotation_owner": "platform",
|
||||
"__meta_kubernetes_namespace_annotationpresent_owner": "true",
|
||||
"__meta_kubernetes_namespace_annotation_version": "v1.2.3",
|
||||
"__meta_kubernetes_namespace_annotationpresent_version": "true",
|
||||
"__meta_kubernetes_namespace_label_environment": "production",
|
||||
"__meta_kubernetes_namespace_labelpresent_environment": "true",
|
||||
"__meta_kubernetes_namespace_label_team": "backend",
|
||||
"__meta_kubernetes_namespace_labelpresent_team": "true",
|
||||
"__meta_kubernetes_service_name": "testservice",
|
||||
},
|
||||
Source: fmt.Sprintf("svc/%s/testservice", ns),
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestServiceDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ns := "test-ns"
|
||||
nsLabels := map[string]string{"environment": "development", "team": "frontend"}
|
||||
nsAnnotations := map[string]string{"owner": "devops", "version": "v2.1.0"}
|
||||
|
||||
namespace := makeNamespace(ns, nsLabels, nsAnnotations)
|
||||
n, c := makeDiscoveryWithMetadata(RoleService, NamespaceDiscovery{}, AttachMetadataConfig{Namespace: true}, namespace, makeService(ns))
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
expectedMaxItems: 2,
|
||||
afterStart: func() {
|
||||
namespace.Labels["environment"] = "staging"
|
||||
namespace.Labels["region"] = "us-west"
|
||||
namespace.Annotations["owner"] = "sre"
|
||||
namespace.Annotations["cost-center"] = "engineering"
|
||||
c.CoreV1().Namespaces().Update(context.Background(), namespace, metav1.UpdateOptions{})
|
||||
},
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
fmt.Sprintf("svc/%s/testservice", ns): {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "testservice.test-ns.svc:30900",
|
||||
"__meta_kubernetes_service_cluster_ip": "10.0.0.1",
|
||||
"__meta_kubernetes_service_port_name": "testport",
|
||||
"__meta_kubernetes_service_port_number": "30900",
|
||||
"__meta_kubernetes_service_port_protocol": "TCP",
|
||||
"__meta_kubernetes_service_type": "ClusterIP",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_namespace": model.LabelValue(ns),
|
||||
"__meta_kubernetes_namespace_annotation_owner": "sre",
|
||||
"__meta_kubernetes_namespace_annotationpresent_owner": "true",
|
||||
"__meta_kubernetes_namespace_annotation_version": "v2.1.0",
|
||||
"__meta_kubernetes_namespace_annotationpresent_version": "true",
|
||||
"__meta_kubernetes_namespace_annotation_cost_center": "engineering",
|
||||
"__meta_kubernetes_namespace_annotationpresent_cost_center": "true",
|
||||
"__meta_kubernetes_namespace_label_environment": "staging",
|
||||
"__meta_kubernetes_namespace_labelpresent_environment": "true",
|
||||
"__meta_kubernetes_namespace_label_team": "frontend",
|
||||
"__meta_kubernetes_namespace_labelpresent_team": "true",
|
||||
"__meta_kubernetes_namespace_label_region": "us-west",
|
||||
"__meta_kubernetes_namespace_labelpresent_region": "true",
|
||||
"__meta_kubernetes_service_name": "testservice",
|
||||
},
|
||||
Source: fmt.Sprintf("svc/%s/testservice", ns),
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
@ -1967,7 +1967,7 @@ attach_metadata:
|
||||
# Attaches node metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice.
|
||||
# When set to true, Prometheus must have permissions to get Nodes.
|
||||
[ node: <boolean> | default = false ]
|
||||
# Attaches namespace metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice.
|
||||
# Attaches namespace metadata to discovered targets. Valid for roles: pod, endpoints, endpointslice, service, ingress.
|
||||
# When set to true, Prometheus must have permissions to list/watch Namespaces.
|
||||
[ namespace: <boolean> | default = false ]
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user