feat: bring in new ssa logic

drop the old cli-utils based manifest apply logic and replace it with the new fluxcd/pkg/ssa based implementation

Signed-off-by: Orzelius <33936483+Orzelius@users.noreply.github.com>
This commit is contained in:
Orzelius 2026-02-27 13:45:26 +09:00
parent 0bb6413ff7
commit d417d68e0d
No known key found for this signature in database
GPG Key ID: C17C8E3962A0D9B1
11 changed files with 207 additions and 197 deletions

View File

@ -7,11 +7,11 @@ package talos
import (
"context"
"fmt"
"time"
"github.com/siderolabs/go-kubernetes/kubernetes/manifests"
"github.com/siderolabs/go-kubernetes/kubernetes/ssa"
"github.com/siderolabs/go-kubernetes/kubernetes/upgrade"
"github.com/spf13/cobra"
"sigs.k8s.io/cli-utils/pkg/inventory"
"github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/siderolabs/talos/pkg/cluster"
@ -44,8 +44,6 @@ var upgradeK8sCmdFlags struct {
}
func init() {
ssaDefaults := manifests.DefaultSSApplyBehaviorOptions()
upgradeK8sCmd.Flags().StringVar(&upgradeK8sCmdFlags.FromVersion, "from", "", "the Kubernetes control plane version to upgrade from")
upgradeK8sCmd.Flags().StringVar(&upgradeK8sCmdFlags.ToVersion, "to", constants.DefaultKubernetesVersion, "the Kubernetes control plane version to upgrade to")
upgradeK8sCmd.Flags().StringVar(&upgradeOptions.ControlPlaneEndpoint, "endpoint", "", "the cluster control plane endpoint")
@ -62,13 +60,11 @@ func init() {
upgradeK8sCmd.Flags().StringVar(&upgradeOptions.ProxyImage, "proxy-image", constants.KubeProxyImage, "kube-proxy image to use")
// manifest sync related options
upgradeK8sCmd.Flags().BoolVar(&upgradeOptions.ForceConflicts, "manifests-force-conflicts", ssaDefaults.ForceConflicts, "overwrite the fields when applying even if the field manager differs")
upgradeK8sCmd.Flags().BoolVar(&upgradeOptions.NoPrune, "manifests-no-prune", ssaDefaults.NoPrune, "whether pruning of previously applied objects should happen after apply")
upgradeK8sCmd.Flags().StringVar(&upgradeK8sCmdFlags.inventoryPolicy, "manifests-inventory-policy", ssaDefaults.InventoryPolicy.String(),
upgradeK8sCmd.Flags().BoolVar(&upgradeOptions.ForceManifests, "manifests-force", false, "whether to recreate objects that contain immutable field changes")
upgradeK8sCmd.Flags().BoolVar(&upgradeOptions.NoPrune, "manifests-no-prune", false, "whether pruning of previously applied objects should happen after apply")
upgradeK8sCmd.Flags().StringVar(&upgradeK8sCmdFlags.inventoryPolicy, "manifests-inventory-policy", "AdoptIfNoInventory",
"kubernetes SSA inventory policy (one of 'MustMatch', 'AdoptIfNoInventory' or 'AdoptAll')")
upgradeK8sCmd.Flags().DurationVar(&upgradeOptions.PruneTimeout, "manifests-prune-timeout", ssaDefaults.PruneTimeout,
"how long to wait for resources to be fully deleted (set to zero to disable waiting)")
upgradeK8sCmd.Flags().DurationVar(&upgradeOptions.ReconcileTimeout, "manifests-reconcile-timeout", ssaDefaults.ReconcileTimeout,
upgradeK8sCmd.Flags().DurationVar(&upgradeOptions.ReconcileTimeout, "manifests-reconcile-timeout", 5*time.Minute,
"how long to wait for resources to be fully reconciled (set to zero to disable waiting)")
addCommand(upgradeK8sCmd)
@ -124,7 +120,7 @@ func upgradeKubernetes(ctx context.Context, c *client.Client) error {
commentsFlags |= encoder.CommentsExamples
}
policy, err := parseInventoryPolicy(upgradeK8sCmdFlags.inventoryPolicy)
policy, err := ssa.ParseInventoryPolicy(upgradeK8sCmdFlags.inventoryPolicy)
if err != nil {
return err
}
@ -132,18 +128,9 @@ func upgradeKubernetes(ctx context.Context, c *client.Client) error {
upgradeOptions.InventoryPolicy = policy
upgradeOptions.EncoderOpt = encoder.WithComments(commentsFlags)
if upgradeOptions.ReconcileTimeout == 0 {
upgradeOptions.SkipManifestWait = true
}
return k8s.Upgrade(ctx, &state, upgradeOptions)
}
func parseInventoryPolicy(policy string) (inventory.Policy, error) {
switch policy {
case "MustMatch":
return inventory.PolicyMustMatch, nil
case "AdoptIfNoInventory":
return inventory.PolicyAdoptIfNoInventory, nil
case "AdoptAll":
return inventory.PolicyAdoptAll, nil
default:
return 0, fmt.Errorf("invalid inventory policy %q: must be one of 'MustMatch', 'AdoptIfNoInventory', or 'AdoptAll'", policy)
}
}

6
go.mod
View File

@ -147,7 +147,7 @@ require (
github.com/siderolabs/go-debug v0.6.2
github.com/siderolabs/go-kmsg v0.1.4
github.com/siderolabs/go-kubeconfig v0.1.1
github.com/siderolabs/go-kubernetes v0.2.32
github.com/siderolabs/go-kubernetes v0.2.33
github.com/siderolabs/go-loadbalancer v0.5.0
github.com/siderolabs/go-pcidb v0.3.2
github.com/siderolabs/go-pointer v1.0.1
@ -191,7 +191,6 @@ require (
gopkg.in/typ.v4 v4.4.0
k8s.io/klog/v2 v2.130.1
kernel.org/pub/linux/libs/security/libcap/cap v1.2.77
sigs.k8s.io/cli-utils v0.37.3-0.20250918194211-77c836a69463
sigs.k8s.io/hydrophone v0.7.0
)
@ -250,7 +249,6 @@ require (
github.com/evanphx/json-patch v5.9.11+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect
github.com/fatih/camelcase v1.0.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fluxcd/cli-utils v0.36.0-flux.15 // indirect
github.com/fluxcd/pkg/ssa v0.60.0 // indirect
@ -329,7 +327,6 @@ require (
github.com/siderolabs/protoenc v0.2.4 // indirect
github.com/siderolabs/tcpproxy v0.1.0 // indirect
github.com/spf13/afero v1.14.0 // indirect
github.com/spyzhov/ajson v0.9.6 // indirect
github.com/u-root/uio v0.0.0-20240224005618-d2acac8f3701 // indirect
github.com/vbatts/tar-split v0.12.2 // indirect
github.com/vishvananda/netlink v1.3.1 // indirect
@ -364,7 +361,6 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/cli-runtime v0.35.0 // indirect
k8s.io/component-helpers v0.35.0 // indirect
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect
k8s.io/utils v0.0.0-20260108192941-914a6e750570 // indirect
kernel.org/pub/linux/libs/security/libcap/psx v1.2.77 // indirect

12
go.sum
View File

@ -215,8 +215,6 @@ github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjT
github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM=
github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f h1:Wl78ApPPB2Wvf/TIe2xdyJxTlb6obmF18d8QdkxNDu4=
github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f/go.mod h1:OSYXu++VVOHnXeitef/D8n/6y4QV8uLHSFXX4NeXMGc=
github.com/fatih/camelcase v1.0.0 h1:hxNvNX/xYBp0ovncs8WyWZrOrpBNub/JfaMvbURyft8=
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
@ -642,8 +640,8 @@ github.com/siderolabs/go-kmsg v0.1.4 h1:RLAa90O9bWuhA3pXPAYAdrI+kzcqTshZASRA5yso
github.com/siderolabs/go-kmsg v0.1.4/go.mod h1:BLkt2N2DHT0wsFMz32lMw6vNEZL90c8ZnBjpIUoBb/M=
github.com/siderolabs/go-kubeconfig v0.1.1 h1:tZlgpelj/OqrcHVUwISPN0NRgObcflpH9WtE41mtQZ0=
github.com/siderolabs/go-kubeconfig v0.1.1/go.mod h1:QaGp4i9L95oDbcU7jDn30aw4gnREkb3O5otgxw8imOk=
github.com/siderolabs/go-kubernetes v0.2.32 h1:nReXnZNmCCVS9iymIFLYPAhVegaFgAnLI+48W8p9bC8=
github.com/siderolabs/go-kubernetes v0.2.32/go.mod h1:yDWt6+46mnekoOj6GIPSaPLMrxNSQKOJuOsHxvM5NL8=
github.com/siderolabs/go-kubernetes v0.2.33 h1:I5J1EW6McjQeTzQCaZ7Vc0ZTbwypsCburRshTkb6HS0=
github.com/siderolabs/go-kubernetes v0.2.33/go.mod h1:Ow/1iR+kJmMvKno9whL8WTRBoCPKPrHqxy1a/7SYacs=
github.com/siderolabs/go-loadbalancer v0.5.0 h1:0v7E6GrxoONyqwcmHiA+J0vIDPWbkTmevHGCFb4tjdc=
github.com/siderolabs/go-loadbalancer v0.5.0/go.mod h1:tRVouZ9i2R/TRbNUF9MqyBlV2wsjX0cxkYTjPXcI9P0=
github.com/siderolabs/go-pcidb v0.3.2 h1:18KMjsc+AO2r6/pl0KLBR9xOXO0ULLCXwmGhIukoAbw=
@ -692,8 +690,6 @@ github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiT
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spyzhov/ajson v0.9.6 h1:iJRDaLa+GjhCDAt1yFtU/LKMtLtsNVKkxqlpvrHHlpQ=
github.com/spyzhov/ajson v0.9.6/go.mod h1:a6oSw0MMb7Z5aD2tPoPO+jq11ETKgXUr2XktHdT8Wt8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
@ -1056,8 +1052,6 @@ k8s.io/client-go v0.35.0 h1:IAW0ifFbfQQwQmga0UdoH0yvdqrbwMdq9vIFEhRpxBE=
k8s.io/client-go v0.35.0/go.mod h1:q2E5AAyqcbeLGPdoRB+Nxe3KYTfPce1Dnu1myQdqz9o=
k8s.io/component-base v0.35.0 h1:+yBrOhzri2S1BVqyVSvcM3PtPyx5GUxCK2tinZz1G94=
k8s.io/component-base v0.35.0/go.mod h1:85SCX4UCa6SCFt6p3IKAPej7jSnF3L8EbfSyMZayJR0=
k8s.io/component-helpers v0.35.0 h1:wcXv7HJRksgVjM4VlXJ1CNFBpyDHruRI99RrBtrJceA=
k8s.io/component-helpers v0.35.0/go.mod h1:ahX0m/LTYmu7fL3W8zYiIwnQ/5gT28Ex4o2pymF63Co=
k8s.io/cri-api v0.35.0 h1:fxLSKyJHqbyCSUsg1rW4DRpmjSEM/elZ1GXzYTSLoDQ=
k8s.io/cri-api v0.35.0/go.mod h1:Cnt29u/tYl1Se1cBRL30uSZ/oJ5TaIp4sZm1xDLvcMc=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
@ -1082,8 +1076,6 @@ pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk=
pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY=
rsc.io/qr v0.2.0/go.mod h1:IF+uZjkb9fqyeF/4tlBoynqmQxUoPfWEKh921coOuXs=
sigs.k8s.io/cli-utils v0.37.3-0.20250918194211-77c836a69463 h1:cogPZ9yOTBY+1TxAXD7DnqQqTrN83iwNTdq08z3lFao=
sigs.k8s.io/cli-utils v0.37.3-0.20250918194211-77c836a69463/go.mod h1:u5LTcoijf7f18rMNL7PVNyJzoGEriT+tS57ZSVG3nc4=
sigs.k8s.io/controller-runtime v0.22.2 h1:cK2l8BGWsSWkXz09tcS4rJh95iOLney5eawcK5A33r4=
sigs.k8s.io/controller-runtime v0.22.2/go.mod h1:+QX1XUpTXN4mLoblf4tqr5CQcyHPAki2HLXqQMY6vh8=
sigs.k8s.io/hydrophone v0.7.0 h1:BKEb8m6mcVL6kFEZ4jUCk5VD81bqm2XPtNxFT52ifxc=

View File

@ -19,7 +19,8 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/siderolabs/gen/optional"
"github.com/siderolabs/gen/xslices"
gokube "github.com/siderolabs/go-kubernetes/kubernetes/manifests"
"github.com/siderolabs/go-kubernetes/kubernetes/ssa"
"github.com/siderolabs/go-kubernetes/kubernetes/ssa/object"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
corev1 "k8s.io/api/core/v1"
@ -28,18 +29,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/apis/actuation"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
k8sadapter "github.com/siderolabs/talos/internal/app/machined/pkg/adapters/k8s"
"github.com/siderolabs/talos/internal/pkg/etcd"
@ -137,52 +134,7 @@ func (ctrl *ManifestApplyController) Run(ctx context.Context, r controller.Runti
})
if len(manifests.Items) > 0 {
var (
kubeconfig *rest.Config
dyn dynamic.Interface
)
kubeconfig, err = clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return clientcmd.Load([]byte(secrets.LocalhostAdminKubeconfig))
})
if err != nil {
return fmt.Errorf("error loading kubeconfig: %w", err)
}
kubeconfig.WarningHandler = rest.NewWarningWriter(logging.NewWriter(logger, zapcore.WarnLevel), rest.WarningWriterOptions{
Deduplicate: true,
})
discoveryClient, err := discovery.NewDiscoveryClientForConfig(kubeconfig)
if err != nil {
return fmt.Errorf("error building discovery client: %w", err)
}
dc := memory.NewMemCacheClient(discoveryClient)
mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc))
dyn, err = dynamic.NewForConfig(kubeconfig)
if err != nil {
return fmt.Errorf("error building dynamic client: %w", err)
}
if err = etcd.WithLock(ctx, constants.EtcdTalosManifestApplyMutex, logger, func() error {
inventoryClient, inv, err := getInventory(ctx, kubeconfig, mapper, dc)
if err != nil {
return err
}
applyErr := ctrl.apply(ctx, logger, mapper, dyn, manifests, inv)
// update inventory even if the apply process failed half way through
err = inventoryClient.CreateOrUpdate(ctx, inv, inventory.UpdateOptions{})
if err != nil {
err = fmt.Errorf("updating inventory failed: %w", err)
}
return errors.Join(applyErr, err)
}); err != nil {
if err = ctrl.applyManifests(ctx, logger, manifests, secrets); err != nil {
return err
}
}
@ -203,6 +155,74 @@ func (ctrl *ManifestApplyController) Run(ctx context.Context, r controller.Runti
}
}
func (ctrl *ManifestApplyController) applyManifests(
ctx context.Context,
logger *zap.Logger,
manifests resource.List,
secrets *secrets.KubernetesCertsSpec,
) error {
kubeconfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return clientcmd.Load([]byte(secrets.LocalhostAdminKubeconfig))
})
if err != nil {
return fmt.Errorf("error loading kubeconfig: %w", err)
}
kubeconfig.WarningHandler = rest.NewWarningWriter(logging.NewWriter(logger, zapcore.WarnLevel), rest.WarningWriterOptions{
Deduplicate: true,
})
httpClient, err := rest.HTTPClientFor(kubeconfig)
if err != nil {
return fmt.Errorf("error building HTTP client for kubeconfig: %w", err)
}
defer httpClient.CloseIdleConnections()
discoveryClient, err := discovery.NewDiscoveryClientForConfigAndClient(kubeconfig, httpClient)
if err != nil {
return fmt.Errorf("error building discovery client: %w", err)
}
dyn, err := dynamic.NewForConfigAndClient(kubeconfig, httpClient)
if err != nil {
return fmt.Errorf("error building dynamic client: %w", err)
}
dc := memory.NewMemCacheClient(discoveryClient)
mapper := restmapper.NewDeferredDiscoveryRESTMapper(dc)
k8sClient, err := kubernetes.NewForConfigAndClient(kubeconfig, httpClient)
if err != nil {
return err
}
if err = etcd.WithLock(ctx, constants.EtcdTalosManifestApplyMutex, logger, func() error {
inv, err := ssa.GetInventory(ctx, k8sClient, constants.KubernetesInventoryNamespace, constants.KubernetesBootstrapManifestsInventoryName)
if err != nil {
return fmt.Errorf("error getting inventory: %w", err)
}
inventoryContents := inv.Get()
inventoryContents, applyErr := ctrl.apply(ctx, logger, mapper, dyn, manifests, inventoryContents)
inv.Update(inventoryContents)
// update inventory even if the apply process failed half way through
err = inv.Write(ctx)
if err != nil {
err = fmt.Errorf("updating inventory failed: %w", err)
}
return errors.Join(applyErr, err)
}); err != nil {
return err
}
return nil
}
//nolint:gocyclo,cyclop
func (ctrl *ManifestApplyController) apply(
ctx context.Context,
@ -210,8 +230,8 @@ func (ctrl *ManifestApplyController) apply(
mapper *restmapper.DeferredDiscoveryRESTMapper,
dyn dynamic.Interface,
manifests resource.List,
inv inventory.Inventory,
) error {
inv object.ObjMetadataSet,
) (object.ObjMetadataSet, error) {
// flatten list of objects to be applied
objects := xslices.FlatMap(manifests.Items, func(m resource.Resource) []*unstructured.Unstructured {
return k8sadapter.Manifest(m.(*k8s.Manifest)).Objects()
@ -255,16 +275,19 @@ func (ctrl *ManifestApplyController) apply(
var multiErr *multierror.Error
for _, obj := range objects {
objMeta := object.UnstructuredToObjMetadata(obj)
// check if the resource is already in the inventory, if so, skip applying it
if inv.GetObjectRefs().Contains(objMeta) {
continue
}
gvk := obj.GroupVersionKind()
objName := fmt.Sprintf("%s/%s/%s/%s", gvk.Group, gvk.Version, gvk.Kind, obj.GetName())
objMeta, err := object.RuntimeToObjMeta(obj)
if err != nil {
return nil, fmt.Errorf("failed to retrieve object metadata of %q: %w", objName, err)
}
// check if the resource is already in the inventory, if so, skip applying it
if inv.Contains(objMeta) {
continue
}
mapping, err := mapper.RESTMapping(obj.GroupVersionKind().GroupKind(), obj.GroupVersionKind().Version)
if err != nil {
switch {
@ -279,7 +302,7 @@ func (ctrl *ManifestApplyController) apply(
continue
default:
// connection errors, etc.; it makes no sense to continue with other manifests
return fmt.Errorf("error creating mapping for object %s: %w", objName, err)
return nil, fmt.Errorf("error creating mapping for object %s: %w", objName, err)
}
}
@ -302,13 +325,13 @@ func (ctrl *ManifestApplyController) apply(
if err == nil {
// already exists,
// backfill the inventory if the resource is missing (to migrate to inventory-based apply)
inventoryAdd(inv, objMeta, obj, actuation.ActuationSucceeded)
inv = inv.Union(object.ObjMetadataSet{objMeta})
continue
}
if !apierrors.IsNotFound(err) {
return fmt.Errorf("error checking resource existence: %w", err)
return nil, fmt.Errorf("error checking resource existence: %w", err)
}
// Set inventory annotation.
@ -317,13 +340,15 @@ func (ctrl *ManifestApplyController) apply(
annotations = make(map[string]string)
}
inventoryAnnotation, inventoryAnnotationSet := annotations["config.k8s.io/owning-inventory"]
inventoryAnnotation, inventoryAnnotationSet := annotations[ssa.InventoryAnnotationKey]
if inventoryAnnotationSet && inventoryAnnotation != constants.KubernetesBootstrapManifestsInventoryName {
return fmt.Errorf("unexpected foreign inventory annotation on %s ", objName)
multiErr = multierror.Append(multiErr, fmt.Errorf("unexpected foreign inventory annotation on %s ", objName))
continue
}
annotations["config.k8s.io/owning-inventory"] = constants.KubernetesBootstrapManifestsInventoryName
annotations[ssa.InventoryAnnotationKey] = constants.KubernetesBootstrapManifestsInventoryName
obj.SetAnnotations(annotations)
_, err = dr.Apply(ctx, obj.GetName(), obj, metav1.ApplyOptions{
@ -340,67 +365,16 @@ func (ctrl *ManifestApplyController) apply(
multiErr = multierror.Append(multiErr, fmt.Errorf("error creating %s: %w", objName, err))
default:
// connection errors, etc.; it makes no sense to continue with other manifests
return fmt.Errorf("error creating %s: %w", objName, err)
return nil, fmt.Errorf("error creating %s: %w", objName, err)
}
} else {
logger.Sugar().Infof("created %s", objName)
inventoryAdd(inv, objMeta, obj, actuation.ActuationSucceeded)
inv = inv.Union(object.ObjMetadataSet{objMeta})
}
}
return multiErr.ErrorOrNil()
}
func getInventory(
ctx context.Context,
kubeconfig *rest.Config,
mapper *restmapper.DeferredDiscoveryRESTMapper,
dc discovery.CachedDiscoveryInterface,
) (inventory.Client, inventory.Inventory, error) {
clientGetter := gokube.K8sRESTClientGetter{
RestConfig: kubeconfig,
Mapper: mapper,
DiscoveryClient: dc,
}
factory := util.NewFactory(clientGetter)
inventoryClient, err := inventory.ConfigMapClientFactory{StatusEnabled: true}.NewClient(factory)
if err != nil {
return nil, nil, err
}
inventoryInfo := inventory.NewSingleObjectInfo(inventory.ID(
constants.KubernetesBootstrapManifestsInventoryName),
types.NamespacedName{
Namespace: constants.KubernetesInventoryNamespace,
Name: constants.KubernetesBootstrapManifestsInventoryName,
})
err = gokube.AssureInventory(ctx, inventoryClient, inventoryInfo)
if err != nil {
return nil, nil, err
}
inv, err := inventoryClient.Get(ctx, inventoryInfo, inventory.GetOptions{})
if err != nil {
return nil, nil, err
}
return inventoryClient, inv, err
}
func inventoryAdd(inv inventory.Inventory, objMeta object.ObjMetadata, obj *unstructured.Unstructured, actuationStatus actuation.ActuationStatus) {
inv.SetObjectRefs(slices.Concat(inv.GetObjectRefs(), object.ObjMetadataSet{objMeta}))
inv.SetObjectStatuses(slices.Concat(inv.GetObjectStatuses(), object.ObjectStatusSet{actuation.ObjectStatus{
ObjectReference: inventory.ObjectReferenceFromObjMetadata(objMeta),
Strategy: actuation.ActuationStrategyApply,
Actuation: actuationStatus,
Reconcile: actuation.ReconcileUnknown,
UID: obj.GetUID(),
Generation: obj.GetGeneration(),
}}))
return inv, multiErr.ErrorOrNil()
}
func isNamespace(gvk schema.GroupVersionKind) bool {

View File

@ -12,13 +12,13 @@ import (
"encoding/pem"
"errors"
"fmt"
"net/http"
"slices"
"sync"
"time"
"github.com/siderolabs/crypto/x509"
"github.com/siderolabs/gen/xslices"
taloskubernetes "github.com/siderolabs/go-kubernetes/kubernetes"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
@ -38,7 +38,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/connrotation"
"k8s.io/client-go/util/workqueue"
clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config"
@ -99,7 +98,7 @@ type CRDController struct {
kubeClient kubernetes.Interface
dynamicClient dynamic.Interface
dialer *connrotation.Dialer
httpClient *http.Client
secretsSynced cache.InformerSynced
talosSAsSynced cache.InformerSynced
@ -120,15 +119,17 @@ func NewCRDController(
allowedRoles []string,
logger *zap.Logger,
) (*CRDController, error) {
dialer := taloskubernetes.NewDialer()
kubeconfig.Dial = dialer.DialContext
kubeCli, err := kubernetes.NewForConfig(kubeconfig)
httpClient, err := rest.HTTPClientFor(kubeconfig)
if err != nil {
return nil, err
}
dynCli, err := dynamic.NewForConfig(kubeconfig)
kubeCli, err := kubernetes.NewForConfigAndClient(kubeconfig, httpClient)
if err != nil {
return nil, err
}
dynCli, err := dynamic.NewForConfigAndClient(kubeconfig, httpClient)
if err != nil {
return nil, err
}
@ -159,7 +160,7 @@ func NewCRDController(
kubeInformerFactory: kubeInformerFactory,
kubeClient: kubeCli,
dynamicClient: dynCli,
dialer: dialer,
httpClient: httpClient,
dynamicLister: lister,
queue: workqueue.NewTypedRateLimitingQueue(
workqueue.DefaultTypedControllerRateLimiter[string](),
@ -206,7 +207,7 @@ func (t *CRDController) Run(ctx context.Context, workers int) error {
defer func() {
t.queue.ShutDown()
t.dialer.CloseAll()
t.httpClient.CloseIdleConnections()
wg.Wait()
t.logger.Debug("all workers have shut down")

View File

@ -397,6 +397,8 @@ func (apiSuite *APISuite) ClearConnectionRefused(ctx context.Context, nodes ...s
numMasterNodes = 3
}
apiSuite.T().Log("run ClearConnectionRefused")
apiSuite.Require().NoError(retry.Constant(backoff.DefaultConfig.MaxDelay, retry.WithUnits(time.Second)).Retry(func() error {
for range numMasterNodes * 5 {
_, err := apiSuite.Client.Version(client.WithNodes(ctx, nodes...))
@ -404,6 +406,8 @@ func (apiSuite *APISuite) ClearConnectionRefused(ctx context.Context, nodes ...s
continue
}
apiSuite.T().Log(err.Error())
if client.StatusCode(err) == codes.Unavailable || client.StatusCode(err) == codes.Canceled {
return retry.ExpectedError(err)
}

View File

@ -12,7 +12,6 @@ import (
"time"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/siderolabs/go-kubernetes/kubernetes/manifests"
"github.com/stretchr/testify/assert"
"go.yaml.in/yaml/v4"
appsv1 "k8s.io/api/apps/v1"
@ -116,8 +115,8 @@ func (suite *ManifestsSuite) TestSync() {
clusterAccess,
true,
kubernetes.UpgradeOptions{
LogOutput: manifestSyncWriter{t: suite.T()},
SSApplyBehaviorOptions: manifests.DefaultSSApplyBehaviorOptions(),
LogOutput: manifestSyncWriter{t: suite.T()},
ReconcileTimeout: 30 * time.Second,
},
))

View File

@ -21,6 +21,7 @@ import (
"github.com/cosi-project/runtime/pkg/safe"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-blockdevice/v2/encryption"
"github.com/siderolabs/go-kubernetes/kubernetes/ssa"
"github.com/siderolabs/go-kubernetes/kubernetes/upgrade"
"github.com/siderolabs/go-procfs/procfs"
"github.com/siderolabs/go-retry/retry"
@ -429,6 +430,9 @@ func (suite *BaseSuite) upgradeKubernetes(fromVersion, toVersion string, skipKub
ProxyImage: constants.KubeProxyImage,
EncoderOpt: encoder.WithComments(encoder.CommentsAll),
InventoryPolicy: ssa.InventoryPolicyAdoptIfNoInventory,
ReconcileTimeout: 3 * time.Minute,
}
suite.Require().NoError(kubernetes.Upgrade(suite.ctx, suite.clusterAccess, options))

View File

@ -19,6 +19,8 @@ import (
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/xiter"
"github.com/siderolabs/go-kubernetes/kubernetes/manifests"
"github.com/siderolabs/go-kubernetes/kubernetes/ssa"
ssacli "github.com/siderolabs/go-kubernetes/kubernetes/ssa/cli"
"github.com/siderolabs/go-kubernetes/kubernetes/upgrade"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
@ -504,31 +506,77 @@ func syncManifests(ctx context.Context, objects []*unstructured.Unstructured, cl
return manifests.SyncWithLog(ctx, objects, config, options.DryRun, options.Log)
}
//nolint:gocyclo
func syncManifestsSSA(ctx context.Context, objects []*unstructured.Unstructured, cluster UpgradeProvider, options UpgradeOptions) error {
config, err := cluster.K8sRestConfig(ctx)
if err != nil {
return err
}
return manifests.SyncAndDiffWithLogSSA(
ctx,
objects,
config,
manifests.SSAOptions{
FieldManagerName: constants.KubernetesFieldManagerName,
InventoryNamespace: constants.KubernetesInventoryNamespace,
InventoryName: constants.KubernetesBootstrapManifestsInventoryName,
SSApplyBehaviorOptions: manifests.SSApplyBehaviorOptions{
DryRun: options.DryRun,
InventoryPolicy: options.InventoryPolicy,
ReconcileTimeout: options.ReconcileTimeout,
PruneTimeout: options.PruneTimeout,
ForceConflicts: options.ForceConflicts,
NoPrune: options.NoPrune,
},
},
options.Log,
updatingManifestsLogline := "updating manifests"
if options.DryRun {
updatingManifestsLogline += " (dry run)"
}
options.Log("%s", updatingManifestsLogline)
manager, err := ssa.NewManager(ctx, config,
constants.KubernetesFieldManagerName,
constants.KubernetesInventoryNamespace,
constants.KubernetesBootstrapManifestsInventoryName,
)
if err != nil {
return fmt.Errorf("error creating SSA manager: %w", err)
}
defer manager.Close()
if options.DryRun {
// only do the diff in dry-run mode
changes, err := manager.Diff(ctx, objects, ssa.DiffOptions{
NoPrune: options.NoPrune,
InventoryPolicy: options.InventoryPolicy,
})
if err != nil {
return fmt.Errorf("error diffing manifests: %w", err)
}
for _, change := range changes {
options.Log(" < %s %s", change.Action, change.Subject)
if change.Diff != "" {
options.Log("%s", change.Diff)
}
}
return nil
}
changes, err := manager.Apply(ctx, objects, ssa.ApplyOptions{
InventoryPolicy: options.InventoryPolicy,
WaitTimeout: options.ReconcileTimeout,
NoPrune: options.NoPrune,
Force: options.ForceManifests,
})
if err != nil {
return fmt.Errorf("error applying manifests: %w", err)
}
ssacli.LogApplyResults(ctx, changes, manager, options.Log)
if options.SkipManifestWait {
options.Log("skipping waiting for manifest reconciliation")
return nil
}
waitOptions := ssa.WaitOptions{
Interval: 2 * time.Second,
Timeout: options.ReconcileTimeout,
FailFast: true,
}
return ssacli.Wait(ctx, changes, options.Log, manager, waitOptions)
}
//nolint:gocyclo

View File

@ -7,8 +7,9 @@ package kubernetes
import (
"fmt"
"io"
"time"
"github.com/siderolabs/go-kubernetes/kubernetes/manifests"
"github.com/siderolabs/go-kubernetes/kubernetes/ssa"
"github.com/siderolabs/go-kubernetes/kubernetes/upgrade"
"github.com/siderolabs/talos/pkg/machinery/config/encoder"
@ -24,10 +25,9 @@ const (
// UpgradeOptions represents Kubernetes control plane upgrade settings.
type UpgradeOptions struct {
manifests.SSApplyBehaviorOptions
Path *upgrade.Path
DryRun bool
ControlPlaneEndpoint string
LogOutput io.Writer
PrePullImages bool
@ -40,6 +40,12 @@ type UpgradeOptions struct {
SchedulerImage string
ProxyImage string
NoPrune bool
ForceManifests bool
ReconcileTimeout time.Duration
InventoryPolicy ssa.InventoryPolicy
SkipManifestWait bool
controlPlaneNodes []string
workerNodes []string
}

View File

@ -3371,11 +3371,10 @@ talosctl upgrade-k8s [flags]
--from string the Kubernetes control plane version to upgrade from
-h, --help help for upgrade-k8s
--kubelet-image string kubelet image to use (default "ghcr.io/siderolabs/kubelet")
--manifests-force-conflicts overwrite the fields when applying even if the field manager differs
--manifests-force whether to recreate objects that contain immutable field changes
--manifests-inventory-policy string kubernetes SSA inventory policy (one of 'MustMatch', 'AdoptIfNoInventory' or 'AdoptAll') (default "AdoptIfNoInventory")
--manifests-no-prune whether pruning of previously applied objects should happen after apply
--manifests-prune-timeout duration how long to wait for resources to be fully deleted (set to zero to disable waiting) (default 3m0s)
--manifests-reconcile-timeout duration how long to wait for resources to be fully reconciled (set to zero to disable waiting) (default 3m0s)
--manifests-reconcile-timeout duration how long to wait for resources to be fully reconciled (set to zero to disable waiting) (default 5m0s)
-n, --nodes strings target the specified nodes
--pre-pull-images pre-pull images before upgrade (default true)
--proxy-image string kube-proxy image to use (default "registry.k8s.io/kube-proxy")