diff --git a/internal/app/machined/pkg/controllers/cluster/endpoint.go b/internal/app/machined/pkg/controllers/cluster/endpoint.go new file mode 100644 index 000000000..e0fcd0bc3 --- /dev/null +++ b/internal/app/machined/pkg/controllers/cluster/endpoint.go @@ -0,0 +1,95 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package cluster + +import ( + "context" + "fmt" + "reflect" + "sort" + + "github.com/cosi-project/runtime/pkg/controller" + "github.com/cosi-project/runtime/pkg/resource" + "go.uber.org/zap" + "inet.af/netaddr" + + "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" + "github.com/talos-systems/talos/pkg/resources/cluster" + "github.com/talos-systems/talos/pkg/resources/k8s" +) + +// EndpointController looks up control plane endpoints. +type EndpointController struct{} + +// Name implements controller.Controller interface. +func (ctrl *EndpointController) Name() string { + return "cluster.EndpointController" +} + +// Inputs implements controller.Controller interface. +func (ctrl *EndpointController) Inputs() []controller.Input { + return []controller.Input{ + { + Namespace: cluster.NamespaceName, + Type: cluster.MemberType, + Kind: controller.InputWeak, + }, + } +} + +// Outputs implements controller.Controller interface. +func (ctrl *EndpointController) Outputs() []controller.Output { + return []controller.Output{ + { + Type: k8s.EndpointType, + Kind: controller.OutputShared, + }, + } +} + +// Run implements controller.Controller interface. +func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error { + for { + select { + case <-ctx.Done(): + return nil + case <-r.EventCh(): + } + + memberList, err := r.List(ctx, resource.NewMetadata(cluster.NamespaceName, cluster.MemberType, "", resource.VersionUndefined)) + if err != nil { + return fmt.Errorf("error listing members: %w", err) + } + + var endpoints []netaddr.IP + + for _, res := range memberList.Items { + member := res.(*cluster.Member).TypedSpec() + + if !(member.MachineType == machine.TypeControlPlane || member.MachineType == machine.TypeInit) { + continue + } + + endpoints = append(endpoints, member.Addresses...) + } + + sort.Slice(endpoints, func(i, j int) bool { return endpoints[i].Compare(endpoints[j]) < 0 }) + + if err := r.Modify(ctx, + k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneDiscoveredEndpointsID), + func(r resource.Resource) error { + if !reflect.DeepEqual(r.(*k8s.Endpoint).TypedSpec().Addresses, endpoints) { + logger.Debug("updated controlplane endpoints", zap.Any("endpoints", endpoints)) + } + + r.(*k8s.Endpoint).TypedSpec().Addresses = endpoints + + return nil + }, + ); err != nil { + return fmt.Errorf("error updating endpoints: %w", err) + } + } +} diff --git a/internal/app/machined/pkg/controllers/cluster/endpoint_test.go b/internal/app/machined/pkg/controllers/cluster/endpoint_test.go new file mode 100644 index 000000000..5b4f836e0 --- /dev/null +++ b/internal/app/machined/pkg/controllers/cluster/endpoint_test.go @@ -0,0 +1,77 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package cluster_test + +import ( + "fmt" + "testing" + "time" + + "github.com/cosi-project/runtime/pkg/resource" + "github.com/stretchr/testify/suite" + "github.com/talos-systems/go-retry/retry" + "inet.af/netaddr" + + clusterctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/cluster" + "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" + "github.com/talos-systems/talos/pkg/resources/cluster" + "github.com/talos-systems/talos/pkg/resources/k8s" +) + +type EndpointSuite struct { + ClusterSuite +} + +func (suite *EndpointSuite) TestReconcileDefault() { + suite.startRuntime() + + suite.Require().NoError(suite.runtime.RegisterController(&clusterctrl.EndpointController{})) + + member1 := cluster.NewMember(cluster.NamespaceName, "talos-default-master-1") + *member1.TypedSpec() = cluster.MemberSpec{ + NodeID: "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC", + Addresses: []netaddr.IP{netaddr.MustParseIP("172.20.0.2"), netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0")}, + Hostname: "talos-default-master-1", + MachineType: machine.TypeControlPlane, + OperatingSystem: "Talos (v1.0.0)", + } + + member2 := cluster.NewMember(cluster.NamespaceName, "talos-default-master-2") + *member2.TypedSpec() = cluster.MemberSpec{ + NodeID: "9dwHNUViZlPlIervqX9Qo256RUhrfhgO0xBBnKcKl4F", + Addresses: []netaddr.IP{netaddr.MustParseIP("172.20.0.3"), netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e1")}, + Hostname: "talos-default-master-2", + MachineType: machine.TypeControlPlane, + OperatingSystem: "Talos (v1.0.0)", + } + + member3 := cluster.NewMember(cluster.NamespaceName, "talos-default-worker-1") + *member3.TypedSpec() = cluster.MemberSpec{ + NodeID: "xCnFFfxylOf9i5ynhAkt6ZbfcqaLDGKfIa3gwpuaxe7F", + Addresses: []netaddr.IP{netaddr.MustParseIP("172.20.0.4")}, + Hostname: "talos-default-worker-1", + MachineType: machine.TypeWorker, + OperatingSystem: "Talos (v1.0.0)", + } + + for _, r := range []resource.Resource{member1, member2, member3} { + suite.Require().NoError(suite.state.Create(suite.ctx, r)) + } + + // control plane members should be translated to Endpoints + suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( + suite.assertResource(*k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneDiscoveredEndpointsID).Metadata(), func(r resource.Resource) error { + spec := r.(*k8s.Endpoint).TypedSpec() + + suite.Assert().Equal(`["172.20.0.2" "172.20.0.3" "fd50:8d60:4238:6302:f857:23ff:fe21:d1e0" "fd50:8d60:4238:6302:f857:23ff:fe21:d1e1"]`, fmt.Sprintf("%q", spec.Addresses)) + + return nil + }), + )) +} + +func TestEndpointSuite(t *testing.T) { + suite.Run(t, new(EndpointSuite)) +} diff --git a/internal/app/machined/pkg/controllers/k8s/endpoint.go b/internal/app/machined/pkg/controllers/k8s/endpoint.go index 466cbcb17..17f8dbdad 100644 --- a/internal/app/machined/pkg/controllers/k8s/endpoint.go +++ b/internal/app/machined/pkg/controllers/k8s/endpoint.go @@ -53,7 +53,7 @@ func (ctrl *EndpointController) Outputs() []controller.Output { return []controller.Output{ { Type: k8s.EndpointType, - Kind: controller.OutputExclusive, + Kind: controller.OutputShared, }, } } @@ -128,7 +128,7 @@ func (ctrl *EndpointController) watchEndpoints(ctx context.Context, r controller sort.Slice(addrs, func(i, j int) bool { return addrs[i].Compare(addrs[j]) < 0 }) if err := r.Modify(ctx, - k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneEndpointsID), + k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneAPIServerEndpointsID), func(r resource.Resource) error { if !reflect.DeepEqual(r.(*k8s.Endpoint).TypedSpec().Addresses, addrs) { logger.Debug("updated controlplane endpoints", zap.Any("endpoints", addrs)) diff --git a/internal/app/machined/pkg/controllers/secrets/api.go b/internal/app/machined/pkg/controllers/secrets/api.go index 79d474b5c..3c85bfd9c 100644 --- a/internal/app/machined/pkg/controllers/secrets/api.go +++ b/internal/app/machined/pkg/controllers/secrets/api.go @@ -165,7 +165,6 @@ func (ctrl *APIController) reconcile(ctx context.Context, r controller.Runtime, inputs = append(inputs, controller.Input{ Namespace: k8s.ControlPlaneNamespaceName, Type: k8s.EndpointType, - ID: pointer.ToString(k8s.ControlPlaneEndpointsID), Kind: controller.InputWeak, }) } @@ -242,26 +241,23 @@ func (ctrl *APIController) reconcile(ctx context.Context, r controller.Runtime, var endpointsStr []string if !isControlplane { - endpointResource, err := r.Get(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.EndpointType, k8s.ControlPlaneEndpointsID, resource.VersionUndefined)) + endpointResources, err := r.List(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.EndpointType, "", resource.VersionUndefined)) if err != nil { - if state.IsNotFoundError(err) { - continue - } - - return fmt.Errorf("error getting endpoints resource: %w", err) + return fmt.Errorf("error getting endpoints resources: %w", err) } - endpoints := endpointResource.(*k8s.Endpoint).TypedSpec() + var endpointAddrs k8s.EndpointList - if len(endpoints.Addresses) == 0 { + // merge all endpoints into a single list + for _, res := range endpointResources.Items { + endpointAddrs = endpointAddrs.Merge(res.(*k8s.Endpoint)) + } + + if len(endpointAddrs) == 0 { continue } - endpointsStr = make([]string, 0, len(endpoints.Addresses)) - - for _, ip := range endpoints.Addresses { - endpointsStr = append(endpointsStr, ip.String()) - } + endpointsStr = endpointAddrs.Strings() } if isControlplane { @@ -368,6 +364,8 @@ func (ctrl *APIController) generateJoin(ctx context.Context, r controller.Runtim return fmt.Errorf("failed to generate API client CSR: %w", err) } + logger.Debug("sending CSR", zap.Strings("endpoints", endpointsStr)) + // TODO: add keyusage: trustd should accept key usage as additional params _, clientCert.Crt, err = remoteGen.IdentityContext(ctx, clientCSR) if err != nil { diff --git a/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go b/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go index 73a532263..bcc0310f8 100644 --- a/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go +++ b/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go @@ -84,6 +84,7 @@ func (ctrl *Controller) Run(ctx context.Context) error { &cluster.AffiliateMergeController{}, &cluster.ConfigController{}, &cluster.DiscoveryServiceController{}, + &cluster.EndpointController{}, &cluster.LocalAffiliateController{}, &cluster.MemberController{}, &cluster.KubernetesPullController{}, diff --git a/pkg/grpc/gen/remote.go b/pkg/grpc/gen/remote.go index 8ea3c4b81..9685f4777 100644 --- a/pkg/grpc/gen/remote.go +++ b/pkg/grpc/gen/remote.go @@ -62,12 +62,12 @@ func (g *RemoteGenerator) IdentityContext(ctx context.Context, csr *x509.Certifi Csr: csr.X509CertificateRequestPEM, } - ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() - err = retry.Exponential(5*time.Minute, - retry.WithAttemptTimeout(30*time.Second), - retry.WithUnits(5*time.Second), + err = retry.Exponential(time.Minute, + retry.WithAttemptTimeout(10*time.Second), + retry.WithUnits(time.Second), retry.WithJitter(100*time.Millisecond), ).RetryWithContext(ctx, func(ctx context.Context) error { var resp *securityapi.CertificateResponse diff --git a/pkg/resources/k8s/endpoint.go b/pkg/resources/k8s/endpoint.go index 6f161173c..43d0231f0 100644 --- a/pkg/resources/k8s/endpoint.go +++ b/pkg/resources/k8s/endpoint.go @@ -6,6 +6,7 @@ package k8s import ( "fmt" + "sort" "github.com/cosi-project/runtime/pkg/resource" "github.com/cosi-project/runtime/pkg/resource/meta" @@ -15,8 +16,11 @@ import ( // EndpointType is type of Endpoint resource. const EndpointType = resource.Type("Endpoints.kubernetes.talos.dev") -// ControlPlaneEndpointsID is resource ID for controlplane Endpoint. -const ControlPlaneEndpointsID = resource.ID("controlplane") +// ControlPlaneAPIServerEndpointsID is resource ID for kube-apiserver based Endpoints. +const ControlPlaneAPIServerEndpointsID = resource.ID("kube-apiserver") + +// ControlPlaneDiscoveredEndpointsID is resource ID for cluster discovery based Endpoints. +const ControlPlaneDiscoveredEndpointsID = resource.ID("discovery") // Endpoint resource holds definition of rendered secrets. type Endpoint struct { @@ -84,3 +88,34 @@ func (r *Endpoint) ResourceDefinition() meta.ResourceDefinitionSpec { func (r *Endpoint) TypedSpec() *EndpointSpec { return &r.spec } + +// EndpointList is a flattened list of endpoints. +type EndpointList []netaddr.IP + +// Merge endpoints from multiple Endpoint resources into a single list. +func (l EndpointList) Merge(endpoint *Endpoint) EndpointList { + for _, ip := range endpoint.spec.Addresses { + ip := ip + + idx := sort.Search(len(l), func(i int) bool { return !l[i].Less(ip) }) + + if idx < len(l) && l[idx].Compare(ip) == 0 { + continue + } + + l = append(l[:idx], append([]netaddr.IP{ip}, l[idx:]...)...) + } + + return l +} + +// Strings returns a slice of formatted endpoints to string. +func (l EndpointList) Strings() []string { + res := make([]string, len(l)) + + for i := range l { + res[i] = l[i].String() + } + + return res +} diff --git a/pkg/resources/k8s/endpoint_test.go b/pkg/resources/k8s/endpoint_test.go new file mode 100644 index 000000000..2cbdb93e2 --- /dev/null +++ b/pkg/resources/k8s/endpoint_test.go @@ -0,0 +1,37 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package k8s_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "inet.af/netaddr" + + "github.com/talos-systems/talos/pkg/resources/k8s" +) + +func TestEndpointList(t *testing.T) { + t.Parallel() + + var l k8s.EndpointList + + e1 := k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, "1") + e1.TypedSpec().Addresses = []netaddr.IP{ + netaddr.MustParseIP("172.20.0.2"), + netaddr.MustParseIP("172.20.0.3"), + } + + e2 := k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, "2") + e2.TypedSpec().Addresses = []netaddr.IP{ + netaddr.MustParseIP("172.20.0.4"), + netaddr.MustParseIP("172.20.0.3"), + } + + l = l.Merge(e1) + l = l.Merge(e2) + + assert.Equal(t, []string{"172.20.0.2", "172.20.0.3", "172.20.0.4"}, l.Strings()) +}