diff --git a/internal/app/machined/pkg/controllers/config/k8s_control_plane.go b/internal/app/machined/pkg/controllers/config/k8s_control_plane.go index 807e73918..8fe978240 100644 --- a/internal/app/machined/pkg/controllers/config/k8s_control_plane.go +++ b/internal/app/machined/pkg/controllers/config/k8s_control_plane.go @@ -22,6 +22,7 @@ import ( "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" "github.com/talos-systems/talos/pkg/machinery/generic/slices" + "github.com/talos-systems/talos/pkg/machinery/nethelpers" "github.com/talos-systems/talos/pkg/machinery/resources/config" "github.com/talos-systems/talos/pkg/machinery/resources/k8s" ) @@ -164,7 +165,7 @@ func (ctrl *K8sControlPlaneController) manageAPIServerConfig(ctx context.Context Image: cfgProvider.Cluster().APIServer().Image(), CloudProvider: cloudProvider, ControlPlaneEndpoint: cfgProvider.Cluster().Endpoint().String(), - EtcdServers: []string{"https://127.0.0.1:2379"}, + EtcdServers: []string{fmt.Sprintf("https://%s", nethelpers.JoinHostPort("localhost", constants.EtcdClientPort))}, LocalPort: cfgProvider.Cluster().LocalAPIServerPort(), ServiceCIDRs: cfgProvider.Cluster().Network().ServiceCIDRs(), ExtraArgs: cfgProvider.Cluster().APIServer().ExtraArgs(), diff --git a/internal/app/machined/pkg/controllers/etcd/pki.go b/internal/app/machined/pkg/controllers/etcd/pki.go index 08f2ed074..835415c01 100644 --- a/internal/app/machined/pkg/controllers/etcd/pki.go +++ b/internal/app/machined/pkg/controllers/etcd/pki.go @@ -92,11 +92,11 @@ func (ctrl *PKIController) Run(ctx context.Context, r controller.Runtime, logger return err } - if err = os.WriteFile(constants.KubernetesEtcdCACert, rootScrts.TypedSpec().EtcdCA.Crt, 0o400); err != nil { + if err = os.WriteFile(constants.EtcdCACert, rootScrts.TypedSpec().EtcdCA.Crt, 0o400); err != nil { return fmt.Errorf("failed to write CA certificate: %w", err) } - if err = os.WriteFile(constants.KubernetesEtcdCAKey, rootScrts.TypedSpec().EtcdCA.Key, 0o400); err != nil { + if err = os.WriteFile(constants.EtcdCAKey, rootScrts.TypedSpec().EtcdCA.Key, 0o400); err != nil { return fmt.Errorf("failed to write CA key: %w", err) } @@ -109,18 +109,18 @@ func (ctrl *PKIController) Run(ctx context.Context, r controller.Runtime, logger }{ { getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.Etcd }, - keyPath: constants.KubernetesEtcdKey, - certPath: constants.KubernetesEtcdCert, + keyPath: constants.EtcdKey, + certPath: constants.EtcdCert, }, { getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.EtcdPeer }, - keyPath: constants.KubernetesEtcdPeerKey, - certPath: constants.KubernetesEtcdPeerCert, + keyPath: constants.EtcdPeerKey, + certPath: constants.EtcdPeerCert, }, { getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.EtcdAdmin }, - keyPath: constants.KubernetesEtcdAdminKey, - certPath: constants.KubernetesEtcdAdminCert, + keyPath: constants.EtcdAdminKey, + certPath: constants.EtcdAdminCert, }, } { if err = os.WriteFile(keypair.keyPath, keypair.getter().Key, 0o400); err != nil { diff --git a/internal/app/machined/pkg/system/services/etcd.go b/internal/app/machined/pkg/system/services/etcd.go index de0e65e03..af6f756ee 100644 --- a/internal/app/machined/pkg/system/services/etcd.go +++ b/internal/app/machined/pkg/system/services/etcd.go @@ -305,7 +305,7 @@ func buildInitialCluster(ctx context.Context, r runtime.Runtime, name, ip string retry.WithErrorLogging(true), ).RetryWithContext(ctx, func(ctx context.Context) error { var ( - peerAddrs = []string{"https://" + net.FormatAddress(ip) + ":2380"} + peerAddrs = []string{"https://" + nethelpers.JoinHostPort(ip, +constants.EtcdPeerPort)} resp *clientv3.MemberListResponse ) @@ -397,16 +397,16 @@ func (e *Etcd) argsForInit(ctx context.Context, r runtime.Runtime) error { "auto-tls": "false", "peer-auto-tls": "false", "data-dir": constants.EtcdDataPath, - "listen-peer-urls": "https://" + net.FormatAddress(listenAddress) + ":2380", - "listen-client-urls": "https://" + net.FormatAddress(listenAddress) + ":2379", + "listen-peer-urls": "https://" + nethelpers.JoinHostPort(listenAddress, constants.EtcdPeerPort), + "listen-client-urls": "https://" + nethelpers.JoinHostPort(listenAddress, constants.EtcdClientPort), "client-cert-auth": "true", - "cert-file": constants.KubernetesEtcdCert, - "key-file": constants.KubernetesEtcdKey, - "trusted-ca-file": constants.KubernetesEtcdCACert, + "cert-file": constants.EtcdCert, + "key-file": constants.EtcdKey, + "trusted-ca-file": constants.EtcdCACert, "peer-client-cert-auth": "true", - "peer-cert-file": constants.KubernetesEtcdPeerCert, - "peer-key-file": constants.KubernetesEtcdPeerKey, - "peer-trusted-ca-file": constants.KubernetesEtcdCACert, + "peer-cert-file": constants.EtcdPeerCert, + "peer-key-file": constants.EtcdPeerKey, + "peer-trusted-ca-file": constants.EtcdCACert, "experimental-initial-corrupt-check": "true", } @@ -427,7 +427,7 @@ func (e *Etcd) argsForInit(ctx context.Context, r runtime.Runtime) error { } if ok { - initialCluster := fmt.Sprintf("%s=https://%s:2380", hostname, net.FormatAddress(primaryAddr)) + initialCluster := fmt.Sprintf("%s=https://%s", hostname, nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort)) if upgraded { denyListArgs.Set("initial-cluster-state", "existing") @@ -446,13 +446,13 @@ func (e *Etcd) argsForInit(ctx context.Context, r runtime.Runtime) error { if !extraArgs.Contains("initial-advertise-peer-urls") { denyListArgs.Set("initial-advertise-peer-urls", - fmt.Sprintf("https://%s", nethelpers.JoinHostPort(net.FormatAddress(primaryAddr), 2380)), + fmt.Sprintf("https://%s", nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort)), ) } if !extraArgs.Contains("advertise-client-urls") { denyListArgs.Set("advertise-client-urls", - fmt.Sprintf("https://%s", nethelpers.JoinHostPort(net.FormatAddress(primaryAddr), 2379)), + fmt.Sprintf("https://%s", nethelpers.JoinHostPort(primaryAddr, constants.EtcdClientPort)), ) } @@ -487,16 +487,16 @@ func (e *Etcd) argsForControlPlane(ctx context.Context, r runtime.Runtime) error "auto-tls": "false", "peer-auto-tls": "false", "data-dir": constants.EtcdDataPath, - "listen-peer-urls": "https://" + net.FormatAddress(listenAddress) + ":2380", - "listen-client-urls": "https://" + net.FormatAddress(listenAddress) + ":2379", + "listen-peer-urls": "https://" + nethelpers.JoinHostPort(listenAddress, constants.EtcdPeerPort), + "listen-client-urls": "https://" + nethelpers.JoinHostPort(listenAddress, constants.EtcdClientPort), "client-cert-auth": "true", - "cert-file": constants.KubernetesEtcdCert, - "key-file": constants.KubernetesEtcdKey, - "trusted-ca-file": constants.KubernetesEtcdCACert, + "cert-file": constants.EtcdCert, + "key-file": constants.EtcdKey, + "trusted-ca-file": constants.EtcdCACert, "peer-client-cert-auth": "true", - "peer-cert-file": constants.KubernetesEtcdPeerCert, - "peer-key-file": constants.KubernetesEtcdPeerKey, - "peer-trusted-ca-file": constants.KubernetesEtcdCACert, + "peer-cert-file": constants.EtcdPeerCert, + "peer-key-file": constants.EtcdPeerKey, + "peer-trusted-ca-file": constants.EtcdCACert, "experimental-initial-corrupt-check": "true", } @@ -530,7 +530,7 @@ func (e *Etcd) argsForControlPlane(ctx context.Context, r runtime.Runtime) error var initialCluster string if e.Bootstrap { - initialCluster = fmt.Sprintf("%s=https://%s:2380", hostname, net.FormatAddress(primaryAddr)) + initialCluster = fmt.Sprintf("%s=https://%s", hostname, nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort)) } else { initialCluster, e.learnerMemberID, err = buildInitialCluster(ctx, r, hostname, primaryAddr) if err != nil { @@ -543,14 +543,14 @@ func (e *Etcd) argsForControlPlane(ctx context.Context, r runtime.Runtime) error if !extraArgs.Contains("initial-advertise-peer-urls") { denyListArgs.Set("initial-advertise-peer-urls", - fmt.Sprintf("https://%s", nethelpers.JoinHostPort(net.FormatAddress(primaryAddr), 2380)), + fmt.Sprintf("https://%s", nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort)), ) } } if !extraArgs.Contains("advertise-client-urls") { denyListArgs.Set("advertise-client-urls", - fmt.Sprintf("https://%s", nethelpers.JoinHostPort(net.FormatAddress(primaryAddr), 2379)), + fmt.Sprintf("https://%s", nethelpers.JoinHostPort(net.FormatAddress(primaryAddr), constants.EtcdClientPort)), ) } @@ -581,9 +581,9 @@ func (e *Etcd) recoverFromSnapshot(hostname, primaryAddr string) error { Name: hostname, OutputDataDir: constants.EtcdDataPath, - PeerURLs: []string{"https://" + net.FormatAddress(primaryAddr) + ":2380"}, + PeerURLs: []string{"https://" + nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort)}, - InitialCluster: fmt.Sprintf("%s=https://%s:2380", hostname, net.FormatAddress(primaryAddr)), + InitialCluster: fmt.Sprintf("%s=https://%s", hostname, nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort)), SkipHashCheck: e.RecoverSkipHashCheck, }); err != nil { @@ -600,24 +600,60 @@ func (e *Etcd) recoverFromSnapshot(hostname, primaryAddr string) error { func promoteMember(ctx context.Context, r runtime.Runtime, memberID uint64) error { // try to promote a member until it succeeds (call might fail until the member catches up with the leader) // promote member call will fail until member catches up with the master + // + // iterate over all endpoints until we find the one which works + // if we stick with the default behavior, we might hit the member being promoted, and that will never + // promote itself. + idx := 0 + return retry.Constant(10*time.Minute, retry.WithUnits(15*time.Second), retry.WithJitter(time.Second), retry.WithErrorLogging(true), ).RetryWithContext(ctx, func(ctx context.Context) error { - client, err := etcd.NewClientFromControlPlaneIPsNoDiscovery(ctx, r.State().V1Alpha2().Resources()) + endpoints, err := etcd.GetEndpoints(ctx, r.State().V1Alpha2().Resources()) if err != nil { return retry.ExpectedError(err) } - defer client.Close() //nolint:errcheck + if len(endpoints) == 0 { + return retry.ExpectedErrorf("no endpoints") + } - _, err = client.MemberPromote(ctx, memberID) + // try to iterate all available endpoints in the time available for an attempt + for i := 0; i < len(endpoints); i++ { + select { + case <-ctx.Done(): + return retry.ExpectedError(ctx.Err()) + default: + } + + endpoint := endpoints[idx%len(endpoints)] + idx++ + + err = attemptPromote(ctx, endpoint, memberID) + if err == nil { + return nil + } + } return retry.ExpectedError(err) }) } +func attemptPromote(ctx context.Context, endpoint string, memberID uint64) error { + client, err := etcd.NewClient([]string{endpoint}) + if err != nil { + return err + } + + defer client.Close() //nolint:errcheck + + _, err = client.MemberPromote(ctx, memberID) + + return err +} + // IsDirEmpty checks if a directory is empty or not. func IsDirEmpty(name string) (bool, error) { f, err := os.Open(name) diff --git a/internal/pkg/etcd/endpoints.go b/internal/pkg/etcd/endpoints.go new file mode 100644 index 000000000..f714b5ab7 --- /dev/null +++ b/internal/pkg/etcd/endpoints.go @@ -0,0 +1,58 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package etcd + +import ( + "context" + "fmt" + + "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/safe" + "github.com/cosi-project/runtime/pkg/state" + + "github.com/talos-systems/talos/pkg/machinery/constants" + "github.com/talos-systems/talos/pkg/machinery/nethelpers" + "github.com/talos-systems/talos/pkg/machinery/resources/k8s" +) + +// GetEndpoints returns expected endpoints of etcd cluster members. +// +// It is not guaranteed that etcd is running on each listed endpoint. +func GetEndpoints(ctx context.Context, resources state.State) ([]string, error) { + return getEndpoints(ctx, resources, "") +} + +func getEndpoints(ctx context.Context, resources state.State, ignoreEndpointID string) ([]string, error) { + endpointResources, err := safe.StateList[*k8s.Endpoint](ctx, resources, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.EndpointType, "", resource.VersionUndefined)) + if err != nil { + return nil, fmt.Errorf("error getting endpoints resources: %w", err) + } + + iter := safe.IteratorFromList(endpointResources) + + var endpointAddrs k8s.EndpointList + + // merge all endpoints into a single list + for iter.Next() { + if iter.Value().Metadata().ID() == ignoreEndpointID { + continue + } + + endpointAddrs = endpointAddrs.Merge(iter.Value()) + } + + if len(endpointAddrs) == 0 { + return nil, fmt.Errorf("no controlplane endpoints discovered yet") + } + + endpoints := endpointAddrs.Strings() + + // Etcd expects host:port format. + for i := 0; i < len(endpoints); i++ { + endpoints[i] = nethelpers.JoinHostPort(endpoints[i], constants.EtcdClientPort) + } + + return endpoints, nil +} diff --git a/internal/pkg/etcd/etcd.go b/internal/pkg/etcd/etcd.go index e207c3367..5c2461387 100644 --- a/internal/pkg/etcd/etcd.go +++ b/internal/pkg/etcd/etcd.go @@ -13,9 +13,7 @@ import ( "os" "time" - "github.com/cosi-project/runtime/pkg/resource" "github.com/cosi-project/runtime/pkg/state" - "github.com/talos-systems/net" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/transport" @@ -27,6 +25,7 @@ import ( "github.com/talos-systems/talos/pkg/machinery/config" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" + "github.com/talos-systems/talos/pkg/machinery/nethelpers" "github.com/talos-systems/talos/pkg/machinery/resources/k8s" ) @@ -42,9 +41,9 @@ type Client struct { // a list of endpoints. func NewClient(endpoints []string) (client *Client, err error) { tlsInfo := transport.TLSInfo{ - CertFile: constants.KubernetesEtcdAdminCert, - KeyFile: constants.KubernetesEtcdAdminKey, - TrustedCAFile: constants.KubernetesEtcdCACert, + CertFile: constants.EtcdAdminCert, + KeyFile: constants.EtcdAdminKey, + TrustedCAFile: constants.EtcdCACert, } tlsConfig, err := tlsInfo.ClientConfig() @@ -68,7 +67,7 @@ func NewClient(endpoints []string) (client *Client, err error) { // NewLocalClient initializes and returns etcd client configured to talk to localhost endpoint. func NewLocalClient() (client *Client, err error) { - return NewClient([]string{"127.0.0.1:2379"}) + return NewClient([]string{nethelpers.JoinHostPort("localhost", constants.EtcdClientPort)}) } // NewClientFromControlPlaneIPs initializes and returns an etcd client @@ -86,31 +85,9 @@ func NewClientFromControlPlaneIPsNoDiscovery(ctx context.Context, resources stat } func newClientFromControlPlaneIPs(ctx context.Context, resources state.State, ignoreEndpointID string) (client *Client, err error) { - endpointResources, err := resources.List(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.EndpointType, "", resource.VersionUndefined)) + endpoints, err := getEndpoints(ctx, resources, ignoreEndpointID) if err != nil { - return nil, fmt.Errorf("error getting endpoints resources: %w", err) - } - - var endpointAddrs k8s.EndpointList - - // merge all endpoints into a single list - for _, res := range endpointResources.Items { - if res.Metadata().ID() == ignoreEndpointID { - continue - } - - endpointAddrs = endpointAddrs.Merge(res.(*k8s.Endpoint)) - } - - if len(endpointAddrs) == 0 { - return nil, fmt.Errorf("no controlplane endpoints discovered yet") - } - - endpoints := endpointAddrs.Strings() - - // Etcd expects host:port format. - for i := 0; i < len(endpoints); i++ { - endpoints[i] = net.FormatAddress(endpoints[i]) + ":2379" + return nil, err } // Shuffle endpoints to establish random order on each call to avoid patterns based on sorted IP list. diff --git a/pkg/machinery/constants/constants.go b/pkg/machinery/constants/constants.go index e3c7593a6..e5005ec28 100644 --- a/pkg/machinery/constants/constants.go +++ b/pkg/machinery/constants/constants.go @@ -166,32 +166,35 @@ const ( // KubernetesCACert is the path to the root CA certificate. KubernetesCACert = DefaultCertificatesDir + "/" + "ca.crt" - // KubernetesEtcdCACert is the path to the etcd CA certificate. - KubernetesEtcdCACert = EtcdPKIPath + "/" + "ca.crt" + // EtcdCACert is the path to the etcd CA certificate. + EtcdCACert = EtcdPKIPath + "/" + "ca.crt" - // KubernetesEtcdCAKey is the path to the etcd CA private key. - KubernetesEtcdCAKey = EtcdPKIPath + "/" + "ca.key" + // EtcdCAKey is the path to the etcd CA private key. + EtcdCAKey = EtcdPKIPath + "/" + "ca.key" - // KubernetesEtcdCert is the path to the etcd server certificate. - KubernetesEtcdCert = EtcdPKIPath + "/" + "server.crt" + // EtcdCert is the path to the etcd server certificate. + EtcdCert = EtcdPKIPath + "/" + "server.crt" - // KubernetesEtcdKey is the path to the etcd server private key. - KubernetesEtcdKey = EtcdPKIPath + "/" + "server.key" + // EtcdKey is the path to the etcd server private key. + EtcdKey = EtcdPKIPath + "/" + "server.key" - // KubernetesEtcdPeerCert is the path to the etcd peer certificate. - KubernetesEtcdPeerCert = EtcdPKIPath + "/" + "peer.crt" + // EtcdPeerCert is the path to the etcd peer certificate. + EtcdPeerCert = EtcdPKIPath + "/" + "peer.crt" - // KubernetesEtcdPeerKey is the path to the etcd peer private key. - KubernetesEtcdPeerKey = EtcdPKIPath + "/" + "peer.key" + // EtcdPeerKey is the path to the etcd peer private key. + EtcdPeerKey = EtcdPKIPath + "/" + "peer.key" - // KubernetesEtcdAdminCert is the path to the talos client certificate. - KubernetesEtcdAdminCert = EtcdPKIPath + "/" + "admin.crt" + // EtcdAdminCert is the path to the talos client certificate. + EtcdAdminCert = EtcdPKIPath + "/" + "admin.crt" - // KubernetesEtcdAdminKey is the path to the talos client private key. - KubernetesEtcdAdminKey = EtcdPKIPath + "/" + "admin.key" + // EtcdAdminKey is the path to the talos client private key. + EtcdAdminKey = EtcdPKIPath + "/" + "admin.key" - // KubernetesEtcdListenClientPort defines the port etcd listen on for client traffic. - KubernetesEtcdListenClientPort = "2379" + // EtcdClientPort defines the port etcd listen on for client traffic. + EtcdClientPort = 2379 + + // EtcdPeerPort defines the port etcd listens on for peer traffic. + EtcdPeerPort = 2380 // KubernetesAdminCertCommonName defines CN property of Kubernetes admin certificate. KubernetesAdminCertCommonName = "admin"