From 7c9a14383ee034b05cb9bd1ff49f8078cbbf5e66 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Wed, 29 May 2024 18:03:16 +0400 Subject: [PATCH] fix: volume discovery improvements Use shared locks, discover more partitions, some other small changes. Re-enable the flaky test. Signed-off-by: Andrey Smirnov --- go.mod | 2 +- go.sum | 4 +- .../pkg/controllers/block/discovery.go | 52 +++++++++++++------ internal/integration/api/volumes.go | 27 +++++++--- internal/integration/base/api.go | 29 +++++++++++ internal/integration/base/k8s.go | 21 ++++++++ internal/integration/k8s/tink.go | 22 ++++++-- 7 files changed, 126 insertions(+), 31 deletions(-) diff --git a/go.mod b/go.mod index a61e31c01..7fa214c4a 100644 --- a/go.mod +++ b/go.mod @@ -128,7 +128,7 @@ require ( github.com/siderolabs/gen v0.5.0 github.com/siderolabs/go-api-signature v0.3.2 github.com/siderolabs/go-blockdevice v0.4.7 - github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240405165836-3265299b0192 + github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240604163945-81b69bf28eaa github.com/siderolabs/go-circular v0.2.0 github.com/siderolabs/go-cmd v0.1.1 github.com/siderolabs/go-copy v0.1.0 diff --git a/go.sum b/go.sum index d66e6d76f..d4174c156 100644 --- a/go.sum +++ b/go.sum @@ -661,8 +661,8 @@ github.com/siderolabs/go-api-signature v0.3.2 h1:blqrZF1GM7TWgq7mY7CsR+yQ93u6az0 github.com/siderolabs/go-api-signature v0.3.2/go.mod h1:punhUOaXa7LELYBRCUhfgUGH6ieVz68GrP98apCKXj8= github.com/siderolabs/go-blockdevice v0.4.7 h1:2bk4WpEEflGxjrNwp57ye24Pr+cYgAiAeNMWiQOuWbQ= github.com/siderolabs/go-blockdevice v0.4.7/go.mod h1:4PeOuk71pReJj1JQEXDE7kIIQJPVe8a+HZQa+qjxSEA= -github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240405165836-3265299b0192 h1:16/cHDGhTUDBtfIftOkuHWhJcQdpa/FwwWPcTq4aOxc= -github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240405165836-3265299b0192/go.mod h1:UBbbc+L7hU0UggOQeKCA+Qp3ImGkSeaLfVOiCbxRxEI= +github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240604163945-81b69bf28eaa h1:OjQLrcis/GuqaqxnIw2dxp4ZzT/zk5p1GI3NxcMxkrA= +github.com/siderolabs/go-blockdevice/v2 v2.0.0-20240604163945-81b69bf28eaa/go.mod h1:5GnL7VLNp5/vgiwYP74fi6KuTUfqGcRxQxtto2tzD+I= github.com/siderolabs/go-circular v0.2.0 h1:Xca8zrjF/YsujLbwDSojkKzJe7ngetnpuIJn8N78DJI= github.com/siderolabs/go-circular v0.2.0/go.mod h1:rrYCwHLYWmxqrmZP+LjYtwB2a55lxzQi0Ztu1VpWZSc= github.com/siderolabs/go-cmd v0.1.1 h1:nTouZUSxLeiiEe7hFexSVvaTsY/3O8k1s08BxPRrsps= diff --git a/internal/app/machined/pkg/controllers/block/discovery.go b/internal/app/machined/pkg/controllers/block/discovery.go index 9a791da5f..9244411ff 100644 --- a/internal/app/machined/pkg/controllers/block/discovery.go +++ b/internal/app/machined/pkg/controllers/block/discovery.go @@ -6,6 +6,7 @@ package block import ( "context" + "errors" "fmt" "path/filepath" "strconv" @@ -74,11 +75,17 @@ func (ctrl *DiscoveryController) Run(ctx context.Context, r controller.Runtime, continue } - if err := ctrl.rescan(ctx, r, logger, maps.Keys(nextRescan)); err != nil { - return fmt.Errorf("failed to rescan devices: %w", err) - } + logger.Debug("rescanning devices", zap.Strings("devices", maps.Keys(nextRescan))) - nextRescan = map[string]int{} + if nextRescanBatch, err := ctrl.rescan(ctx, r, logger, maps.Keys(nextRescan)); err != nil { + return fmt.Errorf("failed to rescan devices: %w", err) + } else { + nextRescan = map[string]int{} + + for id := range nextRescanBatch { + nextRescan[id] = lastObservedGenerations[id] + } + } case <-r.EventCh(): devices, err := safe.ReaderListAll[*block.Device](ctx, r) if err != nil { @@ -125,10 +132,11 @@ func (ctrl *DiscoveryController) Run(ctx context.Context, r controller.Runtime, } } -//nolint:gocyclo -func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtime, logger *zap.Logger, ids []string) error { +//nolint:gocyclo,cyclop +func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtime, logger *zap.Logger, ids []string) (map[string]struct{}, error) { failedIDs := map[string]struct{}{} touchedIDs := map[string]struct{}{} + nextRescan := map[string]struct{}{} for _, id := range ids { device, err := safe.ReaderGetByID[*block.Device](ctx, r, id) @@ -139,18 +147,27 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim continue } - return fmt.Errorf("failed to get device: %w", err) + return nil, fmt.Errorf("failed to get device: %w", err) } - info, err := blkid.ProbePath(filepath.Join("/dev", id)) + info, err := blkid.ProbePath(filepath.Join("/dev", id), blkid.WithProbeLogger(logger.With(zap.String("device", id)))) if err != nil { - logger.Debug("failed to probe device", zap.String("id", id), zap.Error(err)) + if errors.Is(err, blkid.ErrFailedLock) { + // failed to lock the blockdevice, retry later + logger.Debug("failed to lock device, retrying later", zap.String("id", id)) - failedIDs[id] = struct{}{} + nextRescan[id] = struct{}{} + } else { + logger.Debug("failed to probe device", zap.String("id", id), zap.Error(err)) + + failedIDs[id] = struct{}{} + } continue } + logger.Debug("probed device", zap.String("id", id), zap.Any("info", info)) + if err = safe.WriterModify(ctx, r, block.NewDiscoveredVolume(block.NamespaceName, id), func(dv *block.DiscoveredVolume) error { dv.TypedSpec().Type = device.TypedSpec().Type dv.TypedSpec().DevicePath = device.TypedSpec().DevicePath @@ -164,7 +181,7 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim return nil }); err != nil { - return fmt.Errorf("failed to write discovered volume: %w", err) + return nil, fmt.Errorf("failed to write discovered volume: %w", err) } touchedIDs[id] = struct{}{} @@ -178,6 +195,11 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim dv.TypedSpec().Parent = id dv.TypedSpec().Size = nested.ProbedSize + + if dv.TypedSpec().Size == 0 { + dv.TypedSpec().Size = nested.PartitionSize + } + dv.TypedSpec().SectorSize = info.SectorSize dv.TypedSpec().IOSize = info.IOSize @@ -205,7 +227,7 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim return nil }); err != nil { - return fmt.Errorf("failed to write discovered volume: %w", err) + return nil, fmt.Errorf("failed to write discovered volume: %w", err) } touchedIDs[partID] = struct{}{} @@ -215,7 +237,7 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim // clean up discovered volumes discoveredVolumes, err := safe.ReaderListAll[*block.DiscoveredVolume](ctx, r) if err != nil { - return fmt.Errorf("failed to list discovered volumes: %w", err) + return nil, fmt.Errorf("failed to list discovered volumes: %w", err) } for iterator := discoveredVolumes.Iterator(); iterator.Next(); { @@ -238,12 +260,12 @@ func (ctrl *DiscoveryController) rescan(ctx context.Context, r controller.Runtim if isFailed || parentTouched { // if the probe failed, or if the parent was touched, while this device was not, remove it if err = r.Destroy(ctx, dv.Metadata()); err != nil { - return fmt.Errorf("failed to destroy discovered volume: %w", err) + return nil, fmt.Errorf("failed to destroy discovered volume: %w", err) } } } - return nil + return nextRescan, nil } func (ctrl *DiscoveryController) fillDiscoveredVolumeFromInfo(dv *block.DiscoveredVolume, info blkid.ProbeResult) { diff --git a/internal/integration/api/volumes.go b/internal/integration/api/volumes.go index 271d38cae..f7bde26b1 100644 --- a/internal/integration/api/volumes.go +++ b/internal/integration/api/volumes.go @@ -44,13 +44,18 @@ func (suite *VolumesSuite) TearDownTest() { // TestDiscoveredVolumes verifies that standard Talos partitions are discovered. func (suite *VolumesSuite) TestDiscoveredVolumes() { - suite.T().Skip("skipping test, as it's flaky (going to address it later)") - if !suite.Capabilities().SupportsVolumes { suite.T().Skip("cluster doesn't support volumes") } - node := suite.RandomDiscoveredNodeInternalIP() + for _, node := range suite.DiscoverNodeInternalIPs(suite.ctx) { + suite.Run(node, func() { + suite.testDiscoveredVolumes(node) + }) + } +} + +func (suite *VolumesSuite) testDiscoveredVolumes(node string) { ctx := client.WithNode(suite.ctx, node) volumes, err := safe.StateListAll[*block.DiscoveredVolume](ctx, suite.Client.COSI) @@ -59,7 +64,9 @@ func (suite *VolumesSuite) TestDiscoveredVolumes() { expectedVolumes := map[string]struct { Name string }{ - "META": {}, + "META": { + Name: "talosmeta", + }, "STATE": { Name: "xfs", }, @@ -71,12 +78,12 @@ func (suite *VolumesSuite) TestDiscoveredVolumes() { for iterator := volumes.Iterator(); iterator.Next(); { dv := iterator.Value() - suite.T().Logf("Volume: %s %s %s %s", dv.Metadata().ID(), dv.TypedSpec().Name, dv.TypedSpec().PartitionLabel, dv.TypedSpec().Label) + suite.T().Logf("volume: %s %s %s %s", dv.Metadata().ID(), dv.TypedSpec().Name, dv.TypedSpec().PartitionLabel, dv.TypedSpec().Label) partitionLabel := dv.TypedSpec().PartitionLabel filesystemLabel := dv.TypedSpec().Label - // this is encrypted partition, skip it, we should see another device with actual filesystem + // this is encrypted partition, skip it, we should see another device with the actual filesystem if dv.TypedSpec().Name == "luks" { continue } @@ -95,12 +102,16 @@ func (suite *VolumesSuite) TestDiscoveredVolumes() { } } - suite.Assert().Equal(expected.Name, dv.TypedSpec().Name) + suite.Assert().Equal(expected.Name, dv.TypedSpec().Name, "node: ", node) delete(expectedVolumes, id) } - suite.Assert().Empty(expectedVolumes) + suite.Assert().Empty(expectedVolumes, "node: ", node) + + if suite.T().Failed() { + suite.DumpLogs(suite.ctx, node, "controller-runtime", "block.") + } } func init() { diff --git a/internal/integration/base/api.go b/internal/integration/base/api.go index 284834416..f0aff79e1 100644 --- a/internal/integration/base/api.go +++ b/internal/integration/base/api.go @@ -32,6 +32,7 @@ import ( "github.com/siderolabs/talos/internal/app/machined/pkg/runtime" "github.com/siderolabs/talos/pkg/cluster" "github.com/siderolabs/talos/pkg/cluster/check" + "github.com/siderolabs/talos/pkg/machinery/api/common" machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine" "github.com/siderolabs/talos/pkg/machinery/client" clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config" @@ -748,6 +749,34 @@ waitLoop: } } +// DumpLogs dumps a set of logs from the node. +func (apiSuite *APISuite) DumpLogs(ctx context.Context, node string, service, pattern string) { + nodeCtx := client.WithNode(ctx, node) + + logsStream, err := apiSuite.Client.Logs( + nodeCtx, + constants.SystemContainerdNamespace, + common.ContainerDriver_CONTAINERD, + service, + false, + -1, + ) + apiSuite.Require().NoError(err) + + logReader, err := client.ReadStream(logsStream) + apiSuite.Require().NoError(err) + + defer logReader.Close() //nolint:errcheck + + scanner := bufio.NewScanner(logReader) + + for scanner.Scan() { + if pattern == "" || strings.Contains(scanner.Text(), pattern) { + apiSuite.T().Logf("%s (%s): %s", node, service, scanner.Text()) + } + } +} + // TearDownSuite closes Talos API client. func (apiSuite *APISuite) TearDownSuite() { if apiSuite.Client != nil { diff --git a/internal/integration/base/k8s.go b/internal/integration/base/k8s.go index f7876662c..e65b00eb9 100644 --- a/internal/integration/base/k8s.go +++ b/internal/integration/base/k8s.go @@ -228,6 +228,27 @@ func (k8sSuite *K8sSuite) WaitForPodToBeRunning(ctx context.Context, timeout tim } } +// LogPodLogs logs the logs of the pod with the given namespace and name. +func (k8sSuite *K8sSuite) LogPodLogs(ctx context.Context, namespace, podName string) { + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + req := k8sSuite.Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{}) + + readCloser, err := req.Stream(ctx) + if err != nil { + k8sSuite.T().Logf("failed to get pod logs: %s", err) + } + + defer readCloser.Close() //nolint:errcheck + + scanner := bufio.NewScanner(readCloser) + + for scanner.Scan() { + k8sSuite.T().Logf("%s/%s: %s", namespace, podName, scanner.Text()) + } +} + // WaitForPodToBeDeleted waits for the pod with the given namespace and name to be deleted. func (k8sSuite *K8sSuite) WaitForPodToBeDeleted(ctx context.Context, timeout time.Duration, namespace, podName string) error { ctx, cancel := context.WithTimeout(ctx, timeout) diff --git a/internal/integration/k8s/tink.go b/internal/integration/k8s/tink.go index 1c0aa1e5e..790dfcd22 100644 --- a/internal/integration/k8s/tink.go +++ b/internal/integration/k8s/tink.go @@ -157,7 +157,13 @@ func (suite *TinkSuite) TestDeploy() { cpCfgBytes, err := cpCfg.Bytes() suite.Require().NoError(err) - suite.waitForEndpointReady(talosEndpoint) + readyErr := suite.waitForEndpointReady(talosEndpoint) + + if readyErr != nil { + suite.LogPodLogs(ctx, namespace, ss+"-0") + } + + suite.Require().NoError(readyErr) insecureClient, err := client.New(ctx, client.WithEndpoints(talosEndpoint), @@ -182,7 +188,13 @@ func (suite *TinkSuite) TestDeploy() { suite.T().Logf("talosconfig = %s", string(ensure.Value(talosconfig.Bytes()))) - suite.waitForEndpointReady(talosEndpoint) + readyErr = suite.waitForEndpointReady(talosEndpoint) + + if readyErr != nil { + suite.LogPodLogs(ctx, namespace, ss+"-0") + } + + suite.Require().NoError(readyErr) talosClient, err := client.New(ctx, client.WithConfigContext(talosconfig.Contexts[talosconfig.Context]), @@ -246,8 +258,8 @@ func (access *tinkClusterAccess) NodesByType(typ machine.Type) []cluster.NodeInf } } -func (suite *TinkSuite) waitForEndpointReady(endpoint string) { - suite.Require().NoError(retry.Constant(30*time.Second, retry.WithUnits(10*time.Millisecond)).Retry(func() error { +func (suite *TinkSuite) waitForEndpointReady(endpoint string) error { + return retry.Constant(30*time.Second, retry.WithUnits(10*time.Millisecond)).Retry(func() error { c, err := tls.Dial("tcp", endpoint, &tls.Config{ InsecureSkipVerify: true, @@ -259,7 +271,7 @@ func (suite *TinkSuite) waitForEndpointReady(endpoint string) { } return retry.ExpectedError(err) - })) + }) } func (suite *TinkSuite) getTinkManifests(namespace, serviceName, ssName, talosImage string) []unstructured.Unstructured {