Dmitry Sharshakov 653f838b09
feat: support multiple Docker cluster in talosctl cluster create
Dynamically map Kubernetes and Talos API ports to an available port on
the host, so every cluster gets its own unique set of parts.

As part of the changes, refactor the provision library and interfaces,
dropping old weird interfaces replacing with (hopefully) much more
descriprive names.

Signed-off-by: Dmitry Sharshakov <dmitry.sharshakov@siderolabs.com>
Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
2024-04-04 21:21:39 +04:00

638 lines
17 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//go:build integration
// Package provision provides integration tests which rely on provisioning cluster per test.
package provision
import (
"context"
"encoding/json"
"fmt"
"net/netip"
"os"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-blockdevice/blockdevice/encryption"
"github.com/siderolabs/go-kubernetes/kubernetes/upgrade"
"github.com/siderolabs/go-retry/retry"
sideronet "github.com/siderolabs/net"
"github.com/stretchr/testify/suite"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"github.com/siderolabs/talos/internal/integration/base"
"github.com/siderolabs/talos/pkg/cluster/check"
"github.com/siderolabs/talos/pkg/cluster/kubernetes"
"github.com/siderolabs/talos/pkg/cluster/sonobuoy"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
talosclient "github.com/siderolabs/talos/pkg/machinery/client"
clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config"
"github.com/siderolabs/talos/pkg/machinery/config"
"github.com/siderolabs/talos/pkg/machinery/config/bundle"
"github.com/siderolabs/talos/pkg/machinery/config/encoder"
"github.com/siderolabs/talos/pkg/machinery/config/generate"
"github.com/siderolabs/talos/pkg/machinery/config/machine"
"github.com/siderolabs/talos/pkg/machinery/config/types/v1alpha1"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/version"
"github.com/siderolabs/talos/pkg/provision"
"github.com/siderolabs/talos/pkg/provision/access"
"github.com/siderolabs/talos/pkg/provision/providers/qemu"
)
var allSuites []suite.TestingSuite
// GetAllSuites returns all the suites for provision test.
//
// Depending on build tags, this might return different lists.
func GetAllSuites() []suite.TestingSuite {
return allSuites
}
// Settings for provision tests.
type Settings struct {
// CIDR to use for provisioned clusters
CIDR string
// Registry mirrors to push to Talos config, in format `host=endpoint`
RegistryMirrors base.StringList
// MTU for the network.
MTU int
// VM parameters
CPUs int64
MemMB int64
DiskGB uint64
// Node count for the tests
ControlplaneNodes int
WorkerNodes int
// Target installer image registry
TargetInstallImageRegistry string
// Current version of the cluster (built in the CI pass)
CurrentVersion string
// Custom CNI URL to use.
CustomCNIURL string
// Enable crashdump on failure.
CrashdumpEnabled bool
// CNI bundle for QEMU provisioner.
CNIBundleURL string
}
// DefaultSettings filled in by test runner.
var DefaultSettings = Settings{
CIDR: "172.21.0.0/24",
MTU: 1500,
CPUs: 4,
MemMB: 3 * 1024,
DiskGB: 12,
ControlplaneNodes: 3,
WorkerNodes: 1,
TargetInstallImageRegistry: "ghcr.io",
CNIBundleURL: fmt.Sprintf("https://github.com/siderolabs/talos/releases/download/%s/talosctl-cni-bundle-%s.tar.gz", trimVersion(version.Tag), constants.ArchVariable),
}
func trimVersion(version string) string {
// remove anything extra after semantic version core, `v0.3.2-1-abcd` -> `v0.3.2`
return regexp.MustCompile(`(-\d+-g[0-9a-f]+)$`).ReplaceAllString(version, "")
}
var defaultNameservers = []netip.Addr{netip.MustParseAddr("8.8.8.8"), netip.MustParseAddr("1.1.1.1")}
// BaseSuite provides base features for provision tests.
type BaseSuite struct {
suite.Suite
base.TalosSuite
provisioner provision.Provisioner
configBundle *bundle.Bundle
clusterAccess *access.Adapter
controlPlaneEndpoint string
//nolint:containedctx
ctx context.Context
ctxCancel context.CancelFunc
stateDir string
cniDir string
}
// SetupSuite ...
func (suite *BaseSuite) SetupSuite() {
// timeout for the whole test
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 50*time.Minute)
var err error
suite.provisioner, err = qemu.NewProvisioner(suite.ctx)
suite.Require().NoError(err)
}
// TearDownSuite ...
func (suite *BaseSuite) TearDownSuite() {
if suite.T().Failed() && DefaultSettings.CrashdumpEnabled && suite.Cluster != nil {
// for failed tests, produce crash dump for easier debugging,
// as cluster is going to be torn down below
suite.provisioner.CrashDump(suite.ctx, suite.Cluster, os.Stderr)
if suite.clusterAccess != nil {
suite.clusterAccess.CrashDump(suite.ctx, os.Stderr)
}
}
if suite.clusterAccess != nil {
suite.Assert().NoError(suite.clusterAccess.Close())
}
if suite.Cluster != nil {
suite.Assert().NoError(suite.provisioner.Destroy(suite.ctx, suite.Cluster))
}
suite.ctxCancel()
if suite.stateDir != "" {
suite.Assert().NoError(os.RemoveAll(suite.stateDir))
}
if suite.provisioner != nil {
suite.Assert().NoError(suite.provisioner.Close())
}
}
// waitForClusterHealth asserts cluster health after any change.
func (suite *BaseSuite) waitForClusterHealth() {
runs := 1
singleNodeCluster := len(suite.Cluster.Info().Nodes) == 1
if singleNodeCluster {
// run health check several times for single node clusters,
// as self-hosted control plane is not stable after reboot
runs = 3
}
for run := range runs {
if run > 0 {
time.Sleep(15 * time.Second)
}
checkCtx, checkCtxCancel := context.WithTimeout(suite.ctx, 15*time.Minute)
defer checkCtxCancel()
suite.Require().NoError(
check.Wait(
checkCtx,
suite.clusterAccess,
check.DefaultClusterChecks(),
check.StderrReporter(),
),
)
}
}
func (suite *BaseSuite) untaint(name string) {
client, err := suite.clusterAccess.K8sClient(suite.ctx)
suite.Require().NoError(err)
n, err := client.CoreV1().Nodes().Get(suite.ctx, name, metav1.GetOptions{})
suite.Require().NoError(err)
oldData, err := json.Marshal(n)
suite.Require().NoError(err)
k := 0
for _, taint := range n.Spec.Taints {
if taint.Key != constants.LabelNodeRoleControlPlane {
n.Spec.Taints[k] = taint
k++
}
}
n.Spec.Taints = n.Spec.Taints[:k]
newData, err := json.Marshal(n)
suite.Require().NoError(err)
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
suite.Require().NoError(err)
_, err = client.CoreV1().Nodes().Patch(
suite.ctx,
n.Name,
types.StrategicMergePatchType,
patchBytes,
metav1.PatchOptions{},
)
suite.Require().NoError(err)
}
func (suite *BaseSuite) assertSameVersionCluster(client *talosclient.Client, expectedVersion string) {
nodes := xslices.Map(suite.Cluster.Info().Nodes, func(node provision.NodeInfo) string { return node.IPs[0].String() })
ctx := talosclient.WithNodes(suite.ctx, nodes...)
var v *machineapi.VersionResponse
err := retry.Constant(
time.Minute,
).Retry(
func() error {
var e error
v, e = client.Version(ctx)
return retry.ExpectedError(e)
},
)
suite.Require().NoError(err)
suite.Require().Len(v.Messages, len(nodes))
for _, version := range v.Messages {
suite.Assert().Equal(expectedVersion, version.Version.Tag)
}
}
func (suite *BaseSuite) readVersion(nodeCtx context.Context, client *talosclient.Client) (
version string,
err error,
) {
var v *machineapi.VersionResponse
v, err = client.Version(nodeCtx)
if err != nil {
return
}
version = v.Messages[0].Version.Tag
return
}
type upgradeOptions struct {
TargetInstallerImage string
UpgradePreserve bool
UpgradeStage bool
TargetVersion string
}
//nolint:gocyclo
func (suite *BaseSuite) upgradeNode(client *talosclient.Client, node provision.NodeInfo, options upgradeOptions) {
suite.T().Logf("upgrading node %s", node.IPs[0])
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()
nodeCtx := talosclient.WithNodes(ctx, node.IPs[0].String())
var (
resp *machineapi.UpgradeResponse
err error
)
err = retry.Constant(time.Minute, retry.WithUnits(10*time.Second)).Retry(
func() error {
resp, err = client.Upgrade(
nodeCtx,
options.TargetInstallerImage,
options.UpgradePreserve,
options.UpgradeStage,
false,
)
if err != nil {
if strings.Contains(err.Error(), "leader changed") {
return retry.ExpectedError(err)
}
if strings.Contains(err.Error(), "failed to acquire upgrade lock") {
return retry.ExpectedError(err)
}
return err
}
return nil
},
)
suite.Require().NoError(err)
suite.Require().Equal("Upgrade request received", resp.Messages[0].Ack)
actorID := resp.Messages[0].ActorId
eventCh := make(chan talosclient.EventResult)
// watch for events
suite.Require().NoError(client.EventsWatchV2(nodeCtx, eventCh, talosclient.WithActorID(actorID), talosclient.WithTailEvents(-1)))
waitTimer := time.NewTimer(5 * time.Minute)
defer waitTimer.Stop()
waitLoop:
for {
select {
case ev := <-eventCh:
suite.Require().NoError(ev.Error)
switch msg := ev.Event.Payload.(type) {
case *machineapi.SequenceEvent:
if msg.Error != nil {
suite.FailNow("upgrade failed", "%s: %s", msg.Error.Message, msg.Error.Code)
}
case *machineapi.PhaseEvent:
if msg.Action == machineapi.PhaseEvent_START && msg.Phase == "kexec" {
// about to be rebooted
break waitLoop
}
if msg.Action == machineapi.PhaseEvent_STOP {
suite.T().Logf("upgrade phase %q finished", msg.Phase)
}
}
case <-waitTimer.C:
suite.FailNow("timeout waiting for upgrade to finish")
case <-ctx.Done():
suite.FailNow("context canceled")
}
}
// wait for the apid to be shut down
time.Sleep(10 * time.Second)
// wait for the version to be equal to target version
suite.Require().NoError(
retry.Constant(10 * time.Minute).Retry(
func() error {
var version string
version, err = suite.readVersion(nodeCtx, client)
if err != nil {
// API might be unresponsive during upgrade
return retry.ExpectedError(err)
}
if version != options.TargetVersion {
// upgrade not finished yet
return retry.ExpectedErrorf(
"node %q version doesn't match expected: expected %q, got %q",
node.IPs[0].String(),
options.TargetVersion,
version,
)
}
return nil
},
),
)
suite.waitForClusterHealth()
}
func (suite *BaseSuite) upgradeKubernetes(fromVersion, toVersion string, skipKubeletUpgrade bool) {
if fromVersion == toVersion {
suite.T().Logf("skipping Kubernetes upgrade, as versions are equal %q -> %q", fromVersion, toVersion)
return
}
suite.T().Logf("upgrading Kubernetes: %q -> %q", fromVersion, toVersion)
path, err := upgrade.NewPath(fromVersion, toVersion)
suite.Require().NoError(err)
options := kubernetes.UpgradeOptions{
Path: path,
ControlPlaneEndpoint: suite.controlPlaneEndpoint,
UpgradeKubelet: !skipKubeletUpgrade,
PrePullImages: true,
KubeletImage: constants.KubeletImage,
APIServerImage: constants.KubernetesAPIServerImage,
ControllerManagerImage: constants.KubernetesControllerManagerImage,
SchedulerImage: constants.KubernetesSchedulerImage,
ProxyImage: constants.KubeProxyImage,
EncoderOpt: encoder.WithComments(encoder.CommentsAll),
}
suite.Require().NoError(kubernetes.Upgrade(suite.ctx, suite.clusterAccess, options))
}
type clusterOptions struct {
ClusterName string
ControlplaneNodes int
WorkerNodes int
SourceKernelPath string
SourceInitramfsPath string
SourceInstallerImage string
SourceVersion string
SourceK8sVersion string
WithEncryption bool
}
// setupCluster provisions source clusters and waits for health.
func (suite *BaseSuite) setupCluster(options clusterOptions) {
defaultStateDir, err := clientconfig.GetTalosDirectory()
suite.Require().NoError(err)
suite.stateDir = filepath.Join(defaultStateDir, "clusters")
suite.cniDir = filepath.Join(defaultStateDir, "cni")
cidr, err := netip.ParsePrefix(DefaultSettings.CIDR)
suite.Require().NoError(err)
var gatewayIP netip.Addr
gatewayIP, err = sideronet.NthIPInNetwork(cidr, 1)
suite.Require().NoError(err)
ips := make([]netip.Addr, options.ControlplaneNodes+options.WorkerNodes)
for i := range ips {
ips[i], err = sideronet.NthIPInNetwork(cidr, i+2)
suite.Require().NoError(err)
}
suite.T().Logf("initializing provisioner with cluster name %q, state directory %q", options.ClusterName, suite.stateDir)
request := provision.ClusterRequest{
Name: options.ClusterName,
Network: provision.NetworkRequest{
Name: options.ClusterName,
CIDRs: []netip.Prefix{cidr},
GatewayAddrs: []netip.Addr{gatewayIP},
MTU: DefaultSettings.MTU,
Nameservers: defaultNameservers,
CNI: provision.CNIConfig{
BinPath: []string{filepath.Join(suite.cniDir, "bin")},
ConfDir: filepath.Join(suite.cniDir, "conf.d"),
CacheDir: filepath.Join(suite.cniDir, "cache"),
BundleURL: DefaultSettings.CNIBundleURL,
},
},
KernelPath: options.SourceKernelPath,
InitramfsPath: options.SourceInitramfsPath,
SelfExecutable: suite.TalosctlPath,
StateDirectory: suite.stateDir,
}
suite.controlPlaneEndpoint = suite.provisioner.GetExternalKubernetesControlPlaneEndpoint(request.Network, constants.DefaultControlPlanePort)
genOptions := suite.provisioner.GenOptions(request.Network)
for _, registryMirror := range DefaultSettings.RegistryMirrors {
parts := strings.SplitN(registryMirror, "=", 2)
suite.Require().Len(parts, 2)
genOptions = append(genOptions, generate.WithRegistryMirror(parts[0], parts[1]))
}
controlplaneEndpoints := make([]string, options.ControlplaneNodes)
for i := range controlplaneEndpoints {
controlplaneEndpoints[i] = ips[i].String()
}
if DefaultSettings.CustomCNIURL != "" {
genOptions = append(
genOptions, generate.WithClusterCNIConfig(
&v1alpha1.CNIConfig{
CNIName: constants.CustomCNI,
CNIUrls: []string{DefaultSettings.CustomCNIURL},
},
),
)
}
if options.WithEncryption {
genOptions = append(
genOptions, generate.WithSystemDiskEncryption(
&v1alpha1.SystemDiskEncryptionConfig{
StatePartition: &v1alpha1.EncryptionConfig{
EncryptionProvider: encryption.LUKS2,
EncryptionKeys: []*v1alpha1.EncryptionKey{
{
KeyNodeID: &v1alpha1.EncryptionKeyNodeID{},
KeySlot: 0,
},
},
},
EphemeralPartition: &v1alpha1.EncryptionConfig{
EncryptionProvider: encryption.LUKS2,
EncryptionKeys: []*v1alpha1.EncryptionKey{
{
KeyNodeID: &v1alpha1.EncryptionKeyNodeID{},
KeySlot: 0,
},
},
},
},
),
)
}
versionContract, err := config.ParseContractFromVersion(options.SourceVersion)
suite.Require().NoError(err)
suite.configBundle, err = bundle.NewBundle(
bundle.WithInputOptions(
&bundle.InputOptions{
ClusterName: options.ClusterName,
Endpoint: suite.controlPlaneEndpoint,
KubeVersion: options.SourceK8sVersion,
GenOptions: append(
genOptions,
generate.WithEndpointList(controlplaneEndpoints),
generate.WithInstallImage(options.SourceInstallerImage),
generate.WithDNSDomain("cluster.local"),
generate.WithVersionContract(versionContract),
),
},
),
)
suite.Require().NoError(err)
for i := range options.ControlplaneNodes {
request.Nodes = append(
request.Nodes,
provision.NodeRequest{
Name: fmt.Sprintf("control-plane-%d", i+1),
Type: machine.TypeControlPlane,
IPs: []netip.Addr{ips[i]},
Memory: DefaultSettings.MemMB * 1024 * 1024,
NanoCPUs: DefaultSettings.CPUs * 1000 * 1000 * 1000,
Disks: []*provision.Disk{
{
Size: DefaultSettings.DiskGB * 1024 * 1024 * 1024,
},
},
Config: suite.configBundle.ControlPlane(),
},
)
}
for i := 1; i <= options.WorkerNodes; i++ {
request.Nodes = append(
request.Nodes,
provision.NodeRequest{
Name: fmt.Sprintf("worker-%d", i),
Type: machine.TypeWorker,
IPs: []netip.Addr{ips[options.ControlplaneNodes+i-1]},
Memory: DefaultSettings.MemMB * 1024 * 1024,
NanoCPUs: DefaultSettings.CPUs * 1000 * 1000 * 1000,
Disks: []*provision.Disk{
{
Size: DefaultSettings.DiskGB * 1024 * 1024 * 1024,
},
},
Config: suite.configBundle.Worker(),
},
)
}
suite.Cluster, err = suite.provisioner.Create(
suite.ctx, request,
provision.WithBootlader(true),
provision.WithUEFI(true),
provision.WithTalosConfig(suite.configBundle.TalosConfig()),
)
suite.Require().NoError(err)
c, err := clientconfig.Open("")
suite.Require().NoError(err)
c.Merge(suite.configBundle.TalosConfig())
suite.Require().NoError(c.Save(""))
suite.clusterAccess = access.NewAdapter(suite.Cluster, provision.WithTalosConfig(suite.configBundle.TalosConfig()))
suite.Require().NoError(suite.clusterAccess.Bootstrap(suite.ctx, os.Stdout))
suite.waitForClusterHealth()
}
// runE2E runs e2e test on the cluster.
func (suite *BaseSuite) runE2E(k8sVersion string) {
options := sonobuoy.DefaultOptions()
options.KubernetesVersion = k8sVersion
suite.Assert().NoError(sonobuoy.Run(suite.ctx, suite.clusterAccess, options))
}