test: improve workload proxying tests
Some checks are pending
default / default (push) Waiting to run
default / e2e-backups (push) Blocked by required conditions
default / e2e-forced-removal (push) Blocked by required conditions
default / e2e-scaling (push) Blocked by required conditions
default / e2e-short (push) Blocked by required conditions
default / e2e-short-secureboot (push) Blocked by required conditions
default / e2e-templates (push) Blocked by required conditions
default / e2e-upgrades (push) Blocked by required conditions
default / e2e-workload-proxy (push) Blocked by required conditions

Add many more services and test scenarios to the workload proxying feature:
- Use two clusters, a 1+1 and a 1+2.
- Use multiple nginx workloads.
- Each workload serving its name in its `index.html` and it being asserted (i.e., we assert that we hit the correct service).
- Multiple exposed services per workload.
- Multiple parallel requests per exposed service.
- Toggle the feature off and on, assert service accessibility.
- Toggle an exposed service off and on by removing/readding the k8s service annotation, assert accessibility.
- Test explicit prefixes.

Additionally:
- Fix two bugs in workload services:
  - Check the cookies before returning 404 for a non-existing exposed service prefix.
  - Add timeouts to `inmem` proxy transport, so requests do not potentially hang forever.
- Bring back the logic the saving of a support bundle when an integration test fails, and fix its save path.

Signed-off-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
This commit is contained in:
Utku Ozdemir 2025-05-06 12:12:18 +02:00
parent c9c4c8e10d
commit 7c19c318e8
No known key found for this signature in database
GPG Key ID: DBD13117B0A14E93
34 changed files with 993 additions and 500 deletions

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: common/omni.proto
package common

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/management/management.proto
package management

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v4.24.4
// - protoc v5.29.4
// source: omni/management/management.proto
package management

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/oidc/oidc.proto
package oidc

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v4.24.4
// - protoc v5.29.4
// source: omni/oidc/oidc.proto
package oidc

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/resources/resources.proto
package resources

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v4.24.4
// - protoc v5.29.4
// source: omni/resources/resources.proto
package resources

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/specs/auth.proto
package specs

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/specs/ephemeral.proto
package specs

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/specs/infra.proto
package specs

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/specs/oidc.proto
package specs

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/specs/omni.proto
package specs

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/specs/siderolink.proto
package specs

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/specs/system.proto
package specs

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v4.24.4
// protoc v5.29.4
// source: omni/specs/virtual.proto
package specs

View File

@ -0,0 +1,28 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package constants
const (
// ExposedServiceLabelAnnotationKey is the annotation to define the human-readable label of Kubernetes Services to expose them to Omni.
//
// tsgen:ExposedServiceLabelAnnotationKey
ExposedServiceLabelAnnotationKey = "omni-kube-service-exposer.sidero.dev/label"
// ExposedServicePortAnnotationKey is the annotation to define the port of Kubernetes Services to expose them to Omni.
//
// tsgen:ExposedServicePortAnnotationKey
ExposedServicePortAnnotationKey = "omni-kube-service-exposer.sidero.dev/port"
// ExposedServiceIconAnnotationKey is the annotation to define the icon of Kubernetes Services to expose them to Omni.
//
// tsgen:ExposedServiceIconAnnotationKey
ExposedServiceIconAnnotationKey = "omni-kube-service-exposer.sidero.dev/icon"
// ExposedServicePrefixAnnotationKey is the annotation to define the prefix of Kubernetes Services to expose them to Omni.
// When it is not defined, a prefix will be generated automatically.
//
// tsgen:ExposedServicePrefixAnnotationKey
ExposedServicePrefixAnnotationKey = "omni-kube-service-exposer.sidero.dev/prefix"
)

View File

@ -25,10 +25,6 @@ export const SignedRedirect = "v1:";
export const workloadProxyPublicKeyIdCookie = "publicKeyId";
export const workloadProxyPublicKeyIdSignatureBase64Cookie = "publicKeyIdSignatureBase64";
export const DefaultKubernetesVersion = "1.32.3";
export const ServiceLabelAnnotationKey = "omni-kube-service-exposer.sidero.dev/label";
export const ServicePortAnnotationKey = "omni-kube-service-exposer.sidero.dev/port";
export const ServiceIconAnnotationKey = "omni-kube-service-exposer.sidero.dev/icon";
export const ServicePrefixAnnotationKey = "omni-kube-service-exposer.sidero.dev/prefix";
export const installDiskMinSize = 5e+09;
export const authPublicKeyIDQueryParam = "public-key-id";
export const SecureBoot = "secureboot";
@ -37,6 +33,10 @@ export const PatchWeightInstallDisk = 0;
export const PatchBaseWeightCluster = 200;
export const PatchBaseWeightMachineSet = 400;
export const PatchBaseWeightClusterMachine = 400;
export const ExposedServiceLabelAnnotationKey = "omni-kube-service-exposer.sidero.dev/label";
export const ExposedServicePortAnnotationKey = "omni-kube-service-exposer.sidero.dev/port";
export const ExposedServiceIconAnnotationKey = "omni-kube-service-exposer.sidero.dev/icon";
export const ExposedServicePrefixAnnotationKey = "omni-kube-service-exposer.sidero.dev/prefix";
export const TalosServiceType = "Services.v1alpha1.talos.dev";
export const TalosCPUType = "CPUStats.perf.talos.dev";
export const TalosMemoryType = "MemoryStats.perf.talos.dev";

View File

@ -12,9 +12,9 @@ included in the LICENSE file.
<p>When enabled, the Services annotated with the following annotations</p>
<p>will be listed and accessible from the Omni Web interface:</p>
<div class="bg-naturals-N5 rounded px-2 py-1">
<p class="font-roboto">{{ ServicePortAnnotationKey }} (required)</p>
<p class="font-roboto">{{ ServiceLabelAnnotationKey }} (optional)</p>
<p class="font-roboto">{{ ServiceIconAnnotationKey }} (optional)</p>
<p class="font-roboto">{{ ExposedServicePortAnnotationKey }} (required)</p>
<p class="font-roboto">{{ ExposedServiceLabelAnnotationKey }} (optional)</p>
<p class="font-roboto">{{ ExposedServiceIconAnnotationKey }} (optional)</p>
</div>
<p>If the icon is specified, it must be a valid base64 of either a gzipped or uncompressed svg image.</p>
</div>
@ -27,7 +27,7 @@ included in the LICENSE file.
import { setupWorkloadProxyingEnabledFeatureWatch } from "@/methods/features";
import TCheckbox from "@/components/common/Checkbox/TCheckbox.vue";
import Tooltip from "@/components/common/Tooltip/Tooltip.vue";
import { ServiceIconAnnotationKey, ServiceLabelAnnotationKey, ServicePortAnnotationKey } from "@/api/resources";
import { ExposedServiceIconAnnotationKey, ExposedServiceLabelAnnotationKey, ExposedServicePortAnnotationKey } from "@/api/resources";
type Props = {
checked?: boolean;

View File

@ -18,9 +18,9 @@ import (
"github.com/hashicorp/go-multierror"
"go.uber.org/zap"
"github.com/siderolabs/omni/client/pkg/constants"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/exposedservice"
)
//go:embed data/kube-service-exposer-config-patch.tmpl.yaml
@ -134,7 +134,7 @@ func (ctrl *ClusterWorkloadProxyController) getConfigPatchData() ([]byte, error)
}
opts := tmplOptions{
AnnotationKey: exposedservice.ServicePortAnnotationKey,
AnnotationKey: constants.ExposedServicePortAnnotationKey,
}
var buf bytes.Buffer

View File

