talos/internal/integration/base/k8s.go
Dmitriy Matrenichev c603d2bf95
chore: output more info when ExecuteCommandInPod fails
This should make investigating things like [this](https://github.com/siderolabs/talos/actions/runs/9411253542/job/25924192027)
easier.

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
2024-06-24 20:15:45 +03:00

477 lines
14 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//go:build integration_k8s
package base
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"slices"
"strings"
"time"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-retry/retry"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/tools/remotecommand"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubectl/pkg/scheme"
taloskubernetes "github.com/siderolabs/talos/pkg/kubernetes"
)
// K8sSuite is a base suite for K8s tests.
type K8sSuite struct {
APISuite
Clientset *kubernetes.Clientset
DynamicClient dynamic.Interface
DiscoveryClient *discovery.DiscoveryClient
RestConfig *rest.Config
Mapper *restmapper.DeferredDiscoveryRESTMapper
}
// SetupSuite initializes Kubernetes client.
func (k8sSuite *K8sSuite) SetupSuite() {
k8sSuite.APISuite.SetupSuite()
kubeconfig, err := k8sSuite.Client.Kubeconfig(context.Background())
k8sSuite.Require().NoError(err)
config, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return clientcmd.Load(kubeconfig)
})
k8sSuite.Require().NoError(err)
if k8sSuite.K8sEndpoint != "" {
config.Host = k8sSuite.K8sEndpoint
}
k8sSuite.RestConfig = config
k8sSuite.Clientset, err = kubernetes.NewForConfig(config)
k8sSuite.Require().NoError(err)
k8sSuite.DynamicClient, err = dynamic.NewForConfig(config)
k8sSuite.Require().NoError(err)
k8sSuite.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(config)
k8sSuite.Require().NoError(err)
k8sSuite.Mapper = restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(k8sSuite.DiscoveryClient))
}
// GetK8sNodeByInternalIP returns the kubernetes node by its internal ip or error if it is not found.
func (k8sSuite *K8sSuite) GetK8sNodeByInternalIP(ctx context.Context, internalIP string) (*corev1.Node, error) {
nodeList, err := k8sSuite.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
for _, item := range nodeList.Items {
for _, address := range item.Status.Addresses {
if address.Type == corev1.NodeInternalIP {
if address.Address == internalIP {
return &item, nil
}
}
}
}
return nil, fmt.Errorf("node with internal IP %s not found", internalIP)
}
// WaitForK8sNodeReadinessStatus waits for node to have the given status.
// It retries until the node with the name is found and matches the expected condition.
func (k8sSuite *K8sSuite) WaitForK8sNodeReadinessStatus(ctx context.Context, nodeName string, checkFn func(corev1.ConditionStatus) bool) error {
return retry.Constant(5 * time.Minute).Retry(func() error {
readinessStatus, err := k8sSuite.GetK8sNodeReadinessStatus(ctx, nodeName)
if errors.IsNotFound(err) {
return retry.ExpectedError(err)
}
if taloskubernetes.IsRetryableError(err) {
return retry.ExpectedError(err)
}
if err != nil {
return err
}
if !checkFn(readinessStatus) {
return retry.ExpectedErrorf("node readiness status is %s", readinessStatus)
}
return nil
})
}
// GetK8sNodeReadinessStatus returns the node readiness status of the node.
func (k8sSuite *K8sSuite) GetK8sNodeReadinessStatus(ctx context.Context, nodeName string) (corev1.ConditionStatus, error) {
node, err := k8sSuite.Clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return "", err
}
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady {
return condition.Status, nil
}
}
return "", fmt.Errorf("node %s has no readiness condition", nodeName)
}
// DeleteResource deletes the resource with the given GroupVersionResource, namespace and name.
// Does not return an error if the resource is not found.
func (k8sSuite *K8sSuite) DeleteResource(ctx context.Context, gvr schema.GroupVersionResource, ns, name string) error {
err := k8sSuite.DynamicClient.Resource(gvr).Namespace(ns).Delete(ctx, name, metav1.DeleteOptions{})
if errors.IsNotFound(err) {
return nil
}
return err
}
// EnsureResourceIsDeleted ensures that the resource with the given GroupVersionResource, namespace and name does not exist on Kubernetes.
// It repeatedly checks the resource for the given duration.
func (k8sSuite *K8sSuite) EnsureResourceIsDeleted(
ctx context.Context,
duration time.Duration,
gvr schema.GroupVersionResource,
ns, name string,
) error {
return retry.Constant(duration).RetryWithContext(ctx, func(ctx context.Context) error {
_, err := k8sSuite.DynamicClient.Resource(gvr).Namespace(ns).Get(ctx, name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return nil
}
return err
})
}
// WaitForEventExists waits for the event with the given namespace and check condition to exist on Kubernetes.
func (k8sSuite *K8sSuite) WaitForEventExists(ctx context.Context, ns string, checkFn func(event eventsv1.Event) bool) error {
return retry.Constant(15*time.Second).RetryWithContext(ctx, func(ctx context.Context) error {
events, err := k8sSuite.Clientset.EventsV1().Events(ns).List(ctx, metav1.ListOptions{})
filteredEvents := xslices.Filter(events.Items, func(item eventsv1.Event) bool {
return checkFn(item)
})
if len(filteredEvents) == 0 {
return retry.ExpectedError(err)
}
return nil
})
}
// WaitForPodToBeRunning waits for the pod with the given namespace and name to be running.
func (k8sSuite *K8sSuite) WaitForPodToBeRunning(ctx context.Context, timeout time.Duration, namespace, podName string) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
watcher, err := k8sSuite.Clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
})
if err != nil {
return err
}
defer watcher.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-watcher.ResultChan():
if event.Type == watch.Error {
return fmt.Errorf("error watching pod: %v", event.Object)
}
pod, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}
if pod.Name == podName && pod.Status.Phase == corev1.PodRunning {
return nil
}
}
}
}
// LogPodLogs logs the logs of the pod with the given namespace and name.
func (k8sSuite *K8sSuite) LogPodLogs(ctx context.Context, namespace, podName string) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
req := k8sSuite.Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{})
readCloser, err := req.Stream(ctx)
if err != nil {
k8sSuite.T().Logf("failed to get pod logs: %s", err)
}
defer readCloser.Close() //nolint:errcheck
scanner := bufio.NewScanner(readCloser)
for scanner.Scan() {
k8sSuite.T().Logf("%s/%s: %s", namespace, podName, scanner.Text())
}
}
// WaitForPodToBeDeleted waits for the pod with the given namespace and name to be deleted.
func (k8sSuite *K8sSuite) WaitForPodToBeDeleted(ctx context.Context, timeout time.Duration, namespace, podName string) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
watcher, err := k8sSuite.Clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
})
if err != nil {
return err
}
defer watcher.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-watcher.ResultChan():
if event.Type == watch.Deleted {
return nil
}
if event.Type == watch.Error {
return fmt.Errorf("error watching pod: %v", event.Object)
}
}
}
}
// ExecuteCommandInPod executes the given command in the pod with the given namespace and name.
func (k8sSuite *K8sSuite) ExecuteCommandInPod(ctx context.Context, namespace, podName, command string) (string, string, error) {
cmd := []string{
"/bin/sh",
"-c",
command,
}
req := k8sSuite.Clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName).
Namespace(namespace).SubResource("exec")
option := &corev1.PodExecOptions{
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}
req.VersionedParams(
option,
scheme.ParameterCodec,
)
exec, err := remotecommand.NewSPDYExecutor(k8sSuite.RestConfig, "POST", req.URL())
if err != nil {
return "", "", err
}
var stdout, stderr strings.Builder
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
k8sSuite.T().Logf(
"error executing command in pod %s/%s: %v\n\ncommand %q stdout:\n%s\n\ncommand %q stderr:\n%s",
namespace,
podName,
err,
command,
stdout.String(),
command,
stderr.String(),
)
}
return stdout.String(), stderr.String(), err
}
// GetPodsWithLabel returns the pods with the given label in the specified namespace.
func (k8sSuite *K8sSuite) GetPodsWithLabel(ctx context.Context, namespace, label string) (*corev1.PodList, error) {
podList, err := k8sSuite.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: label,
})
if err != nil {
return nil, err
}
return podList, nil
}
// ParseManifests parses YAML manifest bytes into unstructured objects.
func (k8sSuite *K8sSuite) ParseManifests(manifests []byte) []unstructured.Unstructured {
reader := yaml.NewYAMLReader(bufio.NewReader(bytes.NewReader(manifests)))
var parsedManifests []unstructured.Unstructured
for {
yamlManifest, err := reader.Read()
if err != nil {
if err == io.EOF {
break
}
k8sSuite.Require().NoError(err)
}
yamlManifest = bytes.TrimSpace(yamlManifest)
if len(yamlManifest) == 0 {
continue
}
jsonManifest, err := yaml.ToJSON(yamlManifest)
if err != nil {
k8sSuite.Require().NoError(err, "error converting manifest to JSON")
}
if bytes.Equal(jsonManifest, []byte("null")) || bytes.Equal(jsonManifest, []byte("{}")) {
// skip YAML docs which contain only comments
continue
}
var obj unstructured.Unstructured
if err = json.Unmarshal(jsonManifest, &obj); err != nil {
k8sSuite.Require().NoError(err, "error loading JSON manifest into unstructured")
}
parsedManifests = append(parsedManifests, obj)
}
return parsedManifests
}
// ApplyManifests applies the given manifests to the Kubernetes cluster.
func (k8sSuite *K8sSuite) ApplyManifests(ctx context.Context, manifests []unstructured.Unstructured) {
for _, obj := range manifests {
mapping, err := k8sSuite.Mapper.RESTMapping(obj.GetObjectKind().GroupVersionKind().GroupKind(), obj.GetObjectKind().GroupVersionKind().Version)
if err != nil {
k8sSuite.Require().NoError(err, "error creating mapping for object %s", obj.GetName())
}
dr := k8sSuite.DynamicClient.Resource(mapping.Resource).Namespace(obj.GetNamespace())
_, err = dr.Create(ctx, &obj, metav1.CreateOptions{})
k8sSuite.Require().NoError(err, "error creating object %s", obj.GetName())
k8sSuite.T().Logf("created object %s/%s/%s", obj.GetObjectKind().GroupVersionKind(), obj.GetNamespace(), obj.GetName())
}
}
// DeleteManifests deletes the given manifests from the Kubernetes cluster.
func (k8sSuite *K8sSuite) DeleteManifests(ctx context.Context, manifests []unstructured.Unstructured) {
// process in reverse orderd
manifests = slices.Clone(manifests)
slices.Reverse(manifests)
for _, obj := range manifests {
mapping, err := k8sSuite.Mapper.RESTMapping(obj.GetObjectKind().GroupVersionKind().GroupKind(), obj.GetObjectKind().GroupVersionKind().Version)
if err != nil {
k8sSuite.Require().NoError(err, "error creating mapping for object %s", obj.GetName())
}
dr := k8sSuite.DynamicClient.Resource(mapping.Resource).Namespace(obj.GetNamespace())
err = dr.Delete(ctx, obj.GetName(), metav1.DeleteOptions{})
if errors.IsNotFound(err) {
continue
}
k8sSuite.Require().NoError(err, "error deleting object %s", obj.GetName())
// wait for the object to be deleted
fieldSelector := fields.OneTermEqualSelector("metadata.name", obj.GetName()).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return dr.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return dr.Watch(ctx, options)
},
}
preconditionFunc := func(store cache.Store) (bool, error) {
var exists bool
_, exists, err = store.Get(&metav1.ObjectMeta{Namespace: obj.GetNamespace(), Name: obj.GetName()})
if err != nil {
return true, err
}
if !exists {
// since we're looking for it to disappear we just return here if it no longer exists
return true, nil
}
return false, nil
}
_, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, func(event watch.Event) (bool, error) {
return event.Type == watch.Deleted, nil
})
k8sSuite.Require().NoError(err, "error waiting for the object to be deleted %s", obj.GetName())
k8sSuite.T().Logf("deleted object %s/%s/%s", obj.GetObjectKind().GroupVersionKind(), obj.GetNamespace(), obj.GetName())
}
}
// ToUnstructured converts the given runtime.Object to unstructured.Unstructured.
func (k8sSuite *K8sSuite) ToUnstructured(obj runtime.Object) unstructured.Unstructured {
unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
k8sSuite.Require().NoError(err, "error converting object to unstructured")
}
u := unstructured.Unstructured{Object: unstructuredObj}
u.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
return u
}