fix: volume discovery improvements

Use shared locks, discover more partitions, some other small changes.

Re-enable the flaky test.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
This commit is contained in:
Andrey Smirnov 2024-05-29 18:03:16 +04:00
parent 80ca8ff713
commit 7c9a14383e
No known key found for this signature in database
GPG Key ID: FE042E3D4085A811
7 changed files with 126 additions and 31 deletions

2
go.mod
View File

@ -128,7 +128,7 @@ require (
github.com/siderolabs/gen v0.5.0
github.com/siderolabs/go-api-signature v0.3.2
github.com/siderolabs/go-blockdevice v0.4.7
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240405165836-3265299b0192
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240604163945-81b69bf28eaa
github.com/siderolabs/go-circular v0.2.0
github.com/siderolabs/go-cmd v0.1.1
github.com/siderolabs/go-copy v0.1.0

4
go.sum
View File

@ -661,8 +661,8 @@ github.com/siderolabs/go-api-signature v0.3.2 h1:blqrZF1GM7TWgq7mY7CsR+yQ93u6az0
github.com/siderolabs/go-api-signature v0.3.2/go.mod h1:punhUOaXa7LELYBRCUhfgUGH6ieVz68GrP98apCKXj8=
github.com/siderolabs/go-blockdevice v0.4.7 h1:2bk4WpEEflGxjrNwp57ye24Pr+cYgAiAeNMWiQOuWbQ=
github.com/siderolabs/go-blockdevice v0.4.7/go.mod h1:4PeOuk71pReJj1JQEXDE7kIIQJPVe8a+HZQa+qjxSEA=
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240405165836-3265299b0192 h1:16/cHDGhTUDBtfIftOkuHWhJcQdpa/FwwWPcTq4aOxc=
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240405165836-3265299b0192/go.mod h1:UBbbc+L7hU0UggOQeKCA+Qp3ImGkSeaLfVOiCbxRxEI=
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240604163945-81b69bf28eaa h1:OjQLrcis/GuqaqxnIw2dxp4ZzT/zk5p1GI3NxcMxkrA=
github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240604163945-81b69bf28eaa/go.mod h1:5GnL7VLNp5/vgiwYP74fi6KuTUfqGcRxQxtto2tzD+I=
github.com/siderolabs/go-circular v0.2.0 h1:Xca8zrjF/YsujLbwDSojkKzJe7ngetnpuIJn8N78DJI=
github.com/siderolabs/go-circular v0.2.0/go.mod h1:rrYCwHLYWmxqrmZP+LjYtwB2a55lxzQi0Ztu1VpWZSc=
github.com/siderolabs/go-cmd v0.1.1 h1:nTouZUSxLeiiEe7hFexSVvaTsY/3O8k1s08BxPRrsps=

View File

@ -6,6 +6,7 @@ package block
import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
@ -74,11 +75,17 @@ func (ctrl *DiscoveryController) Run(ctx context.Context, r controller.Runtime,
continue
}
if err := ctrl.rescan(ctx, r, logger, maps.Keys(nextRescan)); err != nil {
return fmt.Errorf("failed to rescan devices: %w", err)
}
logger.Debug("rescanning devices", zap.Strings("devices", maps.Keys(nextRescan)))
nextRescan = map[string]int{}
if nextRescanBatch, err := ctrl.rescan(ctx, r, logger, maps.Keys(nextRescan)); err != nil {
return fmt.Errorf("failed to rescan devices: %w", err)
} else {
nextRescan = map[string]int{}
for id := range nextRescanBatch {
nextRescan[id] = lastObservedGenerations[id]
}
}
case <-r.EventCh():
devices, err := safe.ReaderListAll[*block.Device](ctx, r)
if err != nil {
@ -125,10 +132,11 @@ func (ctrl *DiscoveryController) Run(ctx context.Context, r controller.Runtime,
}
}
//nolint:gocyclo
func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtime, logger *zap.Logger, ids []string) error {
//nolint:gocyclo,cyclop
func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtime, logger *zap.Logger, ids []string) (map[string]struct{}, error) {
failedIDs := map[string]struct{}{}
touchedIDs := map[string]struct{}{}
nextRescan := map[string]struct{}{}
for _, id := range ids {
device, err := safe.ReaderGetByID[*block.Device](ctx, r, id)
@ -139,18 +147,27 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim
continue
}
return fmt.Errorf("failed to get device: %w", err)
return nil, fmt.Errorf("failed to get device: %w", err)
}
info, err := blkid.ProbePath(filepath.Join("/dev", id))
info, err := blkid.ProbePath(filepath.Join("/dev", id), blkid.WithProbeLogger(logger.With(zap.String("device", id))))
if err != nil {
logger.Debug("failed to probe device", zap.String("id", id), zap.Error(err))
if errors.Is(err, blkid.ErrFailedLock) {
// failed to lock the blockdevice, retry later
logger.Debug("failed to lock device, retrying later", zap.String("id", id))
failedIDs[id] = struct{}{}
nextRescan[id] = struct{}{}
} else {
logger.Debug("failed to probe device", zap.String("id", id), zap.Error(err))
failedIDs[id] = struct{}{}
}
continue
}
logger.Debug("probed device", zap.String("id", id), zap.Any("info", info))
if err = safe.WriterModify(ctx, r, block.NewDiscoveredVolume(block.NamespaceName, id), func(dv *block.DiscoveredVolume) error {
dv.TypedSpec().Type = device.TypedSpec().Type
dv.TypedSpec().DevicePath = device.TypedSpec().DevicePath
@ -164,7 +181,7 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim
return nil
}); err != nil {
return fmt.Errorf("failed to write discovered volume: %w", err)
return nil, fmt.Errorf("failed to write discovered volume: %w", err)
}
touchedIDs[id] = struct{}{}
@ -178,6 +195,11 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim
dv.TypedSpec().Parent = id
dv.TypedSpec().Size = nested.ProbedSize
if dv.TypedSpec().Size == 0 {
dv.TypedSpec().Size = nested.PartitionSize
}
dv.TypedSpec().SectorSize = info.SectorSize
dv.TypedSpec().IOSize = info.IOSize
@ -205,7 +227,7 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim
return nil
}); err != nil {
return fmt.Errorf("failed to write discovered volume: %w", err)
return nil, fmt.Errorf("failed to write discovered volume: %w", err)
}
touchedIDs[partID] = struct{}{}
@ -215,7 +237,7 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim
// clean up discovered volumes
discoveredVolumes, err := safe.ReaderListAll[*block.DiscoveredVolume](ctx, r)
if err != nil {
return fmt.Errorf("failed to list discovered volumes: %w", err)
return nil, fmt.Errorf("failed to list discovered volumes: %w", err)
}
for iterator := discoveredVolumes.Iterator(); iterator.Next(); {
@ -238,12 +260,12 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim
if isFailed || parentTouched {
// if the probe failed, or if the parent was touched, while this device was not, remove it
if err = r.Destroy(ctx, dv.Metadata()); err != nil {
return fmt.Errorf("failed to destroy discovered volume: %w", err)
return nil, fmt.Errorf("failed to destroy discovered volume: %w", err)
}
}
}
return nil
return nextRescan, nil
}
func (ctrl *DiscoveryController) fillDiscoveredVolumeFromInfo(dv *block.DiscoveredVolume, info blkid.ProbeResult) {

View File

@ -44,13 +44,18 @@ func (suite *VolumesSuite) TearDownTest() {
// TestDiscoveredVolumes verifies that standard Talos partitions are discovered.
func (suite *VolumesSuite) TestDiscoveredVolumes() {
suite.T().Skip("skipping test, as it's flaky (going to address it later)")
if !suite.Capabilities().SupportsVolumes {
suite.T().Skip("cluster doesn't support volumes")
}
node := suite.RandomDiscoveredNodeInternalIP()
for _, node := range suite.DiscoverNodeInternalIPs(suite.ctx) {
suite.Run(node, func() {
suite.testDiscoveredVolumes(node)
})
}
}
func (suite *VolumesSuite) testDiscoveredVolumes(node string) {
ctx := client.WithNode(suite.ctx, node)
volumes, err := safe.StateListAll[*block.DiscoveredVolume](ctx, suite.Client.COSI)
@ -59,7 +64,9 @@ func (suite *VolumesSuite) TestDiscoveredVolumes() {
expectedVolumes := map[string]struct {
Name string
}{
"META": {},
"META": {
Name: "talosmeta",
},
"STATE": {
Name: "xfs",
},
@ -71,12 +78,12 @@ func (suite *VolumesSuite) TestDiscoveredVolumes() {
for iterator := volumes.Iterator(); iterator.Next(); {
dv := iterator.Value()
suite.T().Logf("Volume: %s %s %s %s", dv.Metadata().ID(), dv.TypedSpec().Name, dv.TypedSpec().PartitionLabel, dv.TypedSpec().Label)
suite.T().Logf("volume: %s %s %s %s", dv.Metadata().ID(), dv.TypedSpec().Name, dv.TypedSpec().PartitionLabel, dv.TypedSpec().Label)
partitionLabel := dv.TypedSpec().PartitionLabel
filesystemLabel := dv.TypedSpec().Label
// this is encrypted partition, skip it, we should see another device with actual filesystem
// this is encrypted partition, skip it, we should see another device with the actual filesystem
if dv.TypedSpec().Name == "luks" {
continue
}
@ -95,12 +102,16 @@ func (suite *VolumesSuite) TestDiscoveredVolumes() {
}
}
suite.Assert().Equal(expected.Name, dv.TypedSpec().Name)
suite.Assert().Equal(expected.Name, dv.TypedSpec().Name, "node: ", node)
delete(expectedVolumes, id)
}
suite.Assert().Empty(expectedVolumes)
suite.Assert().Empty(expectedVolumes, "node: ", node)
if suite.T().Failed() {
suite.DumpLogs(suite.ctx, node, "controller-runtime", "block.")
}
}
func init() {

View File

@ -32,6 +32,7 @@ import (
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/pkg/cluster"
"github.com/siderolabs/talos/pkg/cluster/check"
"github.com/siderolabs/talos/pkg/machinery/api/common"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
"github.com/siderolabs/talos/pkg/machinery/client"
clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config"
@ -748,6 +749,34 @@ waitLoop:
}
}
// DumpLogs dumps a set of logs from the node.
func (apiSuite *APISuite) DumpLogs(ctx context.Context, node string, service, pattern string) {
nodeCtx := client.WithNode(ctx, node)
logsStream, err := apiSuite.Client.Logs(
nodeCtx,
constants.SystemContainerdNamespace,
common.ContainerDriver_CONTAINERD,
service,
false,
-1,
)
apiSuite.Require().NoError(err)
logReader, err := client.ReadStream(logsStream)
apiSuite.Require().NoError(err)
defer logReader.Close() //nolint:errcheck
scanner := bufio.NewScanner(logReader)
for scanner.Scan() {
if pattern == "" || strings.Contains(scanner.Text(), pattern) {
apiSuite.T().Logf("%s (%s): %s", node, service, scanner.Text())
}
}
}
// TearDownSuite closes Talos API client.
func (apiSuite *APISuite) TearDownSuite() {
if apiSuite.Client != nil {

View File

@ -228,6 +228,27 @@ func (k8sSuite *K8sSuite) WaitForPodToBeRunning(ctx context.Context, timeout tim
}
}
// LogPodLogs logs the logs of the pod with the given namespace and name.
func (k8sSuite *K8sSuite) LogPodLogs(ctx context.Context, namespace, podName string) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
req := k8sSuite.Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{})
readCloser, err := req.Stream(ctx)
if err != nil {
k8sSuite.T().Logf("failed to get pod logs: %s", err)
}
defer readCloser.Close() //nolint:errcheck
scanner := bufio.NewScanner(readCloser)
for scanner.Scan() {
k8sSuite.T().Logf("%s/%s: %s", namespace, podName, scanner.Text())
}
}
// WaitForPodToBeDeleted waits for the pod with the given namespace and name to be deleted.
func (k8sSuite *K8sSuite) WaitForPodToBeDeleted(ctx context.Context, timeout time.Duration, namespace, podName string) error {
ctx, cancel := context.WithTimeout(ctx, timeout)

View File

@ -157,7 +157,13 @@ func (suite *TinkSuite) TestDeploy() {
cpCfgBytes, err := cpCfg.Bytes()
suite.Require().NoError(err)
suite.waitForEndpointReady(talosEndpoint)
readyErr := suite.waitForEndpointReady(talosEndpoint)
if readyErr != nil {
suite.LogPodLogs(ctx, namespace, ss+"-0")
}
suite.Require().NoError(readyErr)
insecureClient, err := client.New(ctx,
client.WithEndpoints(talosEndpoint),
@ -182,7 +188,13 @@ func (suite *TinkSuite) TestDeploy() {
suite.T().Logf("talosconfig = %s", string(ensure.Value(talosconfig.Bytes())))
suite.waitForEndpointReady(talosEndpoint)
readyErr = suite.waitForEndpointReady(talosEndpoint)
if readyErr != nil {
suite.LogPodLogs(ctx, namespace, ss+"-0")
}
suite.Require().NoError(readyErr)
talosClient, err := client.New(ctx,
client.WithConfigContext(talosconfig.Contexts[talosconfig.Context]),
@ -246,8 +258,8 @@ func (access *tinkClusterAccess) NodesByType(typ machine.Type) []cluster.NodeInf
}
}
func (suite *TinkSuite) waitForEndpointReady(endpoint string) {
suite.Require().NoError(retry.Constant(30*time.Second, retry.WithUnits(10*time.Millisecond)).Retry(func() error {
func (suite *TinkSuite) waitForEndpointReady(endpoint string) error {
return retry.Constant(30*time.Second, retry.WithUnits(10*time.Millisecond)).Retry(func() error {
c, err := tls.Dial("tcp", endpoint,
&tls.Config{
InsecureSkipVerify: true,
@ -259,7 +271,7 @@ func (suite *TinkSuite) waitForEndpointReady(endpoint string) {
}
return retry.ExpectedError(err)
}))
})
}
func (suite *TinkSuite) getTinkManifests(namespace, serviceName, ssName, talosImage string) []unstructured.Unstructured {