@ -11,29 +11,8 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
)
const (
// ServiceLabelAnnotationKey is the annotation to define the human-readable label of Kubernetes Services to expose them to Omni.
//
// tsgen:ServiceLabelAnnotationKey
ServiceLabelAnnotationKey = "omni-kube-service-exposer.sidero.dev/label"
// ServicePortAnnotationKey is the annotation to define the port of Kubernetes Services to expose them to Omni.
//
// tsgen:ServicePortAnnotationKey
ServicePortAnnotationKey = "omni-kube-service-exposer.sidero.dev/port"
// ServiceIconAnnotationKey is the annotation to define the icon of Kubernetes Services to expose them to Omni.
//
// tsgen:ServiceIconAnnotationKey
ServiceIconAnnotationKey = "omni-kube-service-exposer.sidero.dev/icon"
// ServicePrefixAnnotationKey is the annotation to define the prefix of Kubernetes Services to expose them to Omni.
// When it is not defined, a prefix will be generated automatically.
//
// tsgen:ServicePrefixAnnotationKey
ServicePrefixAnnotationKey = "omni-kube-service-exposer.sidero.dev/prefix"
"github.com/siderolabs/omni/client/pkg/constants"
)
// IsExposedServiceEvent returns true if there is a change on the Kubernetes Services
@ -50,7 +29,7 @@ func IsExposedServiceEvent(k8sObject, oldK8sObject any, logger *zap.Logger) bool
}
// check for ServicePortAnnotationKey annotation - the only annotation required for the exposed services
_, isAnnotated := service.GetObjectMeta().GetAnnotations()[ServicePortAnnotationKey]
_, isAnnotated := service.GetObjectMeta().GetAnnotations()[constants.ExposedServicePortAnnotationKey]
return isAnnotated
}
@ -76,7 +55,10 @@ func IsExposedServiceEvent(k8sObject, oldK8sObject any, logger *zap.Logger) bool
oldAnnotations := oldK8sObject.(*corev1.Service).GetObjectMeta().GetAnnotations() //nolint:forcetypeassert,errcheck
newAnnotations := k8sObject.(*corev1.Service).GetObjectMeta().GetAnnotations() //nolint:forcetypeassert,errcheck
for _, key := range []string{ServiceLabelAnnotationKey, ServicePortAnnotationKey, ServiceIconAnnotationKey, ServicePrefixAnnotationKey} {
for _, key := range []string{
constants.ExposedServiceLabelAnnotationKey, constants.ExposedServicePortAnnotationKey,
constants.ExposedServiceIconAnnotationKey, constants.ExposedServicePrefixAnnotationKey,
} {
if oldAnnotations[key] != newAnnotations[key] {
return true
}

View File

@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/siderolabs/omni/client/pkg/constants"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/exposedservice"
)
@ -42,8 +43,8 @@ func TestIsExposedServiceEvent(t *testing.T) {
obj: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
exposedservice.ServiceLabelAnnotationKey: "foo",
exposedservice.ServiceIconAnnotationKey: "bar",
constants.ExposedServiceLabelAnnotationKey: "foo",
constants.ExposedServiceIconAnnotationKey: "bar",
},
},
},
@ -54,7 +55,7 @@ func TestIsExposedServiceEvent(t *testing.T) {
obj: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
exposedservice.ServicePortAnnotationKey: "8080",
constants.ExposedServicePortAnnotationKey: "8080",
},
},
},
@ -65,8 +66,8 @@ func TestIsExposedServiceEvent(t *testing.T) {
obj: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
exposedservice.ServicePortAnnotationKey: "8080",
exposedservice.ServiceLabelAnnotationKey: "foo",
constants.ExposedServicePortAnnotationKey: "8080",
constants.ExposedServiceLabelAnnotationKey: "foo",
},
},
Spec: corev1.ServiceSpec{ClusterIP: "10.0.0.1"},
@ -74,8 +75,8 @@ func TestIsExposedServiceEvent(t *testing.T) {
oldObj: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
exposedservice.ServicePortAnnotationKey: "8080",
exposedservice.ServiceLabelAnnotationKey: "foo",
constants.ExposedServicePortAnnotationKey: "8080",
constants.ExposedServiceLabelAnnotationKey: "foo",
},
},
Spec: corev1.ServiceSpec{ClusterIP: "10.0.0.2"},
@ -87,16 +88,16 @@ func TestIsExposedServiceEvent(t *testing.T) {
obj: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
exposedservice.ServicePortAnnotationKey: "8080",
exposedservice.ServiceLabelAnnotationKey: "foo",
constants.ExposedServicePortAnnotationKey: "8080",
constants.ExposedServiceLabelAnnotationKey: "foo",
},
},
},
oldObj: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
exposedservice.ServicePortAnnotationKey: "8080",
exposedservice.ServiceLabelAnnotationKey: "bar", // different label
constants.ExposedServicePortAnnotationKey: "8080",
constants.ExposedServiceLabelAnnotationKey: "bar", // different label
},
},
},

View File

