mirror of
https://github.com/siderolabs/talos.git
synced 2025-08-07 23:27:07 +02:00
Bring in new tools, pkgs, update Go dependencies and others. In preparation for Talos 1.9.0-alpha.0. Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
363 lines
11 KiB
Go
363 lines
11 KiB
Go
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
//go:build integration_api
|
|
|
|
package api
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand/v2"
|
|
"net/netip"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/cosi-project/runtime/pkg/resource"
|
|
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
|
|
"github.com/cosi-project/runtime/pkg/safe"
|
|
"github.com/siderolabs/gen/maps"
|
|
"github.com/siderolabs/gen/value"
|
|
"github.com/siderolabs/gen/xslices"
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/siderolabs/talos/internal/integration/base"
|
|
"github.com/siderolabs/talos/pkg/machinery/client"
|
|
"github.com/siderolabs/talos/pkg/machinery/config/types/network"
|
|
"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
|
|
"github.com/siderolabs/talos/pkg/machinery/resources/kubespan"
|
|
)
|
|
|
|
// DiscoverySuite verifies Discovery API.
|
|
type DiscoverySuite struct {
|
|
base.APISuite
|
|
|
|
ctx context.Context //nolint:containedctx
|
|
ctxCancel context.CancelFunc
|
|
}
|
|
|
|
// SuiteName ...
|
|
func (suite *DiscoverySuite) SuiteName() string {
|
|
return "api.DiscoverySuite"
|
|
}
|
|
|
|
// SetupTest ...
|
|
func (suite *DiscoverySuite) SetupTest() {
|
|
// make sure API calls have timeout
|
|
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 15*time.Second)
|
|
|
|
// check that cluster has discovery enabled
|
|
node := suite.RandomDiscoveredNodeInternalIP()
|
|
suite.ClearConnectionRefused(suite.ctx, node)
|
|
|
|
nodeCtx := client.WithNode(suite.ctx, node)
|
|
provider, err := suite.ReadConfigFromNode(nodeCtx)
|
|
suite.Require().NoError(err)
|
|
|
|
if !provider.Cluster().Discovery().Enabled() {
|
|
suite.T().Skip("cluster discovery is disabled")
|
|
}
|
|
}
|
|
|
|
// TearDownTest ...
|
|
func (suite *DiscoverySuite) TearDownTest() {
|
|
if suite.ctxCancel != nil {
|
|
suite.ctxCancel()
|
|
}
|
|
}
|
|
|
|
// TestMembers checks that `talosctl get members` matches expected cluster discovery.
|
|
//
|
|
//nolint:gocyclo
|
|
func (suite *DiscoverySuite) TestMembers() {
|
|
nodes := suite.DiscoverNodes(suite.ctx).Nodes()
|
|
|
|
expectedTalosVersion := fmt.Sprintf("Talos (%s)", suite.Version)
|
|
|
|
for _, node := range nodes {
|
|
nodeCtx := client.WithNode(suite.ctx, node.InternalIP.String())
|
|
|
|
members := suite.getMembers(nodeCtx)
|
|
|
|
suite.Assert().Len(members, len(nodes))
|
|
|
|
// do basic check against discovered nodes
|
|
for _, expectedNode := range nodes {
|
|
nodeAddresses := xslices.Map(expectedNode.IPs, func(t netip.Addr) string {
|
|
return t.String()
|
|
})
|
|
|
|
found := false
|
|
|
|
for _, member := range members {
|
|
memberAddresses := xslices.Map(member.TypedSpec().Addresses, func(t netip.Addr) string {
|
|
return t.String()
|
|
})
|
|
|
|
if maps.Contains(xslices.ToSet(memberAddresses), nodeAddresses) {
|
|
found = true
|
|
|
|
break
|
|
}
|
|
|
|
if found {
|
|
break
|
|
}
|
|
}
|
|
|
|
suite.Assert().True(found, "addr %q", nodeAddresses)
|
|
}
|
|
|
|
// if cluster information is available, perform additional checks
|
|
if suite.Cluster == nil {
|
|
continue
|
|
}
|
|
|
|
memberByName := xslices.ToMap(members,
|
|
func(member *cluster.Member) (string, *cluster.Member) {
|
|
return member.Metadata().ID(), member
|
|
},
|
|
)
|
|
|
|
memberByIP := make(map[netip.Addr]*cluster.Member)
|
|
|
|
for _, member := range members {
|
|
for _, addr := range member.TypedSpec().Addresses {
|
|
memberByIP[addr] = member
|
|
}
|
|
}
|
|
|
|
nodesInfo := suite.Cluster.Info().Nodes
|
|
|
|
for _, nodeInfo := range nodesInfo {
|
|
matchingMember := memberByName[nodeInfo.Name]
|
|
|
|
var matchingMemberByIP *cluster.Member
|
|
|
|
for _, nodeIP := range nodeInfo.IPs {
|
|
matchingMemberByIP = memberByIP[nodeIP]
|
|
|
|
break
|
|
}
|
|
|
|
// if hostnames are not set via DHCP, use match by IP
|
|
if matchingMember == nil {
|
|
matchingMember = matchingMemberByIP
|
|
}
|
|
|
|
suite.Require().NotNil(matchingMember)
|
|
|
|
suite.Assert().Equal(nodeInfo.Type, matchingMember.TypedSpec().MachineType)
|
|
suite.Assert().Equal(expectedTalosVersion, matchingMember.TypedSpec().OperatingSystem)
|
|
|
|
for _, nodeIP := range nodeInfo.IPs {
|
|
found := false
|
|
|
|
for _, memberAddr := range matchingMember.TypedSpec().Addresses {
|
|
if memberAddr.Compare(nodeIP) == 0 {
|
|
found = true
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
suite.Assert().True(found, "addr %s", nodeIP)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestRegistries checks that all registries produce same raw Affiliate data.
|
|
func (suite *DiscoverySuite) TestRegistries() {
|
|
node := suite.RandomDiscoveredNodeInternalIP()
|
|
suite.ClearConnectionRefused(suite.ctx, node)
|
|
|
|
nodeCtx := client.WithNode(suite.ctx, node)
|
|
provider, err := suite.ReadConfigFromNode(nodeCtx)
|
|
suite.Require().NoError(err)
|
|
|
|
var registries []string
|
|
|
|
if provider.Cluster().Discovery().Registries().Kubernetes().Enabled() {
|
|
registries = append(registries, "k8s/")
|
|
}
|
|
|
|
if provider.Cluster().Discovery().Registries().Service().Enabled() {
|
|
registries = append(registries, "service/")
|
|
}
|
|
|
|
nodes := suite.DiscoverNodeInternalIPs(suite.ctx)
|
|
|
|
for _, node := range nodes {
|
|
nodeCtx := client.WithNode(suite.ctx, node)
|
|
|
|
members := suite.getMembers(nodeCtx)
|
|
localIdentity := suite.getNodeIdentity(nodeCtx)
|
|
|
|
// raw affiliates don't contain the local node
|
|
expectedRawAffiliates := len(registries) * (len(members) - 1)
|
|
|
|
var rawAffiliates []*cluster.Affiliate
|
|
|
|
for range 30 {
|
|
rawAffiliates = suite.getAffiliates(nodeCtx, cluster.RawNamespaceName)
|
|
|
|
if len(rawAffiliates) == expectedRawAffiliates {
|
|
break
|
|
}
|
|
|
|
suite.T().Logf("waiting for cluster affiliates to be discovered: %d expected, %d found", expectedRawAffiliates, len(rawAffiliates))
|
|
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
|
|
suite.Assert().Len(rawAffiliates, expectedRawAffiliates)
|
|
|
|
rawAffiliatesByID := make(map[string]*cluster.Affiliate)
|
|
|
|
for _, rawAffiliate := range rawAffiliates {
|
|
rawAffiliatesByID[rawAffiliate.Metadata().ID()] = rawAffiliate
|
|
}
|
|
|
|
// every member except for local identity member should be discovered via each registry
|
|
for _, member := range members {
|
|
if member.TypedSpec().NodeID == localIdentity.TypedSpec().NodeID {
|
|
continue
|
|
}
|
|
|
|
for _, registry := range registries {
|
|
rawAffiliate := rawAffiliatesByID[registry+member.TypedSpec().NodeID]
|
|
suite.Require().NotNil(rawAffiliate)
|
|
|
|
stripDomain := func(s string) string { return strings.Split(s, ".")[0] }
|
|
|
|
// registries can be a bit inconsistent, e.g. whether they report fqdn or just hostname
|
|
suite.Assert().Contains([]string{member.TypedSpec().Hostname, stripDomain(member.TypedSpec().Hostname)}, rawAffiliate.TypedSpec().Hostname)
|
|
|
|
suite.Assert().Equal(member.TypedSpec().Addresses, rawAffiliate.TypedSpec().Addresses)
|
|
suite.Assert().Equal(member.TypedSpec().OperatingSystem, rawAffiliate.TypedSpec().OperatingSystem)
|
|
suite.Assert().Equal(member.TypedSpec().MachineType, rawAffiliate.TypedSpec().MachineType)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestKubeSpanPeers verifies that KubeSpan peer specs are populated, and that peer statuses are available.
|
|
func (suite *DiscoverySuite) TestKubeSpanPeers() {
|
|
if !suite.Capabilities().RunsTalosKernel {
|
|
suite.T().Skip("not running Talos kernel")
|
|
}
|
|
|
|
// check that cluster has KubeSpan enabled
|
|
node := suite.RandomDiscoveredNodeInternalIP()
|
|
suite.ClearConnectionRefused(suite.ctx, node)
|
|
|
|
nodeCtx := client.WithNode(suite.ctx, node)
|
|
provider, err := suite.ReadConfigFromNode(nodeCtx)
|
|
suite.Require().NoError(err)
|
|
|
|
if !provider.Machine().Network().KubeSpan().Enabled() {
|
|
suite.T().Skip("KubeSpan is disabled")
|
|
}
|
|
|
|
nodes := suite.DiscoverNodeInternalIPs(suite.ctx)
|
|
|
|
for _, node := range nodes {
|
|
nodeCtx := client.WithNode(suite.ctx, node)
|
|
|
|
rtestutils.AssertLength[*kubespan.PeerSpec](nodeCtx, suite.T(), suite.Client.COSI, len(nodes)-1)
|
|
rtestutils.AssertLength[*kubespan.PeerStatus](nodeCtx, suite.T(), suite.Client.COSI, len(nodes)-1)
|
|
|
|
rtestutils.AssertAll[*kubespan.PeerStatus](nodeCtx, suite.T(), suite.Client.COSI,
|
|
func(status *kubespan.PeerStatus, asrt *assert.Assertions) {
|
|
asrt.Equal(kubespan.PeerStateUp, status.TypedSpec().State)
|
|
asrt.False(value.IsZero(status.TypedSpec().Endpoint))
|
|
asrt.Greater(status.TypedSpec().ReceiveBytes, int64(0))
|
|
asrt.Greater(status.TypedSpec().TransmitBytes, int64(0))
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestKubeSpanExtraEndpoints verifies that KubeSpan peer specs are updated with extra endpoints.
|
|
func (suite *DiscoverySuite) TestKubeSpanExtraEndpoints() {
|
|
if !suite.Capabilities().RunsTalosKernel {
|
|
suite.T().Skip("not running Talos kernel")
|
|
}
|
|
|
|
// check that cluster has KubeSpan enabled
|
|
node := suite.RandomDiscoveredNodeInternalIP()
|
|
suite.ClearConnectionRefused(suite.ctx, node)
|
|
|
|
nodeCtx := client.WithNode(suite.ctx, node)
|
|
provider, err := suite.ReadConfigFromNode(nodeCtx)
|
|
suite.Require().NoError(err)
|
|
|
|
if !provider.Machine().Network().KubeSpan().Enabled() {
|
|
suite.T().Skip("KubeSpan is disabled")
|
|
}
|
|
|
|
nodes := suite.DiscoverNodeInternalIPs(suite.ctx)
|
|
|
|
if len(nodes) < 2 {
|
|
suite.T().Skip("need at least two nodes for this test")
|
|
}
|
|
|
|
perm := rand.Perm(len(nodes))
|
|
|
|
checkNode := nodes[perm[0]]
|
|
targetNode := nodes[perm[1]]
|
|
|
|
mockEndpoint := netip.MustParseAddrPort("169.254.121.121:5820")
|
|
|
|
// inject extra endpoint to target node
|
|
cfgDocument := network.NewKubespanEndpointsV1Alpha1()
|
|
cfgDocument.ExtraAnnouncedEndpointsConfig = []netip.AddrPort{mockEndpoint}
|
|
|
|
suite.T().Logf("injecting extra endpoint %s to node %s", mockEndpoint, targetNode)
|
|
suite.PatchMachineConfig(client.WithNode(suite.ctx, targetNode), cfgDocument)
|
|
|
|
targetIdentity, err := safe.ReaderGetByID[*kubespan.Identity](client.WithNode(suite.ctx, targetNode), suite.Client.COSI, kubespan.LocalIdentity)
|
|
suite.Require().NoError(err)
|
|
|
|
suite.T().Logf("checking extra endpoint %s on node %s", mockEndpoint, checkNode)
|
|
rtestutils.AssertResources(client.WithNode(suite.ctx, checkNode), suite.T(), suite.Client.COSI, []string{targetIdentity.TypedSpec().PublicKey},
|
|
func(peer *kubespan.PeerSpec, asrt *assert.Assertions) {
|
|
asrt.Contains(peer.TypedSpec().Endpoints, mockEndpoint)
|
|
},
|
|
)
|
|
|
|
// the extra endpoints disappears with a timeout from the discovery service, so can't assert on that
|
|
suite.T().Logf("removin extra endpoint %s from node %s", mockEndpoint, targetNode)
|
|
suite.RemoveMachineConfigDocuments(client.WithNode(suite.ctx, targetNode), cfgDocument.MetaKind)
|
|
}
|
|
|
|
func (suite *DiscoverySuite) getMembers(nodeCtx context.Context) []*cluster.Member {
|
|
items, err := safe.StateListAll[*cluster.Member](nodeCtx, suite.Client.COSI)
|
|
suite.Require().NoError(err)
|
|
|
|
return safe.ToSlice(items, func(m *cluster.Member) *cluster.Member { return m })
|
|
}
|
|
|
|
func (suite *DiscoverySuite) getNodeIdentity(nodeCtx context.Context) *cluster.Identity {
|
|
identity, err := safe.StateGet[*cluster.Identity](nodeCtx, suite.Client.COSI, resource.NewMetadata(cluster.NamespaceName, cluster.IdentityType, cluster.LocalIdentity, resource.VersionUndefined))
|
|
suite.Require().NoError(err)
|
|
|
|
return identity
|
|
}
|
|
|
|
func (suite *DiscoverySuite) getAffiliates(nodeCtx context.Context, namespace resource.Namespace) []*cluster.Affiliate {
|
|
var result []*cluster.Affiliate
|
|
|
|
items, err := safe.StateList[*cluster.Affiliate](nodeCtx, suite.Client.COSI, resource.NewMetadata(namespace, cluster.AffiliateType, "", resource.VersionUndefined))
|
|
suite.Require().NoError(err)
|
|
|
|
items.ForEach(func(item *cluster.Affiliate) { result = append(result, item) })
|
|
|
|
return result
|
|
}
|
|
|
|
func init() {
|
|
allSuites = append(allSuites, new(DiscoverySuite))
|
|
}
|