mirror of
https://github.com/siderolabs/talos.git
synced 2025-09-13 01:41:11 +02:00
feat: feed control plane endpoints on workers from cluster discovery
Fixes #4231 This allows much faster worker join on `apid` level without waiting for Kubernetes control plane to be up (and even if the Kubernetes control plane doesn't go up or the kubelet isn't up). Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
This commit is contained in:
parent
431e4fb4b6
commit
12f7888b75
95
internal/app/machined/pkg/controllers/cluster/endpoint.go
Normal file
95
internal/app/machined/pkg/controllers/cluster/endpoint.go
Normal file
@ -0,0 +1,95 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"go.uber.org/zap"
|
||||
"inet.af/netaddr"
|
||||
|
||||
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
|
||||
"github.com/talos-systems/talos/pkg/resources/cluster"
|
||||
"github.com/talos-systems/talos/pkg/resources/k8s"
|
||||
)
|
||||
|
||||
// EndpointController looks up control plane endpoints.
|
||||
type EndpointController struct{}
|
||||
|
||||
// Name implements controller.Controller interface.
|
||||
func (ctrl *EndpointController) Name() string {
|
||||
return "cluster.EndpointController"
|
||||
}
|
||||
|
||||
// Inputs implements controller.Controller interface.
|
||||
func (ctrl *EndpointController) Inputs() []controller.Input {
|
||||
return []controller.Input{
|
||||
{
|
||||
Namespace: cluster.NamespaceName,
|
||||
Type: cluster.MemberType,
|
||||
Kind: controller.InputWeak,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Outputs implements controller.Controller interface.
|
||||
func (ctrl *EndpointController) Outputs() []controller.Output {
|
||||
return []controller.Output{
|
||||
{
|
||||
Type: k8s.EndpointType,
|
||||
Kind: controller.OutputShared,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-r.EventCh():
|
||||
}
|
||||
|
||||
memberList, err := r.List(ctx, resource.NewMetadata(cluster.NamespaceName, cluster.MemberType, "", resource.VersionUndefined))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing members: %w", err)
|
||||
}
|
||||
|
||||
var endpoints []netaddr.IP
|
||||
|
||||
for _, res := range memberList.Items {
|
||||
member := res.(*cluster.Member).TypedSpec()
|
||||
|
||||
if !(member.MachineType == machine.TypeControlPlane || member.MachineType == machine.TypeInit) {
|
||||
continue
|
||||
}
|
||||
|
||||
endpoints = append(endpoints, member.Addresses...)
|
||||
}
|
||||
|
||||
sort.Slice(endpoints, func(i, j int) bool { return endpoints[i].Compare(endpoints[j]) < 0 })
|
||||
|
||||
if err := r.Modify(ctx,
|
||||
k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneDiscoveredEndpointsID),
|
||||
func(r resource.Resource) error {
|
||||
if !reflect.DeepEqual(r.(*k8s.Endpoint).TypedSpec().Addresses, endpoints) {
|
||||
logger.Debug("updated controlplane endpoints", zap.Any("endpoints", endpoints))
|
||||
}
|
||||
|
||||
r.(*k8s.Endpoint).TypedSpec().Addresses = endpoints
|
||||
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
return fmt.Errorf("error updating endpoints: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,77 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package cluster_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/talos-systems/go-retry/retry"
|
||||
"inet.af/netaddr"
|
||||
|
||||
clusterctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/cluster"
|
||||
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
|
||||
"github.com/talos-systems/talos/pkg/resources/cluster"
|
||||
"github.com/talos-systems/talos/pkg/resources/k8s"
|
||||
)
|
||||
|
||||
type EndpointSuite struct {
|
||||
ClusterSuite
|
||||
}
|
||||
|
||||
func (suite *EndpointSuite) TestReconcileDefault() {
|
||||
suite.startRuntime()
|
||||
|
||||
suite.Require().NoError(suite.runtime.RegisterController(&clusterctrl.EndpointController{}))
|
||||
|
||||
member1 := cluster.NewMember(cluster.NamespaceName, "talos-default-master-1")
|
||||
*member1.TypedSpec() = cluster.MemberSpec{
|
||||
NodeID: "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC",
|
||||
Addresses: []netaddr.IP{netaddr.MustParseIP("172.20.0.2"), netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0")},
|
||||
Hostname: "talos-default-master-1",
|
||||
MachineType: machine.TypeControlPlane,
|
||||
OperatingSystem: "Talos (v1.0.0)",
|
||||
}
|
||||
|
||||
member2 := cluster.NewMember(cluster.NamespaceName, "talos-default-master-2")
|
||||
*member2.TypedSpec() = cluster.MemberSpec{
|
||||
NodeID: "9dwHNUViZlPlIervqX9Qo256RUhrfhgO0xBBnKcKl4F",
|
||||
Addresses: []netaddr.IP{netaddr.MustParseIP("172.20.0.3"), netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e1")},
|
||||
Hostname: "talos-default-master-2",
|
||||
MachineType: machine.TypeControlPlane,
|
||||
OperatingSystem: "Talos (v1.0.0)",
|
||||
}
|
||||
|
||||
member3 := cluster.NewMember(cluster.NamespaceName, "talos-default-worker-1")
|
||||
*member3.TypedSpec() = cluster.MemberSpec{
|
||||
NodeID: "xCnFFfxylOf9i5ynhAkt6ZbfcqaLDGKfIa3gwpuaxe7F",
|
||||
Addresses: []netaddr.IP{netaddr.MustParseIP("172.20.0.4")},
|
||||
Hostname: "talos-default-worker-1",
|
||||
MachineType: machine.TypeWorker,
|
||||
OperatingSystem: "Talos (v1.0.0)",
|
||||
}
|
||||
|
||||
for _, r := range []resource.Resource{member1, member2, member3} {
|
||||
suite.Require().NoError(suite.state.Create(suite.ctx, r))
|
||||
}
|
||||
|
||||
// control plane members should be translated to Endpoints
|
||||
suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
|
||||
suite.assertResource(*k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneDiscoveredEndpointsID).Metadata(), func(r resource.Resource) error {
|
||||
spec := r.(*k8s.Endpoint).TypedSpec()
|
||||
|
||||
suite.Assert().Equal(`["172.20.0.2" "172.20.0.3" "fd50:8d60:4238:6302:f857:23ff:fe21:d1e0" "fd50:8d60:4238:6302:f857:23ff:fe21:d1e1"]`, fmt.Sprintf("%q", spec.Addresses))
|
||||
|
||||
return nil
|
||||
}),
|
||||
))
|
||||
}
|
||||
|
||||
func TestEndpointSuite(t *testing.T) {
|
||||
suite.Run(t, new(EndpointSuite))
|
||||
}
|
@ -53,7 +53,7 @@ func (ctrl *EndpointController) Outputs() []controller.Output {
|
||||
return []controller.Output{
|
||||
{
|
||||
Type: k8s.EndpointType,
|
||||
Kind: controller.OutputExclusive,
|
||||
Kind: controller.OutputShared,
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -128,7 +128,7 @@ func (ctrl *EndpointController) watchEndpoints(ctx context.Context, r controller
|
||||
sort.Slice(addrs, func(i, j int) bool { return addrs[i].Compare(addrs[j]) < 0 })
|
||||
|
||||
if err := r.Modify(ctx,
|
||||
k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneEndpointsID),
|
||||
k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneAPIServerEndpointsID),
|
||||
func(r resource.Resource) error {
|
||||
if !reflect.DeepEqual(r.(*k8s.Endpoint).TypedSpec().Addresses, addrs) {
|
||||
logger.Debug("updated controlplane endpoints", zap.Any("endpoints", addrs))
|
||||
|
@ -165,7 +165,6 @@ func (ctrl *APIController) reconcile(ctx context.Context, r controller.Runtime,
|
||||
inputs = append(inputs, controller.Input{
|
||||
Namespace: k8s.ControlPlaneNamespaceName,
|
||||
Type: k8s.EndpointType,
|
||||
ID: pointer.ToString(k8s.ControlPlaneEndpointsID),
|
||||
Kind: controller.InputWeak,
|
||||
})
|
||||
}
|
||||
@ -242,26 +241,23 @@ func (ctrl *APIController) reconcile(ctx context.Context, r controller.Runtime,
|
||||
var endpointsStr []string
|
||||
|
||||
if !isControlplane {
|
||||
endpointResource, err := r.Get(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.EndpointType, k8s.ControlPlaneEndpointsID, resource.VersionUndefined))
|
||||
endpointResources, err := r.List(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.EndpointType, "", resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf("error getting endpoints resource: %w", err)
|
||||
return fmt.Errorf("error getting endpoints resources: %w", err)
|
||||
}
|
||||
|
||||
endpoints := endpointResource.(*k8s.Endpoint).TypedSpec()
|
||||
var endpointAddrs k8s.EndpointList
|
||||
|
||||
if len(endpoints.Addresses) == 0 {
|
||||
// merge all endpoints into a single list
|
||||
for _, res := range endpointResources.Items {
|
||||
endpointAddrs = endpointAddrs.Merge(res.(*k8s.Endpoint))
|
||||
}
|
||||
|
||||
if len(endpointAddrs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
endpointsStr = make([]string, 0, len(endpoints.Addresses))
|
||||
|
||||
for _, ip := range endpoints.Addresses {
|
||||
endpointsStr = append(endpointsStr, ip.String())
|
||||
}
|
||||
endpointsStr = endpointAddrs.Strings()
|
||||
}
|
||||
|
||||
if isControlplane {
|
||||
@ -368,6 +364,8 @@ func (ctrl *APIController) generateJoin(ctx context.Context, r controller.Runtim
|
||||
return fmt.Errorf("failed to generate API client CSR: %w", err)
|
||||
}
|
||||
|
||||
logger.Debug("sending CSR", zap.Strings("endpoints", endpointsStr))
|
||||
|
||||
// TODO: add keyusage: trustd should accept key usage as additional params
|
||||
_, clientCert.Crt, err = remoteGen.IdentityContext(ctx, clientCSR)
|
||||
if err != nil {
|
||||
|
@ -84,6 +84,7 @@ func (ctrl *Controller) Run(ctx context.Context) error {
|
||||
&cluster.AffiliateMergeController{},
|
||||
&cluster.ConfigController{},
|
||||
&cluster.DiscoveryServiceController{},
|
||||
&cluster.EndpointController{},
|
||||
&cluster.LocalAffiliateController{},
|
||||
&cluster.MemberController{},
|
||||
&cluster.KubernetesPullController{},
|
||||
|
@ -62,12 +62,12 @@ func (g *RemoteGenerator) IdentityContext(ctx context.Context, csr *x509.Certifi
|
||||
Csr: csr.X509CertificateRequestPEM,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
defer cancel()
|
||||
|
||||
err = retry.Exponential(5*time.Minute,
|
||||
retry.WithAttemptTimeout(30*time.Second),
|
||||
retry.WithUnits(5*time.Second),
|
||||
err = retry.Exponential(time.Minute,
|
||||
retry.WithAttemptTimeout(10*time.Second),
|
||||
retry.WithUnits(time.Second),
|
||||
retry.WithJitter(100*time.Millisecond),
|
||||
).RetryWithContext(ctx, func(ctx context.Context) error {
|
||||
var resp *securityapi.CertificateResponse
|
||||
|
@ -6,6 +6,7 @@ package k8s
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/resource/meta"
|
||||
@ -15,8 +16,11 @@ import (
|
||||
// EndpointType is type of Endpoint resource.
|
||||
const EndpointType = resource.Type("Endpoints.kubernetes.talos.dev")
|
||||
|
||||
// ControlPlaneEndpointsID is resource ID for controlplane Endpoint.
|
||||
const ControlPlaneEndpointsID = resource.ID("controlplane")
|
||||
// ControlPlaneAPIServerEndpointsID is resource ID for kube-apiserver based Endpoints.
|
||||
const ControlPlaneAPIServerEndpointsID = resource.ID("kube-apiserver")
|
||||
|
||||
// ControlPlaneDiscoveredEndpointsID is resource ID for cluster discovery based Endpoints.
|
||||
const ControlPlaneDiscoveredEndpointsID = resource.ID("discovery")
|
||||
|
||||
// Endpoint resource holds definition of rendered secrets.
|
||||
type Endpoint struct {
|
||||
@ -84,3 +88,34 @@ func (r *Endpoint) ResourceDefinition() meta.ResourceDefinitionSpec {
|
||||
func (r *Endpoint) TypedSpec() *EndpointSpec {
|
||||
return &r.spec
|
||||
}
|
||||
|
||||
// EndpointList is a flattened list of endpoints.
|
||||
type EndpointList []netaddr.IP
|
||||
|
||||
// Merge endpoints from multiple Endpoint resources into a single list.
|
||||
func (l EndpointList) Merge(endpoint *Endpoint) EndpointList {
|
||||
for _, ip := range endpoint.spec.Addresses {
|
||||
ip := ip
|
||||
|
||||
idx := sort.Search(len(l), func(i int) bool { return !l[i].Less(ip) })
|
||||
|
||||
if idx < len(l) && l[idx].Compare(ip) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
l = append(l[:idx], append([]netaddr.IP{ip}, l[idx:]...)...)
|
||||
}
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
// Strings returns a slice of formatted endpoints to string.
|
||||
func (l EndpointList) Strings() []string {
|
||||
res := make([]string, len(l))
|
||||
|
||||
for i := range l {
|
||||
res[i] = l[i].String()
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
37
pkg/resources/k8s/endpoint_test.go
Normal file
37
pkg/resources/k8s/endpoint_test.go
Normal file
@ -0,0 +1,37 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package k8s_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"inet.af/netaddr"
|
||||
|
||||
"github.com/talos-systems/talos/pkg/resources/k8s"
|
||||
)
|
||||
|
||||
func TestEndpointList(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var l k8s.EndpointList
|
||||
|
||||
e1 := k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, "1")
|
||||
e1.TypedSpec().Addresses = []netaddr.IP{
|
||||
netaddr.MustParseIP("172.20.0.2"),
|
||||
netaddr.MustParseIP("172.20.0.3"),
|
||||
}
|
||||
|
||||
e2 := k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, "2")
|
||||
e2.TypedSpec().Addresses = []netaddr.IP{
|
||||
netaddr.MustParseIP("172.20.0.4"),
|
||||
netaddr.MustParseIP("172.20.0.3"),
|
||||
}
|
||||
|
||||
l = l.Merge(e1)
|
||||
l = l.Merge(e2)
|
||||
|
||||
assert.Equal(t, []string{"172.20.0.2", "172.20.0.3", "172.20.0.4"}, l.Strings())
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user