@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/rand"
"github.com/siderolabs/omni/client/pkg/constants"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
)
@ -113,25 +114,25 @@ func (reconciler *Reconciler) reconcileService(ctx context.Context, exposedServi
version = existingExposedService.Metadata().Version()
}
port, err := strconv.Atoi(service.Annotations[ServicePortAnnotationKey])
port, err := strconv.Atoi(service.Annotations[constants.ExposedServicePortAnnotationKey])
if err != nil || port < 1 || port > 65535 {
logger.Warn("invalid port on Service", zap.String("port", service.Annotations[ServicePortAnnotationKey]))
logger.Warn("invalid port on Service", zap.String("port", service.Annotations[constants.ExposedServicePortAnnotationKey]))
return true, nil //nolint:nilerr
}
label, labelOk := service.Annotations[ServiceLabelAnnotationKey]
label, labelOk := service.Annotations[constants.ExposedServiceLabelAnnotationKey]
if !labelOk {
label = service.Name + "." + service.Namespace
}
icon, err := reconciler.parseIcon(service.Annotations[ServiceIconAnnotationKey])
icon, err := reconciler.parseIcon(service.Annotations[constants.ExposedServiceIconAnnotationKey])
if err != nil {
logger.Debug("invalid icon on Service", zap.Error(err))
}
explicitAliasOpt := optional.None[string]()
if explicitAlias, ok := service.Annotations[ServicePrefixAnnotationKey]; ok {
if explicitAlias, ok := service.Annotations[constants.ExposedServicePrefixAnnotationKey]; ok {
explicitAliasOpt = optional.Some(explicitAlias)
}

View File

@ -110,6 +110,10 @@ func (h *HTTPHandler) ServeHTTP(writer http.ResponseWriter, request *http.Reques
return
}
if !h.checkCookies(writer, request, clusterID) {
return
}
if proxy == nil {
h.logger.Debug("proxy is nil", zap.String("alias", alias))
@ -118,7 +122,7 @@ func (h *HTTPHandler) ServeHTTP(writer http.ResponseWriter, request *http.Reques
return
}
h.checkCookies(writer, request, proxy, clusterID)
proxy.ServeHTTP(writer, request)
}
// isWorkloadProxyRequest checks if the request is for the workload proxy.
@ -145,12 +149,12 @@ func (h *HTTPHandler) isWorkloadProxyRequest(request *http.Request) bool {
return false
}
func (h *HTTPHandler) checkCookies(writer http.ResponseWriter, request *http.Request, proxy http.Handler, clusterID resource.ID) {
func (h *HTTPHandler) checkCookies(writer http.ResponseWriter, request *http.Request, clusterID resource.ID) (valid bool) {
publicKeyID, publicKeyIDSignatureBase64 := h.getSignatureCookies(request)
if publicKeyID == "" || publicKeyIDSignatureBase64 == "" {
h.redirectToLogin(writer, request)
return
return false
}
if err := h.accessValidator.ValidateAccess(request.Context(), publicKeyID, publicKeyIDSignatureBase64, clusterID); err != nil {
@ -160,10 +164,10 @@ func (h *HTTPHandler) checkCookies(writer http.ResponseWriter, request *http.Req
http.Redirect(writer, request, forbiddenURL, http.StatusSeeOther)
return
return false
}
proxy.ServeHTTP(writer, request)
return true
}
// parseServiceAliasFromHost parses the service alias from the request host.

View File

@ -185,7 +185,7 @@ func (registry *Reconciler) GetProxy(alias string) (http.Handler, resource.ID, e
lbSts := registry.clusterToAliasToLBStatus[clusterID][alias]
if lbSts == nil || lbSts.lb == nil {
return nil, "", nil
return nil, clusterID, nil
}
hostPort := registry.hostPortForAlias(clusterID, alias)
@ -197,7 +197,10 @@ func (registry *Reconciler) GetProxy(alias string) (http.Handler, resource.ID, e
proxy := httputil.NewSingleHostReverseProxy(targetURL)
proxy.Transport = &http.Transport{
DialContext: registry.connProvider.DialContext,
DialContext: registry.connProvider.DialContext,
IdleConnTimeout: 90 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
return proxy, clusterID, nil

View File

@ -239,6 +239,7 @@ func AssertClusterCreateAndReady(
name string,
options ClusterOptions,
testOutputDir string,
doSaveSupportBundle bool,
) []subTest { //nolint:nolintlint,revive
clusterName := "integration-" + name
options.Name = clusterName
@ -258,7 +259,7 @@ func AssertClusterCreateAndReady(
).Append(
subTest{
"ClusterShouldBeDestroyed",
AssertDestroyCluster(ctx, rootClient.Omni().State(), clusterName, options.InfraProvider != "", false),
AssertDestroyCluster(ctx, rootClient, clusterName, testOutputDir, options.InfraProvider != "", false, doSaveSupportBundle),
},
)
}

View File

@ -12,6 +12,7 @@ import (
"fmt"
"math/rand/v2"
"os"
"path/filepath"
"sync"
"testing"
"time"
@ -63,6 +64,8 @@ type ClusterOptions struct {
ScalingTimeout time.Duration
SkipExtensionCheckOnCreate bool
AllowSchedulingOnControlPlanes bool
}
// CreateCluster verifies cluster creation.
@ -95,6 +98,10 @@ func CreateCluster(testCtx context.Context, cli *client.Client, options ClusterO
require.NoError(st.Create(ctx, cluster))
if options.AllowSchedulingOnControlPlanes {
ensureAllowSchedulingOnControlPlanesConfigPatch(ctx, t, st, options.Name)
}
for i := range options.ControlPlanes {
t.Logf("Adding machine '%s' to control plane (cluster %q)", machineIDs[i], options.Name)
bindMachine(ctx, t, st, bindMachineOptions{
@ -159,6 +166,10 @@ func CreateClusterWithMachineClass(testCtx context.Context, st state.State, opti
require.NoError(st.Create(ctx, cluster))
require.NoError(st.Create(ctx, kubespanEnabler))
if options.AllowSchedulingOnControlPlanes {
ensureAllowSchedulingOnControlPlanesConfigPatch(ctx, t, st, options.Name)
}
machineClass := omni.NewMachineClass(resources.DefaultNamespace, options.Name)
if options.InfraProvider != "" {
@ -184,6 +195,17 @@ func CreateClusterWithMachineClass(testCtx context.Context, st state.State, opti
}
}
func ensureAllowSchedulingOnControlPlanesConfigPatch(ctx context.Context, t *testing.T, st state.State, clusterID resource.ID) {
createOrUpdate(ctx, t, st, omni.NewConfigPatch(resources.DefaultNamespace, fmt.Sprintf("400-%s-control-planes-untaint", clusterID)), func(patch *omni.ConfigPatch) error {
patch.Metadata().Labels().Set(omni.LabelCluster, clusterID)
patch.Metadata().Labels().Set(omni.LabelMachineSet, omni.ControlPlanesResourceID(clusterID))
return patch.TypedSpec().Value.SetUncompressedData([]byte(`cluster:
allowSchedulingOnControlPlanes: true
`))
})
}
// ScaleClusterMachineSets scales the cluster with machine sets which are using machine classes.
func ScaleClusterMachineSets(testCtx context.Context, st state.State, options ClusterOptions) TestFunc {
return func(t *testing.T) {
@ -511,53 +533,6 @@ func AssertClusterKubernetesUsage(testCtx context.Context, st state.State, clust
}
}
// DestroyCluster destroys a cluster and waits for it to be destroyed.
//
// It is used as a finalizer when the test group fails.
func DestroyCluster(testCtx context.Context, client *client.Client, supportBundleDir, clusterName string) TestFunc {
return func(t *testing.T) {
ctx, cancel := context.WithTimeout(testCtx, 6*time.Minute)
defer cancel()
st := client.Omni().State()
if err := saveSupportBundle(ctx, client, supportBundleDir, clusterName); err != nil {
t.Logf("failed to save support bundle: %v", err)
}
clusterMachineIDs := rtestutils.ResourceIDs[*omni.ClusterMachine](ctx, t, st, state.WithLabelQuery(
resource.LabelEqual(omni.LabelCluster, clusterName),
))
t.Log("destroying cluster", clusterName)
rtestutils.Teardown[*omni.Cluster](ctx, t, st, []resource.ID{clusterName})
rtestutils.AssertNoResource[*omni.Cluster](ctx, t, st, clusterName)
// wait for all machines to returned to the pool as 'available' or be part of a different cluster
rtestutils.AssertResources(ctx, t, st, clusterMachineIDs, func(machine *omni.MachineStatus, asrt *assert.Assertions) {
_, isAvailable := machine.Metadata().Labels().Get(omni.MachineStatusLabelAvailable)
machineCluster, machineBound := machine.Metadata().Labels().Get(omni.LabelCluster)
asrt.True(isAvailable || (machineBound && machineCluster != clusterName),
"machine %q: available %v, bound %v, cluster %q", machine.Metadata().ID(), isAvailable, machineBound, machineCluster,
)
})
_, err := st.Get(ctx, omni.NewMachineClass(resources.DefaultNamespace, clusterName).Metadata())
if state.IsNotFoundError(err) {
return
}
require.NoError(t, err)
t.Log("destroying related machine class", clusterName)
rtestutils.Destroy[*omni.MachineClass](ctx, t, st, []string{clusterName})
}
}
func saveSupportBundle(ctx context.Context, cli *client.Client, dir, cluster string) error {
supportBundle, err := cli.Management().GetSupportBundle(ctx, cluster, nil)
if err != nil {
@ -568,7 +543,10 @@ func saveSupportBundle(ctx context.Context, cli *client.Client, dir, cluster str
return fmt.Errorf("failed to create directory %q: %w", dir, err)
}
if err = os.WriteFile("support-bundle-"+cluster+".zip", supportBundle, 0o644); err != nil {
fileName := time.Now().Format("2006-01-02_15-04-05") + "-support-bundle-" + cluster + ".zip"
bundlePath := filepath.Join(dir, fileName)
if err = os.WriteFile(bundlePath, supportBundle, 0o644); err != nil {
return fmt.Errorf("failed to write support bundle file %q: %w", cluster, err)
}
@ -576,11 +554,21 @@ func saveSupportBundle(ctx context.Context, cli *client.Client, dir, cluster str
}
// AssertDestroyCluster destroys a cluster and verifies that all dependent resources are gone.
func AssertDestroyCluster(testCtx context.Context, st state.State, clusterName string, expectMachinesRemoved, assertInfraMachinesState bool) TestFunc {
func AssertDestroyCluster(testCtx context.Context, omniClient *client.Client, clusterName, outputDir string, expectMachinesRemoved, assertInfraMachinesState, doSaveSupportBundle bool) TestFunc {
return func(t *testing.T) {
ctx, cancel := context.WithTimeout(testCtx, 300*time.Second)
defer cancel()
st := omniClient.Omni().State()
if doSaveSupportBundle {
t.Logf("saving support bundle for cluster %q into %q before destruction", clusterName, outputDir)
if err := saveSupportBundle(ctx, omniClient, outputDir, clusterName); err != nil {
t.Logf("failed to save support bundle: %v", err)
}
}
patches := rtestutils.ResourceIDs[*omni.ConfigPatch](ctx, t, st, state.WithLabelQuery(
resource.LabelEqual(omni.LabelCluster, clusterName),
))
@ -617,7 +605,7 @@ func AssertDestroyCluster(testCtx context.Context, st state.State, clusterName s
return
}
// wait for all machines to returned to the pool as 'available' or be part of a different cluster
// wait for all machines to be returned to the pool as 'available' or be part of a different cluster
rtestutils.AssertResources(ctx, t, st, clusterMachineIDs, func(machine *omni.MachineStatus, asrt *assert.Assertions) {
_, isAvailable := machine.Metadata().Labels().Get(omni.MachineStatusLabelAvailable)
machineCluster, machineBound := machine.Metadata().Labels().Get(omni.LabelCluster)

View File

@ -0,0 +1,100 @@
// Copyright (c) 2025 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// Package kubernetes provides utilities for testing Kubernetes clusters.
package kubernetes
import (
"context"
"errors"
"testing"
"time"
"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"github.com/siderolabs/go-pointer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"github.com/siderolabs/omni/client/pkg/client/management"
"github.com/siderolabs/omni/client/pkg/constants"
)
// WrapContext wraps the provided context with a zap logger so that the Kubernetes client sends its logs to that, complying with the go test logging format.
func WrapContext(ctx context.Context, t *testing.T) context.Context {
logger := zaptest.NewLogger(t)
ctx = logr.NewContext(ctx, zapr.NewLogger(logger.With(zap.String("component", "k8s_client"))))
return ctx
}
// GetClient retrieves a Kubernetes clientset for the specified cluster using the management client.
func GetClient(ctx context.Context, t *testing.T, managementClient *management.Client, clusterName string) *kubernetes.Clientset {
// use service account kubeconfig to bypass oidc flow
kubeconfigBytes, err := managementClient.WithCluster(clusterName).Kubeconfig(ctx,
management.WithServiceAccount(24*time.Hour, "integration-test", constants.DefaultAccessGroup),
)
require.NoError(t, err)
kubeconfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return clientcmd.Load(kubeconfigBytes)
})
require.NoError(t, err)
cli, err := kubernetes.NewForConfig(kubeconfig)
require.NoError(t, err)
return cli
}
// ScaleDeployment scales a Kubernetes deployment to the specified number of replicas and waits until the deployment is scaled.
func ScaleDeployment(ctx context.Context, t *testing.T, kubeClient kubernetes.Interface, namespace, name string, numReplicas uint8) {
deployment, err := kubeClient.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
require.NoError(t, err, "failed to get deployment %q/%q", namespace, name)
// scale down the deployment to 0 replicas
deployment.Spec.Replicas = pointer.To(int32(numReplicas))
if _, err := kubeClient.AppsV1().Deployments(deployment.Namespace).Update(ctx, deployment, metav1.UpdateOptions{}); !apierrors.IsNotFound(err) {
require.NoError(t, err)
}
require.EventuallyWithT(t, func(collect *assert.CollectT) {
dep, err := kubeClient.AppsV1().Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
require.NoError(collect, err)
}
if !assert.NoError(collect, err) {
t.Logf("failed to get deployment %q/%q: %q", deployment.Namespace, deployment.Name, err)
return
}
if !assert.Equal(collect, int(numReplicas), int(dep.Status.ReadyReplicas)) {
t.Logf("deployment %q/%q does not have expected number of replicas: expected %d, found %d",
deployment.Namespace, deployment.Name, numReplicas, dep.Status.ReadyReplicas)
}
}, 3*time.Minute, 5*time.Second)
}
// UpdateService updates a Kubernetes service to add or modify the specified annotations and waits until the service is updated.
func UpdateService(ctx context.Context, t *testing.T, kubeClient kubernetes.Interface, namespace, name string, f func(*corev1.Service)) {
service, err := kubeClient.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
require.NoError(t, err, "failed to get service %q/%q", namespace, name)
f(service)
_, err = kubeClient.CoreV1().Services(namespace).Update(ctx, service, metav1.UpdateOptions{})
require.NoError(t, err, "failed to update service %q/%q", namespace, name)
}

View File

@ -23,6 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/integration/kubernetes"
)
// AssertKubernetesNodeAudit tests the Kubernetes node audit feature (KubernetesNodeAuditController) by doing the following:
@ -32,10 +33,11 @@ import (
// 3. Assert that the ClusterMachine resource is deleted - the ClusterMachineTeardownController did not block its deletion despite failing to remove the node from Kubernetes.
// 4. Wake the control plane back up.
// 5. Assert that the worker node eventually gets removed from Kubernetes due to node audit.
func AssertKubernetesNodeAudit(ctx context.Context, clusterName string, options *TestOptions) TestFunc {
func AssertKubernetesNodeAudit(testCtx context.Context, clusterName string, options *TestOptions) TestFunc {
st := options.omniClient.Omni().State()
return func(t *testing.T) {
ctx := kubernetes.WrapContext(testCtx, t)
if options.FreezeAMachineFunc == nil || options.RestartAMachineFunc == nil {
t.Skip("skip the test as FreezeAMachineFunc or RestartAMachineFunc is not set")
}
@ -85,7 +87,7 @@ func AssertKubernetesNodeAudit(ctx context.Context, clusterName string, options
require.NoError(t, options.RestartAMachineFunc(ctx, id))
}
kubernetesClient := getKubernetesClient(ctx, t, options.omniClient.Management(), clusterName)
kubernetesClient := kubernetes.GetClient(ctx, t, options.omniClient.Management(), clusterName)
logger.Info("assert that the node is removed from Kubernetes due to node audit")

View File

@ -27,9 +27,6 @@ import (
corev1 "k8s.io/api/core/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
managementpb "github.com/siderolabs/omni/client/api/omni/management"
"github.com/siderolabs/omni/client/api/omni/specs"
@ -38,6 +35,7 @@ import (
"github.com/siderolabs/omni/client/pkg/constants"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/integration/kubernetes"
)
// AssertKubernetesAPIAccessViaOmni verifies that cluster kubeconfig works.
@ -48,10 +46,12 @@ func AssertKubernetesAPIAccessViaOmni(testCtx context.Context, rootClient *clien
ctx, cancel := context.WithTimeout(testCtx, timeout)
defer cancel()
ctx = kubernetes.WrapContext(ctx, t)
k8sClient := kubernetes.GetClient(ctx, t, rootClient.Management(), clusterName)
var (
k8sClient = getKubernetesClient(ctx, t, rootClient.Management(), clusterName)
k8sNodes *corev1.NodeList
err error
k8sNodes *corev1.NodeList
err error
)
require.EventuallyWithT(t, func(collect *assert.CollectT) {
@ -321,7 +321,8 @@ func AssertKubernetesDeploymentIsCreated(testCtx context.Context, managementClie
ctx, cancel := context.WithTimeout(testCtx, 120*time.Second)
defer cancel()
kubeClient := getKubernetesClient(ctx, t, managementClient, clusterName)
ctx = kubernetes.WrapContext(ctx, t)
kubeClient := kubernetes.GetClient(ctx, t, managementClient, clusterName)
deployment := appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
@ -370,7 +371,8 @@ func AssertKubernetesSecretIsCreated(testCtx context.Context, managementClient *
ctx, cancel := context.WithTimeout(testCtx, 120*time.Second)
defer cancel()
kubeClient := getKubernetesClient(ctx, t, managementClient, clusterName)
ctx = kubernetes.WrapContext(ctx, t)
kubeClient := kubernetes.GetClient(ctx, t, managementClient, clusterName)
valBase64 := base64.StdEncoding.EncodeToString([]byte(testValue))
@ -394,7 +396,8 @@ func AssertKubernetesSecretHasValue(testCtx context.Context, managementClient *m
ctx, cancel := context.WithTimeout(testCtx, 120*time.Second)
defer cancel()
kubeClient := getKubernetesClient(ctx, t, managementClient, clusterName)
ctx = kubernetes.WrapContext(ctx, t)
kubeClient := kubernetes.GetClient(ctx, t, managementClient, clusterName)
secret, err := kubeClient.CoreV1().Secrets(ns).Get(ctx, name, metav1.GetOptions{})
require.NoError(t, err, "failed to get secret")
@ -413,8 +416,10 @@ func AssertKubernetesSecretHasValue(testCtx context.Context, managementClient *m
// 2. All the extra (stale) Kubernetes nodes are in NotReady state
//
// This assertion is useful to assert the expected nodes state when a cluster is created from an etcd backup.
func AssertKubernetesNodesState(ctx context.Context, rootClient *client.Client, newClusterName string) func(t *testing.T) {
func AssertKubernetesNodesState(testCtx context.Context, rootClient *client.Client, newClusterName string) func(t *testing.T) {
return func(t *testing.T) {
ctx := kubernetes.WrapContext(testCtx, t)
identityList, err := safe.StateListAll[*omni.ClusterMachineIdentity](ctx, rootClient.Omni().State(), state.WithLabelQuery(resource.LabelEqual(omni.LabelCluster, newClusterName)))
require.NoError(t, err)
@ -435,7 +440,7 @@ func AssertKubernetesNodesState(ctx context.Context, rootClient *client.Client,
return false
}
kubeClient := getKubernetesClient(ctx, t, rootClient.Management(), newClusterName)
kubeClient := kubernetes.GetClient(ctx, t, rootClient.Management(), newClusterName)
require.EventuallyWithT(t, func(collect *assert.CollectT) {
kubernetesNodes, listErr := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
@ -467,9 +472,11 @@ func AssertKubernetesNodesState(ctx context.Context, rootClient *client.Client,
}
// AssertKubernetesDeploymentHasRunningPods verifies that a deployment has running pods.
func AssertKubernetesDeploymentHasRunningPods(ctx context.Context, managementClient *management.Client, clusterName, ns, name string) TestFunc {
func AssertKubernetesDeploymentHasRunningPods(testCtx context.Context, managementClient *management.Client, clusterName, ns, name string) TestFunc {
return func(t *testing.T) {
deps := getKubernetesClient(ctx, t, managementClient, clusterName).AppsV1().Deployments(ns)
ctx := kubernetes.WrapContext(testCtx, t)
kubeClient := kubernetes.GetClient(ctx, t, managementClient, clusterName)
deps := kubeClient.AppsV1().Deployments(ns)
// restart the deployment, in case the pod is scheduled on a NotReady node (a node that is no longer valid, which was restored from an etcd backup)
require.EventuallyWithT(t, func(collect *assert.CollectT) {
@ -514,21 +521,3 @@ func AssertKubernetesDeploymentHasRunningPods(ctx context.Context, managementCli
}, 2*time.Minute, 1*time.Second)
}
}
func getKubernetesClient(ctx context.Context, t *testing.T, managementClient *management.Client, clusterName string) *kubernetes.Clientset {
// use service account kubeconfig to bypass oidc flow
kubeconfigBytes, err := managementClient.WithCluster(clusterName).Kubeconfig(ctx,
management.WithServiceAccount(24*time.Hour, "integration-test", constants.DefaultAccessGroup),
)
require.NoError(t, err)
kubeconfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return clientcmd.Load(kubeconfigBytes)
})
require.NoError(t, err)
cli, err := kubernetes.NewForConfig(kubeconfig)
require.NoError(t, err)
return cli
}

View File

@ -13,10 +13,12 @@ import (
"testing"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/integration/workloadproxy"
"github.com/siderolabs/omni/internal/pkg/clientconfig"
"google.golang.org/protobuf/types/known/durationpb"
)
type assertClusterReadyOptions struct {
@ -195,7 +197,7 @@ Test the auditing of the Kubernetes nodes, i.e. when a node is gone from the Omn
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -271,7 +273,7 @@ In the tests, we wipe and reboot the VMs to bring them back as available for the
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -303,7 +305,7 @@ Regression test: create a cluster and destroy it without waiting for the cluster
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -332,6 +334,7 @@ Don't do any changes to the cluster.`)
"default",
clusterOptions,
options.OutputDir,
t.Failed(),
))
}
}
@ -363,6 +366,7 @@ Don't do any changes to the cluster.`)
"encrypted",
clusterOptions,
options.OutputDir,
t.Failed(),
))
}
}
@ -391,6 +395,7 @@ Don't do any changes to the cluster.`)
"singlenode",
clusterOptions,
options.OutputDir,
t.Failed(),
))
}
}
@ -484,7 +489,7 @@ In between the scaling operations, assert that the cluster is ready and accessib
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -578,7 +583,7 @@ In between the scaling operations, assert that the cluster is ready and accessib
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -678,7 +683,7 @@ In between the scaling operations, assert that the cluster is ready and accessib
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, true, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, true, false, t.Failed()),
)
}
}
@ -726,7 +731,7 @@ Tests rolling update & scale down strategies for concurrency control for worker
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -776,7 +781,7 @@ In between the scaling operations, assert that the cluster is ready and accessib
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -843,7 +848,7 @@ Tests applying various config patching, including "broken" config patches which
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -940,7 +945,7 @@ Tests upgrading Talos version, including reverting a failed upgrade.`)
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -995,7 +1000,7 @@ Tests upgrading Kubernetes version, including reverting a failed upgrade.`)
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -1079,7 +1084,7 @@ Finally, a completely new cluster is created using the same backup to test the "
t.Run(
"NewClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), secondClusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, secondClusterName, options.OutputDir, false, false, t.Failed()),
)
runTests(
@ -1094,7 +1099,7 @@ Finally, a completely new cluster is created using the same backup to test the "
t.Run(
"RestoredClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -1208,7 +1213,7 @@ Test authorization on accessing Omni API, some tests run without a cluster, some
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, false, t.Failed()),
)
}
}
@ -1236,39 +1241,58 @@ Test workload service proxying feature`)
t.Parallel()
options.claimMachines(t, 1)
options.claimMachines(t, 6)
clusterName := "integration-workload-proxy"
omniClient := options.omniClient
cluster1 := "integration-workload-proxy-1"
cluster2 := "integration-workload-proxy-2"
t.Run(
"ClusterShouldBeCreated",
CreateCluster(t.Context(), options.omniClient, ClusterOptions{
Name: clusterName,
ControlPlanes: 1,
Workers: 0,
t.Run("ClusterShouldBeCreated-"+cluster1, CreateCluster(t.Context(), omniClient, ClusterOptions{
Name: cluster1,
ControlPlanes: 1,
Workers: 1,
Features: &specs.ClusterSpec_Features{
EnableWorkloadProxy: true,
},
Features: &specs.ClusterSpec_Features{
EnableWorkloadProxy: true,
},
MachineOptions: options.MachineOptions,
ScalingTimeout: options.ScalingTimeout,
MachineOptions: options.MachineOptions,
ScalingTimeout: options.ScalingTimeout,
SkipExtensionCheckOnCreate: options.SkipExtensionsCheckOnCreate,
}),
)
SkipExtensionCheckOnCreate: options.SkipExtensionsCheckOnCreate,
assertClusterAndAPIReady(t, clusterName, options)
AllowSchedulingOnControlPlanes: true,
}))
t.Run("ClusterShouldBeCreated-"+cluster2, CreateCluster(t.Context(), omniClient, ClusterOptions{
Name: cluster2,
ControlPlanes: 1,
Workers: 2,
t.Run(
"WorkloadProxyShouldBeTested",
AssertWorkloadProxy(t.Context(), options.omniClient, clusterName),
)
Features: &specs.ClusterSpec_Features{
EnableWorkloadProxy: true,
},
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, false),
)
MachineOptions: options.MachineOptions,
ScalingTimeout: options.ScalingTimeout,
SkipExtensionCheckOnCreate: options.SkipExtensionsCheckOnCreate,
AllowSchedulingOnControlPlanes: true,
}))
runTests(t, AssertBlockClusterAndTalosAPIAndKubernetesShouldBeReady(t.Context(), omniClient, cluster1, options.MachineOptions.TalosVersion,
options.MachineOptions.KubernetesVersion, options.talosAPIKeyPrepare))
runTests(t, AssertBlockClusterAndTalosAPIAndKubernetesShouldBeReady(t.Context(), omniClient, cluster2, options.MachineOptions.TalosVersion,
options.MachineOptions.KubernetesVersion, options.talosAPIKeyPrepare))
parentCtx := t.Context()
t.Run("WorkloadProxyShouldBeTested", func(t *testing.T) {
workloadproxy.Test(parentCtx, t, omniClient, cluster1, cluster2)
})
t.Run("ClusterShouldBeDestroyed-"+cluster1, AssertDestroyCluster(t.Context(), options.omniClient, cluster1, options.OutputDir, false, false, t.Failed()))
t.Run("ClusterShouldBeDestroyed-"+cluster2, AssertDestroyCluster(t.Context(), options.omniClient, cluster2, options.OutputDir, false, false, t.Failed()))
}
}
@ -1346,7 +1370,7 @@ Note: this test expects all machines to be provisioned by the bare-metal infra p
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, true),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, true, t.Failed()),
)
t.Run(
@ -1367,7 +1391,7 @@ Note: this test expects all machines to be provisioned by the bare-metal infra p
t.Run(
"ClusterShouldBeDestroyed",
AssertDestroyCluster(t.Context(), options.omniClient.Omni().State(), clusterName, false, true),
AssertDestroyCluster(t.Context(), options.omniClient, clusterName, options.OutputDir, false, true, t.Failed()),
)
}
}

View File

@ -789,7 +789,7 @@ func AssertMachineShouldBeUpgradedInMaintenanceMode(
assertClusterReady(talosVersion1)
// destroy the cluster
AssertDestroyCluster(ctx, st, clusterName, false, false)(t)
AssertDestroyCluster(ctx, rootClient, clusterName, "", false, false, false)(t)
t.Logf("creating a cluster on version %q using same machines", talosVersion2)
@ -810,7 +810,7 @@ func AssertMachineShouldBeUpgradedInMaintenanceMode(
// wait for cluster on talosVersion2 to be ready
assertClusterReady(talosVersion2)
AssertDestroyCluster(ctx, st, clusterName, false, false)(t)
AssertDestroyCluster(ctx, rootClient, clusterName, "", false, false, false)(t)
}
}

View File

@ -1,299 +0,0 @@
// Copyright (c) 2025 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//go:build integration
package integration_test
import (
"compress/gzip"
"context"
"crypto/tls"
_ "embed"
"encoding/base64"
"errors"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"testing"
"time"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/hashicorp/go-cleanhttp"
"github.com/siderolabs/go-pointer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"github.com/siderolabs/omni/client/pkg/client"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/backend/workloadproxy"
"github.com/siderolabs/omni/internal/pkg/clientconfig"
)
//go:embed testdata/sidero-labs-icon.svg
var sideroLabsIconSVG []byte
// AssertWorkloadProxy tests the workload proxy feature.
func AssertWorkloadProxy(testCtx context.Context, apiClient *client.Client, clusterName string) TestFunc {
return func(t *testing.T) {
ctx, cancel := context.WithTimeout(testCtx, 150*time.Second)
t.Cleanup(cancel)
st := apiClient.Omni().State()
kubeClient := getKubernetesClient(ctx, t, apiClient.Management(), clusterName)
port := 12345
label := "test-nginx"
icon := base64.StdEncoding.EncodeToString(doGzip(t, sideroLabsIconSVG)) // base64(gzip(Sidero Labs icon SVG))
deployment, service := workloadProxyManifests(port, label, icon)
deps := kubeClient.AppsV1().Deployments(deployment.Namespace)
_, err := deps.Create(ctx, &deployment, metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
require.NoError(t, err)
}
// assert that deployment has a running (Ready) pod
require.EventuallyWithT(t, func(collect *assert.CollectT) {
dep, depErr := deps.Get(ctx, deployment.Name, metav1.GetOptions{})
if errors.Is(depErr, context.Canceled) || errors.Is(depErr, context.DeadlineExceeded) {
require.NoError(collect, depErr)
}
if !assert.NoError(collect, depErr) {
t.Logf("failed to get deployment %q/%q: %q", deployment.Namespace, deployment.Name, depErr)
return
}
if !assert.Greater(collect, dep.Status.ReadyReplicas, int32(0)) {
t.Logf("deployment %q/%q has no ready replicas", dep.Namespace, dep.Name)
}
}, 2*time.Minute, 1*time.Second)
_, err = kubeClient.CoreV1().Services(service.Namespace).Create(ctx, &service, metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
require.NoError(t, err)
}
expectedID := clusterName + "/" + service.Name + "." + service.Namespace
expectedIconBase64 := base64.StdEncoding.EncodeToString(sideroLabsIconSVG)
var svcURL string
rtestutils.AssertResource[*omni.ExposedService](ctx, t, st, expectedID, func(r *omni.ExposedService, assertion *assert.Assertions) {
assertion.Equal(port, int(r.TypedSpec().Value.Port))
assertion.Equal("test-nginx", r.TypedSpec().Value.Label)
assertion.Equal(expectedIconBase64, r.TypedSpec().Value.IconBase64)
assertion.NotEmpty(r.TypedSpec().Value.Url)
svcURL = r.TypedSpec().Value.Url
})
clientTransport := cleanhttp.DefaultPooledTransport()
clientTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
httpClient := &http.Client{
CheckRedirect: func(*http.Request, []*http.Request) error {
return http.ErrUseLastResponse // disable follow redirects
},
Timeout: 5 * time.Second,
Transport: clientTransport,
}
t.Cleanup(httpClient.CloseIdleConnections)
t.Logf("do a GET request to the exposed service URL, expect a redirect to authentication page")
doRequestAssertResponseWithRetries(ctx, t, httpClient, svcURL, http.StatusSeeOther, "", "")
t.Logf("do the same request but with proper cookies set, expect Omni to proxy the request to nginx")
keyID, keyIDSignatureBase64, err := clientconfig.RegisterKeyGetIDSignatureBase64(ctx, apiClient)
require.NoError(t, err)
t.Logf("public key ID: %q, signature base64: %q", keyID, keyIDSignatureBase64)
cookies := []*http.Cookie{
{Name: workloadproxy.PublicKeyIDCookie, Value: keyID},
{Name: workloadproxy.PublicKeyIDSignatureBase64Cookie, Value: keyIDSignatureBase64},
}
doRequestAssertResponseWithRetries(ctx, t, httpClient, svcURL, http.StatusOK, "Welcome to nginx!", "", cookies...)
// Remove the service, assert that ExposedService resource is removed as well
require.NoError(t, kubeClient.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{}))
rtestutils.AssertNoResource[*omni.ExposedService](ctx, t, st, expectedID)
t.Logf("do a GET request to the exposed service URL, expect 404 Not Found")
doRequestAssertResponseWithRetries(ctx, t, httpClient, svcURL, http.StatusNotFound, "", "", cookies...)
}
}
func doRequestAssertResponseWithRetries(ctx context.Context, t *testing.T, httpClient *http.Client, svcURL string, expectedStatusCode int,
expectedBodyContent, expectedLocationHeaderContent string, cookies ...*http.Cookie,
) {
require.EventuallyWithT(t, func(collect *assert.CollectT) {
req := prepareWorkloadProxyRequest(ctx, t, svcURL)
for _, cookie := range cookies {
req.AddCookie(cookie)
}
resp, err := httpClient.Do(req)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
require.NoError(collect, err)
}
if !assert.NoError(collect, err) {
t.Logf("request failed: %v", err)
return
}
defer resp.Body.Close() //nolint:errcheck
if !assert.Equal(collect, expectedStatusCode, resp.StatusCode) {
t.Logf("unexpected status code: not %d, but %d", expectedStatusCode, resp.StatusCode)
}
if expectedBodyContent != "" {
body, bodyErr := io.ReadAll(resp.Body)
require.NoError(collect, bodyErr)
if !assert.Contains(collect, string(body), expectedBodyContent) {
t.Logf("content %q not found in body: %q", expectedBodyContent, string(body))
}
}
if expectedLocationHeaderContent != "" {
location := resp.Header.Get("Location")
if !assert.Contains(collect, location, expectedLocationHeaderContent) {
t.Logf("content %q not found in Location header: %q", expectedLocationHeaderContent, location)
}
}
}, 3*time.Minute, 3*time.Second)
}
// prepareWorkloadProxyRequest prepares a request to the workload proxy.
// It uses the Omni base URL to access Omni with proper name resolution, but set the Host header to the original URL to test the workload proxy logic
//
// sample svcURL: https://j2s7hf-local.proxy-us.localhost:8099/
func prepareWorkloadProxyRequest(ctx context.Context, t *testing.T, svcURL string) *http.Request {
t.Logf("url of the exposed service: %q", svcURL)
parsedURL, err := url.Parse(svcURL)
require.NoError(t, err)
hostParts := strings.SplitN(parsedURL.Host, ".", 3)
require.GreaterOrEqual(t, len(hostParts), 3)
svcHost := parsedURL.Host
baseHost := hostParts[2]
parsedURL.Host = baseHost
baseURL := parsedURL.String()
t.Logf("base URL which will be used to reach exposed service: %q, host: %q", baseURL, svcHost)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, baseURL, nil)
require.NoError(t, err)
req.Host = svcHost
return req
}
func workloadProxyManifests(port int, label, icon string) (appsv1.Deployment, corev1.Service) {
deployment := appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
Namespace: "default",
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.To(int32(1)),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "nginx",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "nginx",
},
},
Spec: corev1.PodSpec{
Tolerations: []corev1.Toleration{
{
Operator: corev1.TolerationOpExists,
},
},
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:stable-alpine-slim",
Ports: []corev1.ContainerPort{
{
ContainerPort: 80,
Protocol: corev1.ProtocolTCP,
},
},
},
},
},
},
},
}
service := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
Namespace: "default",
Annotations: map[string]string{
"omni-kube-service-exposer.sidero.dev/port": strconv.Itoa(port),
"omni-kube-service-exposer.sidero.dev/label": label,
"omni-kube-service-exposer.sidero.dev/icon": icon,
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "nginx",
},
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.FromInt32(80),
},
},
},
}
return deployment, service
}
func doGzip(t *testing.T, input []byte) []byte {
t.Helper()
var buf strings.Builder
writer := gzip.NewWriter(&buf)
_, err := writer.Write(input)
require.NoError(t, err)
require.NoError(t, writer.Close())
return []byte(buf.String())
}

View File

Before

Width:  |  Height:  |  Size: 1.3 KiB

After

Width:  |  Height:  |  Size: 1.3 KiB

View File

@ -0,0 +1,669 @@
// Copyright (c) 2025 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// Package workloadproxy provides tests for the workload proxy (exposed services) functionality in Omni.
package workloadproxy
import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
_ "embed"
"encoding/base64"
"errors"
"fmt"
"io"
"math/rand/v2"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/hashicorp/go-cleanhttp"
"github.com/siderolabs/gen/maps"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-pointer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
clientgokubernetes "k8s.io/client-go/kubernetes"
"github.com/siderolabs/omni/client/pkg/client"
"github.com/siderolabs/omni/client/pkg/constants"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/backend/workloadproxy"
"github.com/siderolabs/omni/internal/integration/kubernetes"
"github.com/siderolabs/omni/internal/pkg/clientconfig"
)
type serviceContext struct {
res *omni.ExposedService
svc *corev1.Service
deployment *deploymentContext
}
type deploymentContext struct {
deployment *appsv1.Deployment
cluster *clusterContext
services []serviceContext
}
type clusterContext struct {
kubeClient clientgokubernetes.Interface
clusterID string
deployments []deploymentContext
}
//go:embed testdata/sidero-labs-icon.svg
var sideroLabsIconSVG []byte
// Test tests the exposed services functionality in Omni.
func Test(ctx context.Context, t *testing.T, omniClient *client.Client, clusterIDs ...string) {
ctx, cancel := context.WithTimeout(ctx, 20*time.Minute)
t.Cleanup(cancel)
if len(clusterIDs) == 0 {
require.Fail(t, "no cluster IDs provided for the test, please provide at least one cluster ID")
}
ctx = kubernetes.WrapContext(ctx, t)
logger := zaptest.NewLogger(t)
clusters := make([]clusterContext, 0, len(clusterIDs))
for _, clusterID := range clusterIDs {
cluster := prepareServices(ctx, t, logger, omniClient, clusterID)
clusters = append(clusters, cluster)
}
var (
allServices []serviceContext
allExposedServices []*omni.ExposedService
deploymentsToScaleDown []deploymentContext
)
for _, cluster := range clusters {
for i, deployment := range cluster.deployments {
allServices = append(allServices, deployment.services...)
for _, service := range deployment.services {
allExposedServices = append(allExposedServices, service.res)
}
if i%2 == 0 { // scale down every second deployment
deploymentsToScaleDown = append(deploymentsToScaleDown, deployment)
}
}
}
rand.Shuffle(len(allExposedServices), func(i, j int) {
allExposedServices[i], allExposedServices[j] = allExposedServices[j], allExposedServices[i]
})
testAccess(ctx, t, logger, omniClient, allExposedServices, http.StatusOK)
inaccessibleExposedServices := make([]*omni.ExposedService, 0, len(allExposedServices))
for _, deployment := range deploymentsToScaleDown {
logger.Info("scale deployment down to zero replicas", zap.String("deployment", deployment.deployment.Name), zap.String("clusterID", deployment.cluster.clusterID))
kubernetes.ScaleDeployment(ctx, t, deployment.cluster.kubeClient, deployment.deployment.Namespace, deployment.deployment.Name, 0)
for _, service := range deployment.services {
inaccessibleExposedServices = append(inaccessibleExposedServices, service.res)
}
}
testAccess(ctx, t, logger, omniClient, inaccessibleExposedServices, http.StatusBadGateway)
for _, deployment := range deploymentsToScaleDown {
logger.Info("scale deployment back up", zap.String("deployment", deployment.deployment.Name), zap.String("clusterID", deployment.cluster.clusterID))
kubernetes.ScaleDeployment(ctx, t, deployment.cluster.kubeClient, deployment.deployment.Namespace, deployment.deployment.Name, 1)
}
testAccess(ctx, t, logger, omniClient, allExposedServices, http.StatusOK)
testToggleFeature(ctx, t, logger, omniClient, clusters[0])
testToggleKubernetesServiceAnnotation(ctx, t, logger, omniClient, allServices[:len(allServices)/2])
}
// testToggleFeature tests toggling off/on the workload proxy feature for a cluster.
func testToggleFeature(ctx context.Context, t *testing.T, logger *zap.Logger, omniClient *client.Client, cluster clusterContext) {
logger.Info("test turning off and on the feature for the cluster", zap.String("clusterID", cluster.clusterID))
setFeatureToggle := func(enabled bool) {
_, err := safe.StateUpdateWithConflicts(ctx, omniClient.Omni().State(), omni.NewCluster(resources.DefaultNamespace, cluster.clusterID).Metadata(), func(res *omni.Cluster) error {
res.TypedSpec().Value.Features.EnableWorkloadProxy = enabled
return nil
})
require.NoErrorf(t, err, "failed to turn off workload proxy feature for cluster %q", cluster.clusterID)
}
setFeatureToggle(false)
var services []*omni.ExposedService
for _, deployment := range cluster.deployments {
for _, service := range deployment.services {
services = append(services, service.res)
}
}
if len(services) > 4 {
services = services[:4]
}
testAccess(ctx, t, logger, omniClient, services[:4], http.StatusNotFound)
setFeatureToggle(true)
testAccess(ctx, t, logger, omniClient, services[:4], http.StatusOK)
}
func testToggleKubernetesServiceAnnotation(ctx context.Context, t *testing.T, logger *zap.Logger, omniClient *client.Client, services []serviceContext) {
logger.Info("test toggling Kubernetes service annotation for exposed services", zap.Int("numServices", len(services)))
for _, service := range services {
kubernetes.UpdateService(ctx, t, service.deployment.cluster.kubeClient, service.svc.Namespace, service.svc.Name, func(svc *corev1.Service) {
delete(svc.Annotations, constants.ExposedServicePortAnnotationKey)
})
}
omniState := omniClient.Omni().State()
for _, service := range services {
rtestutils.AssertNoResource[*omni.ExposedService](ctx, t, omniState, service.res.Metadata().ID())
}
exposedServices := xslices.Map(services, func(svc serviceContext) *omni.ExposedService { return svc.res })
testAccess(ctx, t, logger, omniClient, exposedServices, http.StatusNotFound)
for _, service := range services {
kubernetes.UpdateService(ctx, t, service.deployment.cluster.kubeClient, service.svc.Namespace, service.svc.Name, func(svc *corev1.Service) {
svc.Annotations[constants.ExposedServicePortAnnotationKey] = service.svc.Annotations[constants.ExposedServicePortAnnotationKey]
})
}
updatedServicesMap := make(map[resource.ID]*omni.ExposedService)
for _, service := range services {
rtestutils.AssertResource[*omni.ExposedService](ctx, t, omniState, service.res.Metadata().ID(), func(r *omni.ExposedService, assertion *assert.Assertions) {
assertion.Equal(service.res.TypedSpec().Value.Port, r.TypedSpec().Value.Port, "exposed service port should be restored after toggling the annotation back on")
assertion.Equal(service.res.TypedSpec().Value.Label, r.TypedSpec().Value.Label, "exposed service label should be restored after toggling the annotation back on")
assertion.Equal(service.res.TypedSpec().Value.IconBase64, r.TypedSpec().Value.IconBase64, "exposed service icon should be restored after toggling the annotation back on")
assertion.Equal(service.res.TypedSpec().Value.HasExplicitAlias, r.TypedSpec().Value.HasExplicitAlias, "exposed service has explicit alias")
assertion.Empty(r.TypedSpec().Value.Error, "exposed service should not have an error after toggling the annotation back on")
if r.TypedSpec().Value.HasExplicitAlias {
assertion.Equal(service.res.TypedSpec().Value.Url, r.TypedSpec().Value.Url, "exposed service URL should be restored after toggling the annotation back on")
}
updatedServicesMap[r.Metadata().ID()] = r
})
}
updatedServices := maps.Values(updatedServicesMap)
testAccess(ctx, t, logger, omniClient, updatedServices, http.StatusOK)
}
func prepareServices(ctx context.Context, t *testing.T, logger *zap.Logger, omniClient *client.Client, clusterID string) clusterContext {
ctx, cancel := context.WithTimeout(ctx, 150*time.Second)
t.Cleanup(cancel)
omniState := omniClient.Omni().State()
kubeClient := kubernetes.GetClient(ctx, t, omniClient.Management(), clusterID)
iconBase64 := base64.StdEncoding.EncodeToString(doGzip(t, sideroLabsIconSVG)) // base64(gzip(Sidero Labs icon SVG))
expectedIconBase64 := base64.StdEncoding.EncodeToString(sideroLabsIconSVG)
numWorkloads := 5
numReplicasPerWorkload := 4
numServicePerWorkload := 10
startPort := 12345
cluster := clusterContext{
clusterID: clusterID,
deployments: make([]deploymentContext, 0, numWorkloads),
kubeClient: kubeClient,
}
for i := range numWorkloads {
identifier := fmt.Sprintf("%s-w%02d", clusterID, i)
deployment := deploymentContext{
cluster: &cluster,
}
firstPort := startPort + i*numServicePerWorkload
var services []*corev1.Service
deployment.deployment, services = createKubernetesResources(ctx, t, logger, kubeClient, firstPort, numReplicasPerWorkload, numServicePerWorkload, identifier, iconBase64)
for _, service := range services {
expectedID := clusterID + "/" + service.Name + "." + service.Namespace
expectedPort, err := strconv.Atoi(service.Annotations[constants.ExposedServicePortAnnotationKey])
require.NoError(t, err)
expectedLabel := service.Annotations[constants.ExposedServiceLabelAnnotationKey]
explicitAlias, hasExplicitAlias := service.Annotations[constants.ExposedServicePrefixAnnotationKey]
var res *omni.ExposedService
rtestutils.AssertResource[*omni.ExposedService](ctx, t, omniState, expectedID, func(r *omni.ExposedService, assertion *assert.Assertions) {
assertion.Equal(expectedPort, int(r.TypedSpec().Value.Port))
assertion.Equal(expectedLabel, r.TypedSpec().Value.Label)
assertion.Equal(expectedIconBase64, r.TypedSpec().Value.IconBase64)
assertion.NotEmpty(r.TypedSpec().Value.Url)
assertion.Empty(r.TypedSpec().Value.Error)
assertion.Equal(hasExplicitAlias, r.TypedSpec().Value.HasExplicitAlias)
if hasExplicitAlias {
assertion.Contains(r.TypedSpec().Value.Url, explicitAlias+"-")
}
res = r
})
deployment.services = append(deployment.services, serviceContext{
res: res,
svc: service,
deployment: &deployment,
})
}
cluster.deployments = append(cluster.deployments, deployment)
}
return cluster
}
func testAccess(ctx context.Context, t *testing.T, logger *zap.Logger, omniClient *client.Client, exposedServices []*omni.ExposedService, expectedStatusCode int) {
keyID, keyIDSignatureBase64, err := clientconfig.RegisterKeyGetIDSignatureBase64(ctx, omniClient)
require.NoError(t, err)
logger.Debug("registered public key for workload proxy", zap.String("keyID", keyID), zap.String("keyIDSignatureBase64", keyIDSignatureBase64))
cookies := []*http.Cookie{
{Name: workloadproxy.PublicKeyIDCookie, Value: keyID},
{Name: workloadproxy.PublicKeyIDSignatureBase64Cookie, Value: keyIDSignatureBase64},
}
clientTransport := cleanhttp.DefaultTransport()
clientTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
httpClient := &http.Client{
CheckRedirect: func(*http.Request, []*http.Request) error {
return http.ErrUseLastResponse // disable follow redirects
},
Timeout: 30 * time.Second,
Transport: clientTransport,
}
t.Cleanup(httpClient.CloseIdleConnections)
parallelTestBatchSize := 8
var wg sync.WaitGroup
errs := make([]error, parallelTestBatchSize)
for i, exposedService := range exposedServices {
logger.Info("test exposed service",
zap.String("id", exposedService.Metadata().ID()),
zap.String("url", exposedService.TypedSpec().Value.Url),
zap.Int("expectedStatusCode", expectedStatusCode),
)
wg.Add(1)
go func() {
defer wg.Done()
if testErr := testAccessParallel(ctx, httpClient, exposedService, expectedStatusCode, cookies...); testErr != nil {
errs[i%parallelTestBatchSize] = fmt.Errorf("failed to access exposed service %q over %q [%d]: %w",
exposedService.Metadata().ID(), exposedService.TypedSpec().Value.Url, i, testErr)
}
}()
if i == len(exposedServices)-1 || ((i+1)%parallelTestBatchSize == 0) {
logger.Info("wait for the batch of exposed service tests to finish", zap.Int("batchSize", (i%parallelTestBatchSize)+1))
wg.Wait()
for _, testErr := range errs {
assert.NoError(t, testErr)
}
// reset the errors for the next batch
for j := range errs {
errs[j] = nil
}
}
}
}
func testAccessParallel(ctx context.Context, httpClient *http.Client, exposedService *omni.ExposedService, expectedStatusCode int, cookies ...*http.Cookie) error {
svcURL := exposedService.TypedSpec().Value.Url
// test redirect to the authentication page when cookies are not set
if err := testAccessSingle(ctx, httpClient, svcURL, http.StatusSeeOther, ""); err != nil {
return fmt.Errorf("failed to access exposed service %q over %q: %w", exposedService.Metadata().ID(), svcURL, err)
}
expectedBodyContent := ""
reqPerExposedService := 128
numRetries := 3
label := exposedService.TypedSpec().Value.Label
if expectedStatusCode == http.StatusOK {
expectedBodyContent = label[:strings.LastIndex(label, "-")] // for the label "integration-workload-proxy-2-w03-s01", the content must be "integration-workload-proxy-2-w03"
}
var wg sync.WaitGroup
wg.Add(reqPerExposedService)
lock := sync.Mutex{}
svcErrs := make(map[string]error, reqPerExposedService)
for range reqPerExposedService {
go func() {
defer wg.Done()
if err := testAccessSingleWithRetries(ctx, httpClient, svcURL, expectedStatusCode, expectedBodyContent, numRetries, cookies...); err != nil {
lock.Lock()
svcErrs[err.Error()] = err
lock.Unlock()
}
}()
}
wg.Wait()
return errors.Join(maps.Values(svcErrs)...)
}
func testAccessSingleWithRetries(ctx context.Context, httpClient *http.Client, svcURL string, expectedStatusCode int, expectedBodyContent string, retries int, cookies ...*http.Cookie) error {
if retries <= 0 {
return fmt.Errorf("retries must be greater than 0, got %d", retries)
}
var err error
for range retries {
if err = testAccessSingle(ctx, httpClient, svcURL, expectedStatusCode, expectedBodyContent, cookies...); err == nil {
return nil
}
select {
case <-ctx.Done():
return fmt.Errorf("context canceled while trying to access %q: %w", svcURL, ctx.Err())
case <-time.After(500 * time.Millisecond):
}
}
return fmt.Errorf("failed to access %q after %d retries: %w", svcURL, retries, err)
}
func testAccessSingle(ctx context.Context, httpClient *http.Client, svcURL string, expectedStatusCode int, expectedBodyContent string, cookies ...*http.Cookie) error {
req, err := prepareRequest(ctx, svcURL)
if err != nil {
return fmt.Errorf("failed to prepare request: %w", err)
}
for _, cookie := range cookies {
req.AddCookie(cookie)
}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to do request: %w", err)
}
defer resp.Body.Close() //nolint:errcheck
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
bodyStr := string(body)
if resp.StatusCode != expectedStatusCode {
return fmt.Errorf("unexpected status code: expected %d, got %d, body: %q", expectedStatusCode, resp.StatusCode, bodyStr)
}
if expectedBodyContent == "" {
return nil
}
if !strings.Contains(string(body), expectedBodyContent) {
return fmt.Errorf("content %q not found in body: %q", expectedBodyContent, bodyStr)
}
return nil
}
// prepareRequest prepares a request to the workload proxy.
// It uses the Omni base URL to access Omni with proper name resolution, but set the Host header to the original URL to test the workload proxy logic
//
// sample svcURL: https://j2s7hf-local.proxy-us.localhost:8099/
func prepareRequest(ctx context.Context, svcURL string) (*http.Request, error) {
parsedURL, err := url.Parse(svcURL)
if err != nil {
return nil, fmt.Errorf("failed to parse URL %q: %w", svcURL, err)
}
hostParts := strings.SplitN(parsedURL.Host, ".", 3)
if len(hostParts) < 3 {
return nil, fmt.Errorf("failed to parse host %q: expected at least 3 parts", parsedURL.Host)
}
svcHost := parsedURL.Host
baseHost := hostParts[2]
parsedURL.Host = baseHost
baseURL := parsedURL.String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, baseURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Host = svcHost
return req, nil
}
func createKubernetesResources(ctx context.Context, t *testing.T, logger *zap.Logger, kubeClient clientgokubernetes.Interface,
firstPort, numReplicas, numServices int, identifier, icon string,
) (*appsv1.Deployment, []*corev1.Service) {
namespace := "default"
_, err := kubeClient.CoreV1().ConfigMaps(namespace).Create(ctx, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: identifier,
Namespace: namespace,
},
Data: map[string]string{
"index.html": fmt.Sprintf("<!doctype html><meta charset=utf-8><title>%s</title><body>%s</body>", identifier, identifier),
},
}, metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
require.NoError(t, err)
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: identifier,
Namespace: namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.To(int32(numReplicas)),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": identifier,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": identifier,
},
},
Spec: corev1.PodSpec{
Tolerations: []corev1.Toleration{
{
Operator: corev1.TolerationOpExists,
},
},
Containers: []corev1.Container{
{
Name: identifier,
Image: "nginx:stable-alpine-slim",
Ports: []corev1.ContainerPort{
{
ContainerPort: 80,
Protocol: corev1.ProtocolTCP,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "contents",
MountPath: "/usr/share/nginx/html",
ReadOnly: true,
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "contents",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: identifier,
},
},
},
},
},
},
},
},
}
if _, err = kubeClient.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
require.NoErrorf(t, err, "failed to create deployment %q in namespace %q", identifier, namespace)
}
// if the deployment already exists, update it with the new spec
deployment, err = kubeClient.AppsV1().Deployments(namespace).Update(ctx, deployment, metav1.UpdateOptions{})
if !apierrors.IsNotFound(err) {
require.NoError(t, err)
}
}
services := make([]*corev1.Service, 0, numServices)
for i := range numServices {
svcIdentifier := fmt.Sprintf("%s-s%02d", identifier, i)
port := firstPort + i
service := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svcIdentifier,
Namespace: namespace,
Annotations: map[string]string{
constants.ExposedServicePortAnnotationKey: strconv.Itoa(port),
constants.ExposedServiceLabelAnnotationKey: svcIdentifier,
constants.ExposedServiceIconAnnotationKey: icon,
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": identifier,
},
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.FromInt32(80),
},
},
},
}
if usePrefix := i%2 == 0; usePrefix {
service.Annotations[constants.ExposedServicePrefixAnnotationKey] = strings.ReplaceAll(svcIdentifier, "-", "")
}
services = append(services, &service)
_, err = kubeClient.CoreV1().Services(namespace).Create(ctx, &service, metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
require.NoError(t, err)
}
}
// assert that all pods of the deployment are ready
require.EventuallyWithT(t, func(collect *assert.CollectT) {
dep, depErr := kubeClient.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
if errors.Is(depErr, context.Canceled) || errors.Is(depErr, context.DeadlineExceeded) {
require.NoError(collect, depErr)
}
if !assert.NoError(collect, depErr) {
logger.Error("failed to get deployment", zap.String("namespace", deployment.Namespace), zap.String("name", deployment.Name), zap.Error(depErr))
return
}
if !assert.Equal(collect, int(dep.Status.ReadyReplicas), numReplicas) {
logger.Debug("deployment does not have expected number of replicas",
zap.String("namespace", deployment.Namespace),
zap.String("name", deployment.Name),
zap.Int("expected", numReplicas),
zap.Int("found", int(dep.Status.ReadyReplicas)),
)
}
}, 3*time.Minute, 5*time.Second)
return deployment, services
}
func doGzip(t *testing.T, input []byte) []byte {
t.Helper()
var buf bytes.Buffer
writer := gzip.NewWriter(&buf)
_, err := writer.Write(input)
require.NoError(t, err)
require.NoError(t, writer.Close())
return buf.Bytes()
}