feat: use transformers in pod informers to reduce memory footprint

Add a transformer to the pods informer of the pod and service sources.

Refs: #5595

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
This commit is contained in:
Valerian Roche 2025-07-01 18:44:02 -04:00
parent 9fc01b272b
commit 59d54158ee
No known key found for this signature in database
GPG Key ID: DBDC687C250F4CD2
4 changed files with 260 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import (
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kubeinformers "k8s.io/client-go/informers"
@ -76,6 +77,36 @@ func NewPodSource(
}
_, _ = podInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
// Transformer is used to reduce the memory usage of the informer.
// The pod informer will otherwise store a full in-memory, go-typed copy of all pod schemas in the cluster.
// If watchList is not used it will not prevent memory bursts on the initial informer sync.
podInformer.Informer().SetTransform(func(i interface{}) (interface{}, error) {
pod, ok := i.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("object is not a pod")
}
if pod.UID == "" {
// Pod was already transformed and we must be idempotent.
return pod, nil
}
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
// Name/namespace must always be kept for the informer to work.
Name: pod.Name,
Namespace: pod.Namespace,
// Used by the controller. This includes non-external-dns prefixed annotations.
Annotations: pod.Annotations,
},
Spec: corev1.PodSpec{
HostNetwork: pod.Spec.HostNetwork,
NodeName: pod.Spec.NodeName,
},
Status: corev1.PodStatus{
PodIP: pod.Status.PodIP,
},
}, nil
})
_, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
informerFactory.Start(ctx.Done())

View File

@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
@ -997,3 +998,84 @@ func nodesFixturesIPv4() []*corev1.Node {
},
}
}
func TestPodTransformerInPodSource(t *testing.T) {
ctx := t.Context()
fakeClient := fake.NewClientset()
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "test",
}},
Hostname: "test-hostname",
NodeName: "test-node",
HostNetwork: true,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-name",
Labels: map[string]string{
"label1": "value1",
"label2": "value2",
"label3": "value3",
},
Annotations: map[string]string{
"user-annotation": "value",
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
"external-dns.alpha.kubernetes.io/random": "value",
"other/annotation": "value",
},
UID: "someuid",
},
Status: v1.PodStatus{
PodIP: "127.0.0.1",
HostIP: "127.0.0.2",
Conditions: []v1.PodCondition{{
Type: v1.PodReady,
Status: v1.ConditionTrue,
}, {
Type: v1.ContainersReady,
Status: v1.ConditionFalse,
}},
},
}
_, err := fakeClient.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)
// Should not error when creating the source
src, err := NewPodSource(ctx, fakeClient, "", "", false, "", "", false, "", nil)
require.NoError(t, err)
ps, ok := src.(*podSource)
require.True(t, ok)
retrieved, err := ps.podInformer.Lister().Pods("test-ns").Get("test-name")
require.NoError(t, err)
// Metadata
assert.Equal(t, "test-name", retrieved.Name)
assert.Equal(t, "test-ns", retrieved.Namespace)
assert.Empty(t, retrieved.UID)
assert.Empty(t, retrieved.Labels)
// Filtered
assert.Equal(t, map[string]string{
"user-annotation": "value",
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
"external-dns.alpha.kubernetes.io/random": "value",
"other/annotation": "value",
}, retrieved.Annotations)
// Spec
assert.Empty(t, retrieved.Spec.Containers)
assert.Empty(t, retrieved.Spec.Hostname)
assert.Equal(t, "test-node", retrieved.Spec.NodeName)
assert.True(t, retrieved.Spec.HostNetwork)
// Status
assert.Empty(t, retrieved.Status.ContainerStatuses)
assert.Empty(t, retrieved.Status.InitContainerStatuses)
assert.Empty(t, retrieved.Status.HostIP)
assert.Equal(t, "127.0.0.1", retrieved.Status.PodIP)
assert.Empty(t, retrieved.Status.Conditions)
}

View File

