From 7c19c318e810937959464f4d279d58dbbf672c6e Mon Sep 17 00:00:00 2001 From: Utku Ozdemir Date: Tue, 6 May 2025 12:12:18 +0200 Subject: [PATCH] test: improve workload proxying tests Add many more services and test scenarios to the workload proxying feature: - Use two clusters, a 1+1 and a 1+2. - Use multiple nginx workloads. - Each workload serving its name in its `index.html` and it being asserted (i.e., we assert that we hit the correct service). - Multiple exposed services per workload. - Multiple parallel requests per exposed service. - Toggle the feature off and on, assert service accessibility. - Toggle an exposed service off and on by removing/readding the k8s service annotation, assert accessibility. - Test explicit prefixes. Additionally: - Fix two bugs in workload services: - Check the cookies before returning 404 for a non-existing exposed service prefix. - Add timeouts to `inmem` proxy transport, so requests do not potentially hang forever. - Bring back the logic the saving of a support bundle when an integration test fails, and fix its save path. Signed-off-by: Utku Ozdemir --- client/api/common/omni.pb.go | 2 +- client/api/omni/management/management.pb.go | 2 +- .../api/omni/management/management_grpc.pb.go | 2 +- client/api/omni/oidc/oidc.pb.go | 2 +- client/api/omni/oidc/oidc_grpc.pb.go | 2 +- client/api/omni/resources/resources.pb.go | 2 +- .../api/omni/resources/resources_grpc.pb.go | 2 +- client/api/omni/specs/auth.pb.go | 2 +- client/api/omni/specs/ephemeral.pb.go | 2 +- client/api/omni/specs/infra.pb.go | 2 +- client/api/omni/specs/oidc.pb.go | 2 +- client/api/omni/specs/omni.pb.go | 2 +- client/api/omni/specs/siderolink.pb.go | 2 +- client/api/omni/specs/system.pb.go | 2 +- client/api/omni/specs/virtual.pb.go | 2 +- client/pkg/constants/exposedservice.go | 28 + frontend/src/api/resources.ts | 8 +- .../ClusterWorkloadProxyingCheckbox.vue | 8 +- .../omni/cluster_workload_proxy.go | 4 +- .../internal/exposedservice/exposedservice.go | 30 +- .../exposedservice/exposedservice_test.go | 23 +- .../internal/exposedservice/reconciler.go | 11 +- internal/backend/workloadproxy/handler.go | 14 +- internal/backend/workloadproxy/reconciler.go | 7 +- internal/integration/blocks_test.go | 3 +- internal/integration/cluster_test.go | 88 +-- internal/integration/kubernetes/kubernetes.go | 100 +++ .../integration/kubernetes_node_audit_test.go | 8 +- internal/integration/kubernetes_test.go | 51 +- internal/integration/suites_test.go | 108 +-- internal/integration/talos_test.go | 4 +- internal/integration/workload_proxy_test.go | 299 -------- .../testdata/sidero-labs-icon.svg | 0 .../workloadproxy/workloadproxy.go | 669 ++++++++++++++++++ 34 files changed, 993 insertions(+), 500 deletions(-) create mode 100644 client/pkg/constants/exposedservice.go create mode 100644 internal/integration/kubernetes/kubernetes.go delete mode 100644 internal/integration/workload_proxy_test.go rename internal/integration/{ => workloadproxy}/testdata/sidero-labs-icon.svg (100%) create mode 100644 internal/integration/workloadproxy/workloadproxy.go diff --git a/client/api/common/omni.pb.go b/client/api/common/omni.pb.go index eff94642..fd383a95 100644 --- a/client/api/common/omni.pb.go +++ b/client/api/common/omni.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: common/omni.proto package common diff --git a/client/api/omni/management/management.pb.go b/client/api/omni/management/management.pb.go index 2c28be32..81cf13d9 100644 --- a/client/api/omni/management/management.pb.go +++ b/client/api/omni/management/management.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/management/management.proto package management diff --git a/client/api/omni/management/management_grpc.pb.go b/client/api/omni/management/management_grpc.pb.go index 11c20d71..3b96366c 100644 --- a/client/api/omni/management/management_grpc.pb.go +++ b/client/api/omni/management/management_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.5.1 -// - protoc v4.24.4 +// - protoc v5.29.4 // source: omni/management/management.proto package management diff --git a/client/api/omni/oidc/oidc.pb.go b/client/api/omni/oidc/oidc.pb.go index 05abe31a..68b56461 100644 --- a/client/api/omni/oidc/oidc.pb.go +++ b/client/api/omni/oidc/oidc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/oidc/oidc.proto package oidc diff --git a/client/api/omni/oidc/oidc_grpc.pb.go b/client/api/omni/oidc/oidc_grpc.pb.go index c422284d..739540b9 100644 --- a/client/api/omni/oidc/oidc_grpc.pb.go +++ b/client/api/omni/oidc/oidc_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.5.1 -// - protoc v4.24.4 +// - protoc v5.29.4 // source: omni/oidc/oidc.proto package oidc diff --git a/client/api/omni/resources/resources.pb.go b/client/api/omni/resources/resources.pb.go index 3194c4df..259856a6 100644 --- a/client/api/omni/resources/resources.pb.go +++ b/client/api/omni/resources/resources.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/resources/resources.proto package resources diff --git a/client/api/omni/resources/resources_grpc.pb.go b/client/api/omni/resources/resources_grpc.pb.go index 7b70e5bf..c2bb485a 100644 --- a/client/api/omni/resources/resources_grpc.pb.go +++ b/client/api/omni/resources/resources_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.5.1 -// - protoc v4.24.4 +// - protoc v5.29.4 // source: omni/resources/resources.proto package resources diff --git a/client/api/omni/specs/auth.pb.go b/client/api/omni/specs/auth.pb.go index ce813d4d..76f8d914 100644 --- a/client/api/omni/specs/auth.pb.go +++ b/client/api/omni/specs/auth.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/specs/auth.proto package specs diff --git a/client/api/omni/specs/ephemeral.pb.go b/client/api/omni/specs/ephemeral.pb.go index e9a4de8b..9c88ae26 100644 --- a/client/api/omni/specs/ephemeral.pb.go +++ b/client/api/omni/specs/ephemeral.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/specs/ephemeral.proto package specs diff --git a/client/api/omni/specs/infra.pb.go b/client/api/omni/specs/infra.pb.go index 9460fb6b..f3b803ba 100644 --- a/client/api/omni/specs/infra.pb.go +++ b/client/api/omni/specs/infra.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/specs/infra.proto package specs diff --git a/client/api/omni/specs/oidc.pb.go b/client/api/omni/specs/oidc.pb.go index eebf7c1c..5bdef8a5 100644 --- a/client/api/omni/specs/oidc.pb.go +++ b/client/api/omni/specs/oidc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/specs/oidc.proto package specs diff --git a/client/api/omni/specs/omni.pb.go b/client/api/omni/specs/omni.pb.go index 4d14f1a0..e61879f0 100644 --- a/client/api/omni/specs/omni.pb.go +++ b/client/api/omni/specs/omni.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/specs/omni.proto package specs diff --git a/client/api/omni/specs/siderolink.pb.go b/client/api/omni/specs/siderolink.pb.go index 66d1add8..b5190196 100644 --- a/client/api/omni/specs/siderolink.pb.go +++ b/client/api/omni/specs/siderolink.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/specs/siderolink.proto package specs diff --git a/client/api/omni/specs/system.pb.go b/client/api/omni/specs/system.pb.go index e00ee83a..c53220c2 100644 --- a/client/api/omni/specs/system.pb.go +++ b/client/api/omni/specs/system.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/specs/system.proto package specs diff --git a/client/api/omni/specs/virtual.pb.go b/client/api/omni/specs/virtual.pb.go index c92a99a8..1ca443b0 100644 --- a/client/api/omni/specs/virtual.pb.go +++ b/client/api/omni/specs/virtual.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v4.24.4 +// protoc v5.29.4 // source: omni/specs/virtual.proto package specs diff --git a/client/pkg/constants/exposedservice.go b/client/pkg/constants/exposedservice.go new file mode 100644 index 00000000..f3431eb7 --- /dev/null +++ b/client/pkg/constants/exposedservice.go @@ -0,0 +1,28 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package constants + +const ( + // ExposedServiceLabelAnnotationKey is the annotation to define the human-readable label of Kubernetes Services to expose them to Omni. + // + // tsgen:ExposedServiceLabelAnnotationKey + ExposedServiceLabelAnnotationKey = "omni-kube-service-exposer.sidero.dev/label" + + // ExposedServicePortAnnotationKey is the annotation to define the port of Kubernetes Services to expose them to Omni. + // + // tsgen:ExposedServicePortAnnotationKey + ExposedServicePortAnnotationKey = "omni-kube-service-exposer.sidero.dev/port" + + // ExposedServiceIconAnnotationKey is the annotation to define the icon of Kubernetes Services to expose them to Omni. + // + // tsgen:ExposedServiceIconAnnotationKey + ExposedServiceIconAnnotationKey = "omni-kube-service-exposer.sidero.dev/icon" + + // ExposedServicePrefixAnnotationKey is the annotation to define the prefix of Kubernetes Services to expose them to Omni. + // When it is not defined, a prefix will be generated automatically. + // + // tsgen:ExposedServicePrefixAnnotationKey + ExposedServicePrefixAnnotationKey = "omni-kube-service-exposer.sidero.dev/prefix" +) diff --git a/frontend/src/api/resources.ts b/frontend/src/api/resources.ts index 7741b4da..972fc863 100644 --- a/frontend/src/api/resources.ts +++ b/frontend/src/api/resources.ts @@ -25,10 +25,6 @@ export const SignedRedirect = "v1:"; export const workloadProxyPublicKeyIdCookie = "publicKeyId"; export const workloadProxyPublicKeyIdSignatureBase64Cookie = "publicKeyIdSignatureBase64"; export const DefaultKubernetesVersion = "1.32.3"; -export const ServiceLabelAnnotationKey = "omni-kube-service-exposer.sidero.dev/label"; -export const ServicePortAnnotationKey = "omni-kube-service-exposer.sidero.dev/port"; -export const ServiceIconAnnotationKey = "omni-kube-service-exposer.sidero.dev/icon"; -export const ServicePrefixAnnotationKey = "omni-kube-service-exposer.sidero.dev/prefix"; export const installDiskMinSize = 5e+09; export const authPublicKeyIDQueryParam = "public-key-id"; export const SecureBoot = "secureboot"; @@ -37,6 +33,10 @@ export const PatchWeightInstallDisk = 0; export const PatchBaseWeightCluster = 200; export const PatchBaseWeightMachineSet = 400; export const PatchBaseWeightClusterMachine = 400; +export const ExposedServiceLabelAnnotationKey = "omni-kube-service-exposer.sidero.dev/label"; +export const ExposedServicePortAnnotationKey = "omni-kube-service-exposer.sidero.dev/port"; +export const ExposedServiceIconAnnotationKey = "omni-kube-service-exposer.sidero.dev/icon"; +export const ExposedServicePrefixAnnotationKey = "omni-kube-service-exposer.sidero.dev/prefix"; export const TalosServiceType = "Services.v1alpha1.talos.dev"; export const TalosCPUType = "CPUStats.perf.talos.dev"; export const TalosMemoryType = "MemoryStats.perf.talos.dev"; diff --git a/frontend/src/views/omni/Clusters/ClusterWorkloadProxyingCheckbox.vue b/frontend/src/views/omni/Clusters/ClusterWorkloadProxyingCheckbox.vue index 41d8640d..d8eac835 100644 --- a/frontend/src/views/omni/Clusters/ClusterWorkloadProxyingCheckbox.vue +++ b/frontend/src/views/omni/Clusters/ClusterWorkloadProxyingCheckbox.vue @@ -12,9 +12,9 @@ included in the LICENSE file.

