test: use node informer instead of raw watch

This should improve watch reliability, as it was failing on channel
being closed.

Fixes #10039

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
This commit is contained in:
Andrey Smirnov 2024-12-25 18:52:07 +04:00
parent 5dc15e8db4
commit 27233cf0fc
No known key found for this signature in database
GPG Key ID: FE042E3D4085A811
4 changed files with 68 additions and 64 deletions

View File

@ -11,8 +11,6 @@ import (
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
@ -68,13 +66,7 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) {
suite.T().Logf("updating annotations on node %q (%q)", node, k8sNode.Name)
watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)
defer watcher.Stop()
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)
// set two new annotation
suite.setNodeAnnotations(node, map[string]string{
@ -82,7 +74,7 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) {
"talos.dev/ann2": "value2",
})
suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/ann1": "value1",
"talos.dev/ann2": "value2",
})
@ -92,7 +84,7 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) {
"talos.dev/ann1": "foo",
})
suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/ann1": "foo",
"talos.dev/ann2": "",
})
@ -100,20 +92,17 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) {
// remove all Talos annoations
suite.setNodeAnnotations(node, nil)
suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/ann1": "",
"talos.dev/ann2": "",
})
}
func (suite *NodeAnnotationsSuite) waitUntil(watcher watch.Interface, expectedAnnotations map[string]string) {
func (suite *NodeAnnotationsSuite) waitUntil(watchCh <-chan *v1.Node, expectedAnnotations map[string]string) {
outer:
for {
select {
case ev := <-watcher.ResultChan():
k8sNode, ok := ev.Object.(*v1.Node)
suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object)
case k8sNode := <-watchCh:
suite.T().Logf("annotations %#v", k8sNode.Annotations)
for k, v := range expectedAnnotations {

View File

@ -12,8 +12,6 @@ import (
"github.com/siderolabs/go-pointer"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
@ -63,8 +61,6 @@ func (suite *NodeLabelsSuite) TestUpdateWorker() {
suite.testUpdate(node, false)
}
const metadataKeyName = "metadata.name="
// testUpdate cycles through a set of node label updates reverting the change in the end.
func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
k8sNode, err := suite.GetK8sNodeByInternalIP(suite.ctx, node)
@ -72,13 +68,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
suite.T().Logf("updating labels on node %q (%q)", node, k8sNode.Name)
watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)
defer watcher.Stop()
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)
const stdLabelName = "kubernetes.io/hostname"
@ -90,7 +80,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
"talos.dev/test2": "value2",
})
suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "value1",
"talos.dev/test2": "value2",
}, isControlplane)
@ -100,7 +90,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
"talos.dev/test1": "foo",
})
suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "foo",
"talos.dev/test2": "",
}, isControlplane)
@ -112,7 +102,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
stdLabelName: "bar",
})
suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "foo2",
stdLabelName: stdLabelValue,
}, isControlplane)
@ -121,7 +111,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
// remove all Talos Labels
suite.setNodeLabels(node, nil)
suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "",
"talos.dev/test2": "",
}, isControlplane)
@ -136,34 +126,25 @@ func (suite *NodeLabelsSuite) TestAllowScheduling() {
suite.T().Logf("updating taints on node %q (%q)", node, k8sNode.Name)
watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)
defer watcher.Stop()
suite.waitUntil(watcher, nil, true)
suite.waitUntil(watchCh, nil, true)
suite.setAllowScheduling(node, true)
suite.waitUntil(watcher, nil, false)
suite.waitUntil(watchCh, nil, false)
suite.setAllowScheduling(node, false)
suite.waitUntil(watcher, nil, true)
suite.waitUntil(watchCh, nil, true)
}
//nolint:gocyclo
func (suite *NodeLabelsSuite) waitUntil(watcher watch.Interface, expectedLabels map[string]string, taintNoSchedule bool) {
func (suite *NodeLabelsSuite) waitUntil(watchCh <-chan *v1.Node, expectedLabels map[string]string, taintNoSchedule bool) {
outer:
for {
select {
case ev := <-watcher.ResultChan():
k8sNode, ok := ev.Object.(*v1.Node)
suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object)
case k8sNode := <-watchCh:
suite.T().Logf("labels %#v, taints %#v", k8sNode.Labels, k8sNode.Spec.Taints)
for k, v := range expectedLabels {

View File

@ -16,8 +16,6 @@ import (
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/gen/xtesting/must"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
@ -67,13 +65,7 @@ func (suite *NodeTaintsSuite) testUpdate(node string) {
suite.T().Logf("updating taints on node %q (%q)", node, k8sNode.Name)
watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)
defer watcher.Stop()
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)
// set two new taints
suite.setNodeTaints(node, map[string]string{
@ -81,7 +73,7 @@ func (suite *NodeTaintsSuite) testUpdate(node string) {
"talos.dev/test2": "NoSchedule",
})
suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
constants.LabelNodeRoleControlPlane: "NoSchedule",
"talos.dev/test1": "value1:NoSchedule",
"talos.dev/test2": "NoSchedule",
@ -92,7 +84,7 @@ func (suite *NodeTaintsSuite) testUpdate(node string) {
"talos.dev/test1": "value1:NoSchedule",
})
suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
constants.LabelNodeRoleControlPlane: "NoSchedule",
"talos.dev/test1": "value1:NoSchedule",
})
@ -100,19 +92,16 @@ func (suite *NodeTaintsSuite) testUpdate(node string) {
// remove all taints
suite.setNodeTaints(node, nil)
suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
constants.LabelNodeRoleControlPlane: "NoSchedule",
})
}
func (suite *NodeTaintsSuite) waitUntil(watcher watch.Interface, expectedTaints map[string]string) {
func (suite *NodeTaintsSuite) waitUntil(watchCh <-chan *v1.Node, expectedTaints map[string]string) {
outer:
for {
select {
case ev := <-watcher.ResultChan():
k8sNode, ok := ev.Object.(*v1.Node)
suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object)
case k8sNode := <-watchCh:
suite.T().Logf("labels %#v, taints %#v", k8sNode.Labels, k8sNode.Spec.Taints)
taints := xslices.ToMap(k8sNode.Spec.Taints, func(t v1.Taint) (string, string) {

View File

@ -21,6 +21,7 @@ import (
"strings"
"time"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-pointer"
"github.com/siderolabs/go-retry/retry"
@ -37,6 +38,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
@ -804,3 +806,46 @@ func (k8sSuite *K8sSuite) ToUnstructured(obj runtime.Object) unstructured.Unstru
return u
}
// SetupNodeInformer sets up a node informer for the given node name.
func (k8sSuite *K8sSuite) SetupNodeInformer(ctx context.Context, nodeName string) <-chan *corev1.Node {
const metadataKeyName = "metadata.name="
watchCh := make(chan *corev1.Node)
informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sSuite.Clientset, 30*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = metadataKeyName + nodeName
}))
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
_, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
node, ok := obj.(*corev1.Node)
if !ok {
return
}
channel.SendWithContext(ctx, watchCh, node)
},
UpdateFunc: func(_, obj any) {
node, ok := obj.(*corev1.Node)
if !ok {
return
}
channel.SendWithContext(ctx, watchCh, node)
},
})
k8sSuite.Require().NoError(err)
informerFactory.Start(ctx.Done())
k8sSuite.T().Cleanup(informerFactory.Shutdown)
result := informerFactory.WaitForCacheSync(ctx.Done())
for k, v := range result {
k8sSuite.Assert().True(v, "informer %q failed to sync", k.String())
}
return watchCh
}