feat: add a tool and package to convert self-hosted CP to static pods

This is required to upgrade from Talos 0.8.x to 0.9.x. After the cluster
is fully upgraded, control plane is still self-hosted (as it was
bootstrapped with bootkube).

Tool `talosctl convert-k8s` (and library behind it) performs the upgrade
to self-hosted version.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2021-02-09 22:48:18 +03:00 committed by talos-bot
parent 3a78bfcecd
commit 7751920dba
50 changed files with 2970 additions and 1748 deletions

View File

@ -354,9 +354,9 @@ local integration_provision_tests_prepare = Step("provision-tests-prepare", priv
local integration_provision_tests_track_0 = Step("provision-tests-track-0", privileged=true, depends_on=[integration_provision_tests_prepare], environment={"REGISTRY": local_registry});
local integration_provision_tests_track_1 = Step("provision-tests-track-1", privileged=true, depends_on=[integration_provision_tests_prepare], environment={"REGISTRY": local_registry});
local integration_cilium = Step("e2e-cilium-1.8.5", target="e2e-qemu", privileged=true, depends_on=[load_artifacts], environment={
local integration_cilium = Step("e2e-cilium-1.9.4", target="e2e-qemu", privileged=true, depends_on=[load_artifacts], environment={
"SHORT_INTEGRATION_TEST": "yes",
"CUSTOM_CNI_URL": "https://raw.githubusercontent.com/cilium/cilium/v1.8.5/install/kubernetes/quick-install.yaml",
"CUSTOM_CNI_URL": "https://raw.githubusercontent.com/cilium/cilium/v1.9.4/install/kubernetes/quick-install.yaml",
"REGISTRY": local_registry,
"CLUSTER_CIDR": 2,
});

View File

@ -24,7 +24,7 @@ CLUSTERCTL_URL ?= https://github.com/kubernetes-sigs/cluster-api/releases/downlo
SONOBUOY_VERSION ?= 0.19.0
SONOBUOY_URL ?= https://github.com/heptio/sonobuoy/releases/download/v$(SONOBUOY_VERSION)/sonobuoy_$(SONOBUOY_VERSION)_$(OPERATING_SYSTEM)_amd64.tar.gz
TESTPKGS ?= github.com/talos-systems/talos/...
RELEASES ?= v0.7.1 v0.8.0
RELEASES ?= v0.7.1 v0.8.3
SHORT_INTEGRATION_TEST ?=
CUSTOM_CNI_URL ?=

View File

@ -47,6 +47,8 @@ service MachineService {
rpc Rollback(RollbackRequest) returns (RollbackResponse);
rpc Reset(ResetRequest) returns (ResetResponse);
rpc Recover(RecoverRequest) returns (RecoverResponse);
rpc RemoveBootkubeInitializedKey(google.protobuf.Empty)
returns (RemoveBootkubeInitializedKeyResponse);
rpc ServiceList(google.protobuf.Empty) returns (ServiceListResponse);
rpc ServiceRestart(ServiceRestartRequest) returns (ServiceRestartResponse);
rpc ServiceStart(ServiceStartRequest) returns (ServiceStartResponse);
@ -64,6 +66,7 @@ service MachineService {
message ApplyConfigurationRequest {
bytes data = 1;
bool on_reboot = 2;
bool immediate = 3;
}
// ApplyConfigurationResponse describes the response to a configuration request.
@ -836,3 +839,7 @@ message GenerateConfiguration {
message GenerateConfigurationResponse {
repeated GenerateConfiguration messages = 1;
}
// RemoveBootkubeInitializedKeyResponse describes the response to a RemoveBootkubeInitializedKey request.
message RemoveBootkubeInitializedKey { common.Metadata metadata = 1; }
message RemoveBootkubeInitializedKeyResponse { repeated RemoveBootkubeInitializedKey messages = 1; }

View File

@ -24,6 +24,7 @@ var applyConfigCmdFlags struct {
insecure bool
interactive bool
onReboot bool
immediate bool
}
// applyConfigCmd represents the applyConfiguration command.
@ -134,8 +135,9 @@ var applyConfigCmd = &cobra.Command{
}
if _, err := c.ApplyConfiguration(ctx, &machineapi.ApplyConfigurationRequest{
Data: cfgBytes,
OnReboot: applyConfigCmdFlags.onReboot,
Data: cfgBytes,
OnReboot: applyConfigCmdFlags.onReboot,
Immediate: applyConfigCmdFlags.immediate,
}); err != nil {
return fmt.Errorf("error applying new configuration: %s", err)
}
@ -151,6 +153,7 @@ func init() {
applyConfigCmd.Flags().StringSliceVar(&applyConfigCmdFlags.certFingerprints, "cert-fingerprint", nil, "list of server certificate fingeprints to accept (defaults to no check)")
applyConfigCmd.Flags().BoolVar(&applyConfigCmdFlags.interactive, "interactive", false, "apply the config using text based interactive mode")
applyConfigCmd.Flags().BoolVar(&applyConfigCmdFlags.onReboot, "on-reboot", false, "apply the config on reboot")
applyConfigCmd.Flags().BoolVar(&applyConfigCmdFlags.immediate, "immediate", false, "apply the config immediately (without a reboot)")
// deprecated, to be removed in 0.10
applyConfigCmd.Flags().BoolVar(&applyConfigCmdFlags.onReboot, "no-reboot", false, "apply the config only after the reboot")

View File

@ -0,0 +1,57 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package talos
import (
"context"
"github.com/spf13/cobra"
"github.com/talos-systems/talos/pkg/cluster"
k8s "github.com/talos-systems/talos/pkg/cluster/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/client"
)
// convertK8sCmd represents the convert-k8s command.
var convertK8sCmd = &cobra.Command{
Use: "convert-k8s",
Short: "Convert Kubernetes control plane from self-hosted (bootkube) to Talos-managed (static pods).",
Long: `Command converts control plane bootstrapped on Talos <= 0.8 to Talos-managed control plane (Talos >= 0.9).
As part of the conversion process tool reads existing configuration of the control plane, updates
Talos node configuration to reflect changes made since the boostrap time. Once config is updated,
tool releases static pods and deletes self-hosted DaemonSets.`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return WithClient(convertKubernetes)
},
}
var convertOptions k8s.ConvertOptions
func init() {
convertK8sCmd.Flags().StringVar(&convertOptions.ControlPlaneEndpoint, "endpoint", "", "the cluster control plane endpoint")
convertK8sCmd.Flags().BoolVar(&convertOptions.ForceYes, "force", false, "skip prompts, assume yes")
addCommand(convertK8sCmd)
}
func convertKubernetes(ctx context.Context, c *client.Client) error {
clientProvider := &cluster.ConfigClientProvider{
DefaultClient: c,
}
defer clientProvider.Close() //nolint: errcheck
state := struct {
cluster.ClientProvider
cluster.K8sProvider
}{
ClientProvider: clientProvider,
K8sProvider: &cluster.KubernetesClient{
ClientProvider: clientProvider,
ForceEndpoint: convertOptions.ControlPlaneEndpoint,
},
}
return k8s.ConvertToStaticPods(ctx, &state, convertOptions)
}

View File

@ -6,6 +6,7 @@ package talos
import (
"context"
"fmt"
"github.com/spf13/cobra"
@ -45,13 +46,24 @@ func upgradeKubernetes(ctx context.Context, c *client.Client) error {
defer clientProvider.Close() //nolint: errcheck
state := struct {
cluster.ClientProvider
cluster.K8sProvider
}{
ClientProvider: clientProvider,
K8sProvider: &cluster.KubernetesClient{
ClientProvider: clientProvider,
ForceEndpoint: healthCmdFlags.forceEndpoint,
ForceEndpoint: upgradeOptions.ControlPlaneEndpoint,
},
}
return k8s.Upgrade(ctx, &state, upgradeOptions)
selfHosted, err := k8s.IsSelfHostedControlPlane(ctx, &state, Nodes[0])
if err != nil {
return fmt.Errorf("error checking self-hosted status: %w", err)
}
if selfHosted {
return k8s.UpgradeSelfHosted(ctx, &state, upgradeOptions)
}
return k8s.UpgradeTalosManaged(ctx, &state, upgradeOptions)
}

View File

@ -32,6 +32,7 @@ import (
"github.com/prometheus/procfs"
"github.com/rs/xid"
"github.com/talos-systems/go-blockdevice/blockdevice/partition/gpt"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
@ -103,14 +104,33 @@ func (s *Server) Register(obj *grpc.Server) {
}
// ApplyConfiguration implements machine.MachineService.
func (s *Server) ApplyConfiguration(ctx context.Context, in *machine.ApplyConfigurationRequest) (reply *machine.ApplyConfigurationResponse, err error) {
if !in.OnReboot {
if err = s.Controller.Runtime().SetConfig(in.GetData()); err != nil {
//
//nolint: gocyclo
func (s *Server) ApplyConfiguration(ctx context.Context, in *machine.ApplyConfigurationRequest) (*machine.ApplyConfigurationResponse, error) {
log.Printf("apply config request: immediate %v, on reboot %v", in.Immediate, in.OnReboot)
switch {
// --immediate
case in.Immediate:
if err := s.Controller.Runtime().CanApplyImmediate(in.GetData()); err != nil {
return nil, err
}
if err := s.Controller.Runtime().SetConfig(in.GetData()); err != nil {
return nil, err
}
if err := ioutil.WriteFile(constants.ConfigPath, in.GetData(), 0o600); err != nil {
return nil, err
}
// default (no flags)
case !in.OnReboot:
if err := s.Controller.Runtime().SetConfig(in.GetData()); err != nil {
return nil, err
}
go func() {
if err = s.Controller.Run(runtime.SequenceApplyConfiguration, in); err != nil {
if err := s.Controller.Run(runtime.SequenceApplyConfiguration, in); err != nil {
if !runtime.IsRebootError(err) {
log.Println("apply configuration failed:", err)
}
@ -120,7 +140,8 @@ func (s *Server) ApplyConfiguration(ctx context.Context, in *machine.ApplyConfig
}
}
}()
} else {
// --no-reboot
case in.OnReboot:
cfg, err := s.Controller.Runtime().ValidateConfig(in.GetData())
if err != nil {
return nil, err
@ -143,13 +164,11 @@ func (s *Server) ApplyConfiguration(ctx context.Context, in *machine.ApplyConfig
}
}
reply = &machine.ApplyConfigurationResponse{
return &machine.ApplyConfigurationResponse{
Messages: []*machine.ApplyConfiguration{
{},
},
}
return reply, nil
}, nil
}
// GenerateConfiguration implements the machine.MachineServer interface.
@ -1722,6 +1741,33 @@ func (s *Server) EtcdForfeitLeadership(ctx context.Context, in *machine.EtcdForf
return reply, nil
}
// RemoveBootkubeInitializedKey implements machine.MachineService.
//
// Temporary API only used when converting from self-hosted to Talos-managed control plane.
// This API can be removed once the conversion process is no longer needed (Talos 0.11?).
func (s *Server) RemoveBootkubeInitializedKey(ctx context.Context, in *empty.Empty) (*machine.RemoveBootkubeInitializedKeyResponse, error) {
client, err := etcd.NewLocalClient()
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
// nolint: errcheck
defer client.Close()
ctx = clientv3.WithRequireLeader(ctx)
_, err = client.Delete(ctx, constants.InitializedKey)
if err != nil {
return nil, fmt.Errorf("error deleting initialized key: %w", err)
}
return &machine.RemoveBootkubeInitializedKeyResponse{
Messages: []*machine.RemoveBootkubeInitializedKey{
{},
},
}, nil
}
func upgradeMutex(c *etcd.Client) (*concurrency.Mutex, error) {
sess, err := concurrency.NewSession(c.Client,
concurrency.WithTTL(MinimumEtcdUpgradeLeaseLockSeconds),

View File

@ -16,11 +16,11 @@ import (
"github.com/talos-systems/os-runtime/pkg/resource"
"github.com/talos-systems/os-runtime/pkg/state"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/config"
"github.com/talos-systems/talos/pkg/images"
talosconfig "github.com/talos-systems/talos/pkg/machinery/config"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/config"
)
// K8sControlPlaneController manages config.K8sControlPlane based on configuration.

View File

@ -14,8 +14,8 @@ import (
"github.com/talos-systems/os-runtime/pkg/resource"
"github.com/talos-systems/os-runtime/pkg/state"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/config"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/resources/config"
)
// MachineTypeController manages config.MachineType based on configuration.

View File

@ -19,9 +19,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/config"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/k8s"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/k8s"
)
// ControlPlaneStaticPodController manages k8s.StaticPod based on control plane configuration.

View File

@ -20,9 +20,9 @@ import (
"github.com/talos-systems/os-runtime/pkg/resource"
"github.com/talos-systems/os-runtime/pkg/state"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/config"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/k8s"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/v1alpha1"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/k8s"
"github.com/talos-systems/talos/pkg/resources/v1alpha1"
)
// ExtraManifestController renders manifests based on templates and config/secrets.

View File

@ -22,11 +22,11 @@ import (
"github.com/talos-systems/os-runtime/pkg/state"
"gopkg.in/yaml.v3"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/k8s"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/secrets"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/v1alpha1"
"github.com/talos-systems/talos/pkg/kubernetes/kubelet"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/k8s"
"github.com/talos-systems/talos/pkg/resources/secrets"
"github.com/talos-systems/talos/pkg/resources/v1alpha1"
)
// KubeletStaticPodController renders static pod definitions and manages k8s.StaticPodStatus.

View File

@ -16,9 +16,9 @@ import (
"github.com/talos-systems/os-runtime/pkg/resource"
"github.com/talos-systems/os-runtime/pkg/state"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/config"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/k8s"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/secrets"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/k8s"
"github.com/talos-systems/talos/pkg/resources/secrets"
)
// ManifestController renders manifests based on templates and config/secrets.

View File

@ -28,11 +28,11 @@ import (
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/k8s"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/secrets"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/v1alpha1"
"github.com/talos-systems/talos/internal/pkg/etcd"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/k8s"
"github.com/talos-systems/talos/pkg/resources/secrets"
"github.com/talos-systems/talos/pkg/resources/v1alpha1"
)
// ManifestApplyController applies manifests via control plane endpoint.

View File

@ -19,9 +19,9 @@ import (
"github.com/talos-systems/os-runtime/pkg/resource"
"github.com/talos-systems/os-runtime/pkg/state"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/k8s"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/secrets"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/k8s"
"github.com/talos-systems/talos/pkg/resources/secrets"
)
// RenderSecretsStaticPodController manages k8s.SecretsReady and renders secrets from secrets.Kubernetes.

View File

@ -18,14 +18,14 @@ import (
"github.com/talos-systems/os-runtime/pkg/resource"
"github.com/talos-systems/os-runtime/pkg/state"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/config"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/secrets"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/v1alpha1"
"github.com/talos-systems/talos/internal/pkg/etcd"
"github.com/talos-systems/talos/internal/pkg/kubeconfig"
talosconfig "github.com/talos-systems/talos/pkg/machinery/config"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/secrets"
"github.com/talos-systems/talos/pkg/resources/v1alpha1"
)
// KubernetesController manages secrets.Kubernetes based on configuration.

View File

@ -15,10 +15,10 @@ import (
"github.com/talos-systems/os-runtime/pkg/state"
"go.etcd.io/etcd/clientv3"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/v1alpha1"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/internal/pkg/etcd"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/v1alpha1"
)
// BootstrapStatusController manages v1alpha1.Service based on services subsystem state.

View File

@ -13,9 +13,9 @@ import (
"github.com/talos-systems/os-runtime/pkg/resource"
"github.com/talos-systems/os-runtime/pkg/state"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/v1alpha1"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/resources/v1alpha1"
)
// ServiceController manages v1alpha1.Service based on services subsystem state.

View File

@ -14,9 +14,9 @@ import (
"github.com/talos-systems/os-runtime/pkg/resource"
"github.com/talos-systems/os-runtime/pkg/state"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/config"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/v1alpha1"
v1alpha1runtime "github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/v1alpha1"
)
// TimeSyncController manages v1alpha1.TimeSync based on configuration and service 'timed' status.

View File

@ -13,6 +13,7 @@ type Runtime interface {
Config() config.Provider
ValidateConfig([]byte) (config.Provider, error)
SetConfig([]byte) error
CanApplyImmediate([]byte) error
State() State
Events() EventStream
Logging() LoggingManager

View File

@ -8,11 +8,13 @@ import (
"fmt"
"log"
"os"
"reflect"
"syscall"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/pkg/machinery/config"
"github.com/talos-systems/talos/pkg/machinery/config/configloader"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
)
// Runtime implements the Runtime interface.
@ -64,6 +66,45 @@ func (r *Runtime) SetConfig(b []byte) error {
return r.s.V1Alpha2().SetConfig(cfg)
}
// CanApplyImmediate implements the Runtime interface.
func (r *Runtime) CanApplyImmediate(b []byte) error {
cfg, err := r.ValidateConfig(b)
if err != nil {
return err
}
// serialize and load back current config to remove any changes made
// to the config in-memory while the node was running
currentBytes, err := r.Config().Bytes()
if err != nil {
return fmt.Errorf("error serializing current config: %w", err)
}
currentConfigProvider, err := configloader.NewFromBytes(currentBytes)
if err != nil {
return fmt.Errorf("error loading current config: %w", err)
}
currentConfig, ok := currentConfigProvider.(*v1alpha1.Config)
if !ok {
return fmt.Errorf("current config is not v1alpha1")
}
newConfig, ok := cfg.(*v1alpha1.Config)
if !ok {
return fmt.Errorf("new config is not v1alpha1")
}
// the only allowed config change to be applied immediately for now is cluster config
newConfig.ClusterConfig = currentConfig.ClusterConfig
if !reflect.DeepEqual(currentConfig, newConfig) {
return fmt.Errorf("this config change can't be applied in immediate mode")
}
return nil
}
// State implements the Runtime interface.
func (r *Runtime) State() runtime.State {
return r.s

View File

@ -13,11 +13,11 @@ import (
"github.com/talos-systems/os-runtime/pkg/state/impl/namespaced"
"github.com/talos-systems/os-runtime/pkg/state/registry"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/config"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/k8s"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/secrets"
"github.com/talos-systems/talos/internal/app/machined/pkg/resources/v1alpha1"
talosconfig "github.com/talos-systems/talos/pkg/machinery/config"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/k8s"
"github.com/talos-systems/talos/pkg/resources/secrets"
"github.com/talos-systems/talos/pkg/resources/v1alpha1"
)
// State implements runtime.V1alpha2State interface.

View File

@ -45,6 +45,7 @@ import (
"github.com/talos-systems/talos/pkg/provision/providers/qemu"
)
//nolint: maligned
type upgradeSpec struct {
ShortName string
@ -53,10 +54,12 @@ type upgradeSpec struct {
SourceInstallerImage string
SourceVersion string
SourceK8sVersion string
SourceSelfHosted bool
TargetInstallerImage string
TargetVersion string
TargetK8sVersion string
TargetSelfHosted bool
MasterNodes int
WorkerNodes int
@ -67,11 +70,11 @@ type upgradeSpec struct {
const (
previousRelease = "v0.7.1"
stableRelease = "v0.8.0"
stableRelease = "v0.8.3"
previousK8sVersion = "1.19.4"
stableK8sVersion = "1.20.1"
currentK8sVersion = "1.20.2"
currentK8sVersion = "1.20.2" //nolint: deadcode,varcheck
)
var (
@ -99,10 +102,12 @@ func upgradeBetweenTwoLastReleases() upgradeSpec {
SourceInstallerImage: fmt.Sprintf("%s:%s", "ghcr.io/talos-systems/installer", previousRelease),
SourceVersion: previousRelease,
SourceK8sVersion: previousK8sVersion,
SourceSelfHosted: true,
TargetInstallerImage: fmt.Sprintf("%s:%s", "ghcr.io/talos-systems/installer", stableRelease),
TargetVersion: stableRelease,
TargetK8sVersion: stableK8sVersion,
TargetSelfHosted: true,
MasterNodes: DefaultSettings.MasterNodes,
WorkerNodes: DefaultSettings.WorkerNodes,
@ -119,10 +124,12 @@ func upgradeStableReleaseToCurrent() upgradeSpec {
SourceInstallerImage: fmt.Sprintf("%s:%s", "ghcr.io/talos-systems/installer", stableRelease),
SourceVersion: stableRelease,
SourceK8sVersion: stableK8sVersion,
SourceSelfHosted: true,
TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion),
TargetVersion: DefaultSettings.CurrentVersion,
TargetK8sVersion: currentK8sVersion,
TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion,
TargetSelfHosted: false,
MasterNodes: DefaultSettings.MasterNodes,
WorkerNodes: DefaultSettings.WorkerNodes,
@ -132,17 +139,19 @@ func upgradeStableReleaseToCurrent() upgradeSpec {
// upgradeSingeNodePreserve upgrade last release of Talos to the current version of Talos for single-node cluster with preserve.
func upgradeSingeNodePreserve() upgradeSpec {
return upgradeSpec{
ShortName: fmt.Sprintf("preserve-%s-%s", stableRelease, DefaultSettings.CurrentVersion),
ShortName: fmt.Sprintf("prsrv-%s-%s", stableRelease, DefaultSettings.CurrentVersion),
SourceKernelPath: helpers.ArtifactPath(filepath.Join(trimVersion(stableRelease), constants.KernelAsset)),
SourceInitramfsPath: helpers.ArtifactPath(filepath.Join(trimVersion(stableRelease), constants.InitramfsAsset)),
SourceInstallerImage: fmt.Sprintf("%s:%s", "ghcr.io/talos-systems/installer", stableRelease),
SourceVersion: stableRelease,
SourceK8sVersion: stableK8sVersion,
SourceSelfHosted: true,
TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion),
TargetVersion: DefaultSettings.CurrentVersion,
TargetK8sVersion: stableK8sVersion, // TODO: looks like single-node can't upgrade k8s
TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion
TargetSelfHosted: false,
MasterNodes: 1,
WorkerNodes: 0,
@ -153,17 +162,19 @@ func upgradeSingeNodePreserve() upgradeSpec {
// upgradeSingeNodeStage upgrade last release of Talos to the current version of Talos for single-node cluster with preserve and stage.
func upgradeSingeNodeStage() upgradeSpec {
return upgradeSpec{
ShortName: fmt.Sprintf("preserve-stage-%s-%s", stableRelease, DefaultSettings.CurrentVersion),
ShortName: fmt.Sprintf("prsrv-stg-%s-%s", stableRelease, DefaultSettings.CurrentVersion),
SourceKernelPath: helpers.ArtifactPath(filepath.Join(trimVersion(stableRelease), constants.KernelAsset)),
SourceInitramfsPath: helpers.ArtifactPath(filepath.Join(trimVersion(stableRelease), constants.InitramfsAsset)),
SourceInstallerImage: fmt.Sprintf("%s:%s", "ghcr.io/talos-systems/installer", stableRelease),
SourceVersion: stableRelease,
SourceK8sVersion: stableK8sVersion,
SourceSelfHosted: true,
TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion),
TargetVersion: DefaultSettings.CurrentVersion,
TargetK8sVersion: stableK8sVersion,
TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion
TargetSelfHosted: false,
MasterNodes: 1,
WorkerNodes: 0,
@ -470,7 +481,23 @@ func (suite *UpgradeSuite) upgradeNode(client *talosclient.Client, node provisio
nodeCtx := talosclient.WithNodes(suite.ctx, node.IPs[0].String())
resp, err := client.Upgrade(nodeCtx, suite.spec.TargetInstallerImage, suite.spec.UpgradePreserve, suite.spec.UpgradeStage, false)
var (
resp *machineapi.UpgradeResponse
err error
)
err = retry.Constant(time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error {
resp, err = client.Upgrade(nodeCtx, suite.spec.TargetInstallerImage, suite.spec.UpgradePreserve, suite.spec.UpgradeStage, false)
if err != nil {
if strings.Contains(err.Error(), "leader changed") {
return retry.ExpectedError(err)
}
return retry.UnexpectedError(err)
}
return nil
})
err = base.IgnoreGRPCUnavailable(err)
suite.Require().NoError(err)
@ -503,6 +530,27 @@ func (suite *UpgradeSuite) upgradeNode(client *talosclient.Client, node provisio
suite.waitForClusterHealth()
}
func (suite *UpgradeSuite) convertSelfHosted(fromSelfHosted, toSelfHosted bool) {
if fromSelfHosted == toSelfHosted {
suite.T().Logf("skipping control plane conversion, as self hosted is %v -> %v", fromSelfHosted, toSelfHosted)
return
}
if toSelfHosted {
suite.Require().FailNow("conversion to self-hosted is not supported")
}
suite.T().Logf("converting Kubernetes to static pods")
options := kubernetes.ConvertOptions{
ControlPlaneEndpoint: suite.controlPlaneEndpoint,
ForceYes: true,
}
suite.Require().NoError(kubernetes.ConvertToStaticPods(suite.ctx, suite.clusterAccess, options))
}
func (suite *UpgradeSuite) upgradeKubernetes(fromVersion, toVersion string) {
if fromVersion == toVersion {
suite.T().Logf("skipping Kubernetes upgrade, as versions are equal %q -> %q", fromVersion, toVersion)
@ -512,12 +560,18 @@ func (suite *UpgradeSuite) upgradeKubernetes(fromVersion, toVersion string) {
suite.T().Logf("upgrading Kubernetes: %q -> %q", fromVersion, toVersion)
suite.Require().NoError(kubernetes.Upgrade(suite.ctx, suite.clusterAccess, kubernetes.UpgradeOptions{
options := kubernetes.UpgradeOptions{
FromVersion: fromVersion,
ToVersion: toVersion,
ControlPlaneEndpoint: suite.controlPlaneEndpoint,
}))
}
if suite.spec.TargetSelfHosted {
suite.Require().NoError(kubernetes.UpgradeSelfHosted(suite.ctx, suite.clusterAccess, options))
} else {
suite.Require().NoError(kubernetes.UpgradeTalosManaged(suite.ctx, suite.clusterAccess, options))
}
}
func (suite *UpgradeSuite) untaint(name string) {
@ -561,9 +615,6 @@ func (suite *UpgradeSuite) TestRolling() {
// verify initial cluster version
suite.assertSameVersionCluster(client, suite.spec.SourceVersion)
// upgrade Kubernetes if required
suite.upgradeKubernetes(suite.spec.SourceK8sVersion, suite.spec.TargetK8sVersion)
// upgrade master nodes
for _, node := range suite.Cluster.Info().Nodes {
if node.Type == machine.TypeInit || node.Type == machine.TypeControlPlane {
@ -581,6 +632,12 @@ func (suite *UpgradeSuite) TestRolling() {
// verify final cluster version
suite.assertSameVersionCluster(client, suite.spec.TargetVersion)
// convert to static pods if required
suite.convertSelfHosted(suite.spec.SourceSelfHosted, suite.spec.TargetSelfHosted)
// upgrade Kubernetes if required
suite.upgradeKubernetes(suite.spec.SourceK8sVersion, suite.spec.TargetK8sVersion)
// run e2e test
suite.runE2E(suite.spec.TargetK8sVersion)
}

View File

@ -12,6 +12,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
k8s "github.com/talos-systems/talos/pkg/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/client"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
)
@ -32,6 +33,7 @@ type K8sProvider interface {
Kubeconfig(ctx context.Context) ([]byte, error)
K8sRestConfig(ctx context.Context) (*rest.Config, error)
K8sClient(ctx context.Context) (*kubernetes.Clientset, error)
K8sHelper(ctx context.Context) (*k8s.Client, error)
}
// CrashDumper captures Talos cluster state to the specified writer for debugging.

View File

@ -35,3 +35,17 @@ func (k *KubernetesFromKubeletClient) K8sClient(ctx context.Context) (*k8s.Clien
return k.clientset, nil
}
// K8sHelper returns wrapper around K8sClient.
func (k *KubernetesFromKubeletClient) K8sHelper(ctx context.Context) (*kubernetes.Client, error) {
if k.KubeHelper != nil {
return k.KubeHelper, nil
}
_, err := k.K8sClient(ctx)
if err != nil {
return nil, err
}
return k.KubeHelper, nil
}

View File

@ -95,3 +95,17 @@ func (k *KubernetesClient) K8sClient(ctx context.Context) (*kubernetes.Clientset
return k.clientset, nil
}
// K8sHelper returns wrapper around K8sClient.
func (k *KubernetesClient) K8sHelper(ctx context.Context) (*k8s.Client, error) {
if k.KubeHelper != nil {
return k.KubeHelper, nil
}
_, err := k.K8sClient(ctx)
if err != nil {
return nil, err
}
return k.KubeHelper, nil
}

View File

@ -0,0 +1,673 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package kubernetes
import (
"bufio"
"context"
"fmt"
"io"
"log"
"os"
"strings"
"time"
"github.com/AlekSi/pointer"
"github.com/talos-systems/crypto/x509"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/os-runtime/pkg/resource"
"google.golang.org/protobuf/types/known/emptypb"
"gopkg.in/yaml.v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/talos-systems/talos/pkg/cluster"
"github.com/talos-systems/talos/pkg/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
"github.com/talos-systems/talos/pkg/machinery/config/configloader"
v1alpha1config "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/generate"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/k8s"
"github.com/talos-systems/talos/pkg/resources/v1alpha1"
)
// ConvertOptions are options for convert tasks.
type ConvertOptions struct {
ControlPlaneEndpoint string
ForceYes bool
masterNodes []string
}
// ConvertProvider are the cluster interfaces required by converter.
type ConvertProvider interface {
cluster.ClientProvider
cluster.K8sProvider
}
// ConvertToStaticPods the self-hosted Kubernetes control plane to Talos-managed static pods-based control plane.
//
//nolint: gocyclo
func ConvertToStaticPods(ctx context.Context, cluster ConvertProvider, options ConvertOptions) error {
var err error
k8sClient, err := cluster.K8sHelper(ctx)
if err != nil {
return fmt.Errorf("error building kubernetes client: %w", err)
}
options.masterNodes, err = k8sClient.MasterIPs(ctx)
if err != nil {
return fmt.Errorf("error fetching master nodes: %w", err)
}
if len(options.masterNodes) == 0 {
return fmt.Errorf("no master nodes discovered")
}
fmt.Printf("discovered master nodes %q\n", options.masterNodes)
selfHosted, err := IsSelfHostedControlPlane(ctx, cluster, options.masterNodes[0])
if err != nil {
return fmt.Errorf("error checkign self-hosted control plane status: %w", err)
}
fmt.Printf("current self-hosted status: %v\n", selfHosted)
if selfHosted {
if err = updateNodeConfig(ctx, cluster, &options); err != nil {
return err
}
if err = waitResourcesReady(ctx, cluster, &options); err != nil {
return err
}
fmt.Println("Talos generated control plane static pod definitions and bootstrap manifests, please verify them with commands:")
fmt.Printf("\ttalosctl -n <master node IP> get %s\n", k8s.StaticPodType)
fmt.Printf("\ttalosctl -n <master node IP> get %s\n", k8s.ManifestType)
fmt.Println()
fmt.Println("bootstrap manifests will only be applied for missing resources, existing resources will not be updated")
if !options.ForceYes {
var yes bool
yes, err = askYesNo("confirm disabling pod-checkpointer to proceed with control plane update")
if err != nil {
return err
}
if !yes {
return fmt.Errorf("aborted")
}
}
if err = disablePodCheckpointer(ctx, cluster); err != nil {
return err
}
if !options.ForceYes {
var yes bool
yes, err = askYesNo("confirm applying static pod definitions and manifests")
if err != nil {
return err
}
if !yes {
return fmt.Errorf("aborted")
}
}
if err = removeInitializedKey(ctx, cluster, options.masterNodes[0]); err != nil {
return err
}
}
for _, ds := range []string{kubeAPIServer, kubeControllerManager, kubeScheduler} {
// API server won't be ready as it can't bind to the port
if err = waitForStaticPods(ctx, cluster, &options, ds, ds != kubeAPIServer); err != nil {
return err
}
}
for _, ds := range []string{kubeAPIServer, kubeControllerManager, kubeScheduler} {
if err = deleteDaemonset(ctx, cluster, ds, false); err != nil {
return err
}
if err = waitForStaticPods(ctx, cluster, &options, ds, true); err != nil {
return err
}
}
fmt.Println("conversion process completed successfully")
return nil
}
// IsSelfHostedControlPlane returns true if cluster is still running bootkube self-hosted control plane.
func IsSelfHostedControlPlane(ctx context.Context, cluster cluster.ClientProvider, node string) (bool, error) {
c, err := cluster.Client()
if err != nil {
return false, fmt.Errorf("error building Talos API client: %w", err)
}
ctx = client.WithNodes(ctx, node)
resources, err := c.Resources.Get(ctx, v1alpha1.NamespaceName, v1alpha1.BootstrapStatusType, v1alpha1.BootstrapStatusID)
if err != nil {
return false, fmt.Errorf("error fetching bootstrapStatus resource: %w", err)
}
if len(resources) != 1 {
return false, fmt.Errorf("expected 1 instance of bootstrapStatus resource, got %d", len(resources))
}
r := resources[0]
return r.Resource.(*resource.Any).Value().(map[string]interface{})["selfHostedControlPlane"].(bool), nil
}
// updateNodeConfig reads self-hosted settings and secrets from K8s and stores them back to node configs.
//
//nolint: gocyclo
func updateNodeConfig(ctx context.Context, cluster ConvertProvider, options *ConvertOptions) error {
fmt.Println("gathering control plane configuration")
k8sClient, err := cluster.K8sHelper(ctx)
if err != nil {
return fmt.Errorf("error building kubernetes client: %w", err)
}
type NodeConfigPath struct {
ServiceAccount *x509.PEMEncodedKey
AggregatorCA *x509.PEMEncodedCertificateAndKey
KubeAPIServerImage string
KubeControllerManagerImage string
KubeSchedulerImage string
}
var patch NodeConfigPath
secret, err := k8sClient.CoreV1().Secrets(namespace).Get(ctx, kubeControllerManager, v1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching kube-controller-manager secret: %w", err)
}
patch.ServiceAccount = &x509.PEMEncodedKey{}
patch.ServiceAccount.Key = secret.Data["service-account.key"]
if patch.ServiceAccount.Key == nil {
return fmt.Errorf("service-account.key missing")
}
fmt.Println("aggregator CA key can't be recovered from bootkube-boostrapped control plane, generating new CA")
aggregatorCA, err := generate.NewAggregatorCA(time.Now())
if err != nil {
return fmt.Errorf("error generating aggregator CA: %w", err)
}
patch.AggregatorCA = x509.NewCertificateAndKeyFromCertificateAuthority(aggregatorCA)
for _, name := range []string{kubeAPIServer, kubeControllerManager, kubeScheduler} {
var ds *appsv1.DaemonSet
ds, err = k8sClient.AppsV1().DaemonSets(namespace).Get(ctx, name, v1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching %q daemonset: %w", name, err)
}
image := ds.Spec.Template.Spec.Containers[0].Image
switch name {
case kubeAPIServer:
patch.KubeAPIServerImage = image
case kubeControllerManager:
patch.KubeControllerManagerImage = image
case kubeScheduler:
patch.KubeSchedulerImage = image
}
}
for _, node := range options.masterNodes {
fmt.Printf("patching master node %q configuration\n", node)
if err = patchNodeConfig(ctx, cluster, node, func(config *v1alpha1config.Config) error {
if config.ClusterConfig == nil {
config.ClusterConfig = &v1alpha1config.ClusterConfig{}
}
config.ClusterConfig.ClusterServiceAccount = patch.ServiceAccount
config.ClusterConfig.ClusterAggregatorCA = patch.AggregatorCA
if config.ClusterConfig.APIServerConfig == nil {
config.ClusterConfig.APIServerConfig = &v1alpha1config.APIServerConfig{}
}
config.ClusterConfig.APIServerConfig.ContainerImage = patch.KubeAPIServerImage
if config.ClusterConfig.ControllerManagerConfig == nil {
config.ClusterConfig.ControllerManagerConfig = &v1alpha1config.ControllerManagerConfig{}
}
config.ClusterConfig.ControllerManagerConfig.ContainerImage = patch.KubeControllerManagerImage
if config.ClusterConfig.SchedulerConfig == nil {
config.ClusterConfig.SchedulerConfig = &v1alpha1config.SchedulerConfig{}
}
config.ClusterConfig.SchedulerConfig.ContainerImage = patch.KubeSchedulerImage
return nil
}); err != nil {
return fmt.Errorf("error patching node %q config: %w", node, err)
}
}
return nil
}
// patchNodeConfig updates node configuration by means of patch function.
//
//nolint: gocyclo
func patchNodeConfig(ctx context.Context, cluster ConvertProvider, node string, patchFunc func(config *v1alpha1config.Config) error) error {
c, err := cluster.Client()
if err != nil {
return fmt.Errorf("error building Talos API client: %w", err)
}
ctx = client.WithNodes(ctx, node)
resources, err := c.Resources.Get(ctx, config.NamespaceName, config.V1Alpha1Type, config.V1Alpha1ID)
if err != nil {
return fmt.Errorf("error fetching config resource: %w", err)
}
if len(resources) != 1 {
return fmt.Errorf("expected 1 instance of config resource, got %d", len(resources))
}
r := resources[0]
yamlConfig, err := yaml.Marshal(r.Resource.Spec())
if err != nil {
return fmt.Errorf("error getting YAML config: %w", err)
}
config, err := configloader.NewFromBytes(yamlConfig)
if err != nil {
return fmt.Errorf("error loading config: %w", err)
}
cfg, ok := config.(*v1alpha1config.Config)
if !ok {
return fmt.Errorf("config is not v1alpha1 config")
}
if !cfg.Persist() {
return fmt.Errorf("config persistence is disabled, patching is not supported")
}
if err = patchFunc(cfg); err != nil {
return fmt.Errorf("error patching config: %w", err)
}
cfgBytes, err := cfg.Bytes()
if err != nil {
return fmt.Errorf("error serializing config: %w", err)
}
_, err = c.ApplyConfiguration(ctx, &machine.ApplyConfigurationRequest{
Data: cfgBytes,
Immediate: true,
})
if err != nil {
return fmt.Errorf("error applying config: %w", err)
}
return nil
}
// waitResourcesReady waits for manifests and static pod definitions to be generated.
//
//nolint: gocyclo
func waitResourcesReady(ctx context.Context, cluster ConvertProvider, options *ConvertOptions) error {
c, err := cluster.Client()
if err != nil {
return fmt.Errorf("error building Talos API client: %w", err)
}
ctx = client.WithNodes(ctx, options.masterNodes...)
fmt.Println("waiting for static pod definitions to be generated")
if err := retry.Constant(3*time.Minute, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
listClient, err := c.Resources.List(ctx, k8s.ControlPlaneNamespaceName, k8s.StaticPodType)
if err != nil {
return retry.UnexpectedError(fmt.Errorf("error listing static pod resources: %w", err))
}
count := 0
for {
resp, err := listClient.Recv()
if err == io.EOF {
break
}
if err != nil {
return retry.UnexpectedError(fmt.Errorf("error list listing static pods resources: %w", err))
}
if resp.Resource != nil {
count++
}
}
if count != len(options.masterNodes)*3 {
return retry.ExpectedError(fmt.Errorf("expected %d static pods, found %d", len(options.masterNodes)*3, count))
}
return nil
}); err != nil {
return fmt.Errorf("error waiting for static pods to be generated: %w", err)
}
fmt.Println("waiting for manifests to be generated")
if err := retry.Constant(3*time.Minute, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
listClient, err := c.Resources.List(ctx, k8s.ControlPlaneNamespaceName, k8s.ManifestType)
if err != nil {
return retry.UnexpectedError(fmt.Errorf("error listing static pod resources: %w", err))
}
nodes := make(map[string]struct{})
for _, node := range options.masterNodes {
nodes[node] = struct{}{}
}
for {
resp, err := listClient.Recv()
if err == io.EOF {
break
}
if err != nil {
return retry.UnexpectedError(fmt.Errorf("error list listing static pods resources: %w", err))
}
if resp.Resource != nil {
delete(nodes, resp.Metadata.GetHostname())
}
}
if len(nodes) > 0 {
return retry.ExpectedError(fmt.Errorf("some nodes don't have manifests generated: %v", nodes))
}
return nil
}); err != nil {
return fmt.Errorf("error waiting for manifests to be generated: %w", err)
}
return nil
}
// removeInitializedKey removes bootkube boostrap initialized key releasing static pods and manifests.
func removeInitializedKey(ctx context.Context, cluster cluster.ClientProvider, node string) error {
c, err := cluster.Client()
if err != nil {
return fmt.Errorf("error building Talos API client: %w", err)
}
ctx = client.WithNodes(ctx, node)
fmt.Println("removing self-hosted initialized key")
_, err = c.MachineClient.RemoveBootkubeInitializedKey(ctx, &emptypb.Empty{})
if err != nil {
return fmt.Errorf("error removing self-hosted iniitialized key: %w", err)
}
return nil
}
// waitForStaticPods waits for static pods to be present in the API server.
//
//nolint: gocyclo
func waitForStaticPods(ctx context.Context, cluster ConvertProvider, options *ConvertOptions, k8sApp string, checkReady bool) error {
fmt.Printf("waiting for static pods for %q to be present in the API server state\n", k8sApp)
k8sClient, err := cluster.K8sHelper(ctx)
if err != nil {
return fmt.Errorf("error building kubernetes client: %w", err)
}
return retry.Constant(3*time.Minute, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
pods, err := k8sClient.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{
LabelSelector: fmt.Sprintf("k8s-app = %s", k8sApp),
})
if err != nil {
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) {
return retry.ExpectedError(err)
}
return retry.UnexpectedError(err)
}
count := 0
for _, pod := range pods.Items {
if pod.Annotations[constants.AnnotationStaticPodSecretsVersion] == "" {
continue
}
staticPod := false
for _, ref := range pod.OwnerReferences {
if ref.Kind == "Node" {
staticPod = true
break
}
}
if !staticPod {
continue
}
if checkReady {
ready := false
for _, condition := range pod.Status.Conditions {
if condition.Type != "Ready" {
continue
}
if condition.Status == "True" {
ready = true
break
}
}
if !ready {
continue
}
}
count++
}
if count != len(options.masterNodes) {
return retry.ExpectedError(fmt.Errorf("found only %d static pods for %q, expecting %d pods", count, k8sApp, len(options.masterNodes)))
}
return nil
})
}
// disablePodCheckpointer disables pod checkpointer and takes daemonsets out of pod-checkpointer control.
func disablePodCheckpointer(ctx context.Context, cluster ConvertProvider) error {
k8sClient, err := cluster.K8sHelper(ctx)
if err != nil {
return fmt.Errorf("error building kubernetes client: %w", err)
}
fmt.Println("disabling pod-checkpointer")
if err = deleteDaemonset(ctx, cluster, "pod-checkpointer", false); err != nil {
return fmt.Errorf("error deleting pod-checkpointer: %w", err)
}
// pod-checkpointer should clean up checkpoints after itself
fmt.Println("checking for active pod checkpoints")
return retry.Constant(7*time.Minute, retry.WithUnits(20*time.Second), retry.WithErrorLogging(true)).Retry(func() error {
var checkpoints []string
checkpoints, err = getActiveCheckpoints(ctx, k8sClient)
if err != nil {
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) {
return retry.ExpectedError(err)
}
return retry.UnexpectedError(err)
}
if len(checkpoints) > 0 {
return retry.ExpectedError(fmt.Errorf("found %d active pod checkpoints: %v", len(checkpoints), checkpoints))
}
return nil
})
}
func getActiveCheckpoints(ctx context.Context, k8sClient *kubernetes.Client) ([]string, error) {
pods, err := k8sClient.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("error listing pods: %w", err)
}
checkpoints := []string{}
pendingCheckpoints := []string{}
for _, pod := range pods.Items {
if _, exists := pod.Annotations[checkpointedPodAnnotation]; exists {
if pod.Status.Phase == corev1.PodPending {
pendingCheckpoints = append(pendingCheckpoints, pod.Name)
}
checkpoints = append(checkpoints, pod.Name)
}
}
if len(pendingCheckpoints) == len(checkpoints) && len(checkpoints) > 0 {
log.Printf("deleting pending checkpoints %v", pendingCheckpoints)
for _, name := range pendingCheckpoints {
if err = k8sClient.CoreV1().Pods(namespace).Delete(ctx, name, v1.DeleteOptions{
GracePeriodSeconds: pointer.ToInt64(0),
}); err != nil {
return nil, fmt.Errorf("error deleting pod: %w", err)
}
}
}
return checkpoints, nil
}
// deleteDaemonset deletes daemonset and waits for all the pods to be removed.
//
//nolint: gocyclo
func deleteDaemonset(ctx context.Context, cluster ConvertProvider, k8sApp string, anyPod bool) error {
fmt.Printf("deleting daemonset %q\n", k8sApp)
k8sClient, err := cluster.K8sHelper(ctx)
if err != nil {
return fmt.Errorf("error building kubernetes client: %w", err)
}
if err = retry.Constant(time.Minute, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
err = k8sClient.AppsV1().DaemonSets(namespace).Delete(ctx, k8sApp, v1.DeleteOptions{})
if err != nil {
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) {
return retry.ExpectedError(err)
}
if apierrors.IsNotFound(err) {
return nil
}
return retry.UnexpectedError(err)
}
return nil
}); err != nil {
return fmt.Errorf("error deleting daemonset %q: %w", k8sApp, err)
}
return retry.Constant(3*time.Minute, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
pods, err := k8sClient.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{
LabelSelector: fmt.Sprintf("k8s-app = %s", k8sApp),
})
if err != nil {
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) {
return retry.ExpectedError(err)
}
return retry.UnexpectedError(err)
}
count := 0
for _, pod := range pods.Items {
if anyPod {
count++
continue
}
for _, ref := range pod.OwnerReferences {
if ref.Kind == "DaemonSet" {
count++
}
}
}
if count > 0 {
return retry.ExpectedError(fmt.Errorf("still %d pods found for %q", count, k8sApp))
}
return nil
})
}
func askYesNo(prompt string) (bool, error) {
reader := bufio.NewReader(os.Stdin)
for {
fmt.Printf("%s [yes/no]: ", prompt)
response, err := reader.ReadString('\n')
if err != nil {
return false, err
}
switch strings.ToLower(strings.TrimSpace(response)) {
case "yes", "y":
return true, nil
case "no", "n":
return false, nil
}
}
}

View File

@ -0,0 +1,350 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package kubernetes
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/talos-systems/go-retry/retry"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
"github.com/talos-systems/talos/pkg/cluster"
"github.com/talos-systems/talos/pkg/machinery/constants"
)
// UpgradeSelfHosted the Kubernetes control plane.
//
//nolint: gocyclo
func UpgradeSelfHosted(ctx context.Context, cluster cluster.K8sProvider, options UpgradeOptions) error {
switch {
case strings.HasPrefix(options.FromVersion, "1.18.") && strings.HasPrefix(options.ToVersion, "1.19."):
return hyperkubeUpgrade(ctx, cluster, options)
case strings.HasPrefix(options.FromVersion, "1.19.") && strings.HasPrefix(options.ToVersion, "1.19."):
return hyperkubeUpgrade(ctx, cluster, options)
case strings.HasPrefix(options.FromVersion, "1.19.") && strings.HasPrefix(options.ToVersion, "1.20."):
options.extraUpdaters = append(options.extraUpdaters, addControlPlaneToleration())
options.podCheckpointerExtraUpdaters = append(options.podCheckpointerExtraUpdaters, addControlPlaneToleration())
serviceAccountUpdater, err := kubeAPIServerServiceAccountPatch(options)
if err != nil {
return err
}
options.extraUpdaters = append(options.extraUpdaters, serviceAccountUpdater)
if err = serviceAccountSecretsUpdate(ctx, cluster); err != nil {
return err
}
return hyperkubeUpgrade(ctx, cluster, options)
case strings.HasPrefix(options.FromVersion, "1.20.") && strings.HasPrefix(options.ToVersion, "1.20."):
return hyperkubeUpgrade(ctx, cluster, options)
default:
return fmt.Errorf("unsupported upgrade from %q to %q", options.FromVersion, options.ToVersion)
}
}
// hyperkubeUpgrade upgrades from hyperkube-based to distroless images in 1.19.
func hyperkubeUpgrade(ctx context.Context, cluster cluster.K8sProvider, options UpgradeOptions) error {
clientset, err := cluster.K8sClient(ctx)
if err != nil {
return fmt.Errorf("error building K8s client: %w", err)
}
if err = podCheckpointerGracePeriod(ctx, clientset, "0m"); err != nil {
return fmt.Errorf("error setting pod-checkpointer grace period: %w", err)
}
graceTimeout := 5 * time.Minute
fmt.Printf("sleeping %s to let the pod-checkpointer self-checkpoint be updated\n", graceTimeout.String())
time.Sleep(graceTimeout)
daemonsets := []string{kubeAPIServer, kubeControllerManager, kubeScheduler, kubeProxy}
for _, ds := range daemonsets {
if err = hyperkubeUpgradeDs(ctx, clientset, ds, options); err != nil {
return fmt.Errorf("failed updating daemonset %q: %w", ds, err)
}
}
if err = podCheckpointerGracePeriod(ctx, clientset, graceTimeout.String(), options.podCheckpointerExtraUpdaters...); err != nil {
return fmt.Errorf("error setting pod-checkpointer grace period: %w", err)
}
return nil
}
//nolint: gocyclo
func updateDaemonset(ctx context.Context, clientset *kubernetes.Clientset, ds string, updateFunc func(daemonset *appsv1.DaemonSet) error) error {
daemonset, err := clientset.AppsV1().DaemonSets(namespace).Get(ctx, ds, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching daemonset: %w", err)
}
oldData, err := json.Marshal(daemonset)
if err != nil {
return fmt.Errorf("error marshaling deployment: %w", err)
}
if err = updateFunc(daemonset); err != nil {
return err
}
newData, err := json.Marshal(daemonset)
if err != nil {
return fmt.Errorf("error marshaling new deployment: %w", err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, appsv1.DaemonSet{})
if err != nil {
return fmt.Errorf("failed to create two way merge patch: %w", err)
}
_, err = clientset.AppsV1().DaemonSets(namespace).Patch(ctx, daemonset.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{
FieldManager: "talos",
})
if err != nil {
return fmt.Errorf("error patching deployment: %w", err)
}
// give k8s some time
time.Sleep(10 * time.Second)
return retry.Constant(5*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error {
daemonset, err = clientset.AppsV1().DaemonSets(namespace).Get(ctx, ds, metav1.GetOptions{})
if err != nil {
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) {
return retry.ExpectedError(err)
}
return retry.UnexpectedError(fmt.Errorf("error fetching daemonset: %w", err))
}
if daemonset.Status.UpdatedNumberScheduled != daemonset.Status.DesiredNumberScheduled {
return retry.ExpectedError(fmt.Errorf("expected current number up-to-date for %s to be %d, got %d", ds, daemonset.Status.UpdatedNumberScheduled, daemonset.Status.CurrentNumberScheduled))
}
if daemonset.Status.CurrentNumberScheduled != daemonset.Status.DesiredNumberScheduled {
return retry.ExpectedError(fmt.Errorf("expected current number scheduled for %s to be %d, got %d", ds, daemonset.Status.DesiredNumberScheduled, daemonset.Status.CurrentNumberScheduled))
}
if daemonset.Status.NumberAvailable != daemonset.Status.DesiredNumberScheduled {
return retry.ExpectedError(fmt.Errorf("expected number available for %s to be %d, got %d", ds, daemonset.Status.DesiredNumberScheduled, daemonset.Status.NumberAvailable))
}
if daemonset.Status.NumberReady != daemonset.Status.DesiredNumberScheduled {
return retry.ExpectedError(fmt.Errorf("expected number ready for %s to be %d, got %d", ds, daemonset.Status.DesiredNumberScheduled, daemonset.Status.NumberReady))
}
return nil
})
}
func podCheckpointerGracePeriod(ctx context.Context, clientset *kubernetes.Clientset, gracePeriod string, extraUpdaters ...daemonsetUpdater) error {
fmt.Printf("updating pod-checkpointer grace period to %q\n", gracePeriod)
return updateDaemonset(ctx, clientset, "pod-checkpointer", func(daemonset *appsv1.DaemonSet) error {
if len(daemonset.Spec.Template.Spec.Containers) != 1 {
return fmt.Errorf("unexpected number of containers: %d", len(daemonset.Spec.Template.Spec.Containers))
}
args := daemonset.Spec.Template.Spec.Containers[0].Command
for i := range args {
if strings.HasPrefix(args[i], "--checkpoint-grace-period=") {
args[i] = fmt.Sprintf("--checkpoint-grace-period=%s", gracePeriod)
}
}
for _, updater := range extraUpdaters {
if err := updater("pod-checkpointer", daemonset); err != nil {
return err
}
}
return nil
})
}
//nolint: gocyclo
func hyperkubeUpgradeDs(ctx context.Context, clientset *kubernetes.Clientset, ds string, options UpgradeOptions) error {
if ds == kubeAPIServer {
fmt.Printf("temporarily taking %q out of pod-checkpointer control\n", ds)
if err := updateDaemonset(ctx, clientset, ds, func(daemonset *appsv1.DaemonSet) error {
delete(daemonset.Spec.Template.Annotations, checkpointerAnnotation)
return nil
}); err != nil {
return err
}
}
fmt.Printf("updating daemonset %q to version %q\n", ds, options.ToVersion)
return updateDaemonset(ctx, clientset, ds, func(daemonset *appsv1.DaemonSet) error {
if len(daemonset.Spec.Template.Spec.Containers) != 1 {
return fmt.Errorf("unexpected number of containers: %d", len(daemonset.Spec.Template.Spec.Containers))
}
args := daemonset.Spec.Template.Spec.Containers[0].Command
if args[0] == "./hyperkube" || args[0] == "/hyperkube" {
args[0] = "/go-runner"
args[1] = fmt.Sprintf("/usr/local/bin/%s", ds)
if ds == kubeProxy {
daemonset.Spec.Template.Spec.Containers[0].Command = daemonset.Spec.Template.Spec.Containers[0].Command[1:]
}
}
switch ds {
case kubeAPIServer:
daemonset.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:v%s", constants.KubernetesAPIServerImage, options.ToVersion)
case kubeControllerManager:
daemonset.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:v%s", constants.KubernetesControllerManagerImage, options.ToVersion)
case kubeScheduler:
daemonset.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:v%s", constants.KubernetesSchedulerImage, options.ToVersion)
case kubeProxy:
daemonset.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:v%s", constants.KubernetesProxyImage, options.ToVersion)
default:
return fmt.Errorf("failed to build new image spec")
}
if ds == kubeAPIServer {
if daemonset.Spec.Template.Annotations == nil {
daemonset.Spec.Template.Annotations = make(map[string]string)
}
daemonset.Spec.Template.Annotations[checkpointerAnnotation] = "true"
}
for _, updater := range options.extraUpdaters {
if err := updater(ds, daemonset); err != nil {
return err
}
}
return nil
})
}
func serviceAccountSecretsUpdate(ctx context.Context, cluster cluster.K8sProvider) error {
const serviceAccountKey = "service-account.key"
clientset, err := cluster.K8sClient(ctx)
if err != nil {
return fmt.Errorf("error building K8s client: %w", err)
}
apiServerSecrets, err := clientset.CoreV1().Secrets(namespace).Get(ctx, kubeAPIServer, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching kube-apiserver secrets: %w", err)
}
if _, ok := apiServerSecrets.Data[serviceAccountKey]; ok {
return nil
}
controllerManagerSecrets, err := clientset.CoreV1().Secrets(namespace).Get(ctx, kubeControllerManager, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching kube-controller-manager secrets: %w", err)
}
if _, ok := controllerManagerSecrets.Data[serviceAccountKey]; !ok {
return fmt.Errorf("kube-controller-manager secrets missing %q secret", serviceAccountKey)
}
apiServerSecrets.Data[serviceAccountKey] = controllerManagerSecrets.Data[serviceAccountKey]
_, err = clientset.CoreV1().Secrets(namespace).Update(ctx, apiServerSecrets, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating kube-apiserver secrets: %w", err)
}
fmt.Printf("patched kube-apiserver secrets for %q\n", serviceAccountKey)
return nil
}
func addControlPlaneToleration() daemonsetUpdater {
return func(ds string, daemonset *appsv1.DaemonSet) error {
if ds == kubeProxy {
return nil
}
tolerationFound := false
for _, toleration := range daemonset.Spec.Template.Spec.Tolerations {
if toleration.Key == constants.LabelNodeRoleControlPlane {
tolerationFound = true
break
}
}
if tolerationFound {
return nil
}
daemonset.Spec.Template.Spec.Tolerations = append(daemonset.Spec.Template.Spec.Tolerations, corev1.Toleration{
Key: constants.LabelNodeRoleControlPlane,
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoSchedule,
})
return nil
}
}
func kubeAPIServerServiceAccountPatch(options UpgradeOptions) (daemonsetUpdater, error) {
if options.ControlPlaneEndpoint == "" {
return nil, fmt.Errorf("control plane endpoint is required for service account patch")
}
return func(ds string, daemonset *appsv1.DaemonSet) error {
if ds != kubeAPIServer {
return nil
}
argExists := func(argName string) bool {
prefix := fmt.Sprintf("--%s=", argName)
for _, arg := range daemonset.Spec.Template.Spec.Containers[0].Command {
if strings.HasPrefix(arg, prefix) {
return true
}
}
return false
}
if !argExists("api-audiences") {
daemonset.Spec.Template.Spec.Containers[0].Command = append(daemonset.Spec.Template.Spec.Containers[0].Command,
fmt.Sprintf("--api-audiences=%s", options.ControlPlaneEndpoint))
}
if !argExists("service-account-issuer") {
daemonset.Spec.Template.Spec.Containers[0].Command = append(daemonset.Spec.Template.Spec.Containers[0].Command,
fmt.Sprintf("--service-account-issuer=%s", options.ControlPlaneEndpoint))
}
if !argExists("service-account-signing-key") {
daemonset.Spec.Template.Spec.Containers[0].Command = append(daemonset.Spec.Template.Spec.Containers[0].Command,
"--service-account-signing-key-file=/etc/kubernetes/secrets/service-account.key")
}
return nil
}, nil
}

View File

@ -0,0 +1,23 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package kubernetes
import (
"context"
"fmt"
"github.com/talos-systems/talos/pkg/cluster"
)
// UpgradeProvider are the cluster interfaces required by upgrade process.
type UpgradeProvider interface {
cluster.ClientProvider
cluster.K8sProvider
}
// UpgradeTalosManaged the Kubernetes control plane.
func UpgradeTalosManaged(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions) error {
return fmt.Errorf("not implemented yet")
}

View File

@ -5,28 +5,13 @@
package kubernetes
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/talos-systems/go-retry/retry"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
"github.com/talos-systems/talos/pkg/cluster"
"github.com/talos-systems/talos/pkg/machinery/constants"
)
const (
namespace = "kube-system"
checkpointerAnnotation = "checkpointer.alpha.coreos.com/checkpoint"
namespace = "kube-system"
checkpointerAnnotation = "checkpointer.alpha.coreos.com/checkpoint"
checkpointedPodAnnotation = "checkpointer.alpha.coreos.com/checkpoint-of"
kubeAPIServer = "kube-apiserver"
kubeControllerManager = "kube-controller-manager"
@ -46,328 +31,3 @@ type UpgradeOptions struct {
}
type daemonsetUpdater func(ds string, daemonset *appsv1.DaemonSet) error
// Upgrade the Kubernetes control plane.
//
//nolint: gocyclo
func Upgrade(ctx context.Context, cluster cluster.K8sProvider, options UpgradeOptions) error {
switch {
case strings.HasPrefix(options.FromVersion, "1.18.") && strings.HasPrefix(options.ToVersion, "1.19."):
return hyperkubeUpgrade(ctx, cluster, options)
case strings.HasPrefix(options.FromVersion, "1.19.") && strings.HasPrefix(options.ToVersion, "1.19."):
return hyperkubeUpgrade(ctx, cluster, options)
case strings.HasPrefix(options.FromVersion, "1.19.") && strings.HasPrefix(options.ToVersion, "1.20."):
options.extraUpdaters = append(options.extraUpdaters, addControlPlaneToleration())
options.podCheckpointerExtraUpdaters = append(options.podCheckpointerExtraUpdaters, addControlPlaneToleration())
serviceAccountUpdater, err := kubeAPIServerServiceAccountPatch(options)
if err != nil {
return err
}
options.extraUpdaters = append(options.extraUpdaters, serviceAccountUpdater)
if err = serviceAccountSecretsUpdate(ctx, cluster); err != nil {
return err
}
return hyperkubeUpgrade(ctx, cluster, options)
case strings.HasPrefix(options.FromVersion, "1.20.") && strings.HasPrefix(options.ToVersion, "1.20."):
return hyperkubeUpgrade(ctx, cluster, options)
default:
return fmt.Errorf("unsupported upgrade from %q to %q", options.FromVersion, options.ToVersion)
}
}
// hyperkubeUpgrade upgrades from hyperkube-based to distroless images in 1.19.
func hyperkubeUpgrade(ctx context.Context, cluster cluster.K8sProvider, options UpgradeOptions) error {
clientset, err := cluster.K8sClient(ctx)
if err != nil {
return fmt.Errorf("error building K8s client: %w", err)
}
if err = podCheckpointerGracePeriod(ctx, clientset, "0m"); err != nil {
return fmt.Errorf("error setting pod-checkpointer grace period: %w", err)
}
graceTimeout := 5 * time.Minute
fmt.Printf("sleeping %s to let the pod-checkpointer self-checkpoint be updated\n", graceTimeout.String())
time.Sleep(graceTimeout)
daemonsets := []string{kubeAPIServer, kubeControllerManager, kubeScheduler, kubeProxy}
for _, ds := range daemonsets {
if err = hyperkubeUpgradeDs(ctx, clientset, ds, options); err != nil {
return fmt.Errorf("failed updating daemonset %q: %w", ds, err)
}
}
if err = podCheckpointerGracePeriod(ctx, clientset, graceTimeout.String(), options.podCheckpointerExtraUpdaters...); err != nil {
return fmt.Errorf("error setting pod-checkpointer grace period: %w", err)
}
return nil
}
//nolint: gocyclo
func updateDaemonset(ctx context.Context, clientset *kubernetes.Clientset, ds string, updateFunc func(daemonset *appsv1.DaemonSet) error) error {
daemonset, err := clientset.AppsV1().DaemonSets(namespace).Get(ctx, ds, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching daemonset: %w", err)
}
oldData, err := json.Marshal(daemonset)
if err != nil {
return fmt.Errorf("error marshaling deployment: %w", err)
}
if err = updateFunc(daemonset); err != nil {
return err
}
newData, err := json.Marshal(daemonset)
if err != nil {
return fmt.Errorf("error marshaling new deployment: %w", err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, appsv1.DaemonSet{})
if err != nil {
return fmt.Errorf("failed to create two way merge patch: %w", err)
}
_, err = clientset.AppsV1().DaemonSets(namespace).Patch(ctx, daemonset.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{
FieldManager: "talos",
})
if err != nil {
return fmt.Errorf("error patching deployment: %w", err)
}
// give k8s some time
time.Sleep(10 * time.Second)
return retry.Constant(5*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error {
daemonset, err = clientset.AppsV1().DaemonSets(namespace).Get(ctx, ds, metav1.GetOptions{})
if err != nil {
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) {
return retry.ExpectedError(err)
}
return retry.UnexpectedError(fmt.Errorf("error fetching daemonset: %w", err))
}
if daemonset.Status.UpdatedNumberScheduled != daemonset.Status.DesiredNumberScheduled {
return retry.ExpectedError(fmt.Errorf("expected current number up-to-date for %s to be %d, got %d", ds, daemonset.Status.UpdatedNumberScheduled, daemonset.Status.CurrentNumberScheduled))
}
if daemonset.Status.CurrentNumberScheduled != daemonset.Status.DesiredNumberScheduled {
return retry.ExpectedError(fmt.Errorf("expected current number scheduled for %s to be %d, got %d", ds, daemonset.Status.DesiredNumberScheduled, daemonset.Status.CurrentNumberScheduled))
}
if daemonset.Status.NumberAvailable != daemonset.Status.DesiredNumberScheduled {
return retry.ExpectedError(fmt.Errorf("expected number available for %s to be %d, got %d", ds, daemonset.Status.DesiredNumberScheduled, daemonset.Status.NumberAvailable))
}
if daemonset.Status.NumberReady != daemonset.Status.DesiredNumberScheduled {
return retry.ExpectedError(fmt.Errorf("expected number ready for %s to be %d, got %d", ds, daemonset.Status.DesiredNumberScheduled, daemonset.Status.NumberReady))
}
return nil
})
}
func podCheckpointerGracePeriod(ctx context.Context, clientset *kubernetes.Clientset, gracePeriod string, extraUpdaters ...daemonsetUpdater) error {
fmt.Printf("updating pod-checkpointer grace period to %q\n", gracePeriod)
return updateDaemonset(ctx, clientset, "pod-checkpointer", func(daemonset *appsv1.DaemonSet) error {
if len(daemonset.Spec.Template.Spec.Containers) != 1 {
return fmt.Errorf("unexpected number of containers: %d", len(daemonset.Spec.Template.Spec.Containers))
}
args := daemonset.Spec.Template.Spec.Containers[0].Command
for i := range args {
if strings.HasPrefix(args[i], "--checkpoint-grace-period=") {
args[i] = fmt.Sprintf("--checkpoint-grace-period=%s", gracePeriod)
}
}
for _, updater := range extraUpdaters {
if err := updater("pod-checkpointer", daemonset); err != nil {
return err
}
}
return nil
})
}
//nolint: gocyclo
func hyperkubeUpgradeDs(ctx context.Context, clientset *kubernetes.Clientset, ds string, options UpgradeOptions) error {
if ds == kubeAPIServer {
fmt.Printf("temporarily taking %q out of pod-checkpointer control\n", ds)
if err := updateDaemonset(ctx, clientset, ds, func(daemonset *appsv1.DaemonSet) error {
delete(daemonset.Spec.Template.Annotations, checkpointerAnnotation)
return nil
}); err != nil {
return err
}
}
fmt.Printf("updating daemonset %q to version %q\n", ds, options.ToVersion)
return updateDaemonset(ctx, clientset, ds, func(daemonset *appsv1.DaemonSet) error {
if len(daemonset.Spec.Template.Spec.Containers) != 1 {
return fmt.Errorf("unexpected number of containers: %d", len(daemonset.Spec.Template.Spec.Containers))
}
args := daemonset.Spec.Template.Spec.Containers[0].Command
if args[0] == "./hyperkube" || args[0] == "/hyperkube" {
args[0] = "/go-runner"
args[1] = fmt.Sprintf("/usr/local/bin/%s", ds)
if ds == kubeProxy {
daemonset.Spec.Template.Spec.Containers[0].Command = daemonset.Spec.Template.Spec.Containers[0].Command[1:]
}
}
switch ds {
case kubeAPIServer:
daemonset.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:v%s", constants.KubernetesAPIServerImage, options.ToVersion)
case kubeControllerManager:
daemonset.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:v%s", constants.KubernetesControllerManagerImage, options.ToVersion)
case kubeScheduler:
daemonset.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:v%s", constants.KubernetesSchedulerImage, options.ToVersion)
case kubeProxy:
daemonset.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:v%s", constants.KubernetesProxyImage, options.ToVersion)
default:
return fmt.Errorf("failed to build new image spec")
}
if ds == kubeAPIServer {
if daemonset.Spec.Template.Annotations == nil {
daemonset.Spec.Template.Annotations = make(map[string]string)
}
daemonset.Spec.Template.Annotations[checkpointerAnnotation] = "true"
}
for _, updater := range options.extraUpdaters {
if err := updater(ds, daemonset); err != nil {
return err
}
}
return nil
})
}
func serviceAccountSecretsUpdate(ctx context.Context, cluster cluster.K8sProvider) error {
const serviceAccountKey = "service-account.key"
clientset, err := cluster.K8sClient(ctx)
if err != nil {
return fmt.Errorf("error building K8s client: %w", err)
}
apiServerSecrets, err := clientset.CoreV1().Secrets(namespace).Get(ctx, kubeAPIServer, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching kube-apiserver secrets: %w", err)
}
if _, ok := apiServerSecrets.Data[serviceAccountKey]; ok {
return nil
}
controllerManagerSecrets, err := clientset.CoreV1().Secrets(namespace).Get(ctx, kubeControllerManager, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching kube-controller-manager secrets: %w", err)
}
if _, ok := controllerManagerSecrets.Data[serviceAccountKey]; !ok {
return fmt.Errorf("kube-controller-manager secrets missing %q secret", serviceAccountKey)
}
apiServerSecrets.Data[serviceAccountKey] = controllerManagerSecrets.Data[serviceAccountKey]
_, err = clientset.CoreV1().Secrets(namespace).Update(ctx, apiServerSecrets, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating kube-apiserver secrets: %w", err)
}
fmt.Printf("patched kube-apiserver secrets for %q\n", serviceAccountKey)
return nil
}
func addControlPlaneToleration() daemonsetUpdater {
return func(ds string, daemonset *appsv1.DaemonSet) error {
if ds == kubeProxy {
return nil
}
tolerationFound := false
for _, toleration := range daemonset.Spec.Template.Spec.Tolerations {
if toleration.Key == constants.LabelNodeRoleControlPlane {
tolerationFound = true
break
}
}
if tolerationFound {
return nil
}
daemonset.Spec.Template.Spec.Tolerations = append(daemonset.Spec.Template.Spec.Tolerations, corev1.Toleration{
Key: constants.LabelNodeRoleControlPlane,
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoSchedule,
})
return nil
}
}
func kubeAPIServerServiceAccountPatch(options UpgradeOptions) (daemonsetUpdater, error) {
if options.ControlPlaneEndpoint == "" {
return nil, fmt.Errorf("control plane endpoint is required for service account patch")
}
return func(ds string, daemonset *appsv1.DaemonSet) error {
if ds != kubeAPIServer {
return nil
}
argExists := func(argName string) bool {
prefix := fmt.Sprintf("--%s=", argName)
for _, arg := range daemonset.Spec.Template.Spec.Containers[0].Command {
if strings.HasPrefix(arg, prefix) {
return true
}
}
return false
}
if !argExists("api-audiences") {
daemonset.Spec.Template.Spec.Containers[0].Command = append(daemonset.Spec.Template.Spec.Containers[0].Command,
fmt.Sprintf("--api-audiences=%s", options.ControlPlaneEndpoint))
}
if !argExists("service-account-issuer") {
daemonset.Spec.Template.Spec.Containers[0].Command = append(daemonset.Spec.Template.Spec.Containers[0].Command,
fmt.Sprintf("--service-account-issuer=%s", options.ControlPlaneEndpoint))
}
if !argExists("service-account-signing-key") {
daemonset.Spec.Template.Spec.Containers[0].Command = append(daemonset.Spec.Template.Spec.Containers[0].Command,
"--service-account-signing-key-file=/etc/kubernetes/secrets/service-account.key")
}
return nil
}, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -110,6 +110,8 @@ description: Talos gRPC API reference.
- [Recover](#machine.Recover)
- [RecoverRequest](#machine.RecoverRequest)
- [RecoverResponse](#machine.RecoverResponse)
- [RemoveBootkubeInitializedKey](#machine.RemoveBootkubeInitializedKey)
- [RemoveBootkubeInitializedKeyResponse](#machine.RemoveBootkubeInitializedKeyResponse)
- [Reset](#machine.Reset)
- [ResetPartitionSpec](#machine.ResetPartitionSpec)
- [ResetRequest](#machine.ResetRequest)
@ -616,6 +618,7 @@ node.
| ----- | ---- | ----- | ----------- |
| data | [bytes](#bytes) | | |
| on_reboot | [bool](#bool) | | |
| immediate | [bool](#bool) | | |
@ -1855,6 +1858,36 @@ The recover message containing the recover status.
<a name="machine.RemoveBootkubeInitializedKey"></a>
### RemoveBootkubeInitializedKey
RemoveBootkubeInitializedKeyResponse describes the response to a RemoveBootkubeInitializedKey request.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| metadata | [common.Metadata](#common.Metadata) | | |
<a name="machine.RemoveBootkubeInitializedKeyResponse"></a>
### RemoveBootkubeInitializedKeyResponse
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| messages | [RemoveBootkubeInitializedKey](#machine.RemoveBootkubeInitializedKey) | repeated | |
<a name="machine.Reset"></a>
### Reset
@ -2785,6 +2818,7 @@ The machine service definition.
| Rollback | [RollbackRequest](#machine.RollbackRequest) | [RollbackResponse](#machine.RollbackResponse) | |
| Reset | [ResetRequest](#machine.ResetRequest) | [ResetResponse](#machine.ResetResponse) | |
| Recover | [RecoverRequest](#machine.RecoverRequest) | [RecoverResponse](#machine.RecoverResponse) | |
| RemoveBootkubeInitializedKey | [.google.protobuf.Empty](#google.protobuf.Empty) | [RemoveBootkubeInitializedKeyResponse](#machine.RemoveBootkubeInitializedKeyResponse) | |
| ServiceList | [.google.protobuf.Empty](#google.protobuf.Empty) | [ServiceListResponse](#machine.ServiceListResponse) | |
| ServiceRestart | [ServiceRestartRequest](#machine.ServiceRestartRequest) | [ServiceRestartResponse](#machine.ServiceRestartResponse) | |
| ServiceStart | [ServiceStartRequest](#machine.ServiceStartRequest) | [ServiceStartResponse](#machine.ServiceStartResponse) | |

View File

@ -19,6 +19,7 @@ talosctl apply-config [flags]
--cert-fingerprint strings list of server certificate fingeprints to accept (defaults to no check)
-f, --file string the filename of the updated configuration
-h, --help help for apply-config
--immediate apply the config immediately (without a reboot)
-i, --insecure apply the config using the insecure (encrypted with no auth) maintenance service
--interactive apply the config using text based interactive mode
--on-reboot apply the config on reboot
@ -521,6 +522,42 @@ talosctl containers [flags]
* [talosctl](#talosctl) - A CLI for out-of-band management of Kubernetes nodes created by Talos
## talosctl convert-k8s
Convert Kubernetes control plane from self-hosted (bootkube) to Talos-managed (static pods).
### Synopsis
Command converts control plane bootstrapped on Talos <= 0.8 to Talos-managed control plane (Talos >= 0.9).
As part of the conversion process tool reads existing configuration of the control plane, updates
Talos node configuration to reflect changes made since the boostrap time. Once config is updated,
tool releases static pods and deletes self-hosted DaemonSets.
```
talosctl convert-k8s [flags]
```
### Options
```
--endpoint string the cluster control plane endpoint
--force skip prompts, assume yes
-h, --help help for convert-k8s
```
### Options inherited from parent commands
```
--context string Context to be used in command
-e, --endpoints strings override default endpoints in Talos configuration
-n, --nodes strings target the specified nodes
--talosconfig string The path to the Talos configuration file (default "/home/user/.talos/config")
```
### SEE ALSO
* [talosctl](#talosctl) - A CLI for out-of-band management of Kubernetes nodes created by Talos
## talosctl copy
Copy data out from the node
@ -1873,6 +1910,7 @@ A CLI for out-of-band management of Kubernetes nodes created by Talos
* [talosctl completion](#talosctl-completion) - Output shell completion code for the specified shell (bash or zsh)
* [talosctl config](#talosctl-config) - Manage the client configuration
* [talosctl containers](#talosctl-containers) - List containers
* [talosctl convert-k8s](#talosctl-convert-k8s) - Convert Kubernetes control plane from self-hosted (bootkube) to Talos-managed (static pods).
* [talosctl copy](#talosctl-copy) - Copy data out from the node
* [talosctl crashdump](#talosctl-crashdump) - Dump debug information about the cluster
* [talosctl dashboard](#talosctl-dashboard) - Cluster dashboard with real-time metrics