talos/internal/integration/api/discovery.go
Dmitry Sharshakov 653f838b09
feat: support multiple Docker cluster in talosctl cluster create
Dynamically map Kubernetes and Talos API ports to an available port on
the host, so every cluster gets its own unique set of parts.

As part of the changes, refactor the provision library and interfaces,
dropping old weird interfaces replacing with (hopefully) much more
descriprive names.

Signed-off-by: Dmitry Sharshakov <dmitry.sharshakov@siderolabs.com>
Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
2024-04-04 21:21:39 +04:00

316 lines
9.0 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//go:build integration_api
package api
import (
"context"
"fmt"
"net/netip"
"strings"
"time"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/siderolabs/gen/maps"
"github.com/siderolabs/gen/value"
"github.com/siderolabs/gen/xslices"
"github.com/stretchr/testify/assert"
"github.com/siderolabs/talos/internal/integration/base"
"github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
"github.com/siderolabs/talos/pkg/machinery/resources/kubespan"
)
// DiscoverySuite verifies Discovery API.
type DiscoverySuite struct {
base.APISuite
ctx context.Context //nolint:containedctx
ctxCancel context.CancelFunc
}
// SuiteName ...
func (suite *DiscoverySuite) SuiteName() string {
return "api.DiscoverySuite"
}
// SetupTest ...
func (suite *DiscoverySuite) SetupTest() {
// make sure API calls have timeout
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 15*time.Second)
// check that cluster has discovery enabled
node := suite.RandomDiscoveredNodeInternalIP()
suite.ClearConnectionRefused(suite.ctx, node)
nodeCtx := client.WithNode(suite.ctx, node)
provider, err := suite.ReadConfigFromNode(nodeCtx)
suite.Require().NoError(err)
if !provider.Cluster().Discovery().Enabled() {
suite.T().Skip("cluster discovery is disabled")
}
}
// TearDownTest ...
func (suite *DiscoverySuite) TearDownTest() {
if suite.ctxCancel != nil {
suite.ctxCancel()
}
}
// TestMembers checks that `talosctl get members` matches expected cluster discovery.
//
//nolint:gocyclo
func (suite *DiscoverySuite) TestMembers() {
nodes := suite.DiscoverNodes(suite.ctx).Nodes()
expectedTalosVersion := fmt.Sprintf("Talos (%s)", suite.Version)
for _, node := range nodes {
nodeCtx := client.WithNode(suite.ctx, node.InternalIP.String())
members := suite.getMembers(nodeCtx)
suite.Assert().Len(members, len(nodes))
// do basic check against discovered nodes
for _, expectedNode := range nodes {
nodeAddresses := xslices.Map(expectedNode.IPs, func(t netip.Addr) string {
return t.String()
})
found := false
for _, member := range members {
memberAddresses := xslices.Map(member.TypedSpec().Addresses, func(t netip.Addr) string {
return t.String()
})
if maps.Contains(xslices.ToSet(memberAddresses), nodeAddresses) {
found = true
break
}
if found {
break
}
}
suite.Assert().True(found, "addr %q", nodeAddresses)
}
// if cluster information is available, perform additional checks
if suite.Cluster == nil {
continue
}
memberByName := xslices.ToMap(members,
func(member *cluster.Member) (string, *cluster.Member) {
return member.Metadata().ID(), member
},
)
memberByIP := make(map[netip.Addr]*cluster.Member)
for _, member := range members {
for _, addr := range member.TypedSpec().Addresses {
memberByIP[addr] = member
}
}
nodesInfo := suite.Cluster.Info().Nodes
for _, nodeInfo := range nodesInfo {
matchingMember := memberByName[nodeInfo.Name]
var matchingMemberByIP *cluster.Member
for _, nodeIP := range nodeInfo.IPs {
matchingMemberByIP = memberByIP[nodeIP]
break
}
// if hostnames are not set via DHCP, use match by IP
if matchingMember == nil {
matchingMember = matchingMemberByIP
}
suite.Require().NotNil(matchingMember)
suite.Assert().Equal(nodeInfo.Type, matchingMember.TypedSpec().MachineType)
suite.Assert().Equal(expectedTalosVersion, matchingMember.TypedSpec().OperatingSystem)
for _, nodeIP := range nodeInfo.IPs {
found := false
for _, memberAddr := range matchingMember.TypedSpec().Addresses {
if memberAddr.Compare(nodeIP) == 0 {
found = true
break
}
}
suite.Assert().True(found, "addr %s", nodeIP)
}
}
}
}
// TestRegistries checks that all registries produce same raw Affiliate data.
func (suite *DiscoverySuite) TestRegistries() {
node := suite.RandomDiscoveredNodeInternalIP()
suite.ClearConnectionRefused(suite.ctx, node)
nodeCtx := client.WithNode(suite.ctx, node)
provider, err := suite.ReadConfigFromNode(nodeCtx)
suite.Require().NoError(err)
var registries []string
if provider.Cluster().Discovery().Registries().Kubernetes().Enabled() {
registries = append(registries, "k8s/")
}
if provider.Cluster().Discovery().Registries().Service().Enabled() {
registries = append(registries, "service/")
}
nodes := suite.DiscoverNodeInternalIPs(suite.ctx)
for _, node := range nodes {
nodeCtx := client.WithNode(suite.ctx, node)
members := suite.getMembers(nodeCtx)
localIdentity := suite.getNodeIdentity(nodeCtx)
// raw affiliates don't contain the local node
expectedRawAffiliates := len(registries) * (len(members) - 1)
var rawAffiliates []*cluster.Affiliate
for range 30 {
rawAffiliates = suite.getAffiliates(nodeCtx, cluster.RawNamespaceName)
if len(rawAffiliates) == expectedRawAffiliates {
break
}
suite.T().Logf("waiting for cluster affiliates to be discovered: %d expected, %d found", expectedRawAffiliates, len(rawAffiliates))
time.Sleep(2 * time.Second)
}
suite.Assert().Len(rawAffiliates, expectedRawAffiliates)
rawAffiliatesByID := make(map[string]*cluster.Affiliate)
for _, rawAffiliate := range rawAffiliates {
rawAffiliatesByID[rawAffiliate.Metadata().ID()] = rawAffiliate
}
// every member except for local identity member should be discovered via each registry
for _, member := range members {
if member.TypedSpec().NodeID == localIdentity.TypedSpec().NodeID {
continue
}
for _, registry := range registries {
rawAffiliate := rawAffiliatesByID[registry+member.TypedSpec().NodeID]
suite.Require().NotNil(rawAffiliate)
stripDomain := func(s string) string { return strings.Split(s, ".")[0] }
// registries can be a bit inconsistent, e.g. whether they report fqdn or just hostname
suite.Assert().Contains([]string{member.TypedSpec().Hostname, stripDomain(member.TypedSpec().Hostname)}, rawAffiliate.TypedSpec().Hostname)
suite.Assert().Equal(member.TypedSpec().Addresses, rawAffiliate.TypedSpec().Addresses)
suite.Assert().Equal(member.TypedSpec().OperatingSystem, rawAffiliate.TypedSpec().OperatingSystem)
suite.Assert().Equal(member.TypedSpec().MachineType, rawAffiliate.TypedSpec().MachineType)
}
}
}
}
// TestKubeSpanPeers verifies that KubeSpan peer specs are populated, and that peer statuses are available.
func (suite *DiscoverySuite) TestKubeSpanPeers() {
if !suite.Capabilities().RunsTalosKernel {
suite.T().Skip("not running Talos kernel")
}
// check that cluster has KubeSpan enabled
node := suite.RandomDiscoveredNodeInternalIP()
suite.ClearConnectionRefused(suite.ctx, node)
nodeCtx := client.WithNode(suite.ctx, node)
provider, err := suite.ReadConfigFromNode(nodeCtx)
suite.Require().NoError(err)
if !provider.Machine().Network().KubeSpan().Enabled() {
suite.T().Skip("KubeSpan is disabled")
}
nodes := suite.DiscoverNodeInternalIPs(suite.ctx)
for _, node := range nodes {
nodeCtx := client.WithNode(suite.ctx, node)
rtestutils.AssertLength[*kubespan.PeerSpec](nodeCtx, suite.T(), suite.Client.COSI, len(nodes)-1)
rtestutils.AssertLength[*kubespan.PeerStatus](nodeCtx, suite.T(), suite.Client.COSI, len(nodes)-1)
rtestutils.AssertAll[*kubespan.PeerStatus](nodeCtx, suite.T(), suite.Client.COSI,
func(status *kubespan.PeerStatus, asrt *assert.Assertions) {
asrt.Equal(kubespan.PeerStateUp, status.TypedSpec().State)
asrt.False(value.IsZero(status.TypedSpec().Endpoint))
asrt.Greater(status.TypedSpec().ReceiveBytes, int64(0))
asrt.Greater(status.TypedSpec().TransmitBytes, int64(0))
})
}
}
func (suite *DiscoverySuite) getMembers(nodeCtx context.Context) []*cluster.Member {
var result []*cluster.Member
items, err := safe.StateListAll[*cluster.Member](nodeCtx, suite.Client.COSI)
suite.Require().NoError(err)
it := items.Iterator()
for it.Next() {
result = append(result, it.Value())
}
return result
}
func (suite *DiscoverySuite) getNodeIdentity(nodeCtx context.Context) *cluster.Identity {
identity, err := safe.StateGet[*cluster.Identity](nodeCtx, suite.Client.COSI, resource.NewMetadata(cluster.NamespaceName, cluster.IdentityType, cluster.LocalIdentity, resource.VersionUndefined))
suite.Require().NoError(err)
return identity
}
func (suite *DiscoverySuite) getAffiliates(nodeCtx context.Context, namespace resource.Namespace) []*cluster.Affiliate {
var result []*cluster.Affiliate
items, err := safe.StateList[*cluster.Affiliate](nodeCtx, suite.Client.COSI, resource.NewMetadata(namespace, cluster.AffiliateType, "", resource.VersionUndefined))
suite.Require().NoError(err)
items.ForEach(func(item *cluster.Affiliate) { result = append(result, item) })
return result
}
func init() {
allSuites = append(allSuites, new(DiscoverySuite))
}