When enabled, the Services annotated with the following annotations

will be listed and accessible from the Omni Web interface:

-

{{ ServicePortAnnotationKey }} (required)

-

{{ ServiceLabelAnnotationKey }} (optional)

-

{{ ServiceIconAnnotationKey }} (optional)

+

{{ ExposedServicePortAnnotationKey }} (required)

+

{{ ExposedServiceLabelAnnotationKey }} (optional)

+

{{ ExposedServiceIconAnnotationKey }} (optional)

If the icon is specified, it must be a valid base64 of either a gzipped or uncompressed svg image.

@@ -27,7 +27,7 @@ included in the LICENSE file. import { setupWorkloadProxyingEnabledFeatureWatch } from "@/methods/features"; import TCheckbox from "@/components/common/Checkbox/TCheckbox.vue"; import Tooltip from "@/components/common/Tooltip/Tooltip.vue"; -import { ServiceIconAnnotationKey, ServiceLabelAnnotationKey, ServicePortAnnotationKey } from "@/api/resources"; +import { ExposedServiceIconAnnotationKey, ExposedServiceLabelAnnotationKey, ExposedServicePortAnnotationKey } from "@/api/resources"; type Props = { checked?: boolean; diff --git a/internal/backend/runtime/omni/controllers/omni/cluster_workload_proxy.go b/internal/backend/runtime/omni/controllers/omni/cluster_workload_proxy.go index 553d1242..0b8ebb20 100644 --- a/internal/backend/runtime/omni/controllers/omni/cluster_workload_proxy.go +++ b/internal/backend/runtime/omni/controllers/omni/cluster_workload_proxy.go @@ -18,9 +18,9 @@ import ( "github.com/hashicorp/go-multierror" "go.uber.org/zap" + "github.com/siderolabs/omni/client/pkg/constants" "github.com/siderolabs/omni/client/pkg/omni/resources" "github.com/siderolabs/omni/client/pkg/omni/resources/omni" - "github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/exposedservice" ) //go:embed data/kube-service-exposer-config-patch.tmpl.yaml @@ -134,7 +134,7 @@ func (ctrl *ClusterWorkloadProxyController) getConfigPatchData() ([]byte, error) } opts := tmplOptions{ - AnnotationKey: exposedservice.ServicePortAnnotationKey, + AnnotationKey: constants.ExposedServicePortAnnotationKey, } var buf bytes.Buffer diff --git a/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/exposedservice.go b/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/exposedservice.go index 74876e61..ace6ccc2 100644 --- a/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/exposedservice.go +++ b/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/exposedservice.go @@ -11,29 +11,8 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" -) -const ( - // ServiceLabelAnnotationKey is the annotation to define the human-readable label of Kubernetes Services to expose them to Omni. - // - // tsgen:ServiceLabelAnnotationKey - ServiceLabelAnnotationKey = "omni-kube-service-exposer.sidero.dev/label" - - // ServicePortAnnotationKey is the annotation to define the port of Kubernetes Services to expose them to Omni. - // - // tsgen:ServicePortAnnotationKey - ServicePortAnnotationKey = "omni-kube-service-exposer.sidero.dev/port" - - // ServiceIconAnnotationKey is the annotation to define the icon of Kubernetes Services to expose them to Omni. - // - // tsgen:ServiceIconAnnotationKey - ServiceIconAnnotationKey = "omni-kube-service-exposer.sidero.dev/icon" - - // ServicePrefixAnnotationKey is the annotation to define the prefix of Kubernetes Services to expose them to Omni. - // When it is not defined, a prefix will be generated automatically. - // - // tsgen:ServicePrefixAnnotationKey - ServicePrefixAnnotationKey = "omni-kube-service-exposer.sidero.dev/prefix" + "github.com/siderolabs/omni/client/pkg/constants" ) // IsExposedServiceEvent returns true if there is a change on the Kubernetes Services @@ -50,7 +29,7 @@ func IsExposedServiceEvent(k8sObject, oldK8sObject any, logger *zap.Logger) bool } // check for ServicePortAnnotationKey annotation - the only annotation required for the exposed services - _, isAnnotated := service.GetObjectMeta().GetAnnotations()[ServicePortAnnotationKey] + _, isAnnotated := service.GetObjectMeta().GetAnnotations()[constants.ExposedServicePortAnnotationKey] return isAnnotated } @@ -76,7 +55,10 @@ func IsExposedServiceEvent(k8sObject, oldK8sObject any, logger *zap.Logger) bool oldAnnotations := oldK8sObject.(*corev1.Service).GetObjectMeta().GetAnnotations() //nolint:forcetypeassert,errcheck newAnnotations := k8sObject.(*corev1.Service).GetObjectMeta().GetAnnotations() //nolint:forcetypeassert,errcheck - for _, key := range []string{ServiceLabelAnnotationKey, ServicePortAnnotationKey, ServiceIconAnnotationKey, ServicePrefixAnnotationKey} { + for _, key := range []string{ + constants.ExposedServiceLabelAnnotationKey, constants.ExposedServicePortAnnotationKey, + constants.ExposedServiceIconAnnotationKey, constants.ExposedServicePrefixAnnotationKey, + } { if oldAnnotations[key] != newAnnotations[key] { return true } diff --git a/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/exposedservice_test.go b/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/exposedservice_test.go index 2eb1583b..8c616661 100644 --- a/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/exposedservice_test.go +++ b/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/exposedservice_test.go @@ -13,6 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/siderolabs/omni/client/pkg/constants" "github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/exposedservice" ) @@ -42,8 +43,8 @@ func TestIsExposedServiceEvent(t *testing.T) { obj: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - exposedservice.ServiceLabelAnnotationKey: "foo", - exposedservice.ServiceIconAnnotationKey: "bar", + constants.ExposedServiceLabelAnnotationKey: "foo", + constants.ExposedServiceIconAnnotationKey: "bar", }, }, }, @@ -54,7 +55,7 @@ func TestIsExposedServiceEvent(t *testing.T) { obj: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - exposedservice.ServicePortAnnotationKey: "8080", + constants.ExposedServicePortAnnotationKey: "8080", }, }, }, @@ -65,8 +66,8 @@ func TestIsExposedServiceEvent(t *testing.T) { obj: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - exposedservice.ServicePortAnnotationKey: "8080", - exposedservice.ServiceLabelAnnotationKey: "foo", + constants.ExposedServicePortAnnotationKey: "8080", + constants.ExposedServiceLabelAnnotationKey: "foo", }, }, Spec: corev1.ServiceSpec{ClusterIP: "10.0.0.1"}, @@ -74,8 +75,8 @@ func TestIsExposedServiceEvent(t *testing.T) { oldObj: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - exposedservice.ServicePortAnnotationKey: "8080", - exposedservice.ServiceLabelAnnotationKey: "foo", + constants.ExposedServicePortAnnotationKey: "8080", + constants.ExposedServiceLabelAnnotationKey: "foo", }, }, Spec: corev1.ServiceSpec{ClusterIP: "10.0.0.2"}, @@ -87,16 +88,16 @@ func TestIsExposedServiceEvent(t *testing.T) { obj: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - exposedservice.ServicePortAnnotationKey: "8080", - exposedservice.ServiceLabelAnnotationKey: "foo", + constants.ExposedServicePortAnnotationKey: "8080", + constants.ExposedServiceLabelAnnotationKey: "foo", }, }, }, oldObj: &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - exposedservice.ServicePortAnnotationKey: "8080", - exposedservice.ServiceLabelAnnotationKey: "bar", // different label + constants.ExposedServicePortAnnotationKey: "8080", + constants.ExposedServiceLabelAnnotationKey: "bar", // different label }, }, }, diff --git a/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/reconciler.go b/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/reconciler.go index cb9d1d49..4cebc8a6 100644 --- a/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/reconciler.go +++ b/internal/backend/runtime/omni/controllers/omni/internal/exposedservice/reconciler.go @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/rand" + "github.com/siderolabs/omni/client/pkg/constants" "github.com/siderolabs/omni/client/pkg/omni/resources" "github.com/siderolabs/omni/client/pkg/omni/resources/omni" ) @@ -113,25 +114,25 @@ func (reconciler *Reconciler) reconcileService(ctx context.Context, exposedServi version = existingExposedService.Metadata().Version() } - port, err := strconv.Atoi(service.Annotations[ServicePortAnnotationKey]) + port, err := strconv.Atoi(service.Annotations[constants.ExposedServicePortAnnotationKey]) if err != nil || port < 1 || port > 65535 { - logger.Warn("invalid port on Service", zap.String("port", service.Annotations[ServicePortAnnotationKey])) + logger.Warn("invalid port on Service", zap.String("port", service.Annotations[constants.ExposedServicePortAnnotationKey])) return true, nil //nolint:nilerr } - label, labelOk := service.Annotations[ServiceLabelAnnotationKey] + label, labelOk := service.Annotations[constants.ExposedServiceLabelAnnotationKey] if !labelOk { label = service.Name + "." + service.Namespace } - icon, err := reconciler.parseIcon(service.Annotations[ServiceIconAnnotationKey]) + icon, err := reconciler.parseIcon(service.Annotations[constants.ExposedServiceIconAnnotationKey]) if err != nil { logger.Debug("invalid icon on Service", zap.Error(err)) } explicitAliasOpt := optional.None[string]() - if explicitAlias, ok := service.Annotations[ServicePrefixAnnotationKey]; ok { + if explicitAlias, ok := service.Annotations[constants.ExposedServicePrefixAnnotationKey]; ok { explicitAliasOpt = optional.Some(explicitAlias) } diff --git a/internal/backend/workloadproxy/handler.go b/internal/backend/workloadproxy/handler.go index 48fb508a..265646fb 100644 --- a/internal/backend/workloadproxy/handler.go +++ b/internal/backend/workloadproxy/handler.go @@ -110,6 +110,10 @@ func (h *HTTPHandler) ServeHTTP(writer http.ResponseWriter, request *http.Reques return } + if !h.checkCookies(writer, request, clusterID) { + return + } + if proxy == nil { h.logger.Debug("proxy is nil", zap.String("alias", alias)) @@ -118,7 +122,7 @@ func (h *HTTPHandler) ServeHTTP(writer http.ResponseWriter, request *http.Reques return } - h.checkCookies(writer, request, proxy, clusterID) + proxy.ServeHTTP(writer, request) } // isWorkloadProxyRequest checks if the request is for the workload proxy. @@ -145,12 +149,12 @@ func (h *HTTPHandler) isWorkloadProxyRequest(request *http.Request) bool { return false } -func (h *HTTPHandler) checkCookies(writer http.ResponseWriter, request *http.Request, proxy http.Handler, clusterID resource.ID) { +func (h *HTTPHandler) checkCookies(writer http.ResponseWriter, request *http.Request, clusterID resource.ID) (valid bool) { publicKeyID, publicKeyIDSignatureBase64 := h.getSignatureCookies(request) if publicKeyID == "" || publicKeyIDSignatureBase64 == "" { h.redirectToLogin(writer, request) - return + return false } if err := h.accessValidator.ValidateAccess(request.Context(), publicKeyID, publicKeyIDSignatureBase64, clusterID); err != nil { @@ -160,10 +164,10 @@ func (h *HTTPHandler) checkCookies(writer http.ResponseWriter, request *http.Req http.Redirect(writer, request, forbiddenURL, http.StatusSeeOther) - return + return false } - proxy.ServeHTTP(writer, request) + return true } // parseServiceAliasFromHost parses the service alias from the request host. diff --git a/internal/backend/workloadproxy/reconciler.go b/internal/backend/workloadproxy/reconciler.go index 09210aef..11ca39e8 100644 --- a/internal/backend/workloadproxy/reconciler.go +++ b/internal/backend/workloadproxy/reconciler.go @@ -185,7 +185,7 @@ func (registry *Reconciler) GetProxy(alias string) (http.Handler, resource.ID, e lbSts := registry.clusterToAliasToLBStatus[clusterID][alias] if lbSts == nil || lbSts.lb == nil { - return nil, "", nil + return nil, clusterID, nil } hostPort := registry.hostPortForAlias(clusterID, alias) @@ -197,7 +197,10 @@ func (registry *Reconciler) GetProxy(alias string) (http.Handler, resource.ID, e proxy := httputil.NewSingleHostReverseProxy(targetURL) proxy.Transport = &http.Transport{ - DialContext: registry.connProvider.DialContext, + DialContext: registry.connProvider.DialContext, + IdleConnTimeout: 90 * time.Second, + ResponseHeaderTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, } return proxy, clusterID, nil diff --git a/internal/integration/blocks_test.go b/internal/integration/blocks_test.go index a7d7046e..fbfe9679 100644 --- a/internal/integration/blocks_test.go +++ b/internal/integration/blocks_test.go @@ -239,6 +239,7 @@ func AssertClusterCreateAndReady( name string, options ClusterOptions, testOutputDir string, + doSaveSupportBundle bool, ) []subTest { //nolint:nolintlint,revive clusterName := "integration-" + name options.Name = clusterName @@ -258,7 +259,7 @@ func AssertClusterCreateAndReady( ).Append( subTest{ "ClusterShouldBeDestroyed", - AssertDestroyCluster(ctx, rootClient.Omni().State(), clusterName, options.InfraProvider != "", false), + AssertDestroyCluster(ctx, rootClient, clusterName, testOutputDir, options.InfraProvider != "", false, doSaveSupportBundle), }, ) } diff --git a/internal/integration/cluster_test.go b/internal/integration/cluster_test.go index 0741f4d2..bafc3077 100644 --- a/internal/integration/cluster_test.go +++ b/internal/integration/cluster_test.go @@ -12,6 +12,7 @@ import ( "fmt" "math/rand/v2" "os" + "path/filepath" "sync" "testing" "time" @@ -63,6 +64,8 @@ type ClusterOptions struct { ScalingTimeout time.Duration SkipExtensionCheckOnCreate bool + + AllowSchedulingOnControlPlanes bool } // CreateCluster verifies cluster creation. @@ -95,6 +98,10 @@ func CreateCluster(testCtx context.Context, cli *client.Client, options ClusterO require.NoError(st.Create(ctx, cluster)) + if options.AllowSchedulingOnControlPlanes { + ensureAllowSchedulingOnControlPlanesConfigPatch(ctx, t, st, options.Name) + } + for i := range options.ControlPlanes { t.Logf("Adding machine '%s' to control plane (cluster %q)", machineIDs[i], options.Name) bindMachine(ctx, t, st, bindMachineOptions{ @@ -159,6 +166,10 @@ func CreateClusterWithMachineClass(testCtx context.Context, st state.State, opti require.NoError(st.Create(ctx, cluster)) require.NoError(st.Create(ctx, kubespanEnabler)) + if options.AllowSchedulingOnControlPlanes { + ensureAllowSchedulingOnControlPlanesConfigPatch(ctx, t, st, options.Name) + } + machineClass := omni.NewMachineClass(resources.DefaultNamespace, options.Name) if options.InfraProvider != "" { @@ -184,6 +195,17 @@ func CreateClusterWithMachineClass(testCtx context.Context, st state.State, opti } } +func ensureAllowSchedulingOnControlPlanesConfigPatch(ctx context.Context, t *testing.T, st state.State, clusterID resource.ID) { + createOrUpdate(ctx, t, st, omni.NewConfigPatch(resources.DefaultNamespace, fmt.Sprintf("400-%s-control-planes-untaint", clusterID)), func(patch *omni.ConfigPatch) error { + patch.Metadata().Labels().Set(omni.LabelCluster, clusterID) + patch.Metadata().Labels().Set(omni.LabelMachineSet, omni.ControlPlanesResourceID(clusterID)) + + return patch.TypedSpec().Value.SetUncompressedData([]byte(`cluster: + allowSchedulingOnControlPlanes: true +`)) + }) +} + // ScaleClusterMachineSets scales the cluster with machine sets which are using machine classes. func ScaleClusterMachineSets(testCtx context.Context, st state.State, options ClusterOptions) TestFunc { return func(t *testing.T) { @@ -511,53 +533,6 @@ func AssertClusterKubernetesUsage(testCtx context.Context, st state.State, clust } } -// DestroyCluster destroys a cluster and waits for it to be destroyed. -// -// It is used as a finalizer when the test group fails. -func DestroyCluster(testCtx context.Context, client *client.Client, supportBundleDir, clusterName string) TestFunc { - return func(t *testing.T) { - ctx, cancel := context.WithTimeout(testCtx, 6*time.Minute) - defer cancel() - - st := client.Omni().State() - - if err := saveSupportBundle(ctx, client, supportBundleDir, clusterName); err != nil { - t.Logf("failed to save support bundle: %v", err) - } - - clusterMachineIDs := rtestutils.ResourceIDs[*omni.ClusterMachine](ctx, t, st, state.WithLabelQuery( - resource.LabelEqual(omni.LabelCluster, clusterName), - )) - - t.Log("destroying cluster", clusterName) - - rtestutils.Teardown[*omni.Cluster](ctx, t, st, []resource.ID{clusterName}) - - rtestutils.AssertNoResource[*omni.Cluster](ctx, t, st, clusterName) - - // wait for all machines to returned to the pool as 'available' or be part of a different cluster - rtestutils.AssertResources(ctx, t, st, clusterMachineIDs, func(machine *omni.MachineStatus, asrt *assert.Assertions) { - _, isAvailable := machine.Metadata().Labels().Get(omni.MachineStatusLabelAvailable) - machineCluster, machineBound := machine.Metadata().Labels().Get(omni.LabelCluster) - - asrt.True(isAvailable || (machineBound && machineCluster != clusterName), - "machine %q: available %v, bound %v, cluster %q", machine.Metadata().ID(), isAvailable, machineBound, machineCluster, - ) - }) - - _, err := st.Get(ctx, omni.NewMachineClass(resources.DefaultNamespace, clusterName).Metadata()) - if state.IsNotFoundError(err) { - return - } - - require.NoError(t, err) - - t.Log("destroying related machine class", clusterName) - - rtestutils.Destroy[*omni.MachineClass](ctx, t, st, []string{clusterName}) - } -} - func saveSupportBundle(ctx context.Context, cli *client.Client, dir, cluster string) error { supportBundle, err := cli.Management().GetSupportBundle(ctx, cluster, nil) if err != nil { @@ -568,7 +543,10 @@ func saveSupportBundle(ctx context.Context, cli *client.Client, dir, cluster str return fmt.Errorf("failed to create directory %q: %w", dir, err) } - if err = os.WriteFile("support-bundle-"+cluster+".zip", supportBundle, 0o644); err != nil { + fileName := time.Now().Format("2006-01-02_15-04-05") + "-support-bundle-" + cluster + ".zip" + bundlePath := filepath.Join(dir, fileName) + + if err = os.WriteFile(bundlePath, supportBundle, 0o644); err != nil { return fmt.Errorf("failed to write support bundle file %q: %w", cluster, err) } @@ -576,11 +554,21 @@ func saveSupportBundle(ctx context.Context, cli *client.Client, dir, cluster str } // AssertDestroyCluster destroys a cluster and verifies that all dependent resources are gone. -func AssertDestroyCluster(testCtx context.Context, st state.State, clusterName string, expectMachinesRemoved, assertInfraMachinesState bool) TestFunc { +func AssertDestroyCluster(testCtx context.Context, omniClient *client.Client, clusterName, outputDir string, expectMachinesRemoved, assertInfraMachinesState, doSaveSupportBundle bool) TestFunc { return func(t *testing.T) { ctx, cancel := context.WithTimeout(testCtx, 300*time.Second) defer cancel() + st := omniClient.Omni().State() + + if doSaveSupportBundle { + t.Logf("saving support bundle for cluster %q into %q before destruction", clusterName, outputDir) + + if err := saveSupportBundle(ctx, omniClient, outputDir, clusterName); err != nil { + t.Logf("failed to save support bundle: %v", err) + } + } + patches := rtestutils.ResourceIDs[*omni.ConfigPatch](ctx, t, st, state.WithLabelQuery( resource.LabelEqual(omni.LabelCluster, clusterName), )) @@ -617,7 +605,7 @@ func AssertDestroyCluster(testCtx context.Context, st state.State, clusterName s return } - // wait for all machines to returned to the pool as 'available' or be part of a different cluster + // wait for all machines to be returned to the pool as 'available' or be part of a different cluster rtestutils.AssertResources(ctx, t, st, clusterMachineIDs, func(machine *omni.MachineStatus, asrt *assert.Assertions) { _, isAvailable := machine.Metadata().Labels().Get(omni.MachineStatusLabelAvailable) machineCluster, machineBound := machine.Metadata().Labels().Get(omni.LabelCluster) diff --git a/internal/integration/kubernetes/kubernetes.go b/internal/integration/kubernetes/kubernetes.go new file mode 100644 index 00000000..bdf2dcbb --- /dev/null +++ b/internal/integration/kubernetes/kubernetes.go @@ -0,0 +1,100 @@ +// Copyright (c) 2025 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +// Package kubernetes provides utilities for testing Kubernetes clusters. +package kubernetes + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/go-logr/logr" + "github.com/go-logr/zapr" + "github.com/siderolabs/go-pointer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + + "github.com/siderolabs/omni/client/pkg/client/management" + "github.com/siderolabs/omni/client/pkg/constants" +) + +// WrapContext wraps the provided context with a zap logger so that the Kubernetes client sends its logs to that, complying with the go test logging format. +func WrapContext(ctx context.Context, t *testing.T) context.Context { + logger := zaptest.NewLogger(t) + ctx = logr.NewContext(ctx, zapr.NewLogger(logger.With(zap.String("component", "k8s_client")))) + + return ctx +} + +// GetClient retrieves a Kubernetes clientset for the specified cluster using the management client. +func GetClient(ctx context.Context, t *testing.T, managementClient *management.Client, clusterName string) *kubernetes.Clientset { + // use service account kubeconfig to bypass oidc flow + kubeconfigBytes, err := managementClient.WithCluster(clusterName).Kubeconfig(ctx, + management.WithServiceAccount(24*time.Hour, "integration-test", constants.DefaultAccessGroup), + ) + require.NoError(t, err) + + kubeconfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) { + return clientcmd.Load(kubeconfigBytes) + }) + require.NoError(t, err) + + cli, err := kubernetes.NewForConfig(kubeconfig) + require.NoError(t, err) + + return cli +} + +// ScaleDeployment scales a Kubernetes deployment to the specified number of replicas and waits until the deployment is scaled. +func ScaleDeployment(ctx context.Context, t *testing.T, kubeClient kubernetes.Interface, namespace, name string, numReplicas uint8) { + deployment, err := kubeClient.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) + require.NoError(t, err, "failed to get deployment %q/%q", namespace, name) + + // scale down the deployment to 0 replicas + deployment.Spec.Replicas = pointer.To(int32(numReplicas)) + + if _, err := kubeClient.AppsV1().Deployments(deployment.Namespace).Update(ctx, deployment, metav1.UpdateOptions{}); !apierrors.IsNotFound(err) { + require.NoError(t, err) + } + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + dep, err := kubeClient.AppsV1().Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + require.NoError(collect, err) + } + + if !assert.NoError(collect, err) { + t.Logf("failed to get deployment %q/%q: %q", deployment.Namespace, deployment.Name, err) + + return + } + + if !assert.Equal(collect, int(numReplicas), int(dep.Status.ReadyReplicas)) { + t.Logf("deployment %q/%q does not have expected number of replicas: expected %d, found %d", + deployment.Namespace, deployment.Name, numReplicas, dep.Status.ReadyReplicas) + } + }, 3*time.Minute, 5*time.Second) +} + +// UpdateService updates a Kubernetes service to add or modify the specified annotations and waits until the service is updated. +func UpdateService(ctx context.Context, t *testing.T, kubeClient kubernetes.Interface, namespace, name string, f func(*corev1.Service)) { + service, err := kubeClient.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) + require.NoError(t, err, "failed to get service %q/%q", namespace, name) + + f(service) + + _, err = kubeClient.CoreV1().Services(namespace).Update(ctx, service, metav1.UpdateOptions{}) + require.NoError(t, err, "failed to update service %q/%q", namespace, name) +} diff --git a/internal/integration/kubernetes_node_audit_test.go b/internal/integration/kubernetes_node_audit_test.go index acb1ea73..c58cf892 100644 --- a/internal/integration/kubernetes_node_audit_test.go +++ b/internal/integration/kubernetes_node_audit_test.go @@ -23,6 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/siderolabs/omni/client/pkg/omni/resources/omni" + "github.com/siderolabs/omni/internal/integration/kubernetes" ) // AssertKubernetesNodeAudit tests the Kubernetes node audit feature (KubernetesNodeAuditController) by doing the following: @@ -32,10 +33,11 @@ import ( // 3. Assert that the ClusterMachine resource is deleted - the ClusterMachineTeardownController did not block its deletion despite failing to remove the node from Kubernetes. // 4. Wake the control plane back up. // 5. Assert that the worker node eventually gets removed from Kubernetes due to node audit. -func AssertKubernetesNodeAudit(ctx context.Context, clusterName string, options *TestOptions) TestFunc { +func AssertKubernetesNodeAudit(testCtx context.Context, clusterName string, options *TestOptions) TestFunc { st := options.omniClient.Omni().State() - return func(t *testing.T) { + ctx := kubernetes.WrapContext(testCtx, t) + if options.FreezeAMachineFunc == nil || options.RestartAMachineFunc == nil { t.Skip("skip the test as FreezeAMachineFunc or RestartAMachineFunc is not set") } @@ -85,7 +87,7 @@ func AssertKubernetesNodeAudit(ctx context.Context, clusterName string, options require.NoError(t, options.RestartAMachineFunc(ctx, id)) } - kubernetesClient := getKubernetesClient(ctx, t, options.omniClient.Management(), clusterName) + kubernetesClient := kubernetes.GetClient(ctx, t, options.omniClient.Management(), clusterName) logger.Info("assert that the node is removed from Kubernetes due to node audit") diff --git a/internal/integration/kubernetes_test.go b/internal/integration/kubernetes_test.go index edd503e8..6ea788b0 100644 --- a/internal/integration/kubernetes_test.go +++ b/internal/integration/kubernetes_test.go @@ -27,9 +27,6 @@ import ( corev1 "k8s.io/api/core/v1" kubeerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" - clientcmdapi "k8s.io/client-go/tools/clientcmd/api" managementpb "github.com/siderolabs/omni/client/api/omni/management" "github.com/siderolabs/omni/client/api/omni/specs" @@ -38,6 +35,7 @@ import ( "github.com/siderolabs/omni/client/pkg/constants" "github.com/siderolabs/omni/client/pkg/omni/resources" "github.com/siderolabs/omni/client/pkg/omni/resources/omni" + "github.com/siderolabs/omni/internal/integration/kubernetes" ) // AssertKubernetesAPIAccessViaOmni verifies that cluster kubeconfig works. @@ -48,10 +46,12 @@ func AssertKubernetesAPIAccessViaOmni(testCtx context.Context, rootClient *clien ctx, cancel := context.WithTimeout(testCtx, timeout) defer cancel() + ctx = kubernetes.WrapContext(ctx, t) + k8sClient := kubernetes.GetClient(ctx, t, rootClient.Management(), clusterName) + var ( - k8sClient = getKubernetesClient(ctx, t, rootClient.Management(), clusterName) - k8sNodes *corev1.NodeList - err error + k8sNodes *corev1.NodeList + err error ) require.EventuallyWithT(t, func(collect *assert.CollectT) { @@ -321,7 +321,8 @@ func AssertKubernetesDeploymentIsCreated(testCtx context.Context, managementClie ctx, cancel := context.WithTimeout(testCtx, 120*time.Second) defer cancel() - kubeClient := getKubernetesClient(ctx, t, managementClient, clusterName) + ctx = kubernetes.WrapContext(ctx, t) + kubeClient := kubernetes.GetClient(ctx, t, managementClient, clusterName) deployment := appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -370,7 +371,8 @@ func AssertKubernetesSecretIsCreated(testCtx context.Context, managementClient * ctx, cancel := context.WithTimeout(testCtx, 120*time.Second) defer cancel() - kubeClient := getKubernetesClient(ctx, t, managementClient, clusterName) + ctx = kubernetes.WrapContext(ctx, t) + kubeClient := kubernetes.GetClient(ctx, t, managementClient, clusterName) valBase64 := base64.StdEncoding.EncodeToString([]byte(testValue)) @@ -394,7 +396,8 @@ func AssertKubernetesSecretHasValue(testCtx context.Context, managementClient *m ctx, cancel := context.WithTimeout(testCtx, 120*time.Second) defer cancel() - kubeClient := getKubernetesClient(ctx, t, managementClient, clusterName) + ctx = kubernetes.WrapContext(ctx, t) + kubeClient := kubernetes.GetClient(ctx, t, managementClient, clusterName) secret, err := kubeClient.CoreV1().Secrets(ns).Get(ctx, name, metav1.GetOptions{}) require.NoError(t, err, "failed to get secret") @@ -413,8 +416,10 @@ func AssertKubernetesSecretHasValue(testCtx context.Context, managementClient *m // 2. All the extra (stale) Kubernetes nodes are in NotReady state // // This assertion is useful to assert the expected nodes state when a cluster is created from an etcd backup. -func AssertKubernetesNodesState(ctx context.Context, rootClient *client.Client, newClusterName string) func(t *testing.T) { +func AssertKubernetesNodesState(testCtx context.Context, rootClient *client.Client, newClusterName string) func(t *testing.T) { return func(t *testing.T) { + ctx := kubernetes.WrapContext(testCtx, t) + identityList, err := safe.StateListAll[*omni.ClusterMachineIdentity](ctx, rootClient.Omni().State(), state.WithLabelQuery(resource.LabelEqual(omni.LabelCluster, newClusterName))) require.NoError(t, err) @@ -435,7 +440,7 @@ func AssertKubernetesNodesState(ctx context.Context, rootClient *client.Client, return false } - kubeClient := getKubernetesClient(ctx, t, rootClient.Management(), newClusterName) + kubeClient := kubernetes.GetClient(ctx, t, rootClient.Management(), newClusterName) require.EventuallyWithT(t, func(collect *assert.CollectT) { kubernetesNodes, listErr := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) @@ -467,9 +472,11 @@ func AssertKubernetesNodesState(ctx context.Context, rootClient *client.Client, } // AssertKubernetesDeploymentHasRunningPods verifies that a deployment has running pods. -func AssertKubernetesDeploymentHasRunningPods(ctx context.Context, managementClient *management.Client, clusterName, ns, name string) TestFunc { +func AssertKubernetesDeploymentHasRunningPods(testCtx context.Context, managementClient *management.Client, clusterName, ns, name string) TestFunc { return func(t *testing.T) { - deps := getKubernetesClient(ctx, t, managementClient, clusterName).AppsV1().Deployments(ns) + ctx := kubernetes.WrapContext(testCtx, t) + kubeClient := kubernetes.GetClient(ctx, t, managementClient, clusterName) + deps := kubeClient.AppsV1().Deployments(ns) // restart the deployment, in case the pod is scheduled on a NotReady node (a node that is no longer valid, which was restored from an etcd backup) require.EventuallyWithT(t, func(collect *assert.CollectT) { @@ -514,21 +521,3 @@ func AssertKubernetesDeploymentHasRunningPods(ctx context.Context, managementCli }, 2*time.Minute, 1*time.Second) } } - -func getKubernetesClient(ctx context.Context, t *testing.T, managementClient *management.Client, clusterName string) *kubernetes.Clientset { - // use service account kubeconfig to bypass oidc flow - kubeconfigBytes, err := managementClient.WithCluster(clusterName).Kubeconfig(ctx, - management.WithServiceAccount(24*time.Hour, "integration-test", constants.DefaultAccessGroup), - ) - require.NoError(t, err) - - kubeconfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) { - return clientcmd.Load(kubeconfigBytes) - }) - require.NoError(t, err) - - cli, err := kubernetes.NewForConfig(kubeconfig) - require.NoError(t, err) - - return cli -} diff --git a/internal/integration/suites_test.go b/internal/integration/suites_test.go index d0016455..53e38b7f 100644 --- a/internal/integration/suites_test.go +++ b/internal/integration/suites_test.go @@ -13,10 +13,12 @@ import ( "testing" "time" + "google.golang.org/protobuf/types/known/durationpb" + "github.com/siderolabs/omni/client/api/omni/specs" "github.com/siderolabs/omni/client/pkg/omni/resources/omni" + "github.com/siderolabs/omni/internal/integration/workloadproxy" "github.com/siderolabs/omni/internal/pkg/clientconfig" - "google.golang.org/protobuf/types/known/durationpb" ) type assertClusterReadyOptions struct { @@ -195,7 +197,7 @@ Test the auditing of the Kubernetes nodes, i.e. when a node is gone from the Omn t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -271,7 +273,7 @@ In the tests, we wipe and reboot the VMs to bring them back as available for the t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -303,7 +305,7 @@ Regression test: create a cluster and destroy it without waiting for the cluster t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -332,6 +334,7 @@ Don't do any changes to the cluster.`) "default", clusterOptions, options.OutputDir, + t.Failed(), )) } } @@ -363,6 +366,7 @@ Don't do any changes to the cluster.`) "encrypted", clusterOptions, options.OutputDir, + t.Failed(), )) } } @@ -391,6 +395,7 @@ Don't do any changes to the cluster.`) "singlenode", clusterOptions, options.OutputDir, + t.Failed(), )) } } @@ -484,7 +489,7 @@ In between the scaling operations, assert that the cluster is ready and accessib t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -578,7 +583,7 @@ In between the scaling operations, assert that the cluster is ready and accessib t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -678,7 +683,7 @@ In between the scaling operations, assert that the cluster is ready and accessib t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, true, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, true, false, t.Failed()), ) } } @@ -726,7 +731,7 @@ Tests rolling update & scale down strategies for concurrency control for worker t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -776,7 +781,7 @@ In between the scaling operations, assert that the cluster is ready and accessib t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -843,7 +848,7 @@ Tests applying various config patching, including "broken" config patches which t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -940,7 +945,7 @@ Tests upgrading Talos version, including reverting a failed upgrade.`) t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -995,7 +1000,7 @@ Tests upgrading Kubernetes version, including reverting a failed upgrade.`) t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -1079,7 +1084,7 @@ Finally, a completely new cluster is created using the same backup to test the " t.Run( "NewClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), secondClusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, secondClusterName, options.OutputDir, false, false, t.Failed()), ) runTests( @@ -1094,7 +1099,7 @@ Finally, a completely new cluster is created using the same backup to test the " t.Run( "RestoredClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -1208,7 +1213,7 @@ Test authorization on accessing Omni API, some tests run without a cluster, some t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()), ) } } @@ -1236,39 +1241,58 @@ Test workload service proxying feature`) t.Parallel() - options.claimMachines(t, 1) + options.claimMachines(t, 6) - clusterName := "integration-workload-proxy" + omniClient := options.omniClient + cluster1 := "integration-workload-proxy-1" + cluster2 := "integration-workload-proxy-2" - t.Run( - "ClusterShouldBeCreated", - CreateCluster(t.Context(), options.omniClient, ClusterOptions{ - Name: clusterName, - ControlPlanes: 1, - Workers: 0, + t.Run("ClusterShouldBeCreated-"+cluster1, CreateCluster(t.Context(), omniClient, ClusterOptions{ + Name: cluster1, + ControlPlanes: 1, + Workers: 1, - Features: &specs.ClusterSpec_Features{ - EnableWorkloadProxy: true, - }, + Features: &specs.ClusterSpec_Features{ + EnableWorkloadProxy: true, + }, - MachineOptions: options.MachineOptions, - ScalingTimeout: options.ScalingTimeout, + MachineOptions: options.MachineOptions, + ScalingTimeout: options.ScalingTimeout, - SkipExtensionCheckOnCreate: options.SkipExtensionsCheckOnCreate, - }), - ) + SkipExtensionCheckOnCreate: options.SkipExtensionsCheckOnCreate, - assertClusterAndAPIReady(t, clusterName, options) + AllowSchedulingOnControlPlanes: true, + })) + t.Run("ClusterShouldBeCreated-"+cluster2, CreateCluster(t.Context(), omniClient, ClusterOptions{ + Name: cluster2, + ControlPlanes: 1, + Workers: 2, - t.Run( - "WorkloadProxyShouldBeTested", - AssertWorkloadProxy(t.Context(), options.omniClient, clusterName), - ) + Features: &specs.ClusterSpec_Features{ + EnableWorkloadProxy: true, + }, - t.Run( - "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false), - ) + MachineOptions: options.MachineOptions, + ScalingTimeout: options.ScalingTimeout, + + SkipExtensionCheckOnCreate: options.SkipExtensionsCheckOnCreate, + + AllowSchedulingOnControlPlanes: true, + })) + + runTests(t, AssertBlockClusterAndTalosAPIAndKubernetesShouldBeReady(t.Context(), omniClient, cluster1, options.MachineOptions.TalosVersion, + options.MachineOptions.KubernetesVersion, options.talosAPIKeyPrepare)) + runTests(t, AssertBlockClusterAndTalosAPIAndKubernetesShouldBeReady(t.Context(), omniClient, cluster2, options.MachineOptions.TalosVersion, + options.MachineOptions.KubernetesVersion, options.talosAPIKeyPrepare)) + + parentCtx := t.Context() + + t.Run("WorkloadProxyShouldBeTested", func(t *testing.T) { + workloadproxy.Test(parentCtx, t, omniClient, cluster1, cluster2) + }) + + t.Run("ClusterShouldBeDestroyed-"+cluster1, AssertDestroyCluster(t.Context(), options.omniClient, cluster1, options.OutputDir, false, false, t.Failed())) + t.Run("ClusterShouldBeDestroyed-"+cluster2, AssertDestroyCluster(t.Context(), options.omniClient, cluster2, options.OutputDir, false, false, t.Failed())) } } @@ -1346,7 +1370,7 @@ Note: this test expects all machines to be provisioned by the bare-metal infra p t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, true), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, true, t.Failed()), ) t.Run( @@ -1367,7 +1391,7 @@ Note: this test expects all machines to be provisioned by the bare-metal infra p t.Run( "ClusterShouldBeDestroyed", - AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, true), + AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, true, t.Failed()), ) } } diff --git a/internal/integration/talos_test.go b/internal/integration/talos_test.go index bde8e645..fc267247 100644 --- a/internal/integration/talos_test.go +++ b/internal/integration/talos_test.go @@ -789,7 +789,7 @@ func AssertMachineShouldBeUpgradedInMaintenanceMode( assertClusterReady(talosVersion1) // destroy the cluster - AssertDestroyCluster(ctx, st, clusterName, false, false)(t) + AssertDestroyCluster(ctx, rootClient, clusterName, "", false, false, false)(t) t.Logf("creating a cluster on version %q using same machines", talosVersion2) @@ -810,7 +810,7 @@ func AssertMachineShouldBeUpgradedInMaintenanceMode( // wait for cluster on talosVersion2 to be ready assertClusterReady(talosVersion2) - AssertDestroyCluster(ctx, st, clusterName, false, false)(t) + AssertDestroyCluster(ctx, rootClient, clusterName, "", false, false, false)(t) } } diff --git a/internal/integration/workload_proxy_test.go b/internal/integration/workload_proxy_test.go deleted file mode 100644 index 666e7b32..00000000 --- a/internal/integration/workload_proxy_test.go +++ /dev/null @@ -1,299 +0,0 @@ -// Copyright (c) 2025 Sidero Labs, Inc. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. - -//go:build integration - -package integration_test - -import ( - "compress/gzip" - "context" - "crypto/tls" - _ "embed" - "encoding/base64" - "errors" - "io" - "net/http" - "net/url" - "strconv" - "strings" - "testing" - "time" - - "github.com/cosi-project/runtime/pkg/resource/rtestutils" - "github.com/hashicorp/go-cleanhttp" - "github.com/siderolabs/go-pointer" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" - - "github.com/siderolabs/omni/client/pkg/client" - "github.com/siderolabs/omni/client/pkg/omni/resources/omni" - "github.com/siderolabs/omni/internal/backend/workloadproxy" - "github.com/siderolabs/omni/internal/pkg/clientconfig" -) - -//go:embed testdata/sidero-labs-icon.svg -var sideroLabsIconSVG []byte - -// AssertWorkloadProxy tests the workload proxy feature. -func AssertWorkloadProxy(testCtx context.Context, apiClient *client.Client, clusterName string) TestFunc { - return func(t *testing.T) { - ctx, cancel := context.WithTimeout(testCtx, 150*time.Second) - t.Cleanup(cancel) - - st := apiClient.Omni().State() - kubeClient := getKubernetesClient(ctx, t, apiClient.Management(), clusterName) - port := 12345 - label := "test-nginx" - icon := base64.StdEncoding.EncodeToString(doGzip(t, sideroLabsIconSVG)) // base64(gzip(Sidero Labs icon SVG)) - deployment, service := workloadProxyManifests(port, label, icon) - deps := kubeClient.AppsV1().Deployments(deployment.Namespace) - - _, err := deps.Create(ctx, &deployment, metav1.CreateOptions{}) - if !apierrors.IsAlreadyExists(err) { - require.NoError(t, err) - } - - // assert that deployment has a running (Ready) pod - require.EventuallyWithT(t, func(collect *assert.CollectT) { - dep, depErr := deps.Get(ctx, deployment.Name, metav1.GetOptions{}) - if errors.Is(depErr, context.Canceled) || errors.Is(depErr, context.DeadlineExceeded) { - require.NoError(collect, depErr) - } - - if !assert.NoError(collect, depErr) { - t.Logf("failed to get deployment %q/%q: %q", deployment.Namespace, deployment.Name, depErr) - - return - } - - if !assert.Greater(collect, dep.Status.ReadyReplicas, int32(0)) { - t.Logf("deployment %q/%q has no ready replicas", dep.Namespace, dep.Name) - } - }, 2*time.Minute, 1*time.Second) - - _, err = kubeClient.CoreV1().Services(service.Namespace).Create(ctx, &service, metav1.CreateOptions{}) - if !apierrors.IsAlreadyExists(err) { - require.NoError(t, err) - } - - expectedID := clusterName + "/" + service.Name + "." + service.Namespace - expectedIconBase64 := base64.StdEncoding.EncodeToString(sideroLabsIconSVG) - - var svcURL string - - rtestutils.AssertResource[*omni.ExposedService](ctx, t, st, expectedID, func(r *omni.ExposedService, assertion *assert.Assertions) { - assertion.Equal(port, int(r.TypedSpec().Value.Port)) - assertion.Equal("test-nginx", r.TypedSpec().Value.Label) - assertion.Equal(expectedIconBase64, r.TypedSpec().Value.IconBase64) - assertion.NotEmpty(r.TypedSpec().Value.Url) - - svcURL = r.TypedSpec().Value.Url - }) - - clientTransport := cleanhttp.DefaultPooledTransport() - clientTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - - httpClient := &http.Client{ - CheckRedirect: func(*http.Request, []*http.Request) error { - return http.ErrUseLastResponse // disable follow redirects - }, - Timeout: 5 * time.Second, - Transport: clientTransport, - } - t.Cleanup(httpClient.CloseIdleConnections) - - t.Logf("do a GET request to the exposed service URL, expect a redirect to authentication page") - - doRequestAssertResponseWithRetries(ctx, t, httpClient, svcURL, http.StatusSeeOther, "", "") - - t.Logf("do the same request but with proper cookies set, expect Omni to proxy the request to nginx") - - keyID, keyIDSignatureBase64, err := clientconfig.RegisterKeyGetIDSignatureBase64(ctx, apiClient) - require.NoError(t, err) - - t.Logf("public key ID: %q, signature base64: %q", keyID, keyIDSignatureBase64) - - cookies := []*http.Cookie{ - {Name: workloadproxy.PublicKeyIDCookie, Value: keyID}, - {Name: workloadproxy.PublicKeyIDSignatureBase64Cookie, Value: keyIDSignatureBase64}, - } - - doRequestAssertResponseWithRetries(ctx, t, httpClient, svcURL, http.StatusOK, "Welcome to nginx!", "", cookies...) - - // Remove the service, assert that ExposedService resource is removed as well - require.NoError(t, kubeClient.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{})) - - rtestutils.AssertNoResource[*omni.ExposedService](ctx, t, st, expectedID) - - t.Logf("do a GET request to the exposed service URL, expect 404 Not Found") - - doRequestAssertResponseWithRetries(ctx, t, httpClient, svcURL, http.StatusNotFound, "", "", cookies...) - } -} - -func doRequestAssertResponseWithRetries(ctx context.Context, t *testing.T, httpClient *http.Client, svcURL string, expectedStatusCode int, - expectedBodyContent, expectedLocationHeaderContent string, cookies ...*http.Cookie, -) { - require.EventuallyWithT(t, func(collect *assert.CollectT) { - req := prepareWorkloadProxyRequest(ctx, t, svcURL) - - for _, cookie := range cookies { - req.AddCookie(cookie) - } - - resp, err := httpClient.Do(req) - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - require.NoError(collect, err) - } - - if !assert.NoError(collect, err) { - t.Logf("request failed: %v", err) - - return - } - - defer resp.Body.Close() //nolint:errcheck - - if !assert.Equal(collect, expectedStatusCode, resp.StatusCode) { - t.Logf("unexpected status code: not %d, but %d", expectedStatusCode, resp.StatusCode) - } - - if expectedBodyContent != "" { - body, bodyErr := io.ReadAll(resp.Body) - require.NoError(collect, bodyErr) - - if !assert.Contains(collect, string(body), expectedBodyContent) { - t.Logf("content %q not found in body: %q", expectedBodyContent, string(body)) - } - } - - if expectedLocationHeaderContent != "" { - location := resp.Header.Get("Location") - - if !assert.Contains(collect, location, expectedLocationHeaderContent) { - t.Logf("content %q not found in Location header: %q", expectedLocationHeaderContent, location) - } - } - }, 3*time.Minute, 3*time.Second) -} - -// prepareWorkloadProxyRequest prepares a request to the workload proxy. -// It uses the Omni base URL to access Omni with proper name resolution, but set the Host header to the original URL to test the workload proxy logic -// -// sample svcURL: https://j2s7hf-local.proxy-us.localhost:8099/ -func prepareWorkloadProxyRequest(ctx context.Context, t *testing.T, svcURL string) *http.Request { - t.Logf("url of the exposed service: %q", svcURL) - - parsedURL, err := url.Parse(svcURL) - require.NoError(t, err) - - hostParts := strings.SplitN(parsedURL.Host, ".", 3) - require.GreaterOrEqual(t, len(hostParts), 3) - - svcHost := parsedURL.Host - baseHost := hostParts[2] - - parsedURL.Host = baseHost - - baseURL := parsedURL.String() - - t.Logf("base URL which will be used to reach exposed service: %q, host: %q", baseURL, svcHost) - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, baseURL, nil) - require.NoError(t, err) - - req.Host = svcHost - - return req -} - -func workloadProxyManifests(port int, label, icon string) (appsv1.Deployment, corev1.Service) { - deployment := appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "nginx", - Namespace: "default", - }, - Spec: appsv1.DeploymentSpec{ - Replicas: pointer.To(int32(1)), - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": "nginx", - }, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app": "nginx", - }, - }, - Spec: corev1.PodSpec{ - Tolerations: []corev1.Toleration{ - { - Operator: corev1.TolerationOpExists, - }, - }, - Containers: []corev1.Container{ - { - Name: "nginx", - Image: "nginx:stable-alpine-slim", - Ports: []corev1.ContainerPort{ - { - ContainerPort: 80, - Protocol: corev1.ProtocolTCP, - }, - }, - }, - }, - }, - }, - }, - } - - service := corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "nginx", - Namespace: "default", - Annotations: map[string]string{ - "omni-kube-service-exposer.sidero.dev/port": strconv.Itoa(port), - "omni-kube-service-exposer.sidero.dev/label": label, - "omni-kube-service-exposer.sidero.dev/icon": icon, - }, - }, - Spec: corev1.ServiceSpec{ - Selector: map[string]string{ - "app": "nginx", - }, - Ports: []corev1.ServicePort{ - { - Port: 80, - TargetPort: intstr.FromInt32(80), - }, - }, - }, - } - - return deployment, service -} - -func doGzip(t *testing.T, input []byte) []byte { - t.Helper() - - var buf strings.Builder - - writer := gzip.NewWriter(&buf) - - _, err := writer.Write(input) - require.NoError(t, err) - - require.NoError(t, writer.Close()) - - return []byte(buf.String()) -} diff --git a/internal/integration/testdata/sidero-labs-icon.svg b/internal/integration/workloadproxy/testdata/sidero-labs-icon.svg similarity index 100% rename from internal/integration/testdata/sidero-labs-icon.svg rename to internal/integration/workloadproxy/testdata/sidero-labs-icon.svg diff --git a/internal/integration/workloadproxy/workloadproxy.go b/internal/integration/workloadproxy/workloadproxy.go new file mode 100644 index 00000000..60c52723 --- /dev/null +++ b/internal/integration/workloadproxy/workloadproxy.go @@ -0,0 +1,669 @@ +// Copyright (c) 2025 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +// Package workloadproxy provides tests for the workload proxy (exposed services) functionality in Omni. +package workloadproxy + +import ( + "bytes" + "compress/gzip" + "context" + "crypto/tls" + _ "embed" + "encoding/base64" + "errors" + "fmt" + "io" + "math/rand/v2" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/resource/rtestutils" + "github.com/cosi-project/runtime/pkg/safe" + "github.com/hashicorp/go-cleanhttp" + "github.com/siderolabs/gen/maps" + "github.com/siderolabs/gen/xslices" + "github.com/siderolabs/go-pointer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + clientgokubernetes "k8s.io/client-go/kubernetes" + + "github.com/siderolabs/omni/client/pkg/client" + "github.com/siderolabs/omni/client/pkg/constants" + "github.com/siderolabs/omni/client/pkg/omni/resources" + "github.com/siderolabs/omni/client/pkg/omni/resources/omni" + "github.com/siderolabs/omni/internal/backend/workloadproxy" + "github.com/siderolabs/omni/internal/integration/kubernetes" + "github.com/siderolabs/omni/internal/pkg/clientconfig" +) + +type serviceContext struct { + res *omni.ExposedService + svc *corev1.Service + deployment *deploymentContext +} + +type deploymentContext struct { + deployment *appsv1.Deployment + cluster *clusterContext + services []serviceContext +} + +type clusterContext struct { + kubeClient clientgokubernetes.Interface + clusterID string + deployments []deploymentContext +} + +//go:embed testdata/sidero-labs-icon.svg +var sideroLabsIconSVG []byte + +// Test tests the exposed services functionality in Omni. +func Test(ctx context.Context, t *testing.T, omniClient *client.Client, clusterIDs ...string) { + ctx, cancel := context.WithTimeout(ctx, 20*time.Minute) + t.Cleanup(cancel) + + if len(clusterIDs) == 0 { + require.Fail(t, "no cluster IDs provided for the test, please provide at least one cluster ID") + } + + ctx = kubernetes.WrapContext(ctx, t) + logger := zaptest.NewLogger(t) + + clusters := make([]clusterContext, 0, len(clusterIDs)) + + for _, clusterID := range clusterIDs { + cluster := prepareServices(ctx, t, logger, omniClient, clusterID) + + clusters = append(clusters, cluster) + } + + var ( + allServices []serviceContext + allExposedServices []*omni.ExposedService + deploymentsToScaleDown []deploymentContext + ) + + for _, cluster := range clusters { + for i, deployment := range cluster.deployments { + allServices = append(allServices, deployment.services...) + + for _, service := range deployment.services { + allExposedServices = append(allExposedServices, service.res) + } + + if i%2 == 0 { // scale down every second deployment + deploymentsToScaleDown = append(deploymentsToScaleDown, deployment) + } + } + } + + rand.Shuffle(len(allExposedServices), func(i, j int) { + allExposedServices[i], allExposedServices[j] = allExposedServices[j], allExposedServices[i] + }) + + testAccess(ctx, t, logger, omniClient, allExposedServices, http.StatusOK) + + inaccessibleExposedServices := make([]*omni.ExposedService, 0, len(allExposedServices)) + + for _, deployment := range deploymentsToScaleDown { + logger.Info("scale deployment down to zero replicas", zap.String("deployment", deployment.deployment.Name), zap.String("clusterID", deployment.cluster.clusterID)) + + kubernetes.ScaleDeployment(ctx, t, deployment.cluster.kubeClient, deployment.deployment.Namespace, deployment.deployment.Name, 0) + + for _, service := range deployment.services { + inaccessibleExposedServices = append(inaccessibleExposedServices, service.res) + } + } + + testAccess(ctx, t, logger, omniClient, inaccessibleExposedServices, http.StatusBadGateway) + + for _, deployment := range deploymentsToScaleDown { + logger.Info("scale deployment back up", zap.String("deployment", deployment.deployment.Name), zap.String("clusterID", deployment.cluster.clusterID)) + + kubernetes.ScaleDeployment(ctx, t, deployment.cluster.kubeClient, deployment.deployment.Namespace, deployment.deployment.Name, 1) + } + + testAccess(ctx, t, logger, omniClient, allExposedServices, http.StatusOK) + testToggleFeature(ctx, t, logger, omniClient, clusters[0]) + testToggleKubernetesServiceAnnotation(ctx, t, logger, omniClient, allServices[:len(allServices)/2]) +} + +// testToggleFeature tests toggling off/on the workload proxy feature for a cluster. +func testToggleFeature(ctx context.Context, t *testing.T, logger *zap.Logger, omniClient *client.Client, cluster clusterContext) { + logger.Info("test turning off and on the feature for the cluster", zap.String("clusterID", cluster.clusterID)) + + setFeatureToggle := func(enabled bool) { + _, err := safe.StateUpdateWithConflicts(ctx, omniClient.Omni().State(), omni.NewCluster(resources.DefaultNamespace, cluster.clusterID).Metadata(), func(res *omni.Cluster) error { + res.TypedSpec().Value.Features.EnableWorkloadProxy = enabled + + return nil + }) + require.NoErrorf(t, err, "failed to turn off workload proxy feature for cluster %q", cluster.clusterID) + } + + setFeatureToggle(false) + + var services []*omni.ExposedService + + for _, deployment := range cluster.deployments { + for _, service := range deployment.services { + services = append(services, service.res) + } + } + + if len(services) > 4 { + services = services[:4] + } + + testAccess(ctx, t, logger, omniClient, services[:4], http.StatusNotFound) + + setFeatureToggle(true) + + testAccess(ctx, t, logger, omniClient, services[:4], http.StatusOK) +} + +func testToggleKubernetesServiceAnnotation(ctx context.Context, t *testing.T, logger *zap.Logger, omniClient *client.Client, services []serviceContext) { + logger.Info("test toggling Kubernetes service annotation for exposed services", zap.Int("numServices", len(services))) + + for _, service := range services { + kubernetes.UpdateService(ctx, t, service.deployment.cluster.kubeClient, service.svc.Namespace, service.svc.Name, func(svc *corev1.Service) { + delete(svc.Annotations, constants.ExposedServicePortAnnotationKey) + }) + } + + omniState := omniClient.Omni().State() + for _, service := range services { + rtestutils.AssertNoResource[*omni.ExposedService](ctx, t, omniState, service.res.Metadata().ID()) + } + + exposedServices := xslices.Map(services, func(svc serviceContext) *omni.ExposedService { return svc.res }) + + testAccess(ctx, t, logger, omniClient, exposedServices, http.StatusNotFound) + + for _, service := range services { + kubernetes.UpdateService(ctx, t, service.deployment.cluster.kubeClient, service.svc.Namespace, service.svc.Name, func(svc *corev1.Service) { + svc.Annotations[constants.ExposedServicePortAnnotationKey] = service.svc.Annotations[constants.ExposedServicePortAnnotationKey] + }) + } + + updatedServicesMap := make(map[resource.ID]*omni.ExposedService) + + for _, service := range services { + rtestutils.AssertResource[*omni.ExposedService](ctx, t, omniState, service.res.Metadata().ID(), func(r *omni.ExposedService, assertion *assert.Assertions) { + assertion.Equal(service.res.TypedSpec().Value.Port, r.TypedSpec().Value.Port, "exposed service port should be restored after toggling the annotation back on") + assertion.Equal(service.res.TypedSpec().Value.Label, r.TypedSpec().Value.Label, "exposed service label should be restored after toggling the annotation back on") + assertion.Equal(service.res.TypedSpec().Value.IconBase64, r.TypedSpec().Value.IconBase64, "exposed service icon should be restored after toggling the annotation back on") + assertion.Equal(service.res.TypedSpec().Value.HasExplicitAlias, r.TypedSpec().Value.HasExplicitAlias, "exposed service has explicit alias") + assertion.Empty(r.TypedSpec().Value.Error, "exposed service should not have an error after toggling the annotation back on") + + if r.TypedSpec().Value.HasExplicitAlias { + assertion.Equal(service.res.TypedSpec().Value.Url, r.TypedSpec().Value.Url, "exposed service URL should be restored after toggling the annotation back on") + } + + updatedServicesMap[r.Metadata().ID()] = r + }) + } + + updatedServices := maps.Values(updatedServicesMap) + + testAccess(ctx, t, logger, omniClient, updatedServices, http.StatusOK) +} + +func prepareServices(ctx context.Context, t *testing.T, logger *zap.Logger, omniClient *client.Client, clusterID string) clusterContext { + ctx, cancel := context.WithTimeout(ctx, 150*time.Second) + t.Cleanup(cancel) + + omniState := omniClient.Omni().State() + kubeClient := kubernetes.GetClient(ctx, t, omniClient.Management(), clusterID) + + iconBase64 := base64.StdEncoding.EncodeToString(doGzip(t, sideroLabsIconSVG)) // base64(gzip(Sidero Labs icon SVG)) + expectedIconBase64 := base64.StdEncoding.EncodeToString(sideroLabsIconSVG) + + numWorkloads := 5 + numReplicasPerWorkload := 4 + numServicePerWorkload := 10 + startPort := 12345 + + cluster := clusterContext{ + clusterID: clusterID, + deployments: make([]deploymentContext, 0, numWorkloads), + kubeClient: kubeClient, + } + + for i := range numWorkloads { + identifier := fmt.Sprintf("%s-w%02d", clusterID, i) + + deployment := deploymentContext{ + cluster: &cluster, + } + + firstPort := startPort + i*numServicePerWorkload + + var services []*corev1.Service + + deployment.deployment, services = createKubernetesResources(ctx, t, logger, kubeClient, firstPort, numReplicasPerWorkload, numServicePerWorkload, identifier, iconBase64) + + for _, service := range services { + expectedID := clusterID + "/" + service.Name + "." + service.Namespace + + expectedPort, err := strconv.Atoi(service.Annotations[constants.ExposedServicePortAnnotationKey]) + require.NoError(t, err) + + expectedLabel := service.Annotations[constants.ExposedServiceLabelAnnotationKey] + explicitAlias, hasExplicitAlias := service.Annotations[constants.ExposedServicePrefixAnnotationKey] + + var res *omni.ExposedService + + rtestutils.AssertResource[*omni.ExposedService](ctx, t, omniState, expectedID, func(r *omni.ExposedService, assertion *assert.Assertions) { + assertion.Equal(expectedPort, int(r.TypedSpec().Value.Port)) + assertion.Equal(expectedLabel, r.TypedSpec().Value.Label) + assertion.Equal(expectedIconBase64, r.TypedSpec().Value.IconBase64) + assertion.NotEmpty(r.TypedSpec().Value.Url) + assertion.Empty(r.TypedSpec().Value.Error) + assertion.Equal(hasExplicitAlias, r.TypedSpec().Value.HasExplicitAlias) + + if hasExplicitAlias { + assertion.Contains(r.TypedSpec().Value.Url, explicitAlias+"-") + } + + res = r + }) + + deployment.services = append(deployment.services, serviceContext{ + res: res, + svc: service, + deployment: &deployment, + }) + } + + cluster.deployments = append(cluster.deployments, deployment) + } + + return cluster +} + +func testAccess(ctx context.Context, t *testing.T, logger *zap.Logger, omniClient *client.Client, exposedServices []*omni.ExposedService, expectedStatusCode int) { + keyID, keyIDSignatureBase64, err := clientconfig.RegisterKeyGetIDSignatureBase64(ctx, omniClient) + require.NoError(t, err) + + logger.Debug("registered public key for workload proxy", zap.String("keyID", keyID), zap.String("keyIDSignatureBase64", keyIDSignatureBase64)) + + cookies := []*http.Cookie{ + {Name: workloadproxy.PublicKeyIDCookie, Value: keyID}, + {Name: workloadproxy.PublicKeyIDSignatureBase64Cookie, Value: keyIDSignatureBase64}, + } + + clientTransport := cleanhttp.DefaultTransport() + clientTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + + httpClient := &http.Client{ + CheckRedirect: func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse // disable follow redirects + }, + Timeout: 30 * time.Second, + Transport: clientTransport, + } + t.Cleanup(httpClient.CloseIdleConnections) + + parallelTestBatchSize := 8 + + var wg sync.WaitGroup + + errs := make([]error, parallelTestBatchSize) + + for i, exposedService := range exposedServices { + logger.Info("test exposed service", + zap.String("id", exposedService.Metadata().ID()), + zap.String("url", exposedService.TypedSpec().Value.Url), + zap.Int("expectedStatusCode", expectedStatusCode), + ) + + wg.Add(1) + + go func() { + defer wg.Done() + + if testErr := testAccessParallel(ctx, httpClient, exposedService, expectedStatusCode, cookies...); testErr != nil { + errs[i%parallelTestBatchSize] = fmt.Errorf("failed to access exposed service %q over %q [%d]: %w", + exposedService.Metadata().ID(), exposedService.TypedSpec().Value.Url, i, testErr) + } + }() + + if i == len(exposedServices)-1 || ((i+1)%parallelTestBatchSize == 0) { + logger.Info("wait for the batch of exposed service tests to finish", zap.Int("batchSize", (i%parallelTestBatchSize)+1)) + + wg.Wait() + + for _, testErr := range errs { + assert.NoError(t, testErr) + } + + // reset the errors for the next batch + for j := range errs { + errs[j] = nil + } + } + } +} + +func testAccessParallel(ctx context.Context, httpClient *http.Client, exposedService *omni.ExposedService, expectedStatusCode int, cookies ...*http.Cookie) error { + svcURL := exposedService.TypedSpec().Value.Url + + // test redirect to the authentication page when cookies are not set + if err := testAccessSingle(ctx, httpClient, svcURL, http.StatusSeeOther, ""); err != nil { + return fmt.Errorf("failed to access exposed service %q over %q: %w", exposedService.Metadata().ID(), svcURL, err) + } + + expectedBodyContent := "" + reqPerExposedService := 128 + numRetries := 3 + label := exposedService.TypedSpec().Value.Label + + if expectedStatusCode == http.StatusOK { + expectedBodyContent = label[:strings.LastIndex(label, "-")] // for the label "integration-workload-proxy-2-w03-s01", the content must be "integration-workload-proxy-2-w03" + } + + var wg sync.WaitGroup + + wg.Add(reqPerExposedService) + + lock := sync.Mutex{} + svcErrs := make(map[string]error, reqPerExposedService) + + for range reqPerExposedService { + go func() { + defer wg.Done() + + if err := testAccessSingleWithRetries(ctx, httpClient, svcURL, expectedStatusCode, expectedBodyContent, numRetries, cookies...); err != nil { + lock.Lock() + svcErrs[err.Error()] = err + lock.Unlock() + } + }() + } + + wg.Wait() + + return errors.Join(maps.Values(svcErrs)...) +} + +func testAccessSingleWithRetries(ctx context.Context, httpClient *http.Client, svcURL string, expectedStatusCode int, expectedBodyContent string, retries int, cookies ...*http.Cookie) error { + if retries <= 0 { + return fmt.Errorf("retries must be greater than 0, got %d", retries) + } + + var err error + + for range retries { + if err = testAccessSingle(ctx, httpClient, svcURL, expectedStatusCode, expectedBodyContent, cookies...); err == nil { + return nil + } + + select { + case <-ctx.Done(): + return fmt.Errorf("context canceled while trying to access %q: %w", svcURL, ctx.Err()) + case <-time.After(500 * time.Millisecond): + } + } + + return fmt.Errorf("failed to access %q after %d retries: %w", svcURL, retries, err) +} + +func testAccessSingle(ctx context.Context, httpClient *http.Client, svcURL string, expectedStatusCode int, expectedBodyContent string, cookies ...*http.Cookie) error { + req, err := prepareRequest(ctx, svcURL) + if err != nil { + return fmt.Errorf("failed to prepare request: %w", err) + } + + for _, cookie := range cookies { + req.AddCookie(cookie) + } + + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to do request: %w", err) + } + + defer resp.Body.Close() //nolint:errcheck + + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + bodyStr := string(body) + + if resp.StatusCode != expectedStatusCode { + return fmt.Errorf("unexpected status code: expected %d, got %d, body: %q", expectedStatusCode, resp.StatusCode, bodyStr) + } + + if expectedBodyContent == "" { + return nil + } + + if !strings.Contains(string(body), expectedBodyContent) { + return fmt.Errorf("content %q not found in body: %q", expectedBodyContent, bodyStr) + } + + return nil +} + +// prepareRequest prepares a request to the workload proxy. +// It uses the Omni base URL to access Omni with proper name resolution, but set the Host header to the original URL to test the workload proxy logic +// +// sample svcURL: https://j2s7hf-local.proxy-us.localhost:8099/ +func prepareRequest(ctx context.Context, svcURL string) (*http.Request, error) { + parsedURL, err := url.Parse(svcURL) + if err != nil { + return nil, fmt.Errorf("failed to parse URL %q: %w", svcURL, err) + } + + hostParts := strings.SplitN(parsedURL.Host, ".", 3) + if len(hostParts) < 3 { + return nil, fmt.Errorf("failed to parse host %q: expected at least 3 parts", parsedURL.Host) + } + + svcHost := parsedURL.Host + baseHost := hostParts[2] + + parsedURL.Host = baseHost + + baseURL := parsedURL.String() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, baseURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Host = svcHost + + return req, nil +} + +func createKubernetesResources(ctx context.Context, t *testing.T, logger *zap.Logger, kubeClient clientgokubernetes.Interface, + firstPort, numReplicas, numServices int, identifier, icon string, +) (*appsv1.Deployment, []*corev1.Service) { + namespace := "default" + + _, err := kubeClient.CoreV1().ConfigMaps(namespace).Create(ctx, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: identifier, + Namespace: namespace, + }, + Data: map[string]string{ + "index.html": fmt.Sprintf("%s%s", identifier, identifier), + }, + }, metav1.CreateOptions{}) + if !apierrors.IsAlreadyExists(err) { + require.NoError(t, err) + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: identifier, + Namespace: namespace, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: pointer.To(int32(numReplicas)), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": identifier, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": identifier, + }, + }, + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Operator: corev1.TolerationOpExists, + }, + }, + Containers: []corev1.Container{ + { + Name: identifier, + Image: "nginx:stable-alpine-slim", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 80, + Protocol: corev1.ProtocolTCP, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "contents", + MountPath: "/usr/share/nginx/html", + ReadOnly: true, + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "contents", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: identifier, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if _, err = kubeClient.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}); err != nil { + if !apierrors.IsAlreadyExists(err) { + require.NoErrorf(t, err, "failed to create deployment %q in namespace %q", identifier, namespace) + } + + // if the deployment already exists, update it with the new spec + deployment, err = kubeClient.AppsV1().Deployments(namespace).Update(ctx, deployment, metav1.UpdateOptions{}) + if !apierrors.IsNotFound(err) { + require.NoError(t, err) + } + } + + services := make([]*corev1.Service, 0, numServices) + + for i := range numServices { + svcIdentifier := fmt.Sprintf("%s-s%02d", identifier, i) + port := firstPort + i + + service := corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: svcIdentifier, + Namespace: namespace, + Annotations: map[string]string{ + constants.ExposedServicePortAnnotationKey: strconv.Itoa(port), + constants.ExposedServiceLabelAnnotationKey: svcIdentifier, + constants.ExposedServiceIconAnnotationKey: icon, + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": identifier, + }, + Ports: []corev1.ServicePort{ + { + Port: 80, + TargetPort: intstr.FromInt32(80), + }, + }, + }, + } + + if usePrefix := i%2 == 0; usePrefix { + service.Annotations[constants.ExposedServicePrefixAnnotationKey] = strings.ReplaceAll(svcIdentifier, "-", "") + } + + services = append(services, &service) + + _, err = kubeClient.CoreV1().Services(namespace).Create(ctx, &service, metav1.CreateOptions{}) + if !apierrors.IsAlreadyExists(err) { + require.NoError(t, err) + } + } + + // assert that all pods of the deployment are ready + require.EventuallyWithT(t, func(collect *assert.CollectT) { + dep, depErr := kubeClient.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + if errors.Is(depErr, context.Canceled) || errors.Is(depErr, context.DeadlineExceeded) { + require.NoError(collect, depErr) + } + + if !assert.NoError(collect, depErr) { + logger.Error("failed to get deployment", zap.String("namespace", deployment.Namespace), zap.String("name", deployment.Name), zap.Error(depErr)) + + return + } + + if !assert.Equal(collect, int(dep.Status.ReadyReplicas), numReplicas) { + logger.Debug("deployment does not have expected number of replicas", + zap.String("namespace", deployment.Namespace), + zap.String("name", deployment.Name), + zap.Int("expected", numReplicas), + zap.Int("found", int(dep.Status.ReadyReplicas)), + ) + } + }, 3*time.Minute, 5*time.Second) + + return deployment, services +} + +func doGzip(t *testing.T, input []byte) []byte { + t.Helper() + + var buf bytes.Buffer + + writer := gzip.NewWriter(&buf) + + _, err := writer.Write(input) + require.NoError(t, err) + + require.NoError(t, writer.Close()) + + return buf.Bytes() +}