From b6b78e7fef3f6ef0c566e1815d1e28f16f868c93 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Fri, 22 Oct 2021 12:14:25 +0300 Subject: [PATCH] test: add cluster discovery integration tests This verifies that members match cluster state and that both cluster registries work in sync producing same discovery data. Fixes #4191 Signed-off-by: Andrey Smirnov --- internal/integration/api/apply-config.go | 67 +--- internal/integration/api/discovery.go | 379 ++++++++++++++++++++++ internal/integration/base/api.go | 56 ++++ pkg/provision/providers/docker/reflect.go | 3 +- pkg/resources/kubespan/peer_state.go | 18 + 5 files changed, 461 insertions(+), 62 deletions(-) create mode 100644 internal/integration/api/discovery.go diff --git a/internal/integration/api/apply-config.go b/internal/integration/api/apply-config.go index bdbcd2fa0..2aa88b893 100644 --- a/internal/integration/api/apply-config.go +++ b/internal/integration/api/apply-config.go @@ -8,12 +8,8 @@ package api import ( - "bytes" "context" - "fmt" - "io" "sort" - "sync" "testing" "time" @@ -23,7 +19,6 @@ import ( machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" "github.com/talos-systems/talos/pkg/machinery/client" "github.com/talos-systems/talos/pkg/machinery/config" - "github.com/talos-systems/talos/pkg/machinery/config/configloader" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" @@ -95,7 +90,7 @@ func (suite *ApplyConfigSuite) TestApply() { nodeCtx := client.WithNodes(suite.ctx, node) - provider, err := suite.readConfigFromNode(nodeCtx) + provider, err := suite.ReadConfigFromNode(nodeCtx) suite.Assert().Nilf(err, "failed to read existing config from node %q: %w", node, err) cfg, ok := provider.(*v1alpha1.Config) @@ -126,7 +121,7 @@ func (suite *ApplyConfigSuite) TestApply() { var newProvider config.Provider suite.Require().Nilf(retry.Constant(time.Minute, retry.WithUnits(time.Second)).Retry(func() error { - newProvider, err = suite.readConfigFromNode(nodeCtx) + newProvider, err = suite.ReadConfigFromNode(nodeCtx) if err != nil { return retry.ExpectedError(err) } @@ -149,7 +144,7 @@ func (suite *ApplyConfigSuite) TestApplyOnReboot() { nodeCtx := client.WithNodes(suite.ctx, node) - provider, err := suite.readConfigFromNode(nodeCtx) + provider, err := suite.ReadConfigFromNode(nodeCtx) suite.Require().NoError(err, "failed to read existing config from node %q", node) cfg, ok := provider.(*v1alpha1.Config) @@ -173,7 +168,7 @@ func (suite *ApplyConfigSuite) TestApplyOnReboot() { // Verify configuration change var newProvider config.Provider - newProvider, err = suite.readConfigFromNode(nodeCtx) + newProvider, err = suite.ReadConfigFromNode(nodeCtx) suite.Require().NoError(err, "failed to read updated configuration from node %q: %w", node) @@ -208,7 +203,7 @@ func (suite *ApplyConfigSuite) TestApplyConfigRotateEncryptionSecrets() { suite.ClearConnectionRefused(suite.ctx, node) nodeCtx := client.WithNodes(suite.ctx, node) - provider, err := suite.readConfigFromNode(nodeCtx) + provider, err := suite.ReadConfigFromNode(nodeCtx) suite.Assert().NoError(err) @@ -294,7 +289,7 @@ func (suite *ApplyConfigSuite) TestApplyConfigRotateEncryptionSecrets() { var newProvider config.Provider suite.Require().Nilf(retry.Constant(time.Minute, retry.WithUnits(time.Second)).Retry(func() error { - newProvider, err = suite.readConfigFromNode(nodeCtx) + newProvider, err = suite.ReadConfigFromNode(nodeCtx) if err != nil { return retry.ExpectedError(err) } @@ -325,56 +320,6 @@ func (suite *ApplyConfigSuite) TestApplyConfigRotateEncryptionSecrets() { } } -func (suite *ApplyConfigSuite) readConfigFromNode(nodeCtx context.Context) (config.Provider, error) { - // Load the current node machine config - cfgData := new(bytes.Buffer) - - reader, errCh, err := suite.Client.Read(nodeCtx, constants.ConfigPath) - if err != nil { - return nil, fmt.Errorf("error creating reader: %w", err) - } - defer reader.Close() //nolint:errcheck - - if err = copyFromReaderWithErrChan(cfgData, reader, errCh); err != nil { - return nil, fmt.Errorf("error reading: %w", err) - } - - provider, err := configloader.NewFromBytes(cfgData.Bytes()) - if err != nil { - return nil, fmt.Errorf("failed to parse: %w", err) - } - - return provider, nil -} - -func copyFromReaderWithErrChan(out io.Writer, in io.Reader, errCh <-chan error) (err error) { - var wg sync.WaitGroup - - var chanErr error - - wg.Add(1) - - go func() { - defer wg.Done() - - // StreamReader is only singly-buffered, so we need to process any errors as we get them. - for chanErr = range errCh { - } - }() - - defer func() { - wg.Wait() - - if err == nil { - err = chanErr - } - }() - - _, err = io.Copy(out, in) - - return err -} - func init() { allSuites = append(allSuites, new(ApplyConfigSuite)) } diff --git a/internal/integration/api/discovery.go b/internal/integration/api/discovery.go new file mode 100644 index 000000000..de85bc445 --- /dev/null +++ b/internal/integration/api/discovery.go @@ -0,0 +1,379 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +//go:build integration_api +// +build integration_api + +package api + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/cosi-project/runtime/pkg/resource" + "gopkg.in/yaml.v3" + "inet.af/netaddr" + + "github.com/talos-systems/talos/internal/integration/base" + "github.com/talos-systems/talos/pkg/machinery/client" + "github.com/talos-systems/talos/pkg/resources/cluster" + "github.com/talos-systems/talos/pkg/resources/kubespan" +) + +// DiscoverySuite verifies Discovery API. +type DiscoverySuite struct { + base.APISuite + + ctx context.Context + ctxCancel context.CancelFunc +} + +// SuiteName ... +func (suite *DiscoverySuite) SuiteName() string { + return "api.DiscoverySuite" +} + +// SetupTest ... +func (suite *DiscoverySuite) SetupTest() { + // make sure API calls have timeout + suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 15*time.Second) + + // check that cluster has discovery enabled + node := suite.RandomDiscoveredNode() + suite.ClearConnectionRefused(suite.ctx, node) + + nodeCtx := client.WithNodes(suite.ctx, node) + provider, err := suite.ReadConfigFromNode(nodeCtx) + suite.Require().NoError(err) + + if !provider.Cluster().Discovery().Enabled() { + suite.T().Skip("cluster discovery is disabled") + } +} + +// TearDownTest ... +func (suite *DiscoverySuite) TearDownTest() { + if suite.ctxCancel != nil { + suite.ctxCancel() + } +} + +// TestMembers checks that `talosctl get members` matches expected cluster discovery. +// +//nolint:gocyclo +func (suite *DiscoverySuite) TestMembers() { + nodes := suite.DiscoverNodes() + expectedTalosVersion := fmt.Sprintf("Talos (%s)", suite.Version) + + for _, node := range nodes.Nodes() { + nodeCtx := client.WithNodes(suite.ctx, node) + + members := suite.getMembers(nodeCtx) + + suite.Assert().Len(members, len(nodes.Nodes())) + + // do basic check against discovered nodes + for _, expectedNode := range nodes.Nodes() { + addr, err := netaddr.ParseIP(expectedNode) + suite.Require().NoError(err) + + found := false + + for _, member := range members { + for _, memberAddr := range member.TypedSpec().Addresses { + if memberAddr.Compare(addr) == 0 { + found = true + + break + } + } + + if found { + break + } + } + + suite.Assert().True(found, "addr %s", addr) + } + + // if cluster informantion is available, perform additional checks + if suite.Cluster == nil { + continue + } + + memberByID := make(map[string]*cluster.Member) + + for _, member := range members { + memberByID[member.Metadata().ID()] = member + } + + nodesInfo := suite.Cluster.Info().Nodes + + for _, nodeInfo := range nodesInfo { + matchingMember := memberByID[nodeInfo.Name] + suite.Require().NotNil(matchingMember) + + suite.Assert().Equal(nodeInfo.Type, matchingMember.TypedSpec().MachineType) + suite.Assert().Equal(expectedTalosVersion, matchingMember.TypedSpec().OperatingSystem) + suite.Assert().Equal(nodeInfo.Name, matchingMember.TypedSpec().Hostname) + + for _, nodeIPStd := range nodeInfo.IPs { + nodeIP, ok := netaddr.FromStdIP(nodeIPStd) + suite.Assert().True(ok) + + found := false + + for _, memberAddr := range matchingMember.TypedSpec().Addresses { + if memberAddr.Compare(nodeIP) == 0 { + found = true + + break + } + } + + suite.Assert().True(found, "addr %s", nodeIP) + } + } + } +} + +// TestRegistries checks that all registries produce same raw Affiliate data. +func (suite *DiscoverySuite) TestRegistries() { + registries := []string{"k8s/", "service/"} + + nodes := suite.DiscoverNodes() + + for _, node := range nodes.Nodes() { + nodeCtx := client.WithNodes(suite.ctx, node) + + members := suite.getMembers(nodeCtx) + localIdentity := suite.getNodeIdentity(nodeCtx) + rawAffiliates := suite.getAffiliates(nodeCtx, cluster.RawNamespaceName) + + // raw affiliates don't contain the local node + suite.Assert().Len(rawAffiliates, len(registries)*(len(members)-1)) + + rawAffiliatesByID := make(map[string]*cluster.Affiliate) + + for _, rawAffiliate := range rawAffiliates { + rawAffiliatesByID[rawAffiliate.Metadata().ID()] = rawAffiliate + } + + // every member except for local identity member should be discovered via each registry + for _, member := range members { + if member.TypedSpec().NodeID == localIdentity.TypedSpec().NodeID { + continue + } + + for _, registry := range registries { + rawAffiliate := rawAffiliatesByID[registry+member.TypedSpec().NodeID] + suite.Require().NotNil(rawAffiliate) + + suite.Assert().Equal(member.TypedSpec().Hostname, rawAffiliate.TypedSpec().Hostname) + suite.Assert().Equal(member.TypedSpec().Addresses, rawAffiliate.TypedSpec().Addresses) + suite.Assert().Equal(member.TypedSpec().OperatingSystem, rawAffiliate.TypedSpec().OperatingSystem) + suite.Assert().Equal(member.TypedSpec().MachineType, rawAffiliate.TypedSpec().MachineType) + } + } + } +} + +// TestKubeSpanPeers verifies that KubeSpan peer specs are populated, and that peer statuses are available. +func (suite *DiscoverySuite) TestKubeSpanPeers() { + if !suite.Capabilities().RunsTalosKernel { + suite.T().Skip("not running Talos kernel") + } + + // check that cluster has KubeSpan enabled + node := suite.RandomDiscoveredNode() + suite.ClearConnectionRefused(suite.ctx, node) + + nodeCtx := client.WithNodes(suite.ctx, node) + provider, err := suite.ReadConfigFromNode(nodeCtx) + suite.Require().NoError(err) + + if !provider.Machine().Network().KubeSpan().Enabled() { + suite.T().Skip("KubeSpan is disabled") + } + + nodes := suite.DiscoverNodes().Nodes() + + for _, node := range nodes { + nodeCtx := client.WithNodes(suite.ctx, node) + + peerSpecs := suite.getKubeSpanPeerSpecs(nodeCtx) + suite.Assert().Len(peerSpecs, len(nodes)-1) + + peerStatuses := suite.getKubeSpanPeerStatuses(nodeCtx) + suite.Assert().Len(peerStatuses, len(nodes)-1) + + for _, status := range peerStatuses { + suite.Assert().Equal(kubespan.PeerStateUp, status.TypedSpec().State) + suite.Assert().False(status.TypedSpec().Endpoint.IsZero()) + suite.Assert().Greater(status.TypedSpec().ReceiveBytes, int64(0)) + suite.Assert().Greater(status.TypedSpec().TransmitBytes, int64(0)) + } + } +} + +//nolint:dupl +func (suite *DiscoverySuite) getMembers(nodeCtx context.Context) []*cluster.Member { + var result []*cluster.Member + + memberList, err := suite.Client.Resources.List(nodeCtx, cluster.NamespaceName, cluster.MemberType) + suite.Require().NoError(err) + + for { + resp, err := memberList.Recv() + if err == io.EOF { + break + } + + suite.Require().NoError(err) + + if resp.Resource == nil { + continue + } + + // TODO: this is hackery to decode back into Member resource + // this should be fixed once we introduce protobuf properly + b, err := yaml.Marshal(resp.Resource.Spec()) + suite.Require().NoError(err) + + member := cluster.NewMember(resp.Resource.Metadata().Namespace(), resp.Resource.Metadata().ID()) + + suite.Require().NoError(yaml.Unmarshal(b, member.TypedSpec())) + + result = append(result, member) + } + + return result +} + +func (suite *DiscoverySuite) getNodeIdentity(nodeCtx context.Context) *cluster.Identity { + list, err := suite.Client.Resources.Get(nodeCtx, cluster.NamespaceName, cluster.IdentityType, cluster.LocalIdentity) + suite.Require().NoError(err) + suite.Require().Len(list, 1) + + resp := list[0] + + // TODO: this is hackery to decode back into Member resource + // this should be fixed once we introduce protobuf properly + b, err := yaml.Marshal(resp.Resource.Spec()) + suite.Require().NoError(err) + + identity := cluster.NewIdentity(resp.Resource.Metadata().Namespace(), resp.Resource.Metadata().ID()) + + suite.Require().NoError(yaml.Unmarshal(b, identity.TypedSpec())) + + return identity +} + +//nolint:dupl +func (suite *DiscoverySuite) getAffiliates(nodeCtx context.Context, namespace resource.Namespace) []*cluster.Affiliate { + var result []*cluster.Affiliate + + affiliateList, err := suite.Client.Resources.List(nodeCtx, namespace, cluster.AffiliateType) + suite.Require().NoError(err) + + for { + resp, err := affiliateList.Recv() + if err == io.EOF { + break + } + + suite.Require().NoError(err) + + if resp.Resource == nil { + continue + } + + // TODO: this is hackery to decode back into Affiliate resource + // this should be fixed once we introduce protobuf properly + b, err := yaml.Marshal(resp.Resource.Spec()) + suite.Require().NoError(err) + + affiliate := cluster.NewAffiliate(resp.Resource.Metadata().Namespace(), resp.Resource.Metadata().ID()) + + suite.Require().NoError(yaml.Unmarshal(b, affiliate.TypedSpec())) + + result = append(result, affiliate) + } + + return result +} + +//nolint:dupl +func (suite *DiscoverySuite) getKubeSpanPeerSpecs(nodeCtx context.Context) []*kubespan.PeerSpec { + var result []*kubespan.PeerSpec + + peerList, err := suite.Client.Resources.List(nodeCtx, kubespan.NamespaceName, kubespan.PeerSpecType) + suite.Require().NoError(err) + + for { + resp, err := peerList.Recv() + if err == io.EOF { + break + } + + suite.Require().NoError(err) + + if resp.Resource == nil { + continue + } + + // TODO: this is hackery to decode back into KubeSpanPeerSpec resource + // this should be fixed once we introduce protobuf properly + b, err := yaml.Marshal(resp.Resource.Spec()) + suite.Require().NoError(err) + + peerSpec := kubespan.NewPeerSpec(resp.Resource.Metadata().Namespace(), resp.Resource.Metadata().ID()) + + suite.Require().NoError(yaml.Unmarshal(b, peerSpec.TypedSpec())) + + result = append(result, peerSpec) + } + + return result +} + +//nolint:dupl +func (suite *DiscoverySuite) getKubeSpanPeerStatuses(nodeCtx context.Context) []*kubespan.PeerStatus { + var result []*kubespan.PeerStatus + + peerList, err := suite.Client.Resources.List(nodeCtx, kubespan.NamespaceName, kubespan.PeerStatusType) + suite.Require().NoError(err) + + for { + resp, err := peerList.Recv() + if err == io.EOF { + break + } + + suite.Require().NoError(err) + + if resp.Resource == nil { + continue + } + + // TODO: this is hackery to decode back into KubeSpanPeerStatus resource + // this should be fixed once we introduce protobuf properly + b, err := yaml.Marshal(resp.Resource.Spec()) + suite.Require().NoError(err) + + peerStatus := kubespan.NewPeerStatus(resp.Resource.Metadata().Namespace(), resp.Resource.Metadata().ID()) + + suite.Require().NoError(yaml.Unmarshal(b, peerStatus.TypedSpec())) + + result = append(result, peerStatus) + } + + return result +} + +func init() { + allSuites = append(allSuites, new(DiscoverySuite)) +} diff --git a/internal/integration/base/api.go b/internal/integration/base/api.go index 3ddec920b..cd42e067f 100644 --- a/internal/integration/base/api.go +++ b/internal/integration/base/api.go @@ -8,6 +8,7 @@ package base import ( + "bytes" "context" "crypto/sha256" "encoding/hex" @@ -16,6 +17,7 @@ import ( "io/ioutil" "math/rand" "strings" + "sync" "time" "github.com/stretchr/testify/suite" @@ -28,7 +30,10 @@ import ( machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" "github.com/talos-systems/talos/pkg/machinery/client" clientconfig "github.com/talos-systems/talos/pkg/machinery/client/config" + "github.com/talos-systems/talos/pkg/machinery/config" + "github.com/talos-systems/talos/pkg/machinery/config/configloader" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" + "github.com/talos-systems/talos/pkg/machinery/constants" "github.com/talos-systems/talos/pkg/provision" "github.com/talos-systems/talos/pkg/provision/access" ) @@ -323,6 +328,57 @@ func (apiSuite *APISuite) HashKubeletCert(ctx context.Context, node string) (str return hex.EncodeToString(hash.Sum(nil)), reader.Close() } +// ReadConfigFromNode reads machine configuration from the node. +func (apiSuite *APISuite) ReadConfigFromNode(nodeCtx context.Context) (config.Provider, error) { + // Load the current node machine config + cfgData := new(bytes.Buffer) + + reader, errCh, err := apiSuite.Client.Read(nodeCtx, constants.ConfigPath) + if err != nil { + return nil, fmt.Errorf("error creating reader: %w", err) + } + defer reader.Close() //nolint:errcheck + + if err = copyFromReaderWithErrChan(cfgData, reader, errCh); err != nil { + return nil, fmt.Errorf("error reading: %w", err) + } + + provider, err := configloader.NewFromBytes(cfgData.Bytes()) + if err != nil { + return nil, fmt.Errorf("failed to parse: %w", err) + } + + return provider, nil +} + +func copyFromReaderWithErrChan(out io.Writer, in io.Reader, errCh <-chan error) (err error) { + var wg sync.WaitGroup + + var chanErr error + + wg.Add(1) + + go func() { + defer wg.Done() + + // StreamReader is only singly-buffered, so we need to process any errors as we get them. + for chanErr = range errCh { + } + }() + + defer func() { + wg.Wait() + + if err == nil { + err = chanErr + } + }() + + _, err = io.Copy(out, in) + + return err +} + // TearDownSuite closes Talos API client. func (apiSuite *APISuite) TearDownSuite() { if apiSuite.Client != nil { diff --git a/pkg/provision/providers/docker/reflect.go b/pkg/provision/providers/docker/reflect.go index 0ab1bf141..7b2a8b744 100644 --- a/pkg/provision/providers/docker/reflect.go +++ b/pkg/provision/providers/docker/reflect.go @@ -8,6 +8,7 @@ import ( "context" "net" "strconv" + "strings" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/provision" @@ -63,7 +64,7 @@ func (p *provisioner) Reflect(ctx context.Context, clusterName, stateDirectory s res.clusterInfo.Nodes = append(res.clusterInfo.Nodes, provision.NodeInfo{ ID: node.ID, - Name: node.Names[0], + Name: strings.TrimLeft(node.Names[0], "/"), Type: t, IPs: []net.IP{net.ParseIP(node.NetworkSettings.Networks[res.clusterInfo.Network.Name].IPAddress)}, diff --git a/pkg/resources/kubespan/peer_state.go b/pkg/resources/kubespan/peer_state.go index fa99d66c6..1342871e4 100644 --- a/pkg/resources/kubespan/peer_state.go +++ b/pkg/resources/kubespan/peer_state.go @@ -4,6 +4,8 @@ package kubespan +import "fmt" + //go:generate stringer -type=PeerState -linecomment // PeerState is KubeSpan peer current state. @@ -14,6 +16,22 @@ func (v PeerState) MarshalText() ([]byte, error) { return []byte(v.String()), nil } +// UnmarshalText implements encoding.TextUnmarshaler. +func (v *PeerState) UnmarshalText(b []byte) error { + switch string(b) { + case "unknown": + *v = PeerStateUnknown + case "up": + *v = PeerStateUp + case "down": + *v = PeerStateDown + default: + return fmt.Errorf("unsupported value for PeerState: %q", string(b)) + } + + return nil +} + // PeerState constants. const ( PeerStateUnknown PeerState = iota // unknown