// This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. package controllers import ( "context" "errors" "fmt" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" "k8s.io/client-go/tools/reference" "k8s.io/utils/pointer" capiv1 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" infrav1 "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3" "github.com/talos-systems/sidero/app/caps-controller-manager/pkg/constants" metalv1alpha1 "github.com/talos-systems/sidero/app/sidero-controller-manager/api/v1alpha1" ) var ErrNoServersInServerClass = errors.New("no servers available in serverclass") // MetalMachineReconciler reconciles a MetalMachine object. type MetalMachineReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme Recorder record.EventRecorder } // +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=metalmachines,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=metalmachines/status,verbs=get;update;patch // +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines;machines/status,verbs=get;list;watch // +kubebuilder:rbac:groups=metal.sidero.dev,resources=serverclasses,verbs=get;list;watch; // +kubebuilder:rbac:groups=metal.sidero.dev,resources=serverclasses/status,verbs=get;list;watch; // +kubebuilder:rbac:groups=metal.sidero.dev,resources=servers,verbs=get;list;watch; // +kubebuilder:rbac:groups=metal.sidero.dev,resources=servers/status,verbs=get;update;patch // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch func (r *MetalMachineReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, err error) { ctx := context.Background() logger := r.Log.WithValues("metalmachine", req.NamespacedName) // Fetch the metalMachine instance. metalMachine := &infrav1.MetalMachine{} err = r.Get(ctx, req.NamespacedName, metalMachine) if apierrors.IsNotFound(err) { return ctrl.Result{}, nil } if err != nil { return ctrl.Result{}, err } machine, err := util.GetOwnerMachine(ctx, r.Client, metalMachine.ObjectMeta) if err != nil { r.Log.Error(err, "Failed to get machine") return ctrl.Result{RequeueAfter: constants.DefaultRequeueAfter}, nil } if machine == nil { logger.Info("No ownerref for metalmachine") return ctrl.Result{RequeueAfter: constants.DefaultRequeueAfter}, nil } logger = logger.WithName(fmt.Sprintf("machine=%s", machine.Name)) cluster, err := util.GetClusterFromMetadata(ctx, r.Client, machine.ObjectMeta) if err != nil { return ctrl.Result{}, fmt.Errorf("no cluster label or cluster does not exist") } logger = logger.WithName(fmt.Sprintf("cluster=%s", cluster.Name)) if !cluster.Status.InfrastructureReady { logger.Error(err, "Cluster infrastructure is not ready", "cluster", cluster.Name) return ctrl.Result{RequeueAfter: constants.DefaultRequeueAfter}, nil } if machine.Spec.Bootstrap.DataSecretName == nil { logger.Info(" Bootstrap secret is not available yet") return ctrl.Result{RequeueAfter: constants.DefaultRequeueAfter}, nil } // Initialize the patch helper patchHelper, err := patch.NewHelper(metalMachine, r) if err != nil { return ctrl.Result{}, err } // Always attempt to Patch the MetalMachine object and status after each reconciliation. defer func() { if e := patchHelper.Patch(ctx, metalMachine); e != nil { logger.Error(e, "failed to patch metalMachine") if err == nil { err = e } } }() // Handle deleted machines if !metalMachine.ObjectMeta.DeletionTimestamp.IsZero() { logger.Info("deleting metalmachine") return r.reconcileDelete(ctx, metalMachine) } controllerutil.AddFinalizer(metalMachine, infrav1.MachineFinalizer) // If server ref is already provided, server binding controller is going to reconcile matching server binding // if server binding is missing, need to pick up a server if metalMachine.Spec.ServerRef == nil { if metalMachine.Spec.ServerClassRef == nil { return ctrl.Result{}, fmt.Errorf("either a server or serverclass ref must be supplied") } serverResource, err := r.fetchServerFromClass(ctx, logger, metalMachine.Spec.ServerClassRef, metalMachine) if err != nil { if errors.Is(err, ErrNoServersInServerClass) { return ctrl.Result{RequeueAfter: constants.DefaultRequeueAfter}, nil } return ctrl.Result{}, err } metalMachine.Spec.ServerRef = &corev1.ObjectReference{ Kind: serverResource.Kind, Name: serverResource.Name, } } // Set the providerID, as its required in upstream capi for machine lifecycle metalMachine.Spec.ProviderID = pointer.StringPtr(fmt.Sprintf("%s://%s", constants.ProviderID, metalMachine.Spec.ServerRef.Name)) err = r.patchProviderID(ctx, cluster, metalMachine) if err != nil { logger.Info("Failed to set provider ID", "error", err) return ctrl.Result{RequeueAfter: constants.DefaultRequeueAfter}, nil } metalMachine.Status.Ready = true return ctrl.Result{}, nil } func (r *MetalMachineReconciler) reconcileDelete(ctx context.Context, metalMachine *infrav1.MetalMachine) (ctrl.Result, error) { if metalMachine.Spec.ServerRef != nil { var serverBinding infrav1.ServerBinding err := r.Get(ctx, types.NamespacedName{Namespace: metalMachine.Spec.ServerRef.Namespace, Name: metalMachine.Spec.ServerRef.Name}, &serverBinding) if err == nil { return ctrl.Result{Requeue: true}, r.Delete(ctx, &serverBinding) } if !apierrors.IsNotFound(err) { return ctrl.Result{}, err } } metalMachine.Spec.ServerRef = nil controllerutil.RemoveFinalizer(metalMachine, infrav1.MachineFinalizer) return ctrl.Result{}, nil } func (r *MetalMachineReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { if err := mgr.GetFieldIndexer().IndexField(&infrav1.ServerBinding{}, infrav1.ServerBindingMetalMachineRefField, func(rawObj runtime.Object) []string { serverBinding := rawObj.(*infrav1.ServerBinding) return []string{serverBinding.Spec.MetalMachineRef.Name} }); err != nil { return err } return ctrl.NewControllerManagedBy(mgr). WithOptions(options). For(&infrav1.MetalMachine{}). Complete(r) } func (r *MetalMachineReconciler) fetchServerFromClass(ctx context.Context, logger logr.Logger, classRef *corev1.ObjectReference, metalMachine *infrav1.MetalMachine) (*metalv1alpha1.Server, error) { // First, check if there is already existing serverBinding for this metalmachine var serverBindingList infrav1.ServerBindingList if err := r.List(ctx, &serverBindingList, client.MatchingFields(fields.Set{infrav1.ServerBindingMetalMachineRefField: metalMachine.Name})); err != nil { return nil, err } for _, serverBinding := range serverBindingList.Items { if serverBinding.Spec.MetalMachineRef.Namespace == metalMachine.Namespace && serverBinding.Spec.MetalMachineRef.Name == metalMachine.Name { // found existing serverBinding for this metalMachine var server metalv1alpha1.Server if err := r.Get(ctx, types.NamespacedName{Namespace: serverBinding.Namespace, Name: serverBinding.Name}, &server); err != nil { return nil, err } logger.Info("reconciled missing server ref", "metalmachine", metalMachine.Name, "server", server.Name) return &server, nil } } // Grab server class and validate that we have nodes available serverClassResource, err := r.fetchServerClass(ctx, classRef) if err != nil { return nil, err } if len(serverClassResource.Status.ServersAvailable) == 0 { return nil, ErrNoServersInServerClass } // Fetch server from available list // NB: we added this loop to double check that an available server isn't "in use" because // we saw raciness between server selection and it being removed from the ServersAvailable list. for _, availServer := range serverClassResource.Status.ServersAvailable { serverObj := &metalv1alpha1.Server{} namespacedName := types.NamespacedName{ Namespace: "", Name: availServer, } if err := r.Get(ctx, namespacedName, serverObj); err != nil { return nil, err } if serverObj.Status.InUse { continue } if !serverObj.Status.IsClean { continue } if err := r.createServerBinding(ctx, serverClassResource, serverObj, metalMachine); err != nil { // the server we picked was updated by another metalmachine before we finished. // move on to the next one. if apierrors.IsAlreadyExists(err) { continue } return nil, err } logger.Info("allocated new server", "metalmachine", metalMachine.Name, "server", serverObj.Name, "serverclass", serverClassResource.Name) return serverObj, nil } return nil, ErrNoServersInServerClass } func (r *MetalMachineReconciler) patchProviderID(ctx context.Context, cluster *capiv1.Cluster, metalMachine *infrav1.MetalMachine) error { kubeconfigSecret := &corev1.Secret{} err := r.Client.Get(ctx, types.NamespacedName{ Namespace: cluster.Namespace, Name: cluster.Name + "-kubeconfig", }, kubeconfigSecret, ) if err != nil { return err } config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigSecret.Data["value"]) if err != nil { return err } clientset, err := kubernetes.NewForConfig(config) if err != nil { return err } label := fmt.Sprintf("metal.sidero.dev/uuid=%s", metalMachine.Spec.ServerRef.Name) r.Log.Info("Searching for node", "label", label) nodes, err := clientset.CoreV1().Nodes().List( metav1.ListOptions{ LabelSelector: label, }, ) if err != nil { return err } if len(nodes.Items) == 0 { return fmt.Errorf("no matching nodes found") } if len(nodes.Items) > 1 { return fmt.Errorf("multiple nodes found with same uuid label") } providerID := fmt.Sprintf("%s://%s", constants.ProviderID, metalMachine.Spec.ServerRef.Name) r.Log.Info("Setting provider ID", "id", providerID) for _, node := range nodes.Items { node := node if node.Spec.ProviderID == providerID { continue } node.Spec.ProviderID = providerID _, err = clientset.CoreV1().Nodes().Update(&node) if err != nil { return err } } return nil } // createServerBinding updates a server to mark it as "in use" via ServerBinding resource. func (r *MetalMachineReconciler) createServerBinding(ctx context.Context, serverClass *metalv1alpha1.ServerClass, serverObj *metalv1alpha1.Server, metalMachine *infrav1.MetalMachine) error { serverRef, err := reference.GetReference(r.Scheme, serverObj) if err != nil { return err } var serverBinding infrav1.ServerBinding serverBinding.Namespace = serverObj.Namespace serverBinding.Name = serverObj.Name serverBinding.Labels = make(map[string]string) serverBinding.Spec.MetalMachineRef = corev1.ObjectReference{ Kind: metalMachine.Kind, UID: metalMachine.UID, Namespace: metalMachine.Namespace, Name: metalMachine.Name, } if serverClass != nil { serverBinding.Spec.ServerClassRef = &corev1.ObjectReference{ Kind: serverClass.Kind, Name: serverClass.Name, } } for label, value := range metalMachine.Labels { serverBinding.Labels[label] = value } err = r.Create(ctx, &serverBinding) if err == nil { r.Recorder.Event(serverRef, corev1.EventTypeNormal, "Server Allocation", fmt.Sprintf("Server as allocated via serverclass %q for metal machine %q.", serverClass.Name, metalMachine.Name)) } return err } func (r *MetalMachineReconciler) fetchServerClass(ctx context.Context, classRef *corev1.ObjectReference) (*metalv1alpha1.ServerClass, error) { serverClassResource := &metalv1alpha1.ServerClass{} namespacedName := types.NamespacedName{ Namespace: classRef.Namespace, Name: classRef.Name, } if err := r.Get(ctx, namespacedName, serverClassResource); err != nil { return nil, err } return serverClassResource, nil }