From d417d68e0dca26b7518dcc3660a5aa139dde2f5a Mon Sep 17 00:00:00 2001 From: Orzelius Date: Fri, 27 Feb 2026 13:45:26 +0900 Subject: [PATCH] feat: bring in new ssa logic drop the old cli-utils based manifest apply logic and replace it with the new fluxcd/pkg/ssa based implementation Signed-off-by: Orzelius <33936483+Orzelius@users.noreply.github.com> --- cmd/talosctl/cmd/talos/upgrade-k8s.go | 35 +-- go.mod | 6 +- go.sum | 12 +- .../pkg/controllers/k8s/manifest_apply.go | 216 ++++++++---------- .../serviceaccount/crd_controller.go | 21 +- internal/integration/base/api.go | 4 + internal/integration/k8s/manifests.go | 5 +- internal/integration/provision/provision.go | 4 + pkg/cluster/kubernetes/talos_managed.go | 84 +++++-- pkg/cluster/kubernetes/upgrade.go | 12 +- website/content/v1.13/reference/cli.md | 5 +- 11 files changed, 207 insertions(+), 197 deletions(-) diff --git a/cmd/talosctl/cmd/talos/upgrade-k8s.go b/cmd/talosctl/cmd/talos/upgrade-k8s.go index a25246712..931696f67 100644 --- a/cmd/talosctl/cmd/talos/upgrade-k8s.go +++ b/cmd/talosctl/cmd/talos/upgrade-k8s.go @@ -7,11 +7,11 @@ package talos import ( "context" "fmt" + "time" - "github.com/siderolabs/go-kubernetes/kubernetes/manifests" + "github.com/siderolabs/go-kubernetes/kubernetes/ssa" "github.com/siderolabs/go-kubernetes/kubernetes/upgrade" "github.com/spf13/cobra" - "sigs.k8s.io/cli-utils/pkg/inventory" "github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers" "github.com/siderolabs/talos/pkg/cluster" @@ -44,8 +44,6 @@ var upgradeK8sCmdFlags struct { } func init() { - ssaDefaults := manifests.DefaultSSApplyBehaviorOptions() - upgradeK8sCmd.Flags().StringVar(&upgradeK8sCmdFlags.FromVersion, "from", "", "the Kubernetes control plane version to upgrade from") upgradeK8sCmd.Flags().StringVar(&upgradeK8sCmdFlags.ToVersion, "to", constants.DefaultKubernetesVersion, "the Kubernetes control plane version to upgrade to") upgradeK8sCmd.Flags().StringVar(&upgradeOptions.ControlPlaneEndpoint, "endpoint", "", "the cluster control plane endpoint") @@ -62,13 +60,11 @@ func init() { upgradeK8sCmd.Flags().StringVar(&upgradeOptions.ProxyImage, "proxy-image", constants.KubeProxyImage, "kube-proxy image to use") // manifest sync related options - upgradeK8sCmd.Flags().BoolVar(&upgradeOptions.ForceConflicts, "manifests-force-conflicts", ssaDefaults.ForceConflicts, "overwrite the fields when applying even if the field manager differs") - upgradeK8sCmd.Flags().BoolVar(&upgradeOptions.NoPrune, "manifests-no-prune", ssaDefaults.NoPrune, "whether pruning of previously applied objects should happen after apply") - upgradeK8sCmd.Flags().StringVar(&upgradeK8sCmdFlags.inventoryPolicy, "manifests-inventory-policy", ssaDefaults.InventoryPolicy.String(), + upgradeK8sCmd.Flags().BoolVar(&upgradeOptions.ForceManifests, "manifests-force", false, "whether to recreate objects that contain immutable field changes") + upgradeK8sCmd.Flags().BoolVar(&upgradeOptions.NoPrune, "manifests-no-prune", false, "whether pruning of previously applied objects should happen after apply") + upgradeK8sCmd.Flags().StringVar(&upgradeK8sCmdFlags.inventoryPolicy, "manifests-inventory-policy", "AdoptIfNoInventory", "kubernetes SSA inventory policy (one of 'MustMatch', 'AdoptIfNoInventory' or 'AdoptAll')") - upgradeK8sCmd.Flags().DurationVar(&upgradeOptions.PruneTimeout, "manifests-prune-timeout", ssaDefaults.PruneTimeout, - "how long to wait for resources to be fully deleted (set to zero to disable waiting)") - upgradeK8sCmd.Flags().DurationVar(&upgradeOptions.ReconcileTimeout, "manifests-reconcile-timeout", ssaDefaults.ReconcileTimeout, + upgradeK8sCmd.Flags().DurationVar(&upgradeOptions.ReconcileTimeout, "manifests-reconcile-timeout", 5*time.Minute, "how long to wait for resources to be fully reconciled (set to zero to disable waiting)") addCommand(upgradeK8sCmd) @@ -124,7 +120,7 @@ func upgradeKubernetes(ctx context.Context, c *client.Client) error { commentsFlags |= encoder.CommentsExamples } - policy, err := parseInventoryPolicy(upgradeK8sCmdFlags.inventoryPolicy) + policy, err := ssa.ParseInventoryPolicy(upgradeK8sCmdFlags.inventoryPolicy) if err != nil { return err } @@ -132,18 +128,9 @@ func upgradeKubernetes(ctx context.Context, c *client.Client) error { upgradeOptions.InventoryPolicy = policy upgradeOptions.EncoderOpt = encoder.WithComments(commentsFlags) + if upgradeOptions.ReconcileTimeout == 0 { + upgradeOptions.SkipManifestWait = true + } + return k8s.Upgrade(ctx, &state, upgradeOptions) } - -func parseInventoryPolicy(policy string) (inventory.Policy, error) { - switch policy { - case "MustMatch": - return inventory.PolicyMustMatch, nil - case "AdoptIfNoInventory": - return inventory.PolicyAdoptIfNoInventory, nil - case "AdoptAll": - return inventory.PolicyAdoptAll, nil - default: - return 0, fmt.Errorf("invalid inventory policy %q: must be one of 'MustMatch', 'AdoptIfNoInventory', or 'AdoptAll'", policy) - } -} diff --git a/go.mod b/go.mod index 282b32a5d..f6cda683b 100644 --- a/go.mod +++ b/go.mod @@ -147,7 +147,7 @@ require ( github.com/siderolabs/go-debug v0.6.2 github.com/siderolabs/go-kmsg v0.1.4 github.com/siderolabs/go-kubeconfig v0.1.1 - github.com/siderolabs/go-kubernetes v0.2.32 + github.com/siderolabs/go-kubernetes v0.2.33 github.com/siderolabs/go-loadbalancer v0.5.0 github.com/siderolabs/go-pcidb v0.3.2 github.com/siderolabs/go-pointer v1.0.1 @@ -191,7 +191,6 @@ require ( gopkg.in/typ.v4 v4.4.0 k8s.io/klog/v2 v2.130.1 kernel.org/pub/linux/libs/security/libcap/cap v1.2.77 - sigs.k8s.io/cli-utils v0.37.3-0.20250918194211-77c836a69463 sigs.k8s.io/hydrophone v0.7.0 ) @@ -250,7 +249,6 @@ require ( github.com/evanphx/json-patch v5.9.11+incompatible // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect - github.com/fatih/camelcase v1.0.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fluxcd/cli-utils v0.36.0-flux.15 // indirect github.com/fluxcd/pkg/ssa v0.60.0 // indirect @@ -329,7 +327,6 @@ require ( github.com/siderolabs/protoenc v0.2.4 // indirect github.com/siderolabs/tcpproxy v0.1.0 // indirect github.com/spf13/afero v1.14.0 // indirect - github.com/spyzhov/ajson v0.9.6 // indirect github.com/u-root/uio v0.0.0-20240224005618-d2acac8f3701 // indirect github.com/vbatts/tar-split v0.12.2 // indirect github.com/vishvananda/netlink v1.3.1 // indirect @@ -364,7 +361,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/cli-runtime v0.35.0 // indirect - k8s.io/component-helpers v0.35.0 // indirect k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect k8s.io/utils v0.0.0-20260108192941-914a6e750570 // indirect kernel.org/pub/linux/libs/security/libcap/psx v1.2.77 // indirect diff --git a/go.sum b/go.sum index 4dfda6f96..bb5190bd5 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,6 @@ github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjT github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM= github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f h1:Wl78ApPPB2Wvf/TIe2xdyJxTlb6obmF18d8QdkxNDu4= github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f/go.mod h1:OSYXu++VVOHnXeitef/D8n/6y4QV8uLHSFXX4NeXMGc= -github.com/fatih/camelcase v1.0.0 h1:hxNvNX/xYBp0ovncs8WyWZrOrpBNub/JfaMvbURyft8= -github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= @@ -642,8 +640,8 @@ github.com/siderolabs/go-kmsg v0.1.4 h1:RLAa90O9bWuhA3pXPAYAdrI+kzcqTshZASRA5yso github.com/siderolabs/go-kmsg v0.1.4/go.mod h1:BLkt2N2DHT0wsFMz32lMw6vNEZL90c8ZnBjpIUoBb/M= github.com/siderolabs/go-kubeconfig v0.1.1 h1:tZlgpelj/OqrcHVUwISPN0NRgObcflpH9WtE41mtQZ0= github.com/siderolabs/go-kubeconfig v0.1.1/go.mod h1:QaGp4i9L95oDbcU7jDn30aw4gnREkb3O5otgxw8imOk= -github.com/siderolabs/go-kubernetes v0.2.32 h1:nReXnZNmCCVS9iymIFLYPAhVegaFgAnLI+48W8p9bC8= -github.com/siderolabs/go-kubernetes v0.2.32/go.mod h1:yDWt6+46mnekoOj6GIPSaPLMrxNSQKOJuOsHxvM5NL8= +github.com/siderolabs/go-kubernetes v0.2.33 h1:I5J1EW6McjQeTzQCaZ7Vc0ZTbwypsCburRshTkb6HS0= +github.com/siderolabs/go-kubernetes v0.2.33/go.mod h1:Ow/1iR+kJmMvKno9whL8WTRBoCPKPrHqxy1a/7SYacs= github.com/siderolabs/go-loadbalancer v0.5.0 h1:0v7E6GrxoONyqwcmHiA+J0vIDPWbkTmevHGCFb4tjdc= github.com/siderolabs/go-loadbalancer v0.5.0/go.mod h1:tRVouZ9i2R/TRbNUF9MqyBlV2wsjX0cxkYTjPXcI9P0= github.com/siderolabs/go-pcidb v0.3.2 h1:18KMjsc+AO2r6/pl0KLBR9xOXO0ULLCXwmGhIukoAbw= @@ -692,8 +690,6 @@ github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiT github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spyzhov/ajson v0.9.6 h1:iJRDaLa+GjhCDAt1yFtU/LKMtLtsNVKkxqlpvrHHlpQ= -github.com/spyzhov/ajson v0.9.6/go.mod h1:a6oSw0MMb7Z5aD2tPoPO+jq11ETKgXUr2XktHdT8Wt8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -1056,8 +1052,6 @@ k8s.io/client-go v0.35.0 h1:IAW0ifFbfQQwQmga0UdoH0yvdqrbwMdq9vIFEhRpxBE= k8s.io/client-go v0.35.0/go.mod h1:q2E5AAyqcbeLGPdoRB+Nxe3KYTfPce1Dnu1myQdqz9o= k8s.io/component-base v0.35.0 h1:+yBrOhzri2S1BVqyVSvcM3PtPyx5GUxCK2tinZz1G94= k8s.io/component-base v0.35.0/go.mod h1:85SCX4UCa6SCFt6p3IKAPej7jSnF3L8EbfSyMZayJR0= -k8s.io/component-helpers v0.35.0 h1:wcXv7HJRksgVjM4VlXJ1CNFBpyDHruRI99RrBtrJceA= -k8s.io/component-helpers v0.35.0/go.mod h1:ahX0m/LTYmu7fL3W8zYiIwnQ/5gT28Ex4o2pymF63Co= k8s.io/cri-api v0.35.0 h1:fxLSKyJHqbyCSUsg1rW4DRpmjSEM/elZ1GXzYTSLoDQ= k8s.io/cri-api v0.35.0/go.mod h1:Cnt29u/tYl1Se1cBRL30uSZ/oJ5TaIp4sZm1xDLvcMc= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= @@ -1082,8 +1076,6 @@ pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk= pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY= rsc.io/qr v0.2.0/go.mod h1:IF+uZjkb9fqyeF/4tlBoynqmQxUoPfWEKh921coOuXs= -sigs.k8s.io/cli-utils v0.37.3-0.20250918194211-77c836a69463 h1:cogPZ9yOTBY+1TxAXD7DnqQqTrN83iwNTdq08z3lFao= -sigs.k8s.io/cli-utils v0.37.3-0.20250918194211-77c836a69463/go.mod h1:u5LTcoijf7f18rMNL7PVNyJzoGEriT+tS57ZSVG3nc4= sigs.k8s.io/controller-runtime v0.22.2 h1:cK2l8BGWsSWkXz09tcS4rJh95iOLney5eawcK5A33r4= sigs.k8s.io/controller-runtime v0.22.2/go.mod h1:+QX1XUpTXN4mLoblf4tqr5CQcyHPAki2HLXqQMY6vh8= sigs.k8s.io/hydrophone v0.7.0 h1:BKEb8m6mcVL6kFEZ4jUCk5VD81bqm2XPtNxFT52ifxc= diff --git a/internal/app/machined/pkg/controllers/k8s/manifest_apply.go b/internal/app/machined/pkg/controllers/k8s/manifest_apply.go index 792ed72c1..d356e5f0a 100644 --- a/internal/app/machined/pkg/controllers/k8s/manifest_apply.go +++ b/internal/app/machined/pkg/controllers/k8s/manifest_apply.go @@ -19,7 +19,8 @@ import ( "github.com/hashicorp/go-multierror" "github.com/siderolabs/gen/optional" "github.com/siderolabs/gen/xslices" - gokube "github.com/siderolabs/go-kubernetes/kubernetes/manifests" + "github.com/siderolabs/go-kubernetes/kubernetes/ssa" + "github.com/siderolabs/go-kubernetes/kubernetes/ssa/object" "go.uber.org/zap" "go.uber.org/zap/zapcore" corev1 "k8s.io/api/core/v1" @@ -28,18 +29,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - "k8s.io/kubectl/pkg/cmd/util" - "sigs.k8s.io/cli-utils/pkg/apis/actuation" - "sigs.k8s.io/cli-utils/pkg/inventory" - "sigs.k8s.io/cli-utils/pkg/object" k8sadapter "github.com/siderolabs/talos/internal/app/machined/pkg/adapters/k8s" "github.com/siderolabs/talos/internal/pkg/etcd" @@ -137,52 +134,7 @@ func (ctrl *ManifestApplyController) Run(ctx context.Context, r controller.Runti }) if len(manifests.Items) > 0 { - var ( - kubeconfig *rest.Config - dyn dynamic.Interface - ) - - kubeconfig, err = clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) { - return clientcmd.Load([]byte(secrets.LocalhostAdminKubeconfig)) - }) - if err != nil { - return fmt.Errorf("error loading kubeconfig: %w", err) - } - - kubeconfig.WarningHandler = rest.NewWarningWriter(logging.NewWriter(logger, zapcore.WarnLevel), rest.WarningWriterOptions{ - Deduplicate: true, - }) - - discoveryClient, err := discovery.NewDiscoveryClientForConfig(kubeconfig) - if err != nil { - return fmt.Errorf("error building discovery client: %w", err) - } - - dc := memory.NewMemCacheClient(discoveryClient) - - mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc)) - - dyn, err = dynamic.NewForConfig(kubeconfig) - if err != nil { - return fmt.Errorf("error building dynamic client: %w", err) - } - - if err = etcd.WithLock(ctx, constants.EtcdTalosManifestApplyMutex, logger, func() error { - inventoryClient, inv, err := getInventory(ctx, kubeconfig, mapper, dc) - if err != nil { - return err - } - - applyErr := ctrl.apply(ctx, logger, mapper, dyn, manifests, inv) - - // update inventory even if the apply process failed half way through - err = inventoryClient.CreateOrUpdate(ctx, inv, inventory.UpdateOptions{}) - if err != nil { - err = fmt.Errorf("updating inventory failed: %w", err) - } - - return errors.Join(applyErr, err) - }); err != nil { + if err = ctrl.applyManifests(ctx, logger, manifests, secrets); err != nil { return err } } @@ -203,6 +155,74 @@ func (ctrl *ManifestApplyController) Run(ctx context.Context, r controller.Runti } } +func (ctrl *ManifestApplyController) applyManifests( + ctx context.Context, + logger *zap.Logger, + manifests resource.List, + secrets *secrets.KubernetesCertsSpec, +) error { + kubeconfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) { + return clientcmd.Load([]byte(secrets.LocalhostAdminKubeconfig)) + }) + if err != nil { + return fmt.Errorf("error loading kubeconfig: %w", err) + } + + kubeconfig.WarningHandler = rest.NewWarningWriter(logging.NewWriter(logger, zapcore.WarnLevel), rest.WarningWriterOptions{ + Deduplicate: true, + }) + + httpClient, err := rest.HTTPClientFor(kubeconfig) + if err != nil { + return fmt.Errorf("error building HTTP client for kubeconfig: %w", err) + } + + defer httpClient.CloseIdleConnections() + + discoveryClient, err := discovery.NewDiscoveryClientForConfigAndClient(kubeconfig, httpClient) + if err != nil { + return fmt.Errorf("error building discovery client: %w", err) + } + + dyn, err := dynamic.NewForConfigAndClient(kubeconfig, httpClient) + if err != nil { + return fmt.Errorf("error building dynamic client: %w", err) + } + + dc := memory.NewMemCacheClient(discoveryClient) + mapper := restmapper.NewDeferredDiscoveryRESTMapper(dc) + + k8sClient, err := kubernetes.NewForConfigAndClient(kubeconfig, httpClient) + if err != nil { + return err + } + + if err = etcd.WithLock(ctx, constants.EtcdTalosManifestApplyMutex, logger, func() error { + inv, err := ssa.GetInventory(ctx, k8sClient, constants.KubernetesInventoryNamespace, constants.KubernetesBootstrapManifestsInventoryName) + if err != nil { + return fmt.Errorf("error getting inventory: %w", err) + } + + inventoryContents := inv.Get() + + inventoryContents, applyErr := ctrl.apply(ctx, logger, mapper, dyn, manifests, inventoryContents) + + inv.Update(inventoryContents) + + // update inventory even if the apply process failed half way through + err = inv.Write(ctx) + if err != nil { + err = fmt.Errorf("updating inventory failed: %w", err) + } + + return errors.Join(applyErr, err) + }); err != nil { + return err + } + + return nil +} + //nolint:gocyclo,cyclop func (ctrl *ManifestApplyController) apply( ctx context.Context, @@ -210,8 +230,8 @@ func (ctrl *ManifestApplyController) apply( mapper *restmapper.DeferredDiscoveryRESTMapper, dyn dynamic.Interface, manifests resource.List, - inv inventory.Inventory, -) error { + inv object.ObjMetadataSet, +) (object.ObjMetadataSet, error) { // flatten list of objects to be applied objects := xslices.FlatMap(manifests.Items, func(m resource.Resource) []*unstructured.Unstructured { return k8sadapter.Manifest(m.(*k8s.Manifest)).Objects() @@ -255,16 +275,19 @@ func (ctrl *ManifestApplyController) apply( var multiErr *multierror.Error for _, obj := range objects { - objMeta := object.UnstructuredToObjMetadata(obj) - - // check if the resource is already in the inventory, if so, skip applying it - if inv.GetObjectRefs().Contains(objMeta) { - continue - } - gvk := obj.GroupVersionKind() objName := fmt.Sprintf("%s/%s/%s/%s", gvk.Group, gvk.Version, gvk.Kind, obj.GetName()) + objMeta, err := object.RuntimeToObjMeta(obj) + if err != nil { + return nil, fmt.Errorf("failed to retrieve object metadata of %q: %w", objName, err) + } + + // check if the resource is already in the inventory, if so, skip applying it + if inv.Contains(objMeta) { + continue + } + mapping, err := mapper.RESTMapping(obj.GroupVersionKind().GroupKind(), obj.GroupVersionKind().Version) if err != nil { switch { @@ -279,7 +302,7 @@ func (ctrl *ManifestApplyController) apply( continue default: // connection errors, etc.; it makes no sense to continue with other manifests - return fmt.Errorf("error creating mapping for object %s: %w", objName, err) + return nil, fmt.Errorf("error creating mapping for object %s: %w", objName, err) } } @@ -302,13 +325,13 @@ func (ctrl *ManifestApplyController) apply( if err == nil { // already exists, // backfill the inventory if the resource is missing (to migrate to inventory-based apply) - inventoryAdd(inv, objMeta, obj, actuation.ActuationSucceeded) + inv = inv.Union(object.ObjMetadataSet{objMeta}) continue } if !apierrors.IsNotFound(err) { - return fmt.Errorf("error checking resource existence: %w", err) + return nil, fmt.Errorf("error checking resource existence: %w", err) } // Set inventory annotation. @@ -317,13 +340,15 @@ func (ctrl *ManifestApplyController) apply( annotations = make(map[string]string) } - inventoryAnnotation, inventoryAnnotationSet := annotations["config.k8s.io/owning-inventory"] + inventoryAnnotation, inventoryAnnotationSet := annotations[ssa.InventoryAnnotationKey] if inventoryAnnotationSet && inventoryAnnotation != constants.KubernetesBootstrapManifestsInventoryName { - return fmt.Errorf("unexpected foreign inventory annotation on %s ", objName) + multiErr = multierror.Append(multiErr, fmt.Errorf("unexpected foreign inventory annotation on %s ", objName)) + + continue } - annotations["config.k8s.io/owning-inventory"] = constants.KubernetesBootstrapManifestsInventoryName + annotations[ssa.InventoryAnnotationKey] = constants.KubernetesBootstrapManifestsInventoryName obj.SetAnnotations(annotations) _, err = dr.Apply(ctx, obj.GetName(), obj, metav1.ApplyOptions{ @@ -340,67 +365,16 @@ func (ctrl *ManifestApplyController) apply( multiErr = multierror.Append(multiErr, fmt.Errorf("error creating %s: %w", objName, err)) default: // connection errors, etc.; it makes no sense to continue with other manifests - return fmt.Errorf("error creating %s: %w", objName, err) + return nil, fmt.Errorf("error creating %s: %w", objName, err) } } else { logger.Sugar().Infof("created %s", objName) - inventoryAdd(inv, objMeta, obj, actuation.ActuationSucceeded) + inv = inv.Union(object.ObjMetadataSet{objMeta}) } } - return multiErr.ErrorOrNil() -} - -func getInventory( - ctx context.Context, - kubeconfig *rest.Config, - mapper *restmapper.DeferredDiscoveryRESTMapper, - dc discovery.CachedDiscoveryInterface, -) (inventory.Client, inventory.Inventory, error) { - clientGetter := gokube.K8sRESTClientGetter{ - RestConfig: kubeconfig, - Mapper: mapper, - DiscoveryClient: dc, - } - - factory := util.NewFactory(clientGetter) - - inventoryClient, err := inventory.ConfigMapClientFactory{StatusEnabled: true}.NewClient(factory) - if err != nil { - return nil, nil, err - } - - inventoryInfo := inventory.NewSingleObjectInfo(inventory.ID( - constants.KubernetesBootstrapManifestsInventoryName), - types.NamespacedName{ - Namespace: constants.KubernetesInventoryNamespace, - Name: constants.KubernetesBootstrapManifestsInventoryName, - }) - - err = gokube.AssureInventory(ctx, inventoryClient, inventoryInfo) - if err != nil { - return nil, nil, err - } - - inv, err := inventoryClient.Get(ctx, inventoryInfo, inventory.GetOptions{}) - if err != nil { - return nil, nil, err - } - - return inventoryClient, inv, err -} - -func inventoryAdd(inv inventory.Inventory, objMeta object.ObjMetadata, obj *unstructured.Unstructured, actuationStatus actuation.ActuationStatus) { - inv.SetObjectRefs(slices.Concat(inv.GetObjectRefs(), object.ObjMetadataSet{objMeta})) - inv.SetObjectStatuses(slices.Concat(inv.GetObjectStatuses(), object.ObjectStatusSet{actuation.ObjectStatus{ - ObjectReference: inventory.ObjectReferenceFromObjMetadata(objMeta), - Strategy: actuation.ActuationStrategyApply, - Actuation: actuationStatus, - Reconcile: actuation.ReconcileUnknown, - UID: obj.GetUID(), - Generation: obj.GetGeneration(), - }})) + return inv, multiErr.ErrorOrNil() } func isNamespace(gvk schema.GroupVersionKind) bool { diff --git a/internal/app/machined/pkg/controllers/kubeaccess/serviceaccount/crd_controller.go b/internal/app/machined/pkg/controllers/kubeaccess/serviceaccount/crd_controller.go index da958c4f1..6a1a97a78 100644 --- a/internal/app/machined/pkg/controllers/kubeaccess/serviceaccount/crd_controller.go +++ b/internal/app/machined/pkg/controllers/kubeaccess/serviceaccount/crd_controller.go @@ -12,13 +12,13 @@ import ( "encoding/pem" "errors" "fmt" + "net/http" "slices" "sync" "time" "github.com/siderolabs/crypto/x509" "github.com/siderolabs/gen/xslices" - taloskubernetes "github.com/siderolabs/go-kubernetes/kubernetes" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" kubeerrors "k8s.io/apimachinery/pkg/api/errors" @@ -38,7 +38,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/connrotation" "k8s.io/client-go/util/workqueue" clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config" @@ -99,7 +98,7 @@ type CRDController struct { kubeClient kubernetes.Interface dynamicClient dynamic.Interface - dialer *connrotation.Dialer + httpClient *http.Client secretsSynced cache.InformerSynced talosSAsSynced cache.InformerSynced @@ -120,15 +119,17 @@ func NewCRDController( allowedRoles []string, logger *zap.Logger, ) (*CRDController, error) { - dialer := taloskubernetes.NewDialer() - kubeconfig.Dial = dialer.DialContext - - kubeCli, err := kubernetes.NewForConfig(kubeconfig) + httpClient, err := rest.HTTPClientFor(kubeconfig) if err != nil { return nil, err } - dynCli, err := dynamic.NewForConfig(kubeconfig) + kubeCli, err := kubernetes.NewForConfigAndClient(kubeconfig, httpClient) + if err != nil { + return nil, err + } + + dynCli, err := dynamic.NewForConfigAndClient(kubeconfig, httpClient) if err != nil { return nil, err } @@ -159,7 +160,7 @@ func NewCRDController( kubeInformerFactory: kubeInformerFactory, kubeClient: kubeCli, dynamicClient: dynCli, - dialer: dialer, + httpClient: httpClient, dynamicLister: lister, queue: workqueue.NewTypedRateLimitingQueue( workqueue.DefaultTypedControllerRateLimiter[string](), @@ -206,7 +207,7 @@ func (t *CRDController) Run(ctx context.Context, workers int) error { defer func() { t.queue.ShutDown() - t.dialer.CloseAll() + t.httpClient.CloseIdleConnections() wg.Wait() t.logger.Debug("all workers have shut down") diff --git a/internal/integration/base/api.go b/internal/integration/base/api.go index 79c2205ca..de4b04236 100644 --- a/internal/integration/base/api.go +++ b/internal/integration/base/api.go @@ -397,6 +397,8 @@ func (apiSuite *APISuite) ClearConnectionRefused(ctx context.Context, nodes ...s numMasterNodes = 3 } + apiSuite.T().Log("run ClearConnectionRefused") + apiSuite.Require().NoError(retry.Constant(backoff.DefaultConfig.MaxDelay, retry.WithUnits(time.Second)).Retry(func() error { for range numMasterNodes * 5 { _, err := apiSuite.Client.Version(client.WithNodes(ctx, nodes...)) @@ -404,6 +406,8 @@ func (apiSuite *APISuite) ClearConnectionRefused(ctx context.Context, nodes ...s continue } + apiSuite.T().Log(err.Error()) + if client.StatusCode(err) == codes.Unavailable || client.StatusCode(err) == codes.Canceled { return retry.ExpectedError(err) } diff --git a/internal/integration/k8s/manifests.go b/internal/integration/k8s/manifests.go index 52568add8..fc0f2f264 100644 --- a/internal/integration/k8s/manifests.go +++ b/internal/integration/k8s/manifests.go @@ -12,7 +12,6 @@ import ( "time" "github.com/cosi-project/runtime/pkg/resource/rtestutils" - "github.com/siderolabs/go-kubernetes/kubernetes/manifests" "github.com/stretchr/testify/assert" "go.yaml.in/yaml/v4" appsv1 "k8s.io/api/apps/v1" @@ -116,8 +115,8 @@ func (suite *ManifestsSuite) TestSync() { clusterAccess, true, kubernetes.UpgradeOptions{ - LogOutput: manifestSyncWriter{t: suite.T()}, - SSApplyBehaviorOptions: manifests.DefaultSSApplyBehaviorOptions(), + LogOutput: manifestSyncWriter{t: suite.T()}, + ReconcileTimeout: 30 * time.Second, }, )) diff --git a/internal/integration/provision/provision.go b/internal/integration/provision/provision.go index c8b3c74bf..c4830290b 100644 --- a/internal/integration/provision/provision.go +++ b/internal/integration/provision/provision.go @@ -21,6 +21,7 @@ import ( "github.com/cosi-project/runtime/pkg/safe" "github.com/siderolabs/gen/xslices" "github.com/siderolabs/go-blockdevice/v2/encryption" + "github.com/siderolabs/go-kubernetes/kubernetes/ssa" "github.com/siderolabs/go-kubernetes/kubernetes/upgrade" "github.com/siderolabs/go-procfs/procfs" "github.com/siderolabs/go-retry/retry" @@ -429,6 +430,9 @@ func (suite *BaseSuite) upgradeKubernetes(fromVersion, toVersion string, skipKub ProxyImage: constants.KubeProxyImage, EncoderOpt: encoder.WithComments(encoder.CommentsAll), + + InventoryPolicy: ssa.InventoryPolicyAdoptIfNoInventory, + ReconcileTimeout: 3 * time.Minute, } suite.Require().NoError(kubernetes.Upgrade(suite.ctx, suite.clusterAccess, options)) diff --git a/pkg/cluster/kubernetes/talos_managed.go b/pkg/cluster/kubernetes/talos_managed.go index a0eb9ee4c..65311db3b 100644 --- a/pkg/cluster/kubernetes/talos_managed.go +++ b/pkg/cluster/kubernetes/talos_managed.go @@ -19,6 +19,8 @@ import ( "github.com/siderolabs/gen/channel" "github.com/siderolabs/gen/xiter" "github.com/siderolabs/go-kubernetes/kubernetes/manifests" + "github.com/siderolabs/go-kubernetes/kubernetes/ssa" + ssacli "github.com/siderolabs/go-kubernetes/kubernetes/ssa/cli" "github.com/siderolabs/go-kubernetes/kubernetes/upgrade" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -504,31 +506,77 @@ func syncManifests(ctx context.Context, objects []*unstructured.Unstructured, cl return manifests.SyncWithLog(ctx, objects, config, options.DryRun, options.Log) } +//nolint:gocyclo func syncManifestsSSA(ctx context.Context, objects []*unstructured.Unstructured, cluster UpgradeProvider, options UpgradeOptions) error { config, err := cluster.K8sRestConfig(ctx) if err != nil { return err } - return manifests.SyncAndDiffWithLogSSA( - ctx, - objects, - config, - manifests.SSAOptions{ - FieldManagerName: constants.KubernetesFieldManagerName, - InventoryNamespace: constants.KubernetesInventoryNamespace, - InventoryName: constants.KubernetesBootstrapManifestsInventoryName, - SSApplyBehaviorOptions: manifests.SSApplyBehaviorOptions{ - DryRun: options.DryRun, - InventoryPolicy: options.InventoryPolicy, - ReconcileTimeout: options.ReconcileTimeout, - PruneTimeout: options.PruneTimeout, - ForceConflicts: options.ForceConflicts, - NoPrune: options.NoPrune, - }, - }, - options.Log, + updatingManifestsLogline := "updating manifests" + if options.DryRun { + updatingManifestsLogline += " (dry run)" + } + + options.Log("%s", updatingManifestsLogline) + + manager, err := ssa.NewManager(ctx, config, + constants.KubernetesFieldManagerName, + constants.KubernetesInventoryNamespace, + constants.KubernetesBootstrapManifestsInventoryName, ) + if err != nil { + return fmt.Errorf("error creating SSA manager: %w", err) + } + + defer manager.Close() + + if options.DryRun { + // only do the diff in dry-run mode + changes, err := manager.Diff(ctx, objects, ssa.DiffOptions{ + NoPrune: options.NoPrune, + InventoryPolicy: options.InventoryPolicy, + }) + if err != nil { + return fmt.Errorf("error diffing manifests: %w", err) + } + + for _, change := range changes { + options.Log(" < %s %s", change.Action, change.Subject) + + if change.Diff != "" { + options.Log("%s", change.Diff) + } + } + + return nil + } + + changes, err := manager.Apply(ctx, objects, ssa.ApplyOptions{ + InventoryPolicy: options.InventoryPolicy, + WaitTimeout: options.ReconcileTimeout, + NoPrune: options.NoPrune, + Force: options.ForceManifests, + }) + if err != nil { + return fmt.Errorf("error applying manifests: %w", err) + } + + ssacli.LogApplyResults(ctx, changes, manager, options.Log) + + if options.SkipManifestWait { + options.Log("skipping waiting for manifest reconciliation") + + return nil + } + + waitOptions := ssa.WaitOptions{ + Interval: 2 * time.Second, + Timeout: options.ReconcileTimeout, + FailFast: true, + } + + return ssacli.Wait(ctx, changes, options.Log, manager, waitOptions) } //nolint:gocyclo diff --git a/pkg/cluster/kubernetes/upgrade.go b/pkg/cluster/kubernetes/upgrade.go index b82daf245..d914b161b 100644 --- a/pkg/cluster/kubernetes/upgrade.go +++ b/pkg/cluster/kubernetes/upgrade.go @@ -7,8 +7,9 @@ package kubernetes import ( "fmt" "io" + "time" - "github.com/siderolabs/go-kubernetes/kubernetes/manifests" + "github.com/siderolabs/go-kubernetes/kubernetes/ssa" "github.com/siderolabs/go-kubernetes/kubernetes/upgrade" "github.com/siderolabs/talos/pkg/machinery/config/encoder" @@ -24,10 +25,9 @@ const ( // UpgradeOptions represents Kubernetes control plane upgrade settings. type UpgradeOptions struct { - manifests.SSApplyBehaviorOptions - Path *upgrade.Path + DryRun bool ControlPlaneEndpoint string LogOutput io.Writer PrePullImages bool @@ -40,6 +40,12 @@ type UpgradeOptions struct { SchedulerImage string ProxyImage string + NoPrune bool + ForceManifests bool + ReconcileTimeout time.Duration + InventoryPolicy ssa.InventoryPolicy + SkipManifestWait bool + controlPlaneNodes []string workerNodes []string } diff --git a/website/content/v1.13/reference/cli.md b/website/content/v1.13/reference/cli.md index e76add5c6..196dfedc0 100644 --- a/website/content/v1.13/reference/cli.md +++ b/website/content/v1.13/reference/cli.md @@ -3371,11 +3371,10 @@ talosctl upgrade-k8s [flags] --from string the Kubernetes control plane version to upgrade from -h, --help help for upgrade-k8s --kubelet-image string kubelet image to use (default "ghcr.io/siderolabs/kubelet") - --manifests-force-conflicts overwrite the fields when applying even if the field manager differs + --manifests-force whether to recreate objects that contain immutable field changes --manifests-inventory-policy string kubernetes SSA inventory policy (one of 'MustMatch', 'AdoptIfNoInventory' or 'AdoptAll') (default "AdoptIfNoInventory") --manifests-no-prune whether pruning of previously applied objects should happen after apply - --manifests-prune-timeout duration how long to wait for resources to be fully deleted (set to zero to disable waiting) (default 3m0s) - --manifests-reconcile-timeout duration how long to wait for resources to be fully reconciled (set to zero to disable waiting) (default 3m0s) + --manifests-reconcile-timeout duration how long to wait for resources to be fully reconciled (set to zero to disable waiting) (default 5m0s) -n, --nodes strings target the specified nodes --pre-pull-images pre-pull images before upgrade (default true) --proxy-image string kube-proxy image to use (default "registry.k8s.io/kube-proxy")