talos/internal/integration/api/volumes.go
Andrey Smirnov cb7ca17bba
feat: implement ExistingVolumeConfig
Fixes #11153

The idea is to allow to mount existing partitions on a Talos machine so
that they are available for workloads.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
2025-07-22 14:32:31 +04:00

1082 lines
36 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//go:build integration_api
package api
import (
"context"
"fmt"
"math/rand"
"path/filepath"
"slices"
"strconv"
"strings"
"testing"
"time"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/google/uuid"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-pointer"
"github.com/stretchr/testify/assert"
"github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
"github.com/siderolabs/talos/pkg/machinery/api/storage"
"github.com/siderolabs/talos/pkg/machinery/cel"
"github.com/siderolabs/talos/pkg/machinery/cel/celenv"
"github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/pkg/machinery/config/machine"
blockcfg "github.com/siderolabs/talos/pkg/machinery/config/types/block"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/resources/block"
)
// VolumesSuite ...
type VolumesSuite struct {
base.K8sSuite
ctx context.Context //nolint:containedctx
ctxCancel context.CancelFunc
}
// SuiteName ...
func (suite *VolumesSuite) SuiteName() string {
return "api.VolumesSuite"
}
// SetupTest ...
func (suite *VolumesSuite) SetupTest() {
if !suite.Capabilities().SupportsVolumes {
suite.T().Skip("cluster doesn't support volumes")
}
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 2*time.Minute)
}
// TearDownTest ...
func (suite *VolumesSuite) TearDownTest() {
if suite.ctxCancel != nil {
suite.ctxCancel()
}
}
// TestDiscoveredVolumes verifies that standard Talos partitions are discovered.
func (suite *VolumesSuite) TestDiscoveredVolumes() {
for _, node := range suite.DiscoverNodeInternalIPs(suite.ctx) {
suite.Run(node, func() {
suite.testDiscoveredVolumes(node)
})
}
}
func (suite *VolumesSuite) testDiscoveredVolumes(node string) {
ctx := client.WithNode(suite.ctx, node)
volumes, err := safe.StateListAll[*block.DiscoveredVolume](ctx, suite.Client.COSI)
suite.Require().NoError(err)
expectedVolumes := map[string]struct {
Names []string
}{
"META": {
Names: []string{"talosmeta", ""}, // if META was never written, it will not be detected
},
"STATE": {
Names: []string{"xfs"},
},
"EPHEMERAL": {
Names: []string{"xfs", ""},
},
}
for dv := range volumes.All() {
suite.T().Logf("volume: %s %s %s %s", dv.Metadata().ID(), dv.TypedSpec().Name, dv.TypedSpec().PartitionLabel, dv.TypedSpec().Label)
partitionLabel := dv.TypedSpec().PartitionLabel
filesystemLabel := dv.TypedSpec().Label
// this is encrypted partition, skip it, we should see another device with the actual filesystem
if dv.TypedSpec().Name == "luks" {
continue
}
// match either by partition or filesystem label
id := partitionLabel
expected, ok := expectedVolumes[id]
if !ok {
id = filesystemLabel
expected, ok = expectedVolumes[id]
if !ok {
continue
}
}
suite.Assert().Contains(expected.Names, dv.TypedSpec().Name, "node: %s", node)
delete(expectedVolumes, id)
}
suite.Assert().Empty(expectedVolumes, "node: ", node)
if suite.T().Failed() {
suite.DumpLogs(suite.ctx, node, "controller-runtime", "block.")
}
}
// TestSystemDisk verifies that Talos system disk is discovered.
func (suite *VolumesSuite) TestSystemDisk() {
for _, node := range suite.DiscoverNodeInternalIPs(suite.ctx) {
suite.Run(node, func() {
ctx := client.WithNode(suite.ctx, node)
systemDisk, err := safe.StateGetByID[*block.SystemDisk](ctx, suite.Client.COSI, block.SystemDiskID)
suite.Require().NoError(err)
suite.Assert().NotEmpty(systemDisk.TypedSpec().DiskID)
suite.T().Logf("system disk: %s", systemDisk.TypedSpec().DiskID)
})
}
}
// TestDisks verifies that Talos discovers disks.
func (suite *VolumesSuite) TestDisks() {
for _, node := range suite.DiscoverNodeInternalIPs(suite.ctx) {
suite.Run(node, func() {
ctx := client.WithNode(suite.ctx, node)
disks, err := safe.StateListAll[*block.Disk](ctx, suite.Client.COSI)
suite.Require().NoError(err)
// there should be at least two disks - loop0 for Talos squashfs and a system disk
suite.Assert().Greater(disks.Len(), 1)
var diskNames []string
for disk := range disks.All() {
if disk.TypedSpec().Readonly {
continue
}
if !disk.TypedSpec().CDROM {
suite.Assert().NotEmpty(disk.TypedSpec().Size, "disk: %s", disk.Metadata().ID())
}
suite.Assert().NotEmpty(disk.TypedSpec().Symlinks, "disk: %s", disk.Metadata().ID())
suite.Assert().NotEmpty(disk.TypedSpec().IOSize, "disk: %s", disk.Metadata().ID())
suite.Assert().NotEmpty(disk.TypedSpec().SectorSize, "disk: %s", disk.Metadata().ID())
if suite.Cluster != nil {
// running on our own provider, transport should be always detected
if disk.TypedSpec().BusPath == "/virtual" {
suite.Assert().Empty(disk.TypedSpec().Transport, "disk: %s", disk.Metadata().ID())
} else {
suite.Assert().NotEmpty(disk.TypedSpec().Transport, "disk: %s", disk.Metadata().ID())
}
}
if strings.HasPrefix(disk.Metadata().ID(), "dm-") {
// devicemapper disks should have secondaries
suite.Assert().NotEmpty(disk.TypedSpec().SecondaryDisks, "disk: %s", disk.Metadata().ID())
suite.T().Logf("disk: %s secondaries: %v", disk.Metadata().ID(), disk.TypedSpec().SecondaryDisks)
}
diskNames = append(diskNames, disk.Metadata().ID())
}
suite.T().Logf("disks: %v", diskNames)
})
}
}
// TestLVMActivation verifies that LVM volume group is activated after reboot.
func (suite *VolumesSuite) TestLVMActivation() {
if suite.SelinuxEnforcing {
suite.T().Skip("skipping tests with nsenter in SELinux enforcing mode")
}
if testing.Short() {
suite.T().Skip("skipping test in short mode.")
}
if suite.Cluster == nil || suite.Cluster.Provisioner() != base.ProvisionerQEMU {
suite.T().Skip("skipping test for non-qemu provisioner")
}
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
k8sNode, err := suite.GetK8sNodeByInternalIP(suite.ctx, node)
suite.Require().NoError(err)
nodeName := k8sNode.Name
suite.T().Logf("creating LVM volume group on node %s/%s", node, nodeName)
userDisks := suite.UserDisks(suite.ctx, node)
if len(userDisks) < 2 {
suite.T().Skipf("skipping test, not enough user disks available on node %s/%s: %q", node, nodeName, userDisks)
}
userDisksJoined := strings.Join(userDisks[:2], " ")
podDef, err := suite.NewPrivilegedPod("pv-create")
suite.Require().NoError(err)
podDef = podDef.WithNodeName(nodeName)
suite.Require().NoError(podDef.Create(suite.ctx, 5*time.Minute))
defer podDef.Delete(suite.ctx) //nolint:errcheck
stdout, _, err := podDef.Exec(
suite.ctx,
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- vgcreate vg0 %s", userDisksJoined),
)
suite.Require().NoError(err)
suite.Require().Contains(stdout, "Volume group \"vg0\" successfully created")
stdout, _, err = podDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- lvcreate --mirrors=1 --type=raid1 --nosync -n lv0 -L 1G vg0",
)
suite.Require().NoError(err)
suite.Require().Contains(stdout, "Logical volume \"lv0\" created.")
stdout, _, err = podDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- lvcreate -n lv1 -L 1G vg0",
)
suite.Require().NoError(err)
suite.Require().Contains(stdout, "Logical volume \"lv1\" created.")
defer func() {
suite.T().Logf("removing LVM volumes %s/%s", node, nodeName)
deletePodDef, err := suite.NewPrivilegedPod("pv-destroy")
suite.Require().NoError(err)
deletePodDef = deletePodDef.WithNodeName(nodeName)
suite.Require().NoError(deletePodDef.Create(suite.ctx, 5*time.Minute))
defer deletePodDef.Delete(suite.ctx) //nolint:errcheck
if _, _, err := deletePodDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- vgremove --yes vg0",
); err != nil {
suite.T().Logf("failed to remove pv vg0: %v", err)
}
if _, _, err := deletePodDef.Exec(
suite.ctx,
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- pvremove --yes %s", userDisksJoined),
); err != nil {
suite.T().Logf("failed to remove pv backed by volumes %s: %v", userDisksJoined, err)
}
}()
suite.T().Logf("rebooting node %s/%s", node, nodeName)
// now we want to reboot the node and make sure the array is still mounted
suite.AssertRebooted(
suite.ctx, node, func(nodeCtx context.Context) error {
return base.IgnoreGRPCUnavailable(suite.Client.Reboot(nodeCtx))
}, 5*time.Minute,
suite.CleanupFailedPods,
)
suite.T().Logf("verifying LVM activation %s/%s", node, nodeName)
suite.Require().Eventually(func() bool {
return suite.lvmVolumeExists(node)
}, 5*time.Second, 1*time.Second, "LVM volume group was not activated after reboot")
}
func (suite *VolumesSuite) lvmVolumeExists(node string) bool {
ctx := client.WithNode(suite.ctx, node)
disks, err := safe.StateListAll[*block.Disk](ctx, suite.Client.COSI)
suite.Require().NoError(err)
var lvmVolumeCount int
for disk := range disks.All() {
if strings.HasPrefix(disk.TypedSpec().DevPath, "/dev/dm") {
lvmVolumeCount++
}
}
// we test with creating a volume group with two logical volumes
// one mirrored and one not, so we expect to see at least 6 volumes
return lvmVolumeCount >= 6
}
// TestSymlinks that Talos can update disk symlinks on the fly.
func (suite *VolumesSuite) TestSymlinks() {
if suite.SelinuxEnforcing {
suite.T().Skip("skipping tests with nsenter in SELinux enforcing mode")
}
if testing.Short() {
suite.T().Skip("skipping test in short mode.")
}
if suite.Cluster == nil || suite.Cluster.Provisioner() != base.ProvisionerQEMU {
suite.T().Skip("skipping test for non-qemu provisioner")
}
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
k8sNode, err := suite.GetK8sNodeByInternalIP(suite.ctx, node)
suite.Require().NoError(err)
nodeName := k8sNode.Name
userDisks := suite.UserDisks(suite.ctx, node)
if len(userDisks) < 1 {
suite.T().Skipf("skipping test, not enough user disks available on node %s/%s: %q", node, nodeName, userDisks)
}
userDisk := userDisks[0]
userDiskName := filepath.Base(userDisk)
suite.T().Logf("performing a symlink test %s on %s/%s", userDisk, node, nodeName)
podDef, err := suite.NewPrivilegedPod("xfs-format")
suite.Require().NoError(err)
podDef = podDef.WithNodeName(nodeName)
suite.Require().NoError(podDef.Create(suite.ctx, 5*time.Minute))
defer podDef.Delete(suite.ctx) //nolint:errcheck
fsUUID := uuid.New().String()
_, _, err = podDef.Exec(
suite.ctx,
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- mkfs.xfs -m uuid=%s %s", fsUUID, userDisk),
)
suite.Require().NoError(err)
expectedSymlink := "/dev/disk/by-uuid/" + fsUUID
// Talos should report a symlink to the disk via FS UUID
_, err = suite.Client.COSI.WatchFor(client.WithNode(suite.ctx, node), block.NewDisk(block.NamespaceName, userDiskName).Metadata(),
state.WithCondition(func(r resource.Resource) (bool, error) {
disk, ok := r.(*block.Disk)
if !ok {
return false, fmt.Errorf("unexpected resource type: %T", r)
}
return slices.Index(disk.TypedSpec().Symlinks, expectedSymlink) != -1, nil
}),
)
suite.Require().NoError(err)
suite.T().Logf("wiping user disk %s on %s/%s", userDisk, node, nodeName)
suite.Require().NoError(suite.Client.BlockDeviceWipe(client.WithNode(suite.ctx, node), &storage.BlockDeviceWipeRequest{
Devices: []*storage.BlockDeviceWipeDescriptor{
{
Device: userDiskName,
Method: storage.BlockDeviceWipeDescriptor_FAST,
},
},
}))
// Talos should remove a symlink to the disk
_, err = suite.Client.COSI.WatchFor(client.WithNode(suite.ctx, node), block.NewDisk(block.NamespaceName, userDiskName).Metadata(),
state.WithCondition(func(r resource.Resource) (bool, error) {
disk, ok := r.(*block.Disk)
if !ok {
return false, fmt.Errorf("unexpected resource type: %T", r)
}
return slices.Index(disk.TypedSpec().Symlinks, expectedSymlink) == -1, nil
}),
)
suite.Require().NoError(err)
}
// TestUserVolumesStatus verifies that existing user volumes were provisioned successfully.
func (suite *VolumesSuite) TestUserVolumesStatus() {
for _, node := range suite.DiscoverNodeInternalIPs(suite.ctx) {
suite.Run(node, func() {
ctx := client.WithNode(suite.ctx, node)
userVolumeIDs := rtestutils.ResourceIDs[*block.VolumeStatus](ctx, suite.T(), suite.Client.COSI, state.WithLabelQuery(resource.LabelExists(block.UserVolumeLabel)))
// check that the volumes are ready
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI,
userVolumeIDs,
func(vs *block.VolumeStatus, asrt *assert.Assertions) {
asrt.Equal(block.VolumePhaseReady, vs.TypedSpec().Phase)
},
)
if len(userVolumeIDs) > 0 {
suite.T().Logf("found %d user volumes", len(userVolumeIDs))
}
// check that the volumes are mounted
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI,
userVolumeIDs,
func(vs *block.MountStatus, _ *assert.Assertions) {},
)
})
}
}
// TestVolumesStatus verifies that all volumes are either ready or missing.
func (suite *VolumesSuite) TestVolumesStatus() {
for _, node := range suite.DiscoverNodeInternalIPs(suite.ctx) {
suite.Run(node, func() {
ctx := client.WithNode(suite.ctx, node)
rtestutils.AssertAll(ctx, suite.T(), suite.Client.COSI,
func(vs *block.VolumeStatus, asrt *assert.Assertions) {
asrt.Contains([]block.VolumePhase{block.VolumePhaseReady, block.VolumePhaseMissing}, vs.TypedSpec().Phase)
},
)
})
}
}
// TestUserVolumes performs a series of operations on user volumes: creating, destroying, verifying, etc.
func (suite *VolumesSuite) TestUserVolumes() {
if testing.Short() {
suite.T().Skip("skipping test in short mode.")
}
if suite.Cluster == nil || suite.Cluster.Provisioner() != base.ProvisionerQEMU {
suite.T().Skip("skipping test for non-qemu provisioner")
}
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
k8sNode, err := suite.GetK8sNodeByInternalIP(suite.ctx, node)
suite.Require().NoError(err)
nodeName := k8sNode.Name
suite.T().Logf("verifying user volumes on node %s/%s", node, nodeName)
userDisks := suite.UserDisks(suite.ctx, node)
if len(userDisks) < 1 {
suite.T().Skipf("skipping test, not enough user disks available on node %s/%s: %q", node, nodeName, userDisks)
}
ctx := client.WithNode(suite.ctx, node)
disk, err := safe.StateGetByID[*block.Disk](ctx, suite.Client.COSI, filepath.Base(userDisks[0]))
suite.Require().NoError(err)
volumeName := fmt.Sprintf("%04x", rand.Int31()) + "-"
const numVolumes = 3
volumeIDs := make([]string, numVolumes)
for i := range numVolumes {
volumeIDs[i] = volumeName + strconv.Itoa(i)
}
userVolumeIDs := xslices.Map(volumeIDs, func(volumeID string) string { return constants.UserVolumePrefix + volumeID })
configDocs := xslices.Map(volumeIDs, func(volumeID string) any {
doc := blockcfg.NewUserVolumeConfigV1Alpha1()
doc.MetaName = volumeID
doc.ProvisioningSpec.DiskSelectorSpec.Match = cel.MustExpression(
cel.ParseBooleanExpression(fmt.Sprintf("'%s' in disk.symlinks", disk.TypedSpec().Symlinks[0]), celenv.DiskLocator()),
)
doc.ProvisioningSpec.ProvisioningMinSize = blockcfg.MustByteSize("100MiB")
doc.ProvisioningSpec.ProvisioningMaxSize = blockcfg.MustByteSize("1GiB")
return doc
})
// create user volumes
suite.PatchMachineConfig(ctx, configDocs...)
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, userVolumeIDs,
func(vs *block.VolumeStatus, asrt *assert.Assertions) {
asrt.Equal(block.VolumePhaseReady, vs.TypedSpec().Phase)
},
)
// check that the volumes are mounted
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, userVolumeIDs,
func(vs *block.MountStatus, _ *assert.Assertions) {})
// create a pod using user volumes
podDef, err := suite.NewPod("user-volume-test")
suite.Require().NoError(err)
// using subdirectory here to test that the hostPath mount is properly propagated into the kubelet
podDef = podDef.WithNodeName(nodeName).
WithNamespace("kube-system").
WithHostVolumeMount(filepath.Join(constants.UserVolumeMountPoint, volumeIDs[0], "data"), "/mnt/data")
suite.Require().NoError(podDef.Create(suite.ctx, 1*time.Minute))
_, _, err = podDef.Exec(suite.ctx, "mkdir -p /mnt/data/test")
suite.Require().NoError(err)
suite.Require().NoError(podDef.Delete(suite.ctx))
// verify that directory exists
expectedPath := filepath.Join(constants.UserVolumeMountPoint, volumeIDs[0], "data", "test")
stream, err := suite.Client.LS(ctx, &machineapi.ListRequest{
Root: expectedPath,
Types: []machineapi.ListRequest_Type{machineapi.ListRequest_DIRECTORY},
})
suite.Require().NoError(err)
suite.Require().NoError(helpers.ReadGRPCStream(stream, func(info *machineapi.FileInfo, _ string, _ bool) error {
suite.T().Logf("found %s on node %s", info.Name, node)
suite.Require().Equal(expectedPath, info.Name, "expected %s to exist", expectedPath)
return nil
}))
// verify that volume labels are set properly
expectedLabels := xslices.ToSet(userVolumeIDs)
dvs, err := safe.StateListAll[*block.DiscoveredVolume](ctx, suite.Client.COSI)
suite.Require().NoError(err)
for dv := range dvs.All() {
delete(expectedLabels, dv.TypedSpec().PartitionLabel)
}
suite.Require().Empty(expectedLabels, "expected labels %v to be set on discovered volumes", expectedLabels)
// now, remove one of the volumes, wipe the partition and re-create the volume
vs, err := safe.ReaderGetByID[*block.VolumeStatus](ctx, suite.Client.COSI, userVolumeIDs[0])
suite.Require().NoError(err)
suite.RemoveMachineConfigDocumentsByName(ctx, blockcfg.UserVolumeConfigKind, volumeIDs[0])
rtestutils.AssertNoResource[*block.VolumeStatus](ctx, suite.T(), suite.Client.COSI, userVolumeIDs[0])
suite.Require().EventuallyWithT(func(collect *assert.CollectT) {
// a little retry loop, as the device might be considered busy for a little while after unmounting
asrt := assert.New(collect)
asrt.NoError(suite.Client.BlockDeviceWipe(ctx, &storage.BlockDeviceWipeRequest{
Devices: []*storage.BlockDeviceWipeDescriptor{
{
Device: filepath.Base(vs.TypedSpec().Location),
Method: storage.BlockDeviceWipeDescriptor_FAST,
DropPartition: true,
},
},
}))
}, time.Minute, time.Second, "failed to wipe partition %s", vs.TypedSpec().Location)
// wait for the discovered volume to disappear
rtestutils.AssertNoResource[*block.DiscoveredVolume](ctx, suite.T(), suite.Client.COSI, filepath.Base(vs.TypedSpec().Location))
// re-create the volume with project quota support
configDocs[0].(*blockcfg.UserVolumeConfigV1Alpha1).FilesystemSpec.ProjectQuotaSupportConfig = pointer.To(true)
suite.PatchMachineConfig(ctx, configDocs[0])
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, userVolumeIDs,
func(vs *block.VolumeStatus, asrt *assert.Assertions) {
asrt.Equal(block.VolumePhaseReady, vs.TypedSpec().Phase)
},
)
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, userVolumeIDs,
func(vs *block.MountStatus, asrt *assert.Assertions) {
if vs.Metadata().ID() == userVolumeIDs[0] {
// check that the project quota support is enabled
asrt.True(vs.TypedSpec().ProjectQuotaSupport, "project quota support should be enabled for %s", vs.Metadata().ID())
} else {
// check that the project quota support is disabled
asrt.False(vs.TypedSpec().ProjectQuotaSupport, "project quota support should be disabled for %s", vs.Metadata().ID())
}
})
// clean up
suite.RemoveMachineConfigDocumentsByName(ctx, blockcfg.UserVolumeConfigKind, volumeIDs...)
for _, userVolumeID := range userVolumeIDs {
rtestutils.AssertNoResource[*block.VolumeStatus](ctx, suite.T(), suite.Client.COSI, userVolumeID)
}
suite.Require().NoError(suite.Client.BlockDeviceWipe(ctx, &storage.BlockDeviceWipeRequest{
Devices: []*storage.BlockDeviceWipeDescriptor{
{
Device: filepath.Base(userDisks[0]),
Method: storage.BlockDeviceWipeDescriptor_FAST,
},
},
}))
// wait for the discovered volume reflect wiped status
rtestutils.AssertResource(ctx, suite.T(), suite.Client.COSI, filepath.Base(userDisks[0]),
func(dv *block.DiscoveredVolume, asrt *assert.Assertions) {
asrt.Empty(dv.TypedSpec().Name, "expected discovered volume %s to be wiped", dv.Metadata().ID())
})
}
// TestRawVolumes performs a series of operations on raw volumes: creating, destroying, etc.
func (suite *VolumesSuite) TestRawVolumes() {
if testing.Short() {
suite.T().Skip("skipping test in short mode.")
}
if suite.Cluster == nil || suite.Cluster.Provisioner() != base.ProvisionerQEMU {
suite.T().Skip("skipping test for non-qemu provisioner")
}
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
k8sNode, err := suite.GetK8sNodeByInternalIP(suite.ctx, node)
suite.Require().NoError(err)
nodeName := k8sNode.Name
suite.T().Logf("verifying raw volumes on node %s/%s", node, nodeName)
userDisks := suite.UserDisks(suite.ctx, node)
if len(userDisks) < 1 {
suite.T().Skipf("skipping test, not enough user disks available on node %s/%s: %q", node, nodeName, userDisks)
}
ctx := client.WithNode(suite.ctx, node)
disk, err := safe.StateGetByID[*block.Disk](ctx, suite.Client.COSI, filepath.Base(userDisks[0]))
suite.Require().NoError(err)
volumeName := fmt.Sprintf("%04x", rand.Int31()) + "-"
const numVolumes = 2
volumeIDs := make([]string, numVolumes)
for i := range numVolumes {
volumeIDs[i] = volumeName + strconv.Itoa(i)
}
rawVolumeIDs := xslices.Map(volumeIDs, func(volumeID string) string { return constants.RawVolumePrefix + volumeID })
configDocs := xslices.Map(volumeIDs, func(volumeID string) any {
doc := blockcfg.NewRawVolumeConfigV1Alpha1()
doc.MetaName = volumeID
doc.ProvisioningSpec.DiskSelectorSpec.Match = cel.MustExpression(
cel.ParseBooleanExpression(fmt.Sprintf("'%s' in disk.symlinks", disk.TypedSpec().Symlinks[0]), celenv.DiskLocator()),
)
doc.ProvisioningSpec.ProvisioningMinSize = blockcfg.MustByteSize("100MiB")
doc.ProvisioningSpec.ProvisioningMaxSize = blockcfg.MustByteSize("500MiB")
return doc
})
// create raw volumes
suite.PatchMachineConfig(ctx, configDocs...)
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, rawVolumeIDs,
func(vs *block.VolumeStatus, asrt *assert.Assertions) {
asrt.Equal(block.VolumePhaseReady, vs.TypedSpec().Phase)
},
)
// verify that volume labels are set properly
expectedLabels := xslices.ToSet(rawVolumeIDs)
dvs, err := safe.StateListAll[*block.DiscoveredVolume](ctx, suite.Client.COSI)
suite.Require().NoError(err)
for dv := range dvs.All() {
delete(expectedLabels, dv.TypedSpec().PartitionLabel)
}
suite.Require().Empty(expectedLabels, "expected labels %v to be set on discovered volumes", expectedLabels)
// now, remove one of the volumes, wipe the partition and re-create the volume
vs, err := safe.ReaderGetByID[*block.VolumeStatus](ctx, suite.Client.COSI, rawVolumeIDs[0])
suite.Require().NoError(err)
suite.RemoveMachineConfigDocumentsByName(ctx, blockcfg.RawVolumeConfigKind, volumeIDs[0])
rtestutils.AssertNoResource[*block.VolumeStatus](ctx, suite.T(), suite.Client.COSI, rawVolumeIDs[0])
suite.Require().EventuallyWithT(func(collect *assert.CollectT) {
// a little retry loop, as the device might be considered busy for a little while after unmounting
asrt := assert.New(collect)
asrt.NoError(suite.Client.BlockDeviceWipe(ctx, &storage.BlockDeviceWipeRequest{
Devices: []*storage.BlockDeviceWipeDescriptor{
{
Device: filepath.Base(vs.TypedSpec().Location),
Method: storage.BlockDeviceWipeDescriptor_FAST,
DropPartition: true,
},
},
}))
}, time.Minute, time.Second, "failed to wipe partition %s", vs.TypedSpec().Location)
// wait for the discovered volume to disappear
rtestutils.AssertNoResource[*block.DiscoveredVolume](ctx, suite.T(), suite.Client.COSI, filepath.Base(vs.TypedSpec().Location))
// re-create the volume
suite.PatchMachineConfig(ctx, configDocs[0])
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, rawVolumeIDs,
func(vs *block.VolumeStatus, asrt *assert.Assertions) {
asrt.Equal(block.VolumePhaseReady, vs.TypedSpec().Phase)
},
)
// clean up
suite.RemoveMachineConfigDocumentsByName(ctx, blockcfg.RawVolumeConfigKind, volumeIDs...)
for _, rawVolumeID := range rawVolumeIDs {
rtestutils.AssertNoResource[*block.VolumeStatus](ctx, suite.T(), suite.Client.COSI, rawVolumeID)
}
suite.Require().NoError(suite.Client.BlockDeviceWipe(ctx, &storage.BlockDeviceWipeRequest{
Devices: []*storage.BlockDeviceWipeDescriptor{
{
Device: filepath.Base(userDisks[0]),
Method: storage.BlockDeviceWipeDescriptor_FAST,
},
},
}))
// wait for the discovered volume reflect wiped status
rtestutils.AssertResource(ctx, suite.T(), suite.Client.COSI, filepath.Base(userDisks[0]),
func(dv *block.DiscoveredVolume, asrt *assert.Assertions) {
asrt.Empty(dv.TypedSpec().Name, "expected discovered volume %s to be wiped", dv.Metadata().ID())
})
}
// TestExistingVolumes performs a series of operations on existing volumes: mount/unmount, etc.
func (suite *VolumesSuite) TestExistingVolumes() {
if testing.Short() {
suite.T().Skip("skipping test in short mode.")
}
if suite.Cluster == nil || suite.Cluster.Provisioner() != base.ProvisionerQEMU {
suite.T().Skip("skipping test for non-qemu provisioner")
}
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
k8sNode, err := suite.GetK8sNodeByInternalIP(suite.ctx, node)
suite.Require().NoError(err)
nodeName := k8sNode.Name
suite.T().Logf("verifying existing volumes on node %s/%s", node, nodeName)
userDisks := suite.UserDisks(suite.ctx, node)
if len(userDisks) < 1 {
suite.T().Skipf("skipping test, not enough user disks available on node %s/%s: %q", node, nodeName, userDisks)
}
ctx := client.WithNode(suite.ctx, node)
disk, err := safe.StateGetByID[*block.Disk](ctx, suite.Client.COSI, filepath.Base(userDisks[0]))
suite.Require().NoError(err)
volumeID := fmt.Sprintf("%04x", rand.Int31())
existingVolumeID := constants.ExistingVolumePrefix + volumeID
// first, create a user volume config to get the volume created
userVolumeID := constants.UserVolumePrefix + volumeID
userVolumeDoc := blockcfg.NewUserVolumeConfigV1Alpha1()
userVolumeDoc.MetaName = volumeID
userVolumeDoc.ProvisioningSpec.DiskSelectorSpec.Match = cel.MustExpression(
cel.ParseBooleanExpression(fmt.Sprintf("'%s' in disk.symlinks", disk.TypedSpec().Symlinks[0]), celenv.DiskLocator()),
)
userVolumeDoc.ProvisioningSpec.ProvisioningMinSize = blockcfg.MustByteSize("100MiB")
userVolumeDoc.ProvisioningSpec.ProvisioningMaxSize = blockcfg.MustByteSize("1GiB")
// create user volumes
suite.PatchMachineConfig(ctx, userVolumeDoc)
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, []resource.ID{userVolumeID},
func(vs *block.VolumeStatus, asrt *assert.Assertions) {
asrt.Equal(block.VolumePhaseReady, vs.TypedSpec().Phase)
},
)
// now destroy a user volume
suite.RemoveMachineConfigDocumentsByName(ctx, blockcfg.UserVolumeConfigKind, volumeID)
// wait for the user volume to be removed
rtestutils.AssertNoResource[*block.VolumeStatus](ctx, suite.T(), suite.Client.COSI, userVolumeID)
// now, recreate the volume as an existing volume
existingVolumeDoc := blockcfg.NewExistingVolumeConfigV1Alpha1()
existingVolumeDoc.MetaName = volumeID
existingVolumeDoc.VolumeDiscoverySpec.VolumeSelectorConfig.Match = cel.MustExpression(
cel.ParseBooleanExpression(
fmt.Sprintf("volume.partition_label == '%s' && '%s' in disk.symlinks", userVolumeID, disk.TypedSpec().Symlinks[0]),
celenv.VolumeLocator(),
),
)
// create existing volume
suite.PatchMachineConfig(ctx, existingVolumeDoc)
// wait for the existing volume to be discovered
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, []resource.ID{existingVolumeID},
func(vs *block.VolumeStatus, asrt *assert.Assertions) {
asrt.Equal(block.VolumePhaseReady, vs.TypedSpec().Phase)
asrt.Equal(userDisks[0], vs.TypedSpec().ParentLocation)
},
)
// check that the volume is mounted
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, []resource.ID{existingVolumeID},
func(vs *block.MountStatus, asrt *assert.Assertions) {
asrt.False(vs.TypedSpec().ReadOnly)
})
// create a pod using existing volumes
podDef, err := suite.NewPod("existing-volume-test")
suite.Require().NoError(err)
// using subdirectory here to test that the hostPath mount is properly propagated into the kubelet
podDef = podDef.WithNodeName(nodeName).
WithNamespace("kube-system").
WithHostVolumeMount(filepath.Join(constants.UserVolumeMountPoint, volumeID, "data"), "/mnt/data")
suite.Require().NoError(podDef.Create(suite.ctx, 1*time.Minute))
_, _, err = podDef.Exec(suite.ctx, "mkdir -p /mnt/data/test")
suite.Require().NoError(err)
suite.Require().NoError(podDef.Delete(suite.ctx))
// verify that directory exists
expectedPath := filepath.Join(constants.UserVolumeMountPoint, volumeID, "data", "test")
stream, err := suite.Client.LS(ctx, &machineapi.ListRequest{
Root: expectedPath,
Types: []machineapi.ListRequest_Type{machineapi.ListRequest_DIRECTORY},
})
suite.Require().NoError(err)
suite.Require().NoError(helpers.ReadGRPCStream(stream, func(info *machineapi.FileInfo, _ string, _ bool) error {
suite.T().Logf("found %s on node %s", info.Name, node)
suite.Require().Equal(expectedPath, info.Name, "expected %s to exist", expectedPath)
return nil
}))
// now, re-mount the existing volume as read-only
existingVolumeDoc.MountSpec.MountReadOnly = pointer.To(true)
suite.PatchMachineConfig(ctx, existingVolumeDoc)
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, []resource.ID{existingVolumeID},
func(vs *block.MountStatus, asrt *assert.Assertions) {
asrt.True(vs.TypedSpec().ReadOnly)
})
// clean up
suite.RemoveMachineConfigDocumentsByName(ctx, blockcfg.ExistingVolumeConfigKind, volumeID)
rtestutils.AssertNoResource[*block.VolumeStatus](ctx, suite.T(), suite.Client.COSI, existingVolumeID)
suite.Require().NoError(suite.Client.BlockDeviceWipe(ctx, &storage.BlockDeviceWipeRequest{
Devices: []*storage.BlockDeviceWipeDescriptor{
{
Device: filepath.Base(userDisks[0]),
Method: storage.BlockDeviceWipeDescriptor_FAST,
},
},
}))
// wait for the discovered volume reflect wiped status
rtestutils.AssertResource(ctx, suite.T(), suite.Client.COSI, filepath.Base(userDisks[0]),
func(dv *block.DiscoveredVolume, asrt *assert.Assertions) {
asrt.Empty(dv.TypedSpec().Name, "expected discovered volume %s to be wiped", dv.Metadata().ID())
})
}
// TestSwapStatus verifies that all swap volumes are successfully enabled.
func (suite *VolumesSuite) TestSwapStatus() {
for _, node := range suite.DiscoverNodeInternalIPs(suite.ctx) {
suite.Run(node, func() {
ctx := client.WithNode(suite.ctx, node)
swapVolumes, err := safe.StateListAll[*block.VolumeConfig](ctx, suite.Client.COSI, state.WithLabelQuery(resource.LabelExists(block.SwapVolumeLabel)))
suite.Require().NoError(err)
if swapVolumes.Len() == 0 {
suite.T().Skipf("skipping test, no swap volumes found on node %s", node)
}
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI,
xslices.Map(slices.Collect(swapVolumes.All()), func(sv *block.VolumeConfig) string {
return sv.Metadata().ID()
}),
func(vs *block.VolumeStatus, asrt *assert.Assertions) {
asrt.Equal(block.VolumePhaseReady, vs.TypedSpec().Phase)
},
)
swapVolumesStatus, err := safe.StateListAll[*block.VolumeStatus](ctx, suite.Client.COSI, state.WithLabelQuery(resource.LabelExists(block.SwapVolumeLabel)))
suite.Require().NoError(err)
deviceNames := xslices.Map(slices.Collect(swapVolumesStatus.All()), func(sv *block.VolumeStatus) string {
return sv.TypedSpec().MountLocation
})
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI,
deviceNames,
func(vs *block.SwapStatus, asrt *assert.Assertions) {},
)
suite.T().Logf("found swap volumes (%q) on node %s", deviceNames, node)
})
}
}
// TestSwapOnOff performs a series of operations on swap volume: creating, destroying, enabling, disabling, etc.
func (suite *VolumesSuite) TestSwapOnOff() {
if testing.Short() {
suite.T().Skip("skipping test in short mode.")
}
if suite.Cluster == nil || suite.Cluster.Provisioner() != base.ProvisionerQEMU {
suite.T().Skip("skipping test for non-qemu provisioner")
}
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
k8sNode, err := suite.GetK8sNodeByInternalIP(suite.ctx, node)
suite.Require().NoError(err)
nodeName := k8sNode.Name
suite.T().Logf("verifying swap on node %s/%s", node, nodeName)
userDisks := suite.UserDisks(suite.ctx, node)
if len(userDisks) < 1 {
suite.T().Skipf("skipping test, not enough user disks available on node %s/%s: %q", node, nodeName, userDisks)
}
ctx := client.WithNode(suite.ctx, node)
disk, err := safe.StateGetByID[*block.Disk](ctx, suite.Client.COSI, filepath.Base(userDisks[0]))
suite.Require().NoError(err)
volumeName := fmt.Sprintf("%04x", rand.Int31())
doc := blockcfg.NewSwapVolumeConfigV1Alpha1()
doc.MetaName = volumeName
doc.ProvisioningSpec.DiskSelectorSpec.Match = cel.MustExpression(
cel.ParseBooleanExpression(fmt.Sprintf("'%s' in disk.symlinks", disk.TypedSpec().Symlinks[0]), celenv.DiskLocator()),
)
doc.EncryptionSpec = blockcfg.EncryptionSpec{
EncryptionProvider: block.EncryptionProviderLUKS2,
EncryptionKeys: []blockcfg.EncryptionKey{
{
KeySlot: 0,
KeyStatic: &blockcfg.EncryptionKeyStatic{
KeyData: "secretswap",
},
},
},
}
doc.ProvisioningSpec.ProvisioningMinSize = blockcfg.MustByteSize("100MiB")
doc.ProvisioningSpec.ProvisioningMaxSize = blockcfg.MustByteSize("500MiB")
// create user volumes
suite.PatchMachineConfig(ctx, doc)
swapVolumeID := constants.SwapVolumePrefix + doc.MetaName
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, []string{swapVolumeID},
func(vs *block.VolumeStatus, asrt *assert.Assertions) {
asrt.Equal(block.VolumePhaseReady, vs.TypedSpec().Phase)
},
)
// check that the volumes are mounted
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, []string{swapVolumeID},
func(vs *block.MountStatus, _ *assert.Assertions) {})
// check that the swap is enabled
volumeStatus, err := safe.ReaderGetByID[*block.VolumeStatus](ctx, suite.Client.COSI, swapVolumeID)
suite.Require().NoError(err)
rtestutils.AssertResources(ctx, suite.T(), suite.Client.COSI, []string{volumeStatus.TypedSpec().MountLocation},
func(vs *block.SwapStatus, asrt *assert.Assertions) {})
// clean up
suite.RemoveMachineConfigDocumentsByName(ctx, blockcfg.SwapVolumeConfigKind, volumeName)
rtestutils.AssertNoResource[*block.VolumeStatus](ctx, suite.T(), suite.Client.COSI, swapVolumeID)
rtestutils.AssertNoResource[*block.SwapStatus](ctx, suite.T(), suite.Client.COSI, volumeStatus.TypedSpec().MountLocation)
suite.Require().NoError(suite.Client.BlockDeviceWipe(ctx, &storage.BlockDeviceWipeRequest{
Devices: []*storage.BlockDeviceWipeDescriptor{
{
Device: filepath.Base(userDisks[0]),
Method: storage.BlockDeviceWipeDescriptor_FAST,
},
},
}))
}
// TestZswapStatus verifies that all zswap-enabled machines have zswap running.
func (suite *VolumesSuite) TestZswapStatus() {
for _, node := range suite.DiscoverNodeInternalIPs(suite.ctx) {
suite.Run(node, func() {
ctx := client.WithNode(suite.ctx, node)
cfg, err := suite.ReadConfigFromNode(ctx)
suite.Require().NoError(err)
if cfg.ZswapConfig() == nil {
suite.T().Skipf("skipping test, zswap is not enabled on node %s", node)
}
rtestutils.AssertResource(ctx, suite.T(), suite.Client.COSI,
block.ZswapStatusID,
func(vs *block.ZswapStatus, asrt *assert.Assertions) {
suite.T().Logf("zswap total size %s, stored pages %d", vs.TypedSpec().TotalSizeHuman, vs.TypedSpec().StoredPages)
},
)
})
}
}
func init() {
allSuites = append(allSuites, new(VolumesSuite))
}