mirror of
https://github.com/kubernetes-sigs/external-dns.git
synced 2026-05-04 22:26:11 +02:00
refactor(source/istio): add transformers (#5728)
* chore(source/istio): added transfomrers Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * chore(source/istio): added transfomrers Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * chore(source/istio): added transfomrers Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * chore(source/istio): added transfomrers Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> --------- Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
parent
829b5a946f
commit
d2d2b40a96
@ -15,6 +15,9 @@ package informers
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
corev1lister "k8s.io/client-go/listers/core/v1"
|
||||
discoveryv1lister "k8s.io/client-go/listers/discovery/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
@ -58,3 +61,50 @@ func (f *FakeNodeInformer) Informer() cache.SharedIndexInformer {
|
||||
func (f *FakeNodeInformer) Lister() corev1lister.NodeLister {
|
||||
return corev1lister.NewNodeLister(f.Informer().GetIndexer())
|
||||
}
|
||||
|
||||
func fakeService() *corev1.Service {
|
||||
return &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "fake-service",
|
||||
Namespace: "ns",
|
||||
Labels: map[string]string{"env": "prod", "team": "devops"},
|
||||
Annotations: map[string]string{"description": "Enriched service object"},
|
||||
UID: "1234",
|
||||
},
|
||||
Spec: corev1.ServiceSpec{
|
||||
Selector: map[string]string{"app": "demo"},
|
||||
ExternalIPs: []string{"1.2.3.4"},
|
||||
Ports: []corev1.ServicePort{
|
||||
{
|
||||
Name: "http",
|
||||
Port: 80,
|
||||
TargetPort: intstr.FromInt32(8080),
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
},
|
||||
{
|
||||
Name: "https",
|
||||
Port: 443,
|
||||
TargetPort: intstr.FromInt32(8443),
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
Type: corev1.ServiceTypeLoadBalancer,
|
||||
},
|
||||
Status: corev1.ServiceStatus{
|
||||
LoadBalancer: corev1.LoadBalancerStatus{
|
||||
Ingress: []corev1.LoadBalancerIngress{
|
||||
{IP: "5.6.7.8", Hostname: "lb.example.com"},
|
||||
},
|
||||
},
|
||||
Conditions: []metav1.Condition{
|
||||
{
|
||||
Type: "Available",
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "MinimumReplicasAvailable",
|
||||
Message: "Service is available",
|
||||
LastTransitionTime: metav1.Now(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
84
source/informers/transfomers.go
Normal file
84
source/informers/transfomers.go
Normal file
@ -0,0 +1,84 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package informers
|
||||
|
||||
import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type TransformOptions struct {
|
||||
specSelector bool
|
||||
specExternalIps bool
|
||||
statusLb bool
|
||||
}
|
||||
|
||||
func TransformerWithOptions[T metav1.Object](optFns ...func(options *TransformOptions)) cache.TransformFunc {
|
||||
options := TransformOptions{}
|
||||
for _, fn := range optFns {
|
||||
fn(&options)
|
||||
}
|
||||
return func(obj any) (any, error) {
|
||||
// only transform if the object is a Service at the moment
|
||||
entity, ok := obj.(*corev1.Service)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
if entity.UID == "" {
|
||||
// Pod was already transformed and we must be idempotent.
|
||||
return entity, nil
|
||||
}
|
||||
svc := &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: entity.Name,
|
||||
Namespace: entity.Namespace,
|
||||
DeletionTimestamp: entity.DeletionTimestamp,
|
||||
},
|
||||
Spec: corev1.ServiceSpec{},
|
||||
Status: corev1.ServiceStatus{},
|
||||
}
|
||||
if options.specSelector {
|
||||
svc.Spec.Selector = entity.Spec.Selector
|
||||
}
|
||||
if options.specExternalIps {
|
||||
svc.Spec.ExternalIPs = entity.Spec.ExternalIPs
|
||||
}
|
||||
if options.statusLb {
|
||||
svc.Status.LoadBalancer = entity.Status.LoadBalancer
|
||||
}
|
||||
return svc, nil
|
||||
}
|
||||
}
|
||||
|
||||
// TransformWithSpecSelector enables copying the Service's .spec.selector field.
|
||||
func TransformWithSpecSelector() func(options *TransformOptions) {
|
||||
return func(options *TransformOptions) {
|
||||
options.specSelector = true
|
||||
}
|
||||
}
|
||||
|
||||
// TransformWithSpecExternalIPs enables copying the Service's .spec.externalIPs field.
|
||||
func TransformWithSpecExternalIPs() func(options *TransformOptions) {
|
||||
return func(options *TransformOptions) {
|
||||
options.specExternalIps = true
|
||||
}
|
||||
}
|
||||
|
||||
// TransformWithStatusLoadBalancer enables copying the Service's .status.loadBalancer field.
|
||||
func TransformWithStatusLoadBalancer() func(options *TransformOptions) {
|
||||
return func(options *TransformOptions) {
|
||||
options.statusLb = true
|
||||
}
|
||||
}
|
||||
176
source/informers/transformers_test.go
Normal file
176
source/informers/transformers_test.go
Normal file
@ -0,0 +1,176 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package informers
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
)
|
||||
|
||||
func TestTransformerWithOptions_Service(t *testing.T) {
|
||||
base := fakeService()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
options []func(*TransformOptions)
|
||||
asserts func(any)
|
||||
}{
|
||||
{
|
||||
name: "minimalistic object",
|
||||
options: nil,
|
||||
asserts: func(obj any) {
|
||||
svc, ok := obj.(*corev1.Service)
|
||||
assert.True(t, ok)
|
||||
assert.Empty(t, svc.UID)
|
||||
assert.NotEmpty(t, svc.Name)
|
||||
assert.NotEmpty(t, svc.Namespace)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with selector",
|
||||
options: []func(*TransformOptions){TransformWithSpecSelector()},
|
||||
asserts: func(obj any) {
|
||||
svc, ok := obj.(*corev1.Service)
|
||||
assert.True(t, ok)
|
||||
assert.NotEmpty(t, svc.Spec.Selector)
|
||||
assert.Empty(t, svc.Spec.ExternalIPs)
|
||||
assert.Empty(t, svc.Status.LoadBalancer.Ingress)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with selector",
|
||||
options: []func(*TransformOptions){TransformWithSpecSelector()},
|
||||
asserts: func(obj any) {
|
||||
svc, ok := obj.(*corev1.Service)
|
||||
assert.True(t, ok)
|
||||
assert.NotEmpty(t, svc.Spec.Selector)
|
||||
assert.Empty(t, svc.Spec.ExternalIPs)
|
||||
assert.Empty(t, svc.Status.LoadBalancer.Ingress)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with loadBalancer",
|
||||
options: []func(*TransformOptions){TransformWithStatusLoadBalancer()},
|
||||
asserts: func(obj any) {
|
||||
svc, ok := obj.(*corev1.Service)
|
||||
assert.True(t, ok)
|
||||
assert.Empty(t, svc.Spec.Selector)
|
||||
assert.Empty(t, svc.Spec.ExternalIPs)
|
||||
assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "all options",
|
||||
options: []func(*TransformOptions){
|
||||
TransformWithSpecSelector(),
|
||||
TransformWithSpecExternalIPs(),
|
||||
TransformWithStatusLoadBalancer(),
|
||||
},
|
||||
asserts: func(obj any) {
|
||||
svc, ok := obj.(*corev1.Service)
|
||||
assert.True(t, ok)
|
||||
assert.NotEmpty(t, svc.Spec.Selector)
|
||||
assert.NotEmpty(t, svc.Spec.ExternalIPs)
|
||||
assert.NotEmpty(t, svc.Status.LoadBalancer.Ingress)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
transform := TransformerWithOptions[*corev1.Service](tt.options...)
|
||||
got, err := transform(base)
|
||||
require.NoError(t, err)
|
||||
tt.asserts(got)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("non-service input", func(t *testing.T) {
|
||||
transform := TransformerWithOptions[*corev1.Service]()
|
||||
out, err := transform("not-a-service")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if out != nil {
|
||||
t.Errorf("expected nil output for non-service input, got %v", out)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestTransformer_Service_WithFakeClient(t *testing.T) {
|
||||
t.Run("with transformer", func(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
svc := fakeService()
|
||||
fakeClient := fake.NewClientset()
|
||||
|
||||
_, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace))
|
||||
serviceInformer := factory.Core().V1().Services()
|
||||
err = serviceInformer.Informer().SetTransform(TransformerWithOptions[*corev1.Service](
|
||||
TransformWithSpecSelector(),
|
||||
TransformWithSpecExternalIPs(),
|
||||
TransformWithStatusLoadBalancer(),
|
||||
))
|
||||
require.NoError(t, err)
|
||||
|
||||
factory.Start(ctx.Done())
|
||||
err = WaitForCacheSync(ctx, factory)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, svc.Spec.Selector, got.Spec.Selector)
|
||||
assert.Equal(t, svc.Spec.ExternalIPs, got.Spec.ExternalIPs)
|
||||
assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress)
|
||||
assert.NotEqual(t, svc.Annotations, got.Annotations)
|
||||
assert.NotEqual(t, svc.Labels, got.Labels)
|
||||
})
|
||||
|
||||
t.Run("without transformer", func(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
svc := fakeService()
|
||||
fakeClient := fake.NewClientset()
|
||||
|
||||
_, err := fakeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
factory := kubeinformers.NewSharedInformerFactoryWithOptions(fakeClient, 0, kubeinformers.WithNamespace(svc.Namespace))
|
||||
serviceInformer := factory.Core().V1().Services()
|
||||
|
||||
err = serviceInformer.Informer().GetIndexer().Add(svc)
|
||||
require.NoError(t, err)
|
||||
|
||||
factory.Start(ctx.Done())
|
||||
err = WaitForCacheSync(ctx, factory)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, map[string]string{"app": "demo"}, got.Spec.Selector)
|
||||
assert.Equal(t, []string{"1.2.3.4"}, got.Spec.ExternalIPs)
|
||||
assert.Equal(t, svc.Status.LoadBalancer.Ingress, got.Status.LoadBalancer.Ingress)
|
||||
assert.Equal(t, svc.Annotations, got.Annotations)
|
||||
assert.Equal(t, svc.Labels, got.Labels)
|
||||
})
|
||||
}
|
||||
@ -28,12 +28,12 @@ import (
|
||||
istioclient "istio.io/client-go/pkg/clientset/versioned"
|
||||
istioinformers "istio.io/client-go/pkg/informers/externalversions"
|
||||
networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
@ -84,30 +84,26 @@ func NewIstioGatewaySource(
|
||||
gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways()
|
||||
|
||||
// Add default resource event handlers to properly initialize informer.
|
||||
_, _ = serviceInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
log.Debug("service added")
|
||||
},
|
||||
},
|
||||
)
|
||||
_, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
|
||||
err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service](
|
||||
informers.TransformWithSpecSelector(),
|
||||
informers.TransformWithSpecExternalIPs(),
|
||||
informers.TransformWithStatusLoadBalancer(),
|
||||
))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, _ = gatewayInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
log.Debug("gateway added")
|
||||
},
|
||||
},
|
||||
)
|
||||
_, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
|
||||
|
||||
informerFactory.Start(ctx.Done())
|
||||
istioInformerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@ -32,6 +32,7 @@ import (
|
||||
networkv1 "k8s.io/api/networking/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
@ -1634,6 +1635,99 @@ func TestGatewaySource_GWSelectorMatchServiceSelector(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransformerInIstioGatewaySource(t *testing.T) {
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "fake-service",
|
||||
Namespace: "default",
|
||||
Labels: map[string]string{
|
||||
"label1": "value1",
|
||||
"label2": "value2",
|
||||
"label3": "value3",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"user-annotation": "value",
|
||||
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
|
||||
"external-dns.alpha.kubernetes.io/random": "value",
|
||||
"other/annotation": "value",
|
||||
},
|
||||
UID: "someuid",
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: map[string]string{
|
||||
"selector": "one",
|
||||
"selector2": "two",
|
||||
"selector3": "three",
|
||||
},
|
||||
ExternalIPs: []string{"1.2.3.4"},
|
||||
Ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "http",
|
||||
Port: 80,
|
||||
TargetPort: intstr.FromInt32(8080),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
{
|
||||
Name: "https",
|
||||
Port: 443,
|
||||
TargetPort: intstr.FromInt32(8443),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
Type: v1.ServiceTypeLoadBalancer,
|
||||
},
|
||||
Status: v1.ServiceStatus{
|
||||
LoadBalancer: v1.LoadBalancerStatus{
|
||||
Ingress: []v1.LoadBalancerIngress{
|
||||
{IP: "5.6.7.8", Hostname: "lb.example.com"},
|
||||
},
|
||||
},
|
||||
Conditions: []metav1.Condition{
|
||||
{
|
||||
Type: "Available",
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "MinimumReplicasAvailable",
|
||||
Message: "Service is available",
|
||||
LastTransitionTime: metav1.Now(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fakeClient := fake.NewClientset()
|
||||
|
||||
_, err := fakeClient.CoreV1().Services(svc.Namespace).Create(context.Background(), svc, metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
src, err := NewIstioGatewaySource(
|
||||
t.Context(),
|
||||
fakeClient,
|
||||
istiofake.NewSimpleClientset(),
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
false,
|
||||
false)
|
||||
require.NoError(t, err)
|
||||
gwSource, ok := src.(*gatewaySource)
|
||||
require.True(t, ok)
|
||||
|
||||
rService, err := gwSource.serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, "fake-service", rService.Name)
|
||||
assert.Empty(t, rService.Labels)
|
||||
assert.Empty(t, rService.Annotations)
|
||||
assert.Empty(t, rService.UID)
|
||||
assert.NotEmpty(t, rService.Status.LoadBalancer)
|
||||
assert.Empty(t, rService.Status.Conditions)
|
||||
assert.Equal(t, map[string]string{
|
||||
"selector": "one",
|
||||
"selector2": "two",
|
||||
"selector3": "three",
|
||||
}, rService.Spec.Selector)
|
||||
}
|
||||
|
||||
// gateway specific helper functions
|
||||
func newTestGatewaySource(loadBalancerList []fakeIngressGatewayService, ingressList []fakeIngress) (*gatewaySource, error) {
|
||||
fakeKubernetesClient := fake.NewClientset()
|
||||
|
||||
@ -30,13 +30,13 @@ import (
|
||||
istioclient "istio.io/client-go/pkg/clientset/versioned"
|
||||
istioinformers "istio.io/client-go/pkg/informers/externalversions"
|
||||
networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
"sigs.k8s.io/external-dns/source/annotations"
|
||||
@ -88,38 +88,28 @@ func NewIstioVirtualServiceSource(
|
||||
gatewayInformer := istioInformerFactory.Networking().V1beta1().Gateways()
|
||||
|
||||
// Add default resource event handlers to properly initialize informer.
|
||||
_, _ = serviceInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
log.Debug("service added")
|
||||
},
|
||||
},
|
||||
)
|
||||
_, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
|
||||
err = serviceInformer.Informer().SetTransform(informers.TransformerWithOptions[*corev1.Service](
|
||||
informers.TransformWithSpecSelector(),
|
||||
informers.TransformWithSpecExternalIPs(),
|
||||
informers.TransformWithStatusLoadBalancer(),
|
||||
))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, _ = virtualServiceInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
log.Debug("virtual service added")
|
||||
},
|
||||
},
|
||||
)
|
||||
_, _ = virtualServiceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
|
||||
|
||||
_, _ = gatewayInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
log.Debug("gateway added")
|
||||
},
|
||||
},
|
||||
)
|
||||
_, _ = gatewayInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
|
||||
|
||||
informerFactory.Start(ctx.Done())
|
||||
istioInformerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := informers.WaitForCacheSync(context.Background(), istioInformerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, istioInformerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@ -33,6 +33,7 @@ import (
|
||||
networkv1 "k8s.io/api/networking/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
|
||||
"sigs.k8s.io/external-dns/endpoint"
|
||||
@ -2335,3 +2336,96 @@ func TestIstioVirtualServiceSource_GWServiceSelectorMatchServiceSelector(t *test
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransformerInIstioGatewayVirtualServiceSource(t *testing.T) {
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "fake-service",
|
||||
Namespace: "default",
|
||||
Labels: map[string]string{
|
||||
"label1": "value1",
|
||||
"label2": "value2",
|
||||
"label3": "value3",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"user-annotation": "value",
|
||||
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
|
||||
"external-dns.alpha.kubernetes.io/random": "value",
|
||||
"other/annotation": "value",
|
||||
},
|
||||
UID: "someuid",
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: map[string]string{
|
||||
"selector": "one",
|
||||
"selector2": "two",
|
||||
"selector3": "three",
|
||||
},
|
||||
ExternalIPs: []string{"1.2.3.4"},
|
||||
Ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "http",
|
||||
Port: 80,
|
||||
TargetPort: intstr.FromInt32(8080),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
{
|
||||
Name: "https",
|
||||
Port: 443,
|
||||
TargetPort: intstr.FromInt32(8443),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
Type: v1.ServiceTypeLoadBalancer,
|
||||
},
|
||||
Status: v1.ServiceStatus{
|
||||
LoadBalancer: v1.LoadBalancerStatus{
|
||||
Ingress: []v1.LoadBalancerIngress{
|
||||
{IP: "5.6.7.8", Hostname: "lb.example.com"},
|
||||
},
|
||||
},
|
||||
Conditions: []metav1.Condition{
|
||||
{
|
||||
Type: "Available",
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "MinimumReplicasAvailable",
|
||||
Message: "Service is available",
|
||||
LastTransitionTime: metav1.Now(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fakeClient := fake.NewClientset()
|
||||
|
||||
_, err := fakeClient.CoreV1().Services(svc.Namespace).Create(t.Context(), svc, metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
src, err := NewIstioVirtualServiceSource(
|
||||
t.Context(),
|
||||
fakeClient,
|
||||
istiofake.NewSimpleClientset(),
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
false,
|
||||
false)
|
||||
require.NoError(t, err)
|
||||
gwSource, ok := src.(*virtualServiceSource)
|
||||
require.True(t, ok)
|
||||
|
||||
rService, err := gwSource.serviceInformer.Lister().Services(svc.Namespace).Get(svc.Name)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, svc.Name, rService.Name)
|
||||
assert.Empty(t, rService.Labels)
|
||||
assert.Empty(t, rService.Annotations)
|
||||
assert.Empty(t, rService.UID)
|
||||
assert.NotEmpty(t, rService.Status.LoadBalancer)
|
||||
assert.Empty(t, rService.Status.Conditions)
|
||||
assert.Equal(t, map[string]string{
|
||||
"selector": "one",
|
||||
"selector2": "two",
|
||||
"selector3": "three",
|
||||
}, rService.Spec.Selector)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user