talos/internal/integration/api/apply-config.go
Andrey Smirnov 9a32e34cb1 feat: implement apply configuration without reboot
This allows config to be written to disk without being applied
immediately.

Small refactoring to extract common code paths.

At first, I tried to implement this via the sequencer, but looks like
it's too hard to get it right, as sequencer lacks context and config to
be written is not applied to the runtime.

Fixes #2828

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
2020-11-23 12:42:44 -08:00

234 lines
6.3 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// +build integration_api
package api
import (
"bytes"
"context"
"fmt"
"io"
"sort"
"sync"
"testing"
"time"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/internal/integration/base"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
"github.com/talos-systems/talos/pkg/machinery/config"
"github.com/talos-systems/talos/pkg/machinery/config/configloader"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
)
// Sysctl to use for testing config changes.
// This sysctl should:
//
// - default to a non-Go-null value in the kernel
//
// - not be defined by the default Talos configuration
//
// - be generally harmless
//
const applyConfigTestSysctl = "net.ipv6.conf.all.accept_ra_mtu"
const applyConfigTestSysctlVal = "1"
const applyConfigNoRebootTestSysctl = "fs.file-max"
const applyConfigNoRebootTestSysctlVal = "500000"
const assertRebootedRebootTimeout = 10 * time.Minute
// ApplyConfigSuite ...
type ApplyConfigSuite struct {
base.K8sSuite
ctx context.Context
ctxCancel context.CancelFunc
}
// SuiteName ...
func (suite *ApplyConfigSuite) SuiteName() string {
return "api.ApplyConfigSuite"
}
// SetupTest ...
func (suite *ApplyConfigSuite) SetupTest() {
if testing.Short() {
suite.T().Skip("skipping in short mode")
}
// make sure we abort at some point in time, but give enough room for Recovers
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 30*time.Minute)
}
// TearDownTest ...
func (suite *ApplyConfigSuite) TearDownTest() {
if suite.ctxCancel != nil {
suite.ctxCancel()
}
}
// TestApply verifies the apply config API.
func (suite *ApplyConfigSuite) TestApply() {
if !suite.Capabilities().SupportsReboot {
suite.T().Skip("cluster doesn't support reboot")
}
nodes := suite.DiscoverNodes().NodesByType(machine.TypeJoin)
suite.Require().NotEmpty(nodes)
suite.WaitForBootDone(suite.ctx)
sort.Strings(nodes)
node := nodes[0]
nodeCtx := client.WithNodes(suite.ctx, node)
provider, err := suite.readConfigFromNode(nodeCtx)
suite.Assert().Nilf(err, "failed to read existing config from node %q: %w", node, err)
provider.Machine().Sysctls()[applyConfigTestSysctl] = applyConfigTestSysctlVal
cfgDataOut, err := provider.Bytes()
suite.Assert().Nilf(err, "failed to marshal updated machine config data (node %q): %w", node, err)
suite.AssertRebooted(suite.ctx, node, func(nodeCtx context.Context) error {
_, err = suite.Client.ApplyConfiguration(nodeCtx, &machineapi.ApplyConfigurationRequest{
Data: cfgDataOut,
})
if err != nil {
// It is expected that the connection will EOF here, so just log the error
suite.Assert().Nilf("failed to apply configuration (node %q): %w", node, err)
}
return nil
}, assertRebootedRebootTimeout)
// Verify configuration change
var newProvider config.Provider
suite.Require().Nilf(retry.Constant(time.Minute, retry.WithUnits(time.Second)).Retry(func() error {
newProvider, err = suite.readConfigFromNode(nodeCtx)
if err != nil {
return retry.ExpectedError(err)
}
return nil
}), "failed to read updated configuration from node %q: %w", node, err)
suite.Assert().Equal(
newProvider.Machine().Sysctls()[applyConfigTestSysctl],
applyConfigTestSysctlVal,
)
}
// TestApplyNoReboot verifies the apply config API without reboot.
func (suite *ApplyConfigSuite) TestApplyNoReboot() {
suite.WaitForBootDone(suite.ctx)
node := suite.RandomDiscoveredNode()
nodeCtx := client.WithNodes(suite.ctx, node)
provider, err := suite.readConfigFromNode(nodeCtx)
suite.Require().NoError(err, "failed to read existing config from node %q", node)
provider.Machine().Sysctls()[applyConfigNoRebootTestSysctl] = applyConfigNoRebootTestSysctlVal
cfgDataOut, err := provider.Bytes()
suite.Require().NoError(err, "failed to marshal updated machine config data (node %q)", node)
_, err = suite.Client.ApplyConfiguration(nodeCtx, &machineapi.ApplyConfigurationRequest{
NoReboot: true,
Data: cfgDataOut,
})
suite.Require().NoError(err, "failed to apply deferred configuration (node %q): %w", node)
// Verify configuration change
var newProvider config.Provider
newProvider, err = suite.readConfigFromNode(nodeCtx)
suite.Require().NoError(err, "failed to read updated configuration from node %q: %w", node)
suite.Assert().Equal(
newProvider.Machine().Sysctls()[applyConfigNoRebootTestSysctl],
applyConfigNoRebootTestSysctlVal,
)
// revert back
delete(provider.Machine().Sysctls(), applyConfigNoRebootTestSysctl)
cfgDataOut, err = provider.Bytes()
suite.Require().NoError(err, "failed to marshal updated machine config data (node %q)", node)
_, err = suite.Client.ApplyConfiguration(nodeCtx, &machineapi.ApplyConfigurationRequest{
NoReboot: true,
Data: cfgDataOut,
})
suite.Require().NoError(err, "failed to apply deferred configuration (node %q): %w", node)
}
func (suite *ApplyConfigSuite) readConfigFromNode(nodeCtx context.Context) (config.Provider, error) {
// Load the current node machine config
cfgData := new(bytes.Buffer)
reader, errCh, err := suite.Client.Read(nodeCtx, constants.ConfigPath)
if err != nil {
return nil, fmt.Errorf("error creating reader: %w", err)
}
defer reader.Close() //nolint: errcheck
if err = copyFromReaderWithErrChan(cfgData, reader, errCh); err != nil {
return nil, fmt.Errorf("error reading: %w", err)
}
provider, err := configloader.NewFromBytes(cfgData.Bytes())
if err != nil {
return nil, fmt.Errorf("failed to parse: %w", err)
}
return provider, nil
}
func copyFromReaderWithErrChan(out io.Writer, in io.Reader, errCh <-chan error) error {
var wg sync.WaitGroup
var chanErr error
wg.Add(1)
go func() {
defer wg.Done()
// StreamReader is only singly-buffered, so we need to process any errors as we get them.
for chanErr = range errCh {
}
}()
defer wg.Wait()
_, err := io.Copy(out, in)
if err != nil {
return err
}
if chanErr != nil {
return chanErr
}
return nil
}
func init() {
allSuites = append(allSuites, new(ApplyConfigSuite))
}