diff --git a/source/pod.go b/source/pod.go index baa0e58e5..b996b358e 100644 --- a/source/pod.go +++ b/source/pod.go @@ -24,6 +24,7 @@ import ( log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" kubeinformers "k8s.io/client-go/informers" @@ -76,6 +77,40 @@ func NewPodSource( } _, _ = podInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) + + if fqdnTemplate == "" { + // Transformer is used to reduce the memory usage of the informer. + // The pod informer will otherwise store a full in-memory, go-typed copy of all pod schemas in the cluster. + // If watchList is not used it will not prevent memory bursts on the initial informer sync. + // When fqdnTemplate is used the entire pod needs to be provided to the rendering call, but the informer itself becomes unneeded. + podInformer.Informer().SetTransform(func(i interface{}) (interface{}, error) { + pod, ok := i.(*corev1.Pod) + if !ok { + return nil, fmt.Errorf("object is not a pod") + } + if pod.UID == "" { + // Pod was already transformed and we must be idempotent. + return pod, nil + } + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + // Name/namespace must always be kept for the informer to work. + Name: pod.Name, + Namespace: pod.Namespace, + // Used by the controller. This includes non-external-dns prefixed annotations. + Annotations: pod.Annotations, + }, + Spec: corev1.PodSpec{ + HostNetwork: pod.Spec.HostNetwork, + NodeName: pod.Spec.NodeName, + }, + Status: corev1.PodStatus{ + PodIP: pod.Status.PodIP, + }, + }, nil + }) + } + _, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler()) informerFactory.Start(ctx.Done()) diff --git a/source/pod_test.go b/source/pod_test.go index e6e0f8b06..e9392bcf4 100644 --- a/source/pod_test.go +++ b/source/pod_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1lister "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -997,3 +998,147 @@ func nodesFixturesIPv4() []*corev1.Node { }, } } + +func TestPodTransformerInPodSource(t *testing.T) { + t.Run("transformer set", func(t *testing.T) { + ctx := t.Context() + fakeClient := fake.NewClientset() + + pod := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "test", + }}, + Hostname: "test-hostname", + NodeName: "test-node", + HostNetwork: true, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-ns", + Name: "test-name", + Labels: map[string]string{ + "label1": "value1", + "label2": "value2", + "label3": "value3", + }, + Annotations: map[string]string{ + "user-annotation": "value", + "external-dns.alpha.kubernetes.io/hostname": "test-hostname", + "external-dns.alpha.kubernetes.io/random": "value", + "other/annotation": "value", + }, + UID: "someuid", + }, + Status: v1.PodStatus{ + PodIP: "127.0.0.1", + HostIP: "127.0.0.2", + Conditions: []v1.PodCondition{{ + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, { + Type: v1.ContainersReady, + Status: v1.ConditionFalse, + }}, + }, + } + + _, err := fakeClient.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{}) + require.NoError(t, err) + + // Should not error when creating the source + src, err := NewPodSource(ctx, fakeClient, "", "", false, "", "", false, "", nil) + require.NoError(t, err) + ps, ok := src.(*podSource) + require.True(t, ok) + + retrieved, err := ps.podInformer.Lister().Pods("test-ns").Get("test-name") + require.NoError(t, err) + + // Metadata + assert.Equal(t, "test-name", retrieved.Name) + assert.Equal(t, "test-ns", retrieved.Namespace) + assert.Empty(t, retrieved.UID) + assert.Empty(t, retrieved.Labels) + // Filtered + assert.Equal(t, map[string]string{ + "user-annotation": "value", + "external-dns.alpha.kubernetes.io/hostname": "test-hostname", + "external-dns.alpha.kubernetes.io/random": "value", + "other/annotation": "value", + }, retrieved.Annotations) + + // Spec + assert.Empty(t, retrieved.Spec.Containers) + assert.Empty(t, retrieved.Spec.Hostname) + assert.Equal(t, "test-node", retrieved.Spec.NodeName) + assert.True(t, retrieved.Spec.HostNetwork) + + // Status + assert.Empty(t, retrieved.Status.ContainerStatuses) + assert.Empty(t, retrieved.Status.InitContainerStatuses) + assert.Empty(t, retrieved.Status.HostIP) + assert.Equal(t, "127.0.0.1", retrieved.Status.PodIP) + assert.Empty(t, retrieved.Status.Conditions) + }) + + t.Run("transormer is not used when fqdnTemplate is set", func(t *testing.T) { + ctx := t.Context() + fakeClient := fake.NewClientset() + + pod := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "test", + }}, + Hostname: "test-hostname", + NodeName: "test-node", + HostNetwork: true, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-ns", + Name: "test-name", + Labels: map[string]string{ + "label1": "value1", + "label2": "value2", + "label3": "value3", + }, + Annotations: map[string]string{ + "user-annotation": "value", + "external-dns.alpha.kubernetes.io/hostname": "test-hostname", + "external-dns.alpha.kubernetes.io/random": "value", + "other/annotation": "value", + }, + UID: "someuid", + }, + Status: v1.PodStatus{ + PodIP: "127.0.0.1", + HostIP: "127.0.0.2", + Conditions: []v1.PodCondition{{ + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, { + Type: v1.ContainersReady, + Status: v1.ConditionFalse, + }}, + }, + } + + _, err := fakeClient.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{}) + require.NoError(t, err) + + // Should not error when creating the source + src, err := NewPodSource(ctx, fakeClient, "", "", false, "", "template", false, "", nil) + require.NoError(t, err) + ps, ok := src.(*podSource) + require.True(t, ok) + + retrieved, err := ps.podInformer.Lister().Pods("test-ns").Get("test-name") + require.NoError(t, err) + + // Metadata + assert.Equal(t, "test-name", retrieved.Name) + assert.Equal(t, "test-ns", retrieved.Namespace) + assert.NotEmpty(t, retrieved.UID) + assert.NotEmpty(t, retrieved.Labels) + }) +} diff --git a/source/service.go b/source/service.go index a2a8f8f7f..29447e382 100644 --- a/source/service.go +++ b/source/service.go @@ -29,6 +29,7 @@ import ( log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" kubeinformers "k8s.io/client-go/informers" @@ -134,6 +135,48 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name if err != nil { return nil, err } + + // Transformer is used to reduce the memory usage of the informer. + // The pod informer will otherwise store a full in-memory, go-typed copy of all pod schemas in the cluster. + // If watchList is not used it will not prevent memory bursts on the initial informer sync. + podInformer.Informer().SetTransform(func(i interface{}) (interface{}, error) { + pod, ok := i.(*v1.Pod) + if !ok { + return nil, fmt.Errorf("object is not a pod") + } + if pod.UID == "" { + // Pod was already transformed and we must be idempotent. + return pod, nil + } + + // All pod level annotations we're interested in start with a common prefix + podAnnotations := map[string]string{} + for key, value := range pod.Annotations { + if strings.HasPrefix(key, annotations.AnnotationKeyPrefix) { + podAnnotations[key] = value + } + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + // Name/namespace must always be kept for the informer to work. + Name: pod.Name, + Namespace: pod.Namespace, + // Used to match services. + Labels: pod.Labels, + Annotations: podAnnotations, + DeletionTimestamp: pod.DeletionTimestamp, + }, + Spec: v1.PodSpec{ + Hostname: pod.Spec.Hostname, + NodeName: pod.Spec.NodeName, + }, + Status: v1.PodStatus{ + HostIP: pod.Status.HostIP, + Phase: pod.Status.Phase, + Conditions: pod.Status.Conditions, + }, + }, nil + }) } var nodeInformer coreinformers.NodeInformer diff --git a/source/service_test.go b/source/service_test.go index 07df803bd..26e4320f6 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -4702,6 +4702,110 @@ func TestEndpointSlicesIndexer(t *testing.T) { }) } +func TestPodTransformerInServiceSource(t *testing.T) { + ctx := t.Context() + fakeClient := fake.NewClientset() + + pod := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "test", + }}, + Hostname: "test-hostname", + NodeName: "test-node", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-ns", + Name: "test-name", + Labels: map[string]string{ + "label1": "value1", + "label2": "value2", + "label3": "value3", + }, + Annotations: map[string]string{ + "user-annotation": "value", + "external-dns.alpha.kubernetes.io/hostname": "test-hostname", + "external-dns.alpha.kubernetes.io/random": "value", + "other/annotation": "value", + }, + UID: "someuid", + }, + Status: v1.PodStatus{ + PodIP: "127.0.0.1", + HostIP: "127.0.0.2", + Conditions: []v1.PodCondition{{ + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, { + Type: v1.ContainersReady, + Status: v1.ConditionFalse, + }}, + }, + } + + _, err := fakeClient.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{}) + require.NoError(t, err) + + // Should not error when creating the source + src, err := NewServiceSource( + ctx, + fakeClient, + "", + "", + "{{.Name}}", + false, + "", + false, + false, + false, + []string{}, + false, + labels.Everything(), + false, + false, + false, + ) + require.NoError(t, err) + ss, ok := src.(*serviceSource) + require.True(t, ok) + + retrieved, err := ss.podInformer.Lister().Pods("test-ns").Get("test-name") + require.NoError(t, err) + + // Metadata + assert.Equal(t, "test-name", retrieved.Name) + assert.Equal(t, "test-ns", retrieved.Namespace) + assert.Empty(t, retrieved.UID) + assert.Equal(t, map[string]string{ + "label1": "value1", + "label2": "value2", + "label3": "value3", + }, retrieved.Labels) + // Filtered + assert.Equal(t, map[string]string{ + "external-dns.alpha.kubernetes.io/hostname": "test-hostname", + "external-dns.alpha.kubernetes.io/random": "value", + }, retrieved.Annotations) + + // Spec + assert.Empty(t, retrieved.Spec.Containers) + assert.Equal(t, "test-hostname", retrieved.Spec.Hostname) + assert.Equal(t, "test-node", retrieved.Spec.NodeName) + + // Status + assert.Empty(t, retrieved.Status.ContainerStatuses) + assert.Empty(t, retrieved.Status.InitContainerStatuses) + assert.Equal(t, "127.0.0.2", retrieved.Status.HostIP) + assert.Empty(t, retrieved.Status.PodIP) + assert.ElementsMatch(t, []v1.PodCondition{{ + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, { + Type: v1.ContainersReady, + Status: v1.ConditionFalse, + }}, retrieved.Status.Conditions) +} + // createTestServicesByType creates the requested number of services per type in the given namespace. func createTestServicesByType(namespace string, typeCounts map[v1.ServiceType]int) []*v1.Service { var services []*v1.Service