@ -29,6 +29,7 @@ import (
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
@ -134,6 +135,48 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
if err != nil {
return nil, err
}
// Transformer is used to reduce the memory usage of the informer.
// The pod informer will otherwise store a full in-memory, go-typed copy of all pod schemas in the cluster.
// If watchList is not used it will not prevent memory bursts on the initial informer sync.
podInformer.Informer().SetTransform(func(i interface{}) (interface{}, error) {
pod, ok := i.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("object is not a pod")
}
if pod.UID == "" {
// Pod was already transformed and we must be idempotent.
return pod, nil
}
// All pod level annotations we're interested in start with a common prefix
podAnnotations := map[string]string{}
for key, value := range pod.Annotations {
if strings.HasPrefix(key, annotations.AnnotationKeyPrefix) {
podAnnotations[key] = value
}
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
// Name/namespace must always be kept for the informer to work.
Name: pod.Name,
Namespace: pod.Namespace,
// Used to match services.
Labels: pod.Labels,
Annotations: podAnnotations,
DeletionTimestamp: pod.DeletionTimestamp,
},
Spec: v1.PodSpec{
Hostname: pod.Spec.Hostname,
NodeName: pod.Spec.NodeName,
},
Status: v1.PodStatus{
HostIP: pod.Status.HostIP,
Phase: pod.Status.Phase,
Conditions: pod.Status.Conditions,
},
}, nil
})
}
var nodeInformer coreinformers.NodeInformer

View File

@ -4702,6 +4702,110 @@ func TestEndpointSlicesIndexer(t *testing.T) {
})
}
func TestPodTransformerInServiceSource(t *testing.T) {
ctx := t.Context()
fakeClient := fake.NewClientset()
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "test",
}},
Hostname: "test-hostname",
NodeName: "test-node",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-name",
Labels: map[string]string{
"label1": "value1",
"label2": "value2",
"label3": "value3",
},
Annotations: map[string]string{
"user-annotation": "value",
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
"external-dns.alpha.kubernetes.io/random": "value",
"other/annotation": "value",
},
UID: "someuid",
},
Status: v1.PodStatus{
PodIP: "127.0.0.1",
HostIP: "127.0.0.2",
Conditions: []v1.PodCondition{{
Type: v1.PodReady,
Status: v1.ConditionTrue,
}, {
Type: v1.ContainersReady,
Status: v1.ConditionFalse,
}},
},
}
_, err := fakeClient.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)
// Should not error when creating the source
src, err := NewServiceSource(
ctx,
fakeClient,
"",
"",
"{{.Name}}",
false,
"",
false,
false,
false,
[]string{},
false,
labels.Everything(),
false,
false,
false,
)
require.NoError(t, err)
ss, ok := src.(*serviceSource)
require.True(t, ok)
retrieved, err := ss.podInformer.Lister().Pods("test-ns").Get("test-name")
require.NoError(t, err)
// Metadata
assert.Equal(t, "test-name", retrieved.Name)
assert.Equal(t, "test-ns", retrieved.Namespace)
assert.Empty(t, retrieved.UID)
assert.Equal(t, map[string]string{
"label1": "value1",
"label2": "value2",
"label3": "value3",
}, retrieved.Labels)
// Filtered
assert.Equal(t, map[string]string{
"external-dns.alpha.kubernetes.io/hostname": "test-hostname",
"external-dns.alpha.kubernetes.io/random": "value",
}, retrieved.Annotations)
// Spec
assert.Empty(t, retrieved.Spec.Containers)
assert.Equal(t, "test-hostname", retrieved.Spec.Hostname)
assert.Equal(t, "test-node", retrieved.Spec.NodeName)
// Status
assert.Empty(t, retrieved.Status.ContainerStatuses)
assert.Empty(t, retrieved.Status.InitContainerStatuses)
assert.Equal(t, "127.0.0.2", retrieved.Status.HostIP)
assert.Empty(t, retrieved.Status.PodIP)
assert.ElementsMatch(t, []v1.PodCondition{{
Type: v1.PodReady,
Status: v1.ConditionTrue,
}, {
Type: v1.ContainersReady,
Status: v1.ConditionFalse,
}}, retrieved.Status.Conditions)
}
// createTestServicesByType creates the requested number of services per type in the given namespace.
func createTestServicesByType(namespace string, typeCounts map[v1.ServiceType]int) []*v1.Service {
var services []*v1.Service