Andrey Smirnov 085c61b2ec
chore: add a special condition to check for kubeconfig readiness
The problem is that the kubelet kubeconfig gets created early, but the
actual client key and cert files are not written, so controllers spam
with scary errors that the config is not valid. This PR removes those
scary messages as we wait for the kubeconfig to be usable.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
2021-09-17 00:07:38 +03:00

145 lines
4.2 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package cluster
import (
"context"
"fmt"
"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"
"github.com/talos-systems/talos/internal/pkg/discovery/registry"
"github.com/talos-systems/talos/pkg/conditions"
"github.com/talos-systems/talos/pkg/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/cluster"
"github.com/talos-systems/talos/pkg/resources/config"
)
// KubernetesPushController pushes Affiliate resource to the Kubernetes registry.
type KubernetesPushController struct {
localAffiliateID resource.ID
kubernetesClient *kubernetes.Client
}
// Name implements controller.Controller interface.
func (ctrl *KubernetesPushController) Name() string {
return "cluster.KubernetesPushController"
}
// Inputs implements controller.Controller interface.
func (ctrl *KubernetesPushController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: cluster.ConfigType,
ID: pointer.ToString(cluster.ConfigID),
Kind: controller.InputWeak,
},
{
Namespace: cluster.NamespaceName,
Type: cluster.IdentityType,
ID: pointer.ToString(cluster.LocalIdentity),
Kind: controller.InputWeak,
},
}
}
// Outputs implements controller.Controller interface.
func (ctrl *KubernetesPushController) Outputs() []controller.Output {
return nil
}
// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *KubernetesPushController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
defer func() {
if ctrl.kubernetesClient != nil {
ctrl.kubernetesClient.Close() //nolint:errcheck
}
ctrl.kubernetesClient = nil
}()
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
discoveryConfig, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, cluster.ConfigType, cluster.ConfigID, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting discovery config: %w", err)
}
continue
}
if !discoveryConfig.(*cluster.Config).TypedSpec().RegistryKubernetesEnabled {
continue
}
if err = conditions.WaitForKubeconfigReady(constants.KubeletKubeconfig).Wait(ctx); err != nil {
return err
}
identity, err := r.Get(ctx, resource.NewMetadata(cluster.NamespaceName, cluster.IdentityType, cluster.LocalIdentity, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting local identity: %w", err)
}
continue
}
localAffiliateID := identity.(*cluster.Identity).TypedSpec().NodeID
if ctrl.localAffiliateID != localAffiliateID {
ctrl.localAffiliateID = localAffiliateID
if err = r.UpdateInputs(append(ctrl.Inputs(),
controller.Input{
Namespace: cluster.NamespaceName,
Type: cluster.AffiliateType,
ID: pointer.ToString(ctrl.localAffiliateID),
Kind: controller.InputWeak,
},
)); err != nil {
return err
}
}
affiliate, err := r.Get(ctx, resource.NewMetadata(cluster.NamespaceName, cluster.AffiliateType, ctrl.localAffiliateID, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting local affiliate: %w", err)
}
continue
}
if ctrl.kubernetesClient == nil {
ctrl.kubernetesClient, err = kubernetes.NewClientFromKubeletKubeconfig()
if err != nil {
return fmt.Errorf("error building kubernetes client: %w", err)
}
}
if err = registry.NewKubernetes(ctrl.kubernetesClient).Push(ctx, affiliate.(*cluster.Affiliate)); err != nil {
// reset client connection
ctrl.kubernetesClient.Close() //nolint:errcheck
ctrl.kubernetesClient = nil
return fmt.Errorf("error pushing to Kubernetes registry: %w", err)
}
}
}
}