mirror of
https://github.com/siderolabs/talos.git
synced 2025-12-07 18:41:33 +01:00
chore: cleanup code
- Replace unsafe resource interface calls with type-safe versions. - Remove unused parameter names. - Minor changes. Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
This commit is contained in:
parent
61d363e1d0
commit
cedabeddf7
2
go.mod
2
go.mod
@ -141,7 +141,7 @@ require (
|
||||
github.com/siderolabs/crypto v0.5.0
|
||||
github.com/siderolabs/discovery-api v0.1.4
|
||||
github.com/siderolabs/discovery-client v0.1.10
|
||||
github.com/siderolabs/gen v0.6.1
|
||||
github.com/siderolabs/gen v0.7.0
|
||||
github.com/siderolabs/go-api-signature v0.3.6
|
||||
github.com/siderolabs/go-blockdevice v0.4.8
|
||||
github.com/siderolabs/go-blockdevice/v2 v2.0.3
|
||||
|
||||
4
go.sum
4
go.sum
@ -638,8 +638,8 @@ github.com/siderolabs/discovery-api v0.1.4 h1:2fMEFSMiWaD1zDiBDY5md8VxItvL1rDQRS
|
||||
github.com/siderolabs/discovery-api v0.1.4/go.mod h1:kaBy+G42v2xd/uAF/NIe383sjNTBE2AhxPTyi9SZI0s=
|
||||
github.com/siderolabs/discovery-client v0.1.10 h1:bTAvFLiISSzVXyYL1cIgAz8cPYd9ZfvhxwdebgtxARA=
|
||||
github.com/siderolabs/discovery-client v0.1.10/go.mod h1:Ew1z07eyJwqNwum84IKYH4S649KEKK5WUmRW49HlXS8=
|
||||
github.com/siderolabs/gen v0.6.1 h1:Mex6Q41Tlw3e+4cGvlju2x4UwULD5WMo/D82n7IxV0Y=
|
||||
github.com/siderolabs/gen v0.6.1/go.mod h1:an3a2Y53O7kUjnnK8Bfu3gewtvnIOu5RTU6HalFtXQQ=
|
||||
github.com/siderolabs/gen v0.7.0 h1:uHAt3WD0dof28NHFuguWBbDokaXQraR/HyVxCLw2QCU=
|
||||
github.com/siderolabs/gen v0.7.0/go.mod h1:an3a2Y53O7kUjnnK8Bfu3gewtvnIOu5RTU6HalFtXQQ=
|
||||
github.com/siderolabs/go-api-signature v0.3.6 h1:wDIsXbpl7Oa/FXvxB6uz4VL9INA9fmr3EbmjEZYFJrU=
|
||||
github.com/siderolabs/go-api-signature v0.3.6/go.mod h1:hoH13AfunHflxbXfh+NoploqV13ZTDfQ1mQJWNVSW9U=
|
||||
github.com/siderolabs/go-blockdevice v0.4.8 h1:KfdWvIx0Jft5YVuCsFIJFwjWEF1oqtzkgX9PeU9cX4c=
|
||||
|
||||
@ -16,7 +16,7 @@ require (
|
||||
github.com/blang/semver/v4 v4.0.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/klauspost/compress v1.17.11
|
||||
github.com/siderolabs/gen v0.6.1
|
||||
github.com/siderolabs/gen v0.7.0
|
||||
github.com/siderolabs/go-retry v0.3.3
|
||||
github.com/spf13/pflag v1.0.5
|
||||
golang.org/x/sync v0.8.0
|
||||
|
||||
@ -179,8 +179,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
|
||||
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
|
||||
github.com/siderolabs/gen v0.6.1 h1:Mex6Q41Tlw3e+4cGvlju2x4UwULD5WMo/D82n7IxV0Y=
|
||||
github.com/siderolabs/gen v0.6.1/go.mod h1:an3a2Y53O7kUjnnK8Bfu3gewtvnIOu5RTU6HalFtXQQ=
|
||||
github.com/siderolabs/gen v0.7.0 h1:uHAt3WD0dof28NHFuguWBbDokaXQraR/HyVxCLw2QCU=
|
||||
github.com/siderolabs/gen v0.7.0/go.mod h1:an3a2Y53O7kUjnnK8Bfu3gewtvnIOu5RTU6HalFtXQQ=
|
||||
github.com/siderolabs/go-retry v0.3.3 h1:zKV+S1vumtO72E6sYsLlmIdV/G/GcYSBLiEx/c9oCEg=
|
||||
github.com/siderolabs/go-retry v0.3.3/go.mod h1:Ff/VGc7v7un4uQg3DybgrmOWHEmJ8BzZds/XNn/BqMI=
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
|
||||
@ -11,7 +11,7 @@ require (
|
||||
github.com/invopop/jsonschema v0.12.0
|
||||
github.com/microcosm-cc/bluemonday v1.0.27
|
||||
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
|
||||
github.com/siderolabs/gen v0.6.1
|
||||
github.com/siderolabs/gen v0.7.0
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.8
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
mvdan.cc/gofumpt v0.7.0
|
||||
|
||||
@ -31,8 +31,8 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU
|
||||
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
|
||||
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
|
||||
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY=
|
||||
github.com/siderolabs/gen v0.6.1 h1:Mex6Q41Tlw3e+4cGvlju2x4UwULD5WMo/D82n7IxV0Y=
|
||||
github.com/siderolabs/gen v0.6.1/go.mod h1:an3a2Y53O7kUjnnK8Bfu3gewtvnIOu5RTU6HalFtXQQ=
|
||||
github.com/siderolabs/gen v0.7.0 h1:uHAt3WD0dof28NHFuguWBbDokaXQraR/HyVxCLw2QCU=
|
||||
github.com/siderolabs/gen v0.7.0/go.mod h1:an3a2Y53O7kUjnnK8Bfu3gewtvnIOu5RTU6HalFtXQQ=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/unix4ever/yaml v0.0.0-20220527175918-f17b0f05cf2c h1:Vn6nVVu9MdOYvXPkJP83iX5jVIfvxFC9v9xIKb+DlaQ=
|
||||
|
||||
@ -59,11 +59,11 @@ func (ctrl *LVMActivationController) Outputs() []controller.Output {
|
||||
//nolint:gocyclo
|
||||
func (ctrl *LVMActivationController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
if ctrl.seenVolumes == nil {
|
||||
ctrl.seenVolumes = make(map[string]struct{})
|
||||
ctrl.seenVolumes = map[string]struct{}{}
|
||||
}
|
||||
|
||||
if ctrl.activatedVGs == nil {
|
||||
ctrl.activatedVGs = make(map[string]struct{})
|
||||
ctrl.activatedVGs = map[string]struct{}{}
|
||||
}
|
||||
|
||||
for {
|
||||
|
||||
@ -49,7 +49,7 @@ func (ctrl *SystemDiskController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *SystemDiskController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *SystemDiskController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-r.EventCh():
|
||||
|
||||
@ -67,7 +67,7 @@ func partitionIdxMatch(devicePath string, partitionIdx int) cel.Expression {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *UserDiskConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *UserDiskConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-r.EventCh():
|
||||
|
||||
@ -132,7 +132,7 @@ func (ctrl *VolumeConfigController) convertEncryption(in cfg.Encryption, out *bl
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *VolumeConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *VolumeConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-r.EventCh():
|
||||
|
||||
@ -60,7 +60,7 @@ func (ctrl *KubernetesPushController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *KubernetesPushController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *KubernetesPushController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
defer func() {
|
||||
if ctrl.kubernetesClient != nil {
|
||||
ctrl.kubernetesClient.Close() //nolint:errcheck
|
||||
|
||||
@ -113,7 +113,7 @@ func (ctrl *LocalAffiliateController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (ctrl *LocalAffiliateController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *LocalAffiliateController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
"go.uber.org/zap"
|
||||
@ -99,8 +100,8 @@ func (ctrl *NodeIdentityController) Run(ctx context.Context, r controller.Runtim
|
||||
return fmt.Errorf("error caching node identity: %w", err)
|
||||
}
|
||||
|
||||
if err := r.Modify(ctx, cluster.NewIdentity(cluster.NamespaceName, cluster.LocalIdentity), func(r resource.Resource) error {
|
||||
*r.(*cluster.Identity).TypedSpec() = localIdentity
|
||||
if err := safe.WriterModify(ctx, r, cluster.NewIdentity(cluster.NamespaceName, cluster.LocalIdentity), func(r *cluster.Identity) error {
|
||||
*r.TypedSpec() = localIdentity
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
@ -108,12 +109,12 @@ func (ctrl *NodeIdentityController) Run(ctx context.Context, r controller.Runtim
|
||||
}
|
||||
|
||||
// generate `/etc/machine-id` from node identity
|
||||
if err := r.Modify(ctx, files.NewEtcFileSpec(files.NamespaceName, "machine-id"),
|
||||
func(r resource.Resource) error {
|
||||
if err := safe.WriterModify(ctx, r, files.NewEtcFileSpec(files.NamespaceName, "machine-id"),
|
||||
func(r *files.EtcFileSpec) error {
|
||||
var err error
|
||||
|
||||
r.(*files.EtcFileSpec).TypedSpec().Contents, err = clusteradapter.IdentitySpec(&localIdentity).ConvertMachineID()
|
||||
r.(*files.EtcFileSpec).TypedSpec().Mode = 0o444
|
||||
r.TypedSpec().Contents, err = clusteradapter.IdentitySpec(&localIdentity).ConvertMachineID()
|
||||
r.TypedSpec().Mode = 0o444
|
||||
|
||||
return err
|
||||
}); err != nil {
|
||||
|
||||
@ -9,7 +9,6 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
@ -50,7 +49,7 @@ func (ctrl *MachineTypeController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *MachineTypeController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *MachineTypeController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -69,8 +68,8 @@ func (ctrl *MachineTypeController) Run(ctx context.Context, r controller.Runtime
|
||||
machineType = cfg.Config().Machine().Type()
|
||||
}
|
||||
|
||||
if err = r.Modify(ctx, config.NewMachineType(), func(r resource.Resource) error {
|
||||
r.(*config.MachineType).SetMachineType(machineType)
|
||||
if err = safe.WriterModify(ctx, r, config.NewMachineType(), func(r *config.MachineType) error {
|
||||
r.SetMachineType(machineType)
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
||||
@ -49,7 +49,7 @@ func (ctrl *SeccompProfileController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.StatsController interface.
|
||||
func (ctrl *SeccompProfileController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *SeccompProfileController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -51,7 +51,7 @@ func (ctrl *SeccompProfileFileController) Outputs() []controller.Output {
|
||||
// Run implements controller.StatsController interface.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (ctrl *SeccompProfileFileController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *SeccompProfileFileController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
// initially, wait for /var to be mounted
|
||||
if err := r.UpdateInputs([]controller.Input{
|
||||
{
|
||||
|
||||
@ -57,7 +57,7 @@ func (ctrl *MemberController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *MemberController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *MemberController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -42,7 +42,7 @@ type MemberSuite struct {
|
||||
|
||||
func (suite *MemberSuite) assertEtcdMember(member *etcd.Member) func() error {
|
||||
return func() error {
|
||||
r, err := suite.State().Get(suite.Ctx(), member.Metadata())
|
||||
r, err := ctest.Get[*etcd.Member](suite, member.Metadata())
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
return retry.ExpectedError(err)
|
||||
@ -51,7 +51,7 @@ func (suite *MemberSuite) assertEtcdMember(member *etcd.Member) func() error {
|
||||
return err
|
||||
}
|
||||
|
||||
spec := r.(*etcd.Member).TypedSpec()
|
||||
spec := r.TypedSpec()
|
||||
expectedSpec := member.TypedSpec()
|
||||
|
||||
suite.Require().Equal(expectedSpec.MemberID, spec.MemberID)
|
||||
|
||||
@ -62,7 +62,7 @@ func (ctrl *PKIController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *PKIController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *PKIController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -67,7 +67,7 @@ func (ctrl *SpecController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (ctrl *SpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *SpecController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -11,7 +11,7 @@ import (
|
||||
"sort"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/siderolabs/talos/internal/pkg/toml"
|
||||
@ -52,7 +52,7 @@ func (ctrl *CRIConfigPartsController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *CRIConfigPartsController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *CRIConfigPartsController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
if ctrl.CRIConfdPath == "" {
|
||||
ctrl.CRIConfdPath = constants.CRIConfdPath
|
||||
}
|
||||
@ -77,9 +77,9 @@ func (ctrl *CRIConfigPartsController) Run(ctx context.Context, r controller.Runt
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.Modify(ctx, files.NewEtcFileSpec(files.NamespaceName, constants.CRIConfig),
|
||||
func(r resource.Resource) error {
|
||||
spec := r.(*files.EtcFileSpec).TypedSpec()
|
||||
if err := safe.WriterModify(ctx, r, files.NewEtcFileSpec(files.NamespaceName, constants.CRIConfig),
|
||||
func(r *files.EtcFileSpec) error {
|
||||
spec := r.TypedSpec()
|
||||
|
||||
spec.Contents = out
|
||||
spec.Mode = 0o600
|
||||
|
||||
@ -13,7 +13,6 @@ import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
@ -62,7 +61,7 @@ func (ctrl *CRIRegistryConfigController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *CRIRegistryConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *CRIRegistryConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
basePath := filepath.Join(constants.CRIConfdPath, "hosts")
|
||||
shadowPath := filepath.Join(constants.SystemPath, basePath)
|
||||
|
||||
@ -113,9 +112,9 @@ func (ctrl *CRIRegistryConfigController) Run(ctx context.Context, r controller.R
|
||||
criHosts = &containerd.HostsConfig{}
|
||||
}
|
||||
|
||||
if err := r.Modify(ctx, files.NewEtcFileSpec(files.NamespaceName, constants.CRIRegistryConfigPart),
|
||||
func(r resource.Resource) error {
|
||||
spec := r.(*files.EtcFileSpec).TypedSpec()
|
||||
if err := safe.WriterModify(ctx, r, files.NewEtcFileSpec(files.NamespaceName, constants.CRIRegistryConfigPart),
|
||||
func(r *files.EtcFileSpec) error {
|
||||
spec := r.TypedSpec()
|
||||
|
||||
spec.Contents = criRegistryContents
|
||||
spec.Mode = 0o600
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
@ -72,13 +73,13 @@ func (ctrl *EtcFileController) Run(ctx context.Context, r controller.Runtime, lo
|
||||
case <-r.EventCh():
|
||||
}
|
||||
|
||||
list, err := r.List(ctx, resource.NewMetadata(files.NamespaceName, files.EtcFileSpecType, "", resource.VersionUndefined))
|
||||
list, err := safe.ReaderList[*files.EtcFileSpec](ctx, r, resource.NewMetadata(files.NamespaceName, files.EtcFileSpecType, "", resource.VersionUndefined))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing specs: %w", err)
|
||||
}
|
||||
|
||||
// add finalizers for all live resources
|
||||
for _, res := range list.Items {
|
||||
for res := range list.All() {
|
||||
if res.Metadata().Phase() != resource.PhaseRunning {
|
||||
continue
|
||||
}
|
||||
@ -90,8 +91,7 @@ func (ctrl *EtcFileController) Run(ctx context.Context, r controller.Runtime, lo
|
||||
|
||||
touchedIDs := make(map[resource.ID]struct{})
|
||||
|
||||
for _, item := range list.Items {
|
||||
spec := item.(*files.EtcFileSpec) //nolint:errcheck,forcetypeassert
|
||||
for spec := range list.All() {
|
||||
filename := spec.Metadata().ID()
|
||||
_, mountExists := ctrl.bindMounts[filename]
|
||||
|
||||
@ -137,8 +137,8 @@ func (ctrl *EtcFileController) Run(ctx context.Context, r controller.Runtime, lo
|
||||
return fmt.Errorf("error updating %q: %w", dst, err)
|
||||
}
|
||||
|
||||
if err = r.Modify(ctx, files.NewEtcFileStatus(files.NamespaceName, filename), func(r resource.Resource) error {
|
||||
r.(*files.EtcFileStatus).TypedSpec().SpecVersion = spec.Metadata().Version().String()
|
||||
if err = safe.WriterModify(ctx, r, files.NewEtcFileStatus(files.NamespaceName, filename), func(r *files.EtcFileStatus) error {
|
||||
r.TypedSpec().SpecVersion = spec.Metadata().Version().String()
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
@ -150,12 +150,12 @@ func (ctrl *EtcFileController) Run(ctx context.Context, r controller.Runtime, lo
|
||||
}
|
||||
|
||||
// list statuses for cleanup
|
||||
list, err = r.List(ctx, resource.NewMetadata(files.NamespaceName, files.EtcFileStatusType, "", resource.VersionUndefined))
|
||||
statuses, err := safe.ReaderList[*files.EtcFileStatus](ctx, r, resource.NewMetadata(files.NamespaceName, files.EtcFileStatusType, "", resource.VersionUndefined))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing resources: %w", err)
|
||||
}
|
||||
|
||||
for _, res := range list.Items {
|
||||
for res := range statuses.All() {
|
||||
if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
|
||||
if err = r.Destroy(ctx, res.Metadata()); err != nil {
|
||||
return fmt.Errorf("error cleaning up specs: %w", err)
|
||||
|
||||
@ -54,7 +54,7 @@ func (ctrl *AddressFilterController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *AddressFilterController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *AddressFilterController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -87,7 +87,7 @@ func (ctrl *KubeletSpecController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -74,7 +74,7 @@ func (ctrl *KubeletStaticPodController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *KubeletStaticPodController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *KubeletStaticPodController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
var kubeletClient *kubelet.Client
|
||||
|
||||
refreshTicker := time.NewTicker(15 * time.Second) // refresh kubelet pods status every 15 seconds
|
||||
|
||||
@ -64,7 +64,7 @@ func (ctrl *ManifestController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *ManifestController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *ManifestController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -58,7 +58,7 @@ func (ctrl *NodeAnnotationSpecController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *NodeAnnotationSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *NodeAnnotationSpecController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -51,7 +51,7 @@ func (ctrl *NodeCordonedSpecController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *NodeCordonedSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *NodeCordonedSpecController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -59,7 +59,7 @@ func (ctrl *NodeLabelSpecController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *NodeLabelSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *NodeLabelSpecController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -59,7 +59,7 @@ func (ctrl *NodenameController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *NodenameController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *NodenameController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -68,7 +68,7 @@ func (ctrl *RenderConfigsStaticPodController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *RenderConfigsStaticPodController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *RenderConfigsStaticPodController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -83,7 +83,7 @@ func (ctrl *RenderSecretsStaticPodController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (ctrl *RenderSecretsStaticPodController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *RenderSecretsStaticPodController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -54,7 +54,7 @@ func (ctrl *StaticEndpointController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *StaticEndpointController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *StaticEndpointController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -54,7 +54,7 @@ func (ctrl *StaticPodConfigController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *StaticPodConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *StaticPodConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -63,7 +63,7 @@ func (ctrl *EndpointController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
"go.uber.org/zap"
|
||||
@ -73,7 +74,7 @@ func (ctrl *IdentityController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (ctrl *IdentityController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *IdentityController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
if ctrl.StatePath == "" {
|
||||
ctrl.StatePath = constants.StateMountPoint
|
||||
}
|
||||
@ -92,19 +93,19 @@ func (ctrl *IdentityController) Run(ctx context.Context, r controller.Runtime, l
|
||||
return fmt.Errorf("error reading mount status: %w", err)
|
||||
}
|
||||
|
||||
cfg, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, kubespan.ConfigType, kubespan.ConfigID, resource.VersionUndefined))
|
||||
cfg, err := safe.ReaderGet[*kubespan.Config](ctx, r, resource.NewMetadata(config.NamespaceName, kubespan.ConfigType, kubespan.ConfigID, resource.VersionUndefined))
|
||||
if err != nil && !state.IsNotFoundError(err) {
|
||||
return fmt.Errorf("error getting kubespan configuration: %w", err)
|
||||
}
|
||||
|
||||
firstMAC, err := r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.HardwareAddrType, network.FirstHardwareAddr, resource.VersionUndefined))
|
||||
firstMAC, err := safe.ReaderGet[*network.HardwareAddr](ctx, r, resource.NewMetadata(network.NamespaceName, network.HardwareAddrType, network.FirstHardwareAddr, resource.VersionUndefined))
|
||||
if err != nil && !state.IsNotFoundError(err) {
|
||||
return fmt.Errorf("error getting first MAC address: %w", err)
|
||||
}
|
||||
|
||||
touchedIDs := make(map[resource.ID]struct{})
|
||||
|
||||
if cfg != nil && firstMAC != nil && cfg.(*kubespan.Config).TypedSpec().Enabled {
|
||||
if cfg != nil && firstMAC != nil && cfg.TypedSpec().Enabled {
|
||||
var localIdentity kubespan.IdentitySpec
|
||||
|
||||
if err = controllers.LoadOrNewFromFile(filepath.Join(ctrl.StatePath, constants.KubeSpanIdentityFilename), &localIdentity, func(v any) error {
|
||||
@ -113,15 +114,15 @@ func (ctrl *IdentityController) Run(ctx context.Context, r controller.Runtime, l
|
||||
return fmt.Errorf("error caching kubespan identity: %w", err)
|
||||
}
|
||||
|
||||
kubespanCfg := cfg.(*kubespan.Config).TypedSpec()
|
||||
mac := firstMAC.(*network.HardwareAddr).TypedSpec()
|
||||
kubespanCfg := cfg.TypedSpec()
|
||||
mac := firstMAC.TypedSpec()
|
||||
|
||||
if err = kubespanadapter.IdentitySpec(&localIdentity).UpdateAddress(kubespanCfg.ClusterID, net.HardwareAddr(mac.HardwareAddr)); err != nil {
|
||||
return fmt.Errorf("error updating KubeSpan address: %w", err)
|
||||
}
|
||||
|
||||
if err = r.Modify(ctx, kubespan.NewIdentity(kubespan.NamespaceName, kubespan.LocalIdentity), func(res resource.Resource) error {
|
||||
*res.(*kubespan.Identity).TypedSpec() = localIdentity
|
||||
if err = safe.WriterModify(ctx, r, kubespan.NewIdentity(kubespan.NamespaceName, kubespan.LocalIdentity), func(res *kubespan.Identity) error {
|
||||
*res.TypedSpec() = localIdentity
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
||||
@ -163,12 +163,12 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
|
||||
case <-tickerC:
|
||||
}
|
||||
|
||||
cfg, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, kubespan.ConfigType, kubespan.ConfigID, resource.VersionUndefined))
|
||||
cfg, err := safe.ReaderGet[*kubespan.Config](ctx, r, resource.NewMetadata(config.NamespaceName, kubespan.ConfigType, kubespan.ConfigID, resource.VersionUndefined))
|
||||
if err != nil && !state.IsNotFoundError(err) {
|
||||
return fmt.Errorf("error getting kubespan configuration: %w", err)
|
||||
}
|
||||
|
||||
if cfg == nil || !cfg.(*kubespan.Config).TypedSpec().Enabled {
|
||||
if cfg == nil || !cfg.TypedSpec().Enabled {
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
|
||||
@ -203,9 +203,9 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
|
||||
tickerC = ticker.C
|
||||
}
|
||||
|
||||
cfgSpec := cfg.(*kubespan.Config).TypedSpec()
|
||||
cfgSpec := cfg.TypedSpec()
|
||||
|
||||
localIdentity, err := r.Get(ctx, resource.NewMetadata(kubespan.NamespaceName, kubespan.IdentityType, kubespan.LocalIdentity, resource.VersionUndefined))
|
||||
localIdentity, err := safe.ReaderGet[*kubespan.Identity](ctx, r, resource.NewMetadata(kubespan.NamespaceName, kubespan.IdentityType, kubespan.LocalIdentity, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
continue
|
||||
@ -214,7 +214,7 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
|
||||
return fmt.Errorf("error getting local KubeSpan identity: %w", err)
|
||||
}
|
||||
|
||||
localSpec := localIdentity.(*kubespan.Identity).TypedSpec()
|
||||
localSpec := localIdentity.TypedSpec()
|
||||
|
||||
// fetch PeerSpecs and PeerStatuses and sync them
|
||||
peerSpecList, err := r.List(ctx, resource.NewMetadata(kubespan.NamespaceName, kubespan.PeerSpecType, "", resource.VersionUndefined))
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/value"
|
||||
"github.com/siderolabs/go-procfs/procfs"
|
||||
@ -162,11 +163,12 @@ func (ctrl *AddressConfigController) apply(ctx context.Context, r controller.Run
|
||||
for _, address := range addresses {
|
||||
id := network.LayeredID(address.ConfigLayer, network.AddressID(address.LinkName, address.Address))
|
||||
|
||||
if err := r.Modify(
|
||||
if err := safe.WriterModify(
|
||||
ctx,
|
||||
r,
|
||||
network.NewAddressSpec(network.ConfigNamespaceName, id),
|
||||
func(r resource.Resource) error {
|
||||
*r.(*network.AddressSpec).TypedSpec() = address
|
||||
func(r *network.AddressSpec) error {
|
||||
*r.TypedSpec() = address
|
||||
|
||||
return nil
|
||||
},
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
"go.uber.org/zap"
|
||||
@ -57,7 +58,7 @@ func (ctrl *AddressEventController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *AddressEventController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *AddressEventController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
ticker := time.NewTicker(time.Minute * 10)
|
||||
|
||||
defer ticker.Stop()
|
||||
@ -72,8 +73,9 @@ func (ctrl *AddressEventController) Run(ctx context.Context, r controller.Runtim
|
||||
|
||||
var addresses []string
|
||||
|
||||
nodeAddr, err := r.Get(
|
||||
nodeAddr, err := safe.ReaderGet[*network.NodeAddress](
|
||||
ctx,
|
||||
r,
|
||||
resource.NewMetadata(
|
||||
network.NamespaceName,
|
||||
network.NodeAddressType,
|
||||
@ -87,7 +89,7 @@ func (ctrl *AddressEventController) Run(ctx context.Context, r controller.Runtim
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
for _, addr := range nodeAddr.(*network.NodeAddress).TypedSpec().Addresses {
|
||||
for _, addr := range nodeAddr.TypedSpec().Addresses {
|
||||
addresses = append(
|
||||
addresses,
|
||||
addr.Addr().String(),
|
||||
@ -97,13 +99,13 @@ func (ctrl *AddressEventController) Run(ctx context.Context, r controller.Runtim
|
||||
|
||||
var hostname string
|
||||
|
||||
hostnameStatus, err := r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.HostnameStatusType, network.HostnameID, resource.VersionUndefined))
|
||||
hostnameStatus, err := safe.ReaderGet[*network.HostnameStatus](ctx, r, resource.NewMetadata(network.NamespaceName, network.HostnameStatusType, network.HostnameID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if !state.IsNotFoundError(err) {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
hostname = hostnameStatus.(*network.HostnameStatus).TypedSpec().Hostname
|
||||
hostname = hostnameStatus.TypedSpec().Hostname
|
||||
}
|
||||
|
||||
ctrl.V1Alpha1Events.Publish(ctx, &machine.AddressEvent{
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -56,7 +57,7 @@ func (ctrl *AddressMergeController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *AddressMergeController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *AddressMergeController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -89,9 +90,7 @@ func (ctrl *AddressMergeController) Run(ctx context.Context, r controller.Runtim
|
||||
conflictsDetected := 0
|
||||
|
||||
for id, address := range addresses {
|
||||
if err = r.Modify(ctx, network.NewAddressSpec(network.NamespaceName, id), func(res resource.Resource) error {
|
||||
addr := res.(*network.AddressSpec) //nolint:errcheck,forcetypeassert
|
||||
|
||||
if err = safe.WriterModify(ctx, r, network.NewAddressSpec(network.NamespaceName, id), func(addr *network.AddressSpec) error {
|
||||
*addr.TypedSpec() = *address.TypedSpec()
|
||||
|
||||
return nil
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/jsimonetti/rtnetlink/v2"
|
||||
"github.com/mdlayher/arp"
|
||||
"go.uber.org/zap"
|
||||
@ -76,13 +77,13 @@ func (ctrl *AddressSpecController) Run(ctx context.Context, r controller.Runtime
|
||||
}
|
||||
|
||||
// list source network configuration resources
|
||||
list, err := r.List(ctx, resource.NewMetadata(network.NamespaceName, network.AddressSpecType, "", resource.VersionUndefined))
|
||||
list, err := safe.ReaderList[*network.AddressSpec](ctx, r, resource.NewMetadata(network.NamespaceName, network.AddressSpecType, "", resource.VersionUndefined))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing source network addresses: %w", err)
|
||||
}
|
||||
|
||||
// add finalizers for all live resources
|
||||
for _, res := range list.Items {
|
||||
for res := range list.All() {
|
||||
if res.Metadata().Phase() != resource.PhaseRunning {
|
||||
continue
|
||||
}
|
||||
@ -105,9 +106,7 @@ func (ctrl *AddressSpecController) Run(ctx context.Context, r controller.Runtime
|
||||
}
|
||||
|
||||
// loop over addresses and make reconcile decision
|
||||
for _, res := range list.Items {
|
||||
address := res.(*network.AddressSpec) //nolint:forcetypeassert,errcheck
|
||||
|
||||
for address := range list.All() {
|
||||
if err = ctrl.syncAddress(ctx, r, logger, conn, links, addrs, address); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/jsimonetti/rtnetlink/v2"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sys/unix"
|
||||
@ -46,7 +47,7 @@ func (ctrl *AddressStatusController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *AddressStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *AddressStatusController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK|unix.RTMGRP_IPV4_IFADDR|unix.RTMGRP_IPV6_IFADDR)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -98,8 +99,8 @@ func (ctrl *AddressStatusController) Run(ctx context.Context, r controller.Runti
|
||||
ipPrefix := netip.PrefixFrom(ipAddr, int(addr.PrefixLength))
|
||||
id := network.AddressID(linkLookup[addr.Index], ipPrefix)
|
||||
|
||||
if err = r.Modify(ctx, network.NewAddressStatus(network.NamespaceName, id), func(r resource.Resource) error {
|
||||
status := r.(*network.AddressStatus).TypedSpec()
|
||||
if err = safe.WriterModify(ctx, r, network.NewAddressStatus(network.NamespaceName, id), func(r *network.AddressStatus) error {
|
||||
status := r.TypedSpec()
|
||||
|
||||
status.Address = ipPrefix
|
||||
status.Local, _ = netip.AddrFromSlice(addr.Attributes.Local)
|
||||
|
||||
@ -163,11 +163,12 @@ func (ctrl *DNSResolveCacheController) Run(ctx context.Context, r controller.Run
|
||||
}
|
||||
|
||||
prxs := xiter.Map(
|
||||
upstreams.All(),
|
||||
// We are using iterator here to preserve finalizer on
|
||||
func(upstream *network.DNSUpstream) *proxy.Proxy {
|
||||
return upstream.TypedSpec().Value.Conn.Proxy().(*proxy.Proxy)
|
||||
})
|
||||
},
|
||||
upstreams.All(),
|
||||
)
|
||||
|
||||
if ctrl.handler.SetProxy(prxs) {
|
||||
ctrl.Logger.Info("updated dns server nameservers", zap.Array("addrs", addrsArr(upstreams)))
|
||||
|
||||
@ -8,9 +8,11 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"iter"
|
||||
"net/netip"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
|
||||
@ -19,6 +21,7 @@ import (
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
"github.com/siderolabs/gen/value"
|
||||
"github.com/siderolabs/gen/xiter"
|
||||
"go.uber.org/zap"
|
||||
|
||||
efiles "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/files"
|
||||
@ -89,7 +92,7 @@ func (ctrl *EtcFileController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (ctrl *EtcFileController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *EtcFileController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -161,7 +164,7 @@ func (ctrl *EtcFileController) Run(ctx context.Context, r controller.Runtime, lo
|
||||
dnsServers = []netip.Addr{hostDNSCfg.TypedSpec().ServiceHostDNSAddress}
|
||||
}
|
||||
|
||||
conf := renderResolvConf(dnsServers, hostnameStatusSpec, cfgProvider)
|
||||
conf := renderResolvConf(slices.All(dnsServers), hostnameStatusSpec, cfgProvider)
|
||||
|
||||
if err = os.MkdirAll(filepath.Dir(ctrl.PodResolvConfPath), 0o755); err != nil {
|
||||
return fmt.Errorf("error creating pod resolv.conf dir: %w", err)
|
||||
@ -189,18 +192,18 @@ func (ctrl *EtcFileController) Run(ctx context.Context, r controller.Runtime, lo
|
||||
}
|
||||
}
|
||||
|
||||
var localDNS = []netip.Addr{netip.MustParseAddr("127.0.0.53")}
|
||||
var localDNS = xiter.Single2(0, netip.MustParseAddr("127.0.0.53"))
|
||||
|
||||
func pickNameservers(hostDNSCfg *network.HostDNSConfig, resolverStatus *network.ResolverStatus) []netip.Addr {
|
||||
func pickNameservers(hostDNSCfg *network.HostDNSConfig, resolverStatus *network.ResolverStatus) iter.Seq2[int, netip.Addr] {
|
||||
if hostDNSCfg.TypedSpec().Enabled {
|
||||
// local dns resolve cache enabled, route host dns requests to 127.0.0.1
|
||||
return localDNS
|
||||
}
|
||||
|
||||
return resolverStatus.TypedSpec().DNSServers
|
||||
return slices.All(resolverStatus.TypedSpec().DNSServers)
|
||||
}
|
||||
|
||||
func renderResolvConf(nameservers []netip.Addr, hostnameStatus *network.HostnameStatusSpec, cfgProvider talosconfig.Config) []byte {
|
||||
func renderResolvConf(nameservers iter.Seq2[int, netip.Addr], hostnameStatus *network.HostnameStatusSpec, cfgProvider talosconfig.Config) []byte {
|
||||
var buf bytes.Buffer
|
||||
|
||||
for i, ns := range nameservers {
|
||||
@ -229,9 +232,7 @@ func (ctrl *EtcFileController) renderHosts(hostnameStatus *network.HostnameStatu
|
||||
|
||||
tabW := tabwriter.NewWriter(&buf, 0, 0, 1, ' ', 0)
|
||||
|
||||
write := func(s string) {
|
||||
tabW.Write([]byte(s)) //nolint:errcheck
|
||||
}
|
||||
write := func(s string) { tabW.Write([]byte(s)) } //nolint:errcheck
|
||||
|
||||
write("127.0.0.1\tlocalhost\n")
|
||||
|
||||
|
||||
@ -127,9 +127,9 @@ func (suite *EtcFileConfigSuite) SetupTest() {
|
||||
suite.hostDNSConfig.TypedSpec().Enabled = true
|
||||
suite.hostDNSConfig.TypedSpec().ListenAddresses = []netip.AddrPort{
|
||||
netip.MustParseAddrPort("127.0.0.53:53"),
|
||||
netip.MustParseAddrPort("10.96.0.9:53"),
|
||||
netip.MustParseAddrPort("169.254.116.108:53"),
|
||||
}
|
||||
suite.hostDNSConfig.TypedSpec().ServiceHostDNSAddress = netip.MustParseAddr("10.96.0.9")
|
||||
suite.hostDNSConfig.TypedSpec().ServiceHostDNSAddress = netip.MustParseAddr("169.254.116.108")
|
||||
}
|
||||
|
||||
func (suite *EtcFileConfigSuite) startRuntime() {
|
||||
@ -228,7 +228,7 @@ func (suite *EtcFileConfigSuite) TestComplete() {
|
||||
etcFileContents{
|
||||
hosts: "127.0.0.1 localhost\n33.11.22.44 foo.example.com foo\n::1 localhost ip6-localhost ip6-loopback\nff02::1 ip6-allnodes\nff02::2 ip6-allrouters\n10.0.0.1 a b\n10.0.0.2 c d\n", //nolint:lll
|
||||
resolvConf: "nameserver 127.0.0.53\n\nsearch example.com\n",
|
||||
resolvGlobalConf: "nameserver 10.96.0.9\n\nsearch example.com\n",
|
||||
resolvGlobalConf: "nameserver 169.254.116.108\n\nsearch example.com\n",
|
||||
},
|
||||
)
|
||||
}
|
||||
@ -239,7 +239,7 @@ func (suite *EtcFileConfigSuite) TestNoExtraHosts() {
|
||||
etcFileContents{
|
||||
hosts: "127.0.0.1 localhost\n33.11.22.44 foo.example.com foo\n::1 localhost ip6-localhost ip6-loopback\nff02::1 ip6-allnodes\nff02::2 ip6-allrouters\n",
|
||||
resolvConf: "nameserver 127.0.0.53\n\nsearch example.com\n",
|
||||
resolvGlobalConf: "nameserver 10.96.0.9\n\nsearch example.com\n",
|
||||
resolvGlobalConf: "nameserver 169.254.116.108\n\nsearch example.com\n",
|
||||
},
|
||||
)
|
||||
}
|
||||
@ -262,7 +262,7 @@ func (suite *EtcFileConfigSuite) TestNoSearchDomain() {
|
||||
etcFileContents{
|
||||
hosts: "127.0.0.1 localhost\n33.11.22.44 foo.example.com foo\n::1 localhost ip6-localhost ip6-loopback\nff02::1 ip6-allnodes\nff02::2 ip6-allrouters\n",
|
||||
resolvConf: "nameserver 127.0.0.53\n",
|
||||
resolvGlobalConf: "nameserver 10.96.0.9\n",
|
||||
resolvGlobalConf: "nameserver 169.254.116.108\n",
|
||||
},
|
||||
)
|
||||
}
|
||||
@ -275,7 +275,7 @@ func (suite *EtcFileConfigSuite) TestNoDomainname() {
|
||||
etcFileContents{
|
||||
hosts: "127.0.0.1 localhost\n33.11.22.44 foo\n::1 localhost ip6-localhost ip6-loopback\nff02::1 ip6-allnodes\nff02::2 ip6-allrouters\n",
|
||||
resolvConf: "nameserver 127.0.0.53\n",
|
||||
resolvGlobalConf: "nameserver 10.96.0.9\n",
|
||||
resolvGlobalConf: "nameserver 169.254.116.108\n",
|
||||
},
|
||||
)
|
||||
}
|
||||
@ -286,7 +286,7 @@ func (suite *EtcFileConfigSuite) TestOnlyResolvers() {
|
||||
etcFileContents{
|
||||
hosts: "",
|
||||
resolvConf: "nameserver 127.0.0.53\n",
|
||||
resolvGlobalConf: "nameserver 10.96.0.9\n",
|
||||
resolvGlobalConf: "nameserver 169.254.116.108\n",
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/siderolabs/talos/pkg/machinery/resources/network"
|
||||
@ -47,7 +48,7 @@ func (ctrl *HardwareAddrController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *HardwareAddrController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *HardwareAddrController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -80,8 +81,8 @@ func (ctrl *HardwareAddrController) Run(ctx context.Context, r controller.Runtim
|
||||
continue
|
||||
}
|
||||
|
||||
if err = r.Modify(ctx, network.NewHardwareAddr(network.NamespaceName, network.FirstHardwareAddr), func(r resource.Resource) error {
|
||||
spec := r.(*network.HardwareAddr).TypedSpec()
|
||||
if err = safe.WriterModify(ctx, r, network.NewHardwareAddr(network.NamespaceName, network.FirstHardwareAddr), func(r *network.HardwareAddr) error {
|
||||
spec := r.TypedSpec()
|
||||
|
||||
spec.HardwareAddr = link.TypedSpec().HardwareAddr
|
||||
spec.Name = link.Metadata().ID()
|
||||
|
||||
@ -98,13 +98,13 @@ func (ctrl *HostnameConfigController) Run(ctx context.Context, r controller.Runt
|
||||
// defaults
|
||||
var defaultAddr *network.NodeAddress
|
||||
|
||||
addrs, err := r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.NodeAddressType, network.NodeAddressDefaultID, resource.VersionUndefined))
|
||||
addrs, err := safe.ReaderGet[*network.NodeAddress](ctx, r, resource.NewMetadata(network.NamespaceName, network.NodeAddressType, network.NodeAddressDefaultID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if !state.IsNotFoundError(err) {
|
||||
return fmt.Errorf("error getting config: %w", err)
|
||||
}
|
||||
} else {
|
||||
defaultAddr = addrs.(*network.NodeAddress) //nolint:errcheck,forcetypeassert
|
||||
defaultAddr = addrs
|
||||
}
|
||||
|
||||
// parse kernel cmdline for the default gateway
|
||||
@ -122,9 +122,9 @@ func (ctrl *HostnameConfigController) Run(ctx context.Context, r controller.Runt
|
||||
}
|
||||
|
||||
if cfgProvider.Machine().Features().StableHostnameEnabled() {
|
||||
var identity resource.Resource
|
||||
var identity *cluster.Identity
|
||||
|
||||
identity, err = r.Get(ctx, resource.NewMetadata(cluster.NamespaceName, cluster.IdentityType, cluster.LocalIdentity, resource.VersionUndefined))
|
||||
identity, err = safe.ReaderGet[*cluster.Identity](ctx, r, resource.NewMetadata(cluster.NamespaceName, cluster.IdentityType, cluster.LocalIdentity, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if !state.IsNotFoundError(err) {
|
||||
return fmt.Errorf("error getting local identity: %w", err)
|
||||
@ -133,7 +133,7 @@ func (ctrl *HostnameConfigController) Run(ctx context.Context, r controller.Runt
|
||||
continue
|
||||
}
|
||||
|
||||
nodeID := identity.(*cluster.Identity).TypedSpec().NodeID
|
||||
nodeID := identity.TypedSpec().NodeID
|
||||
|
||||
stableHostname := ctrl.getStableDefault(nodeID)
|
||||
specs = append(specs, *stableHostname)
|
||||
@ -183,11 +183,12 @@ func (ctrl *HostnameConfigController) apply(ctx context.Context, r controller.Ru
|
||||
for _, spec := range specs {
|
||||
id := network.LayeredID(spec.ConfigLayer, network.HostnameID)
|
||||
|
||||
if err := r.Modify(
|
||||
if err := safe.WriterModify(
|
||||
ctx,
|
||||
r,
|
||||
network.NewHostnameSpec(network.ConfigNamespaceName, id),
|
||||
func(r resource.Resource) error {
|
||||
*r.(*network.HostnameSpec).TypedSpec() = spec
|
||||
func(r *network.HostnameSpec) error {
|
||||
*r.TypedSpec() = spec
|
||||
|
||||
return nil
|
||||
},
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -54,7 +55,7 @@ func (ctrl *HostnameMergeController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *HostnameMergeController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *HostnameMergeController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -83,9 +84,7 @@ func (ctrl *HostnameMergeController) Run(ctx context.Context, r controller.Runti
|
||||
}
|
||||
|
||||
if final.Hostname != "" {
|
||||
if err = r.Modify(ctx, network.NewHostnameSpec(network.NamespaceName, network.HostnameID), func(res resource.Resource) error {
|
||||
spec := res.(*network.HostnameSpec) //nolint:errcheck,forcetypeassert
|
||||
|
||||
if err = safe.WriterModify(ctx, r, network.NewHostnameSpec(network.NamespaceName, network.HostnameID), func(spec *network.HostnameSpec) error {
|
||||
*spec.TypedSpec() = final
|
||||
|
||||
return nil
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sys/unix"
|
||||
@ -91,9 +92,7 @@ func (ctrl *HostnameSpecController) Run(ctx context.Context, r controller.Runtim
|
||||
return fmt.Errorf("error removing finalizer: %w", err)
|
||||
}
|
||||
case resource.PhaseRunning:
|
||||
if err = r.Modify(ctx, network.NewHostnameStatus(network.NamespaceName, spec.Metadata().ID()), func(r resource.Resource) error {
|
||||
status := r.(*network.HostnameStatus) //nolint:forcetypeassert,errcheck
|
||||
|
||||
if err = safe.WriterModify(ctx, r, network.NewHostnameStatus(network.NamespaceName, spec.Metadata().ID()), func(status *network.HostnameStatus) error {
|
||||
status.TypedSpec().Hostname = spec.TypedSpec().Hostname
|
||||
status.TypedSpec().Domainname = spec.TypedSpec().Domainname
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/maps"
|
||||
"github.com/siderolabs/gen/pair/ordered"
|
||||
@ -236,11 +237,12 @@ func (ctrl *LinkConfigController) apply(ctx context.Context, r controller.Runtim
|
||||
for _, link := range links {
|
||||
id := network.LayeredID(link.ConfigLayer, network.LinkID(link.Name))
|
||||
|
||||
if err := r.Modify(
|
||||
if err := safe.WriterModify(
|
||||
ctx,
|
||||
r,
|
||||
network.NewLinkSpec(network.ConfigNamespaceName, id),
|
||||
func(r resource.Resource) error {
|
||||
*r.(*network.LinkSpec).TypedSpec() = link
|
||||
func(r *network.LinkSpec) error {
|
||||
*r.TypedSpec() = link
|
||||
|
||||
return nil
|
||||
},
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -103,9 +104,7 @@ func (ctrl *LinkMergeController) Run(ctx context.Context, r controller.Runtime,
|
||||
conflictsDetected := 0
|
||||
|
||||
for id, link := range links {
|
||||
if err = r.Modify(ctx, network.NewLinkSpec(network.NamespaceName, id), func(res resource.Resource) error {
|
||||
l := res.(*network.LinkSpec) //nolint:errcheck,forcetypeassert
|
||||
|
||||
if err = safe.WriterModify(ctx, r, network.NewLinkSpec(network.NamespaceName, id), func(l *network.LinkSpec) error {
|
||||
*l.TypedSpec() = *link
|
||||
|
||||
return nil
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/jsimonetti/rtnetlink/v2"
|
||||
"github.com/mdlayher/ethtool"
|
||||
ethtoolioctl "github.com/safchain/ethtool"
|
||||
@ -221,8 +222,8 @@ func (ctrl *LinkStatusController) reconcile(
|
||||
}
|
||||
}
|
||||
|
||||
if err = r.Modify(ctx, network.NewLinkStatus(network.NamespaceName, link.Attributes.Name), func(r resource.Resource) error {
|
||||
status := r.(*network.LinkStatus).TypedSpec()
|
||||
if err = safe.WriterModify(ctx, r, network.NewLinkStatus(network.NamespaceName, link.Attributes.Name), func(r *network.LinkStatus) error {
|
||||
status := r.TypedSpec()
|
||||
|
||||
status.Index = link.Index
|
||||
status.HardwareAddr = nethelpers.HardwareAddr(link.Attributes.Address)
|
||||
|
||||
@ -61,7 +61,7 @@ func (ctrl *NfTablesChainConfigController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *NfTablesChainConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) (err error) {
|
||||
func (ctrl *NfTablesChainConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) (err error) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -162,7 +162,10 @@ func (ctrl *NfTablesChainConfigController) Run(ctx context.Context, r controller
|
||||
network.NfTablesRule{
|
||||
MatchSourceAddress: &network.NfTablesAddressMatch{
|
||||
IncludeSubnets: xslices.Map(
|
||||
append(slices.Clone(cfg.Config().Cluster().Network().PodCIDRs()), cfg.Config().Cluster().Network().ServiceCIDRs()...),
|
||||
slices.Concat(
|
||||
cfg.Config().Cluster().Network().PodCIDRs(),
|
||||
cfg.Config().Cluster().Network().ServiceCIDRs(),
|
||||
),
|
||||
netip.MustParsePrefix,
|
||||
),
|
||||
},
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/siderolabs/gen/value"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -62,7 +63,7 @@ func (ctrl *NodeAddressController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
var addressStatusController AddressStatusController
|
||||
|
||||
addressStatusControllerName := addressStatusController.Name()
|
||||
@ -161,8 +162,8 @@ func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime
|
||||
|
||||
// update output resources
|
||||
if !value.IsZero(defaultAddress) {
|
||||
if err = r.Modify(ctx, network.NewNodeAddress(network.NamespaceName, network.NodeAddressDefaultID), func(r resource.Resource) error {
|
||||
spec := r.(*network.NodeAddress).TypedSpec()
|
||||
if err = safe.WriterModify(ctx, r, network.NewNodeAddress(network.NamespaceName, network.NodeAddressDefaultID), func(r *network.NodeAddress) error {
|
||||
spec := r.TypedSpec()
|
||||
|
||||
// never overwrite default address if it's already set
|
||||
// we should start handing default address updates, but for now we're not ready
|
||||
@ -300,8 +301,8 @@ outer:
|
||||
}
|
||||
|
||||
func updateCurrentAddresses(ctx context.Context, r controller.Runtime, id resource.ID, current []netip.Prefix) error {
|
||||
if err := r.Modify(ctx, network.NewNodeAddress(network.NamespaceName, id), func(r resource.Resource) error {
|
||||
spec := r.(*network.NodeAddress).TypedSpec()
|
||||
if err := safe.WriterModify(ctx, r, network.NewNodeAddress(network.NamespaceName, id), func(r *network.NodeAddress) error {
|
||||
spec := r.TypedSpec()
|
||||
|
||||
spec.Addresses = current
|
||||
|
||||
@ -314,8 +315,8 @@ func updateCurrentAddresses(ctx context.Context, r controller.Runtime, id resour
|
||||
}
|
||||
|
||||
func updateAccumulativeAddresses(ctx context.Context, r controller.Runtime, id resource.ID, accumulative []netip.Prefix) error {
|
||||
if err := r.Modify(ctx, network.NewNodeAddress(network.NamespaceName, id), func(r resource.Resource) error {
|
||||
spec := r.(*network.NodeAddress).TypedSpec()
|
||||
if err := safe.WriterModify(ctx, r, network.NewNodeAddress(network.NamespaceName, id), func(r *network.NodeAddress) error {
|
||||
spec := r.TypedSpec()
|
||||
|
||||
for _, ip := range accumulative {
|
||||
// find insert position using binary search
|
||||
@ -328,9 +329,7 @@ func updateAccumulativeAddresses(ctx context.Context, r controller.Runtime, id r
|
||||
}
|
||||
|
||||
// insert at position i
|
||||
spec.Addresses = append(spec.Addresses, netip.Prefix{})
|
||||
copy(spec.Addresses[i+1:], spec.Addresses[i:])
|
||||
spec.Addresses[i] = ip
|
||||
spec.Addresses = slices.Insert(spec.Addresses, i, ip)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/siderolabs/gen/xslices"
|
||||
@ -303,11 +304,12 @@ func (ctrl *OperatorConfigController) apply(ctx context.Context, r controller.Ru
|
||||
for _, spec := range specs {
|
||||
id := network.LayeredID(spec.ConfigLayer, network.OperatorID(spec.Operator, spec.LinkName))
|
||||
|
||||
if err := r.Modify(
|
||||
if err := safe.WriterModify(
|
||||
ctx,
|
||||
r,
|
||||
network.NewOperatorSpec(network.ConfigNamespaceName, id),
|
||||
func(r resource.Resource) error {
|
||||
*r.(*network.OperatorSpec).TypedSpec() = spec
|
||||
func(r *network.OperatorSpec) error {
|
||||
*r.TypedSpec() = spec
|
||||
|
||||
return nil
|
||||
},
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -56,7 +57,7 @@ func (ctrl *OperatorMergeController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *OperatorMergeController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *OperatorMergeController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -89,9 +90,7 @@ func (ctrl *OperatorMergeController) Run(ctx context.Context, r controller.Runti
|
||||
conflictsDetected := 0
|
||||
|
||||
for id, operator := range operators {
|
||||
if err = r.Modify(ctx, network.NewOperatorSpec(network.NamespaceName, id), func(res resource.Resource) error {
|
||||
op := res.(*network.OperatorSpec) //nolint:errcheck,forcetypeassert
|
||||
|
||||
if err = safe.WriterModify(ctx, r, network.NewOperatorSpec(network.NamespaceName, id), func(op *network.OperatorSpec) error {
|
||||
*op.TypedSpec() = *operator.TypedSpec()
|
||||
|
||||
return nil
|
||||
|
||||
@ -141,7 +141,7 @@ func (mock *mockOperator) TimeServerSpecs() []network.TimeServerSpecSpec {
|
||||
return mock.timeservers
|
||||
}
|
||||
|
||||
func (suite *OperatorSpecSuite) newOperator(logger *zap.Logger, spec *network.OperatorSpecSpec) operator.Operator {
|
||||
func (suite *OperatorSpecSuite) newOperator(_ *zap.Logger, spec *network.OperatorSpecSpec) operator.Operator {
|
||||
return &mockOperator{
|
||||
spec: *spec,
|
||||
}
|
||||
@ -311,7 +311,8 @@ func (suite *OperatorSpecSuite) TestScheduling() {
|
||||
retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
|
||||
func() error {
|
||||
return suite.assertRunning(
|
||||
[]string{"dhcp4/eth0", "vip/eth0"}, func(op *mockOperator) error {
|
||||
[]string{"dhcp4/eth0", "vip/eth0"},
|
||||
func(op *mockOperator) error {
|
||||
switch op.spec.Operator { //nolint:exhaustive
|
||||
case network.OperatorDHCP4:
|
||||
suite.Assert().EqualValues(1024, op.spec.DHCP4.RouteMetric)
|
||||
@ -339,7 +340,8 @@ func (suite *OperatorSpecSuite) TestScheduling() {
|
||||
retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
|
||||
func() error {
|
||||
return suite.assertRunning(
|
||||
[]string{"dhcp4/eth0", "vip/eth0"}, func(op *mockOperator) error {
|
||||
[]string{"dhcp4/eth0", "vip/eth0"},
|
||||
func(op *mockOperator) error {
|
||||
switch op.spec.Operator { //nolint:exhaustive
|
||||
case network.OperatorDHCP4:
|
||||
suite.Assert().EqualValues(1024, op.spec.DHCP4.RouteMetric)
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/siderolabs/gen/xslices"
|
||||
@ -176,11 +177,12 @@ func (ctrl *OperatorVIPConfigController) apply(ctx context.Context, r controller
|
||||
for _, spec := range specs {
|
||||
id := network.LayeredID(spec.ConfigLayer, network.OperatorID(spec.Operator, spec.LinkName))
|
||||
|
||||
if err := r.Modify(
|
||||
if err := safe.WriterModify(
|
||||
ctx,
|
||||
r,
|
||||
network.NewOperatorSpec(network.ConfigNamespaceName, id),
|
||||
func(r resource.Resource) error {
|
||||
*r.(*network.OperatorSpec).TypedSpec() = spec
|
||||
func(r *network.OperatorSpec) error {
|
||||
*r.TypedSpec() = spec
|
||||
|
||||
return nil
|
||||
},
|
||||
|
||||
@ -140,11 +140,12 @@ func (ctrl *ResolverConfigController) apply(ctx context.Context, r controller.Ru
|
||||
for _, spec := range specs {
|
||||
id := network.LayeredID(spec.ConfigLayer, network.ResolverID)
|
||||
|
||||
if err := r.Modify(
|
||||
if err := safe.WriterModify(
|
||||
ctx,
|
||||
r,
|
||||
network.NewResolverSpec(network.ConfigNamespaceName, id),
|
||||
func(r resource.Resource) error {
|
||||
*r.(*network.ResolverSpec).TypedSpec() = spec
|
||||
func(r *network.ResolverSpec) error {
|
||||
*r.TypedSpec() = spec
|
||||
|
||||
return nil
|
||||
},
|
||||
|
||||
@ -58,7 +58,7 @@ func (ctrl *ResolverMergeController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *ResolverMergeController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *ResolverMergeController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -57,13 +58,13 @@ func (ctrl *ResolverSpecController) Run(ctx context.Context, r controller.Runtim
|
||||
}
|
||||
|
||||
// list source network configuration resources
|
||||
list, err := r.List(ctx, resource.NewMetadata(network.NamespaceName, network.ResolverSpecType, "", resource.VersionUndefined))
|
||||
list, err := safe.ReaderListAll[*network.ResolverSpec](ctx, r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing source network addresses: %w", err)
|
||||
}
|
||||
|
||||
// add finalizers for all live resources
|
||||
for _, res := range list.Items {
|
||||
for res := range list.All() {
|
||||
if res.Metadata().Phase() != resource.PhaseRunning {
|
||||
continue
|
||||
}
|
||||
@ -74,9 +75,7 @@ func (ctrl *ResolverSpecController) Run(ctx context.Context, r controller.Runtim
|
||||
}
|
||||
|
||||
// loop over specs and sync to statuses
|
||||
for _, res := range list.Items {
|
||||
spec := res.(*network.ResolverSpec) //nolint:forcetypeassert,errcheck
|
||||
|
||||
for spec := range list.All() {
|
||||
switch spec.Metadata().Phase() {
|
||||
case resource.PhaseTearingDown:
|
||||
if err = r.Destroy(ctx, resource.NewMetadata(network.NamespaceName, network.ResolverStatusType, spec.Metadata().ID(), resource.VersionUndefined)); err != nil && !state.IsNotFoundError(err) {
|
||||
@ -89,10 +88,8 @@ func (ctrl *ResolverSpecController) Run(ctx context.Context, r controller.Runtim
|
||||
case resource.PhaseRunning:
|
||||
logger.Info("setting resolvers", zap.Stringers("resolvers", spec.TypedSpec().DNSServers))
|
||||
|
||||
if err = r.Modify(ctx, network.NewResolverStatus(network.NamespaceName, spec.Metadata().ID()), func(r resource.Resource) error {
|
||||
status := r.(*network.ResolverStatus) //nolint:forcetypeassert,errcheck
|
||||
|
||||
status.TypedSpec().DNSServers = spec.TypedSpec().DNSServers
|
||||
if err = safe.WriterModify(ctx, r, network.NewResolverStatus(network.NamespaceName, spec.Metadata().ID()), func(r *network.ResolverStatus) error {
|
||||
r.TypedSpec().DNSServers = spec.TypedSpec().DNSServers
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/value"
|
||||
"github.com/siderolabs/go-procfs/procfs"
|
||||
@ -156,11 +157,12 @@ func (ctrl *RouteConfigController) apply(ctx context.Context, r controller.Runti
|
||||
for _, route := range routes {
|
||||
id := network.LayeredID(route.ConfigLayer, network.RouteID(route.Table, route.Family, route.Destination, route.Gateway, route.Priority, route.OutLinkName))
|
||||
|
||||
if err := r.Modify(
|
||||
if err := safe.WriterModify(
|
||||
ctx,
|
||||
r,
|
||||
network.NewRouteSpec(network.ConfigNamespaceName, id),
|
||||
func(r resource.Resource) error {
|
||||
*r.(*network.RouteSpec).TypedSpec() = route
|
||||
func(r *network.RouteSpec) error {
|
||||
*r.TypedSpec() = route
|
||||
|
||||
return nil
|
||||
},
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -54,7 +55,7 @@ func (ctrl *RouteMergeController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *RouteMergeController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *RouteMergeController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -87,9 +88,7 @@ func (ctrl *RouteMergeController) Run(ctx context.Context, r controller.Runtime,
|
||||
conflictsDetected := 0
|
||||
|
||||
for id, route := range routes {
|
||||
if err = r.Modify(ctx, network.NewRouteSpec(network.NamespaceName, id), func(res resource.Resource) error {
|
||||
rt := res.(*network.RouteSpec) //nolint:errcheck,forcetypeassert
|
||||
|
||||
if err = safe.WriterModify(ctx, r, network.NewRouteSpec(network.NamespaceName, id), func(rt *network.RouteSpec) error {
|
||||
*rt.TypedSpec() = *route.TypedSpec()
|
||||
|
||||
return nil
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/jsimonetti/rtnetlink/v2"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sys/unix"
|
||||
@ -46,7 +47,7 @@ func (ctrl *RouteStatusController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *RouteStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *RouteStatusController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_IPV4_MROUTE|unix.RTMGRP_IPV4_ROUTE|unix.RTMGRP_IPV6_MROUTE|unix.RTMGRP_IPV6_ROUTE)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -106,8 +107,8 @@ func (ctrl *RouteStatusController) Run(ctx context.Context, r controller.Runtime
|
||||
|
||||
id := network.RouteID(nethelpers.RoutingTable(route.Table), nethelpers.Family(route.Family), dstPrefix, gatewayAddr, route.Attributes.Priority, outLinkName)
|
||||
|
||||
if err = r.Modify(ctx, network.NewRouteStatus(network.NamespaceName, id), func(r resource.Resource) error {
|
||||
status := r.(*network.RouteStatus).TypedSpec()
|
||||
if err = safe.WriterModify(ctx, r, network.NewRouteStatus(network.NamespaceName, id), func(r *network.RouteStatus) error {
|
||||
status := r.TypedSpec()
|
||||
|
||||
status.Family = nethelpers.Family(route.Family)
|
||||
status.Destination = dstPrefix
|
||||
|
||||
@ -76,7 +76,7 @@ func (ctrl *StatusController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (ctrl *StatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *StatusController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -140,11 +140,12 @@ func (ctrl *TimeServerConfigController) apply(ctx context.Context, r controller.
|
||||
for _, spec := range specs {
|
||||
id := network.LayeredID(spec.ConfigLayer, network.TimeServerID)
|
||||
|
||||
if err := r.Modify(
|
||||
if err := safe.WriterModify(
|
||||
ctx,
|
||||
r,
|
||||
network.NewTimeServerSpec(network.ConfigNamespaceName, id),
|
||||
func(r resource.Resource) error {
|
||||
*r.(*network.TimeServerSpec).TypedSpec() = spec
|
||||
func(r *network.TimeServerSpec) error {
|
||||
*r.TypedSpec() = spec
|
||||
|
||||
return nil
|
||||
},
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -54,7 +55,7 @@ func (ctrl *TimeServerMergeController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *TimeServerMergeController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *TimeServerMergeController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -89,9 +90,7 @@ func (ctrl *TimeServerMergeController) Run(ctx context.Context, r controller.Run
|
||||
}
|
||||
|
||||
if final.NTPServers != nil {
|
||||
if err = r.Modify(ctx, network.NewTimeServerSpec(network.NamespaceName, network.TimeServerID), func(res resource.Resource) error {
|
||||
spec := res.(*network.TimeServerSpec) //nolint:errcheck,forcetypeassert
|
||||
|
||||
if err = safe.WriterModify(ctx, r, network.NewTimeServerSpec(network.NamespaceName, network.TimeServerID), func(spec *network.TimeServerSpec) error {
|
||||
*spec.TypedSpec() = final
|
||||
|
||||
return nil
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -97,9 +98,7 @@ func (ctrl *TimeServerSpecController) Run(ctx context.Context, r controller.Runt
|
||||
|
||||
logger.Info("setting time servers", zap.Strings("addresses", ntps))
|
||||
|
||||
if err = r.Modify(ctx, network.NewTimeServerStatus(network.NamespaceName, spec.Metadata().ID()), func(r resource.Resource) error {
|
||||
status := r.(*network.TimeServerStatus) //nolint:forcetypeassert,errcheck
|
||||
|
||||
if err = safe.WriterModify(ctx, r, network.NewTimeServerStatus(network.NamespaceName, spec.Metadata().ID()), func(status *network.TimeServerStatus) error {
|
||||
status.TypedSpec().NTPServers = spec.TypedSpec().NTPServers
|
||||
|
||||
return nil
|
||||
|
||||
@ -9,7 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/prometheus/procfs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -47,7 +47,7 @@ func (ctrl *StatsController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.StatsController interface.
|
||||
func (ctrl *StatsController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *StatsController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
ticker := time.NewTicker(updateInterval)
|
||||
|
||||
defer ticker.Stop()
|
||||
@ -90,8 +90,8 @@ func (ctrl *StatsController) updateCPU(ctx context.Context, r controller.Runtime
|
||||
return err
|
||||
}
|
||||
|
||||
return r.Modify(ctx, cpu, func(r resource.Resource) error {
|
||||
perfadapter.CPU(r.(*perf.CPU)).Update(&stat)
|
||||
return safe.WriterModify(ctx, r, cpu, func(r *perf.CPU) error {
|
||||
perfadapter.CPU(r).Update(&stat)
|
||||
|
||||
return nil
|
||||
})
|
||||
@ -105,8 +105,8 @@ func (ctrl *StatsController) updateMemory(ctx context.Context, r controller.Runt
|
||||
return err
|
||||
}
|
||||
|
||||
return r.Modify(ctx, mem, func(r resource.Resource) error {
|
||||
perfadapter.Memory(r.(*perf.Memory)).Update(&info)
|
||||
return safe.WriterModify(ctx, r, mem, func(r *perf.Memory) error {
|
||||
perfadapter.Memory(r).Update(&info)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -42,7 +42,7 @@ func (ctrl *DevicesStatusController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *DevicesStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *DevicesStatusController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
// in container mode, devices are always ready
|
||||
if ctrl.V1Alpha1Mode != machineruntime.ModeContainer {
|
||||
if err := v1alpha1.WaitForServiceHealthy(ctx, r, "udevd", nil); err != nil {
|
||||
|
||||
@ -57,7 +57,7 @@ func (ctrl *EventsSinkConfigController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *EventsSinkConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) (err error) {
|
||||
func (ctrl *EventsSinkConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) (err error) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -53,7 +53,7 @@ func (ctrl *ExtensionServiceConfigController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *ExtensionServiceConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *ExtensionServiceConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -55,7 +55,7 @@ func (ctrl *ExtensionServiceConfigFilesController) Outputs() []controller.Output
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *ExtensionServiceConfigFilesController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *ExtensionServiceConfigFilesController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
if ctrl.V1Alpha1Mode == v1alpha1runtime.ModeContainer {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -12,7 +12,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/siderolabs/talos/pkg/machinery/constants"
|
||||
@ -44,7 +44,7 @@ func (ctrl *ExtensionStatusController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *ExtensionStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *ExtensionStatusController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
// controller runs once, as extensions are static
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -66,8 +66,8 @@ func (ctrl *ExtensionStatusController) Run(ctx context.Context, r controller.Run
|
||||
for _, layer := range cfg.Layers {
|
||||
id := strings.TrimSuffix(layer.Image, ".sqsh")
|
||||
|
||||
if err := r.Modify(ctx, runtime.NewExtensionStatus(runtime.NamespaceName, id), func(res resource.Resource) error {
|
||||
*res.(*runtime.ExtensionStatus).TypedSpec() = *layer
|
||||
if err := safe.WriterModify(ctx, r, runtime.NewExtensionStatus(runtime.NamespaceName, id), func(res *runtime.ExtensionStatus) error {
|
||||
*res.TypedSpec() = *layer
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
||||
@ -50,7 +50,7 @@ func (ctrl *KernelModuleConfigController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *KernelModuleConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *KernelModuleConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -46,7 +46,7 @@ func (ctrl *KernelModuleSpecController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *KernelModuleSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *KernelModuleSpecController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
if ctrl.V1Alpha1Mode == v1alpha1runtime.ModeContainer {
|
||||
// not supported in container mode
|
||||
return nil
|
||||
|
||||
@ -9,7 +9,6 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
@ -52,7 +51,7 @@ func (ctrl *KernelParamConfigController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *KernelParamConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *KernelParamConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -72,8 +71,8 @@ func (ctrl *KernelParamConfigController) Run(ctx context.Context, r controller.R
|
||||
setKernelParam := func(kind, key, value string) error {
|
||||
item := runtime.NewKernelParamSpec(runtime.NamespaceName, kind+"."+key)
|
||||
|
||||
return r.Modify(ctx, item, func(res resource.Resource) error {
|
||||
res.(*runtime.KernelParamSpec).TypedSpec().Value = value
|
||||
return safe.WriterModify(ctx, r, item, func(res *runtime.KernelParamSpec) error {
|
||||
res.TypedSpec().Value = value
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -10,7 +10,7 @@ import (
|
||||
"os"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"go.uber.org/zap"
|
||||
|
||||
v1alpha1runtime "github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
|
||||
@ -45,7 +45,7 @@ func (ctrl *KernelParamDefaultsController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *KernelParamDefaultsController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *KernelParamDefaultsController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
@ -59,8 +59,8 @@ func (ctrl *KernelParamDefaultsController) Run(ctx context.Context, r controller
|
||||
value := prop.Value
|
||||
item := runtime.NewKernelParamDefaultSpec(runtime.NamespaceName, prop.Key)
|
||||
|
||||
if err := r.Modify(ctx, item, func(res resource.Resource) error {
|
||||
res.(*runtime.KernelParamDefaultSpec).TypedSpec().Value = value
|
||||
if err := safe.WriterModify(ctx, r, item, func(res *runtime.KernelParamDefaultSpec) error {
|
||||
res.TypedSpec().Value = value
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
||||
@ -8,10 +8,12 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -93,9 +95,7 @@ func (ctrl *KernelParamSpecController) Run(ctx context.Context, r controller.Run
|
||||
|
||||
configsCounts := len(configs.Items)
|
||||
|
||||
list := configs.Items
|
||||
|
||||
list = append(list, defaults.Items...)
|
||||
list := slices.Concat(configs.Items, defaults.Items)
|
||||
|
||||
touchedIDs := map[string]string{}
|
||||
|
||||
@ -117,8 +117,8 @@ func (ctrl *KernelParamSpecController) Run(ctx context.Context, r controller.Run
|
||||
if errors.Is(err, os.ErrNotExist) && spec.IgnoreErrors {
|
||||
status := runtime.NewKernelParamStatus(runtime.NamespaceName, id)
|
||||
|
||||
if e := r.Modify(ctx, status, func(res resource.Resource) error {
|
||||
res.(*runtime.KernelParamStatus).TypedSpec().Unsupported = true
|
||||
if e := safe.WriterModify(ctx, r, status, func(res *runtime.KernelParamStatus) error {
|
||||
res.TypedSpec().Unsupported = true
|
||||
|
||||
return nil
|
||||
}); e != nil {
|
||||
@ -154,10 +154,7 @@ func (ctrl *KernelParamSpecController) Run(ctx context.Context, r controller.Run
|
||||
}
|
||||
|
||||
func (ctrl *KernelParamSpecController) updateKernelParam(ctx context.Context, r controller.Runtime, key, value string) error {
|
||||
prop := &kernel.Param{
|
||||
Key: key,
|
||||
Value: value,
|
||||
}
|
||||
prop := &kernel.Param{Key: key, Value: value}
|
||||
|
||||
if _, ok := ctrl.defaults[key]; !ok {
|
||||
if data, err := krnl.ReadParam(prop); err == nil {
|
||||
@ -175,9 +172,9 @@ func (ctrl *KernelParamSpecController) updateKernelParam(ctx context.Context, r
|
||||
|
||||
status := runtime.NewKernelParamStatus(runtime.NamespaceName, key)
|
||||
|
||||
return r.Modify(ctx, status, func(res resource.Resource) error {
|
||||
res.(*runtime.KernelParamStatus).TypedSpec().Current = value
|
||||
res.(*runtime.KernelParamStatus).TypedSpec().Default = strings.TrimSpace(ctrl.defaults[key])
|
||||
return safe.WriterModify(ctx, r, status, func(res *runtime.KernelParamStatus) error {
|
||||
res.TypedSpec().Current = value
|
||||
res.TypedSpec().Default = strings.TrimSpace(ctrl.defaults[key])
|
||||
|
||||
return nil
|
||||
})
|
||||
@ -187,10 +184,7 @@ func (ctrl *KernelParamSpecController) resetKernelParam(ctx context.Context, r c
|
||||
var err error
|
||||
|
||||
if def, ok := ctrl.defaults[key]; ok {
|
||||
err = krnl.WriteParam(&kernel.Param{
|
||||
Key: key,
|
||||
Value: def,
|
||||
})
|
||||
err = krnl.WriteParam(&kernel.Param{Key: key, Value: def})
|
||||
} else {
|
||||
err = krnl.DeleteParam(&kernel.Param{Key: key})
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ func (ctrl *KmsgLogConfigController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *KmsgLogConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) (err error) {
|
||||
func (ctrl *KmsgLogConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) (err error) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -48,7 +48,7 @@ func (ctrl *MachineStatusPublisherController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *MachineStatusPublisherController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *MachineStatusPublisherController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -61,7 +61,7 @@ func (ctrl *MaintenanceConfigController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *MaintenanceConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *MaintenanceConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -62,7 +62,7 @@ func (ctrl *SecurityStateController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *SecurityStateController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *SecurityStateController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -49,7 +49,7 @@ func (ctrl *WatchdogTimerConfigController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *WatchdogTimerConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) (err error) {
|
||||
func (ctrl *WatchdogTimerConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) (err error) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/crypto/x509"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
@ -82,7 +83,7 @@ func (ctrl *APIController) Run(ctx context.Context, r controller.Runtime, logger
|
||||
return err
|
||||
}
|
||||
|
||||
machineTypeRes, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, config.MachineTypeType, config.MachineTypeID, resource.VersionUndefined))
|
||||
machineTypeRes, err := safe.ReaderGet[*config.MachineType](ctx, r, resource.NewMetadata(config.NamespaceName, config.MachineTypeType, config.MachineTypeID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
continue
|
||||
@ -91,9 +92,9 @@ func (ctrl *APIController) Run(ctx context.Context, r controller.Runtime, logger
|
||||
return fmt.Errorf("error getting machine type: %w", err)
|
||||
}
|
||||
|
||||
machineType := machineTypeRes.(*config.MachineType).MachineType()
|
||||
machineType := machineTypeRes.MachineType()
|
||||
|
||||
networkResource, err := r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.StatusType, network.StatusID, resource.VersionUndefined))
|
||||
networkResource, err := safe.ReaderGet[*network.Status](ctx, r, resource.NewMetadata(network.NamespaceName, network.StatusType, network.StatusID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
continue
|
||||
@ -102,7 +103,7 @@ func (ctrl *APIController) Run(ctx context.Context, r controller.Runtime, logger
|
||||
return err
|
||||
}
|
||||
|
||||
networkStatus := networkResource.(*network.Status).TypedSpec()
|
||||
networkStatus := networkResource.TypedSpec()
|
||||
|
||||
if !(networkStatus.AddressReady && networkStatus.HostnameReady) {
|
||||
continue
|
||||
@ -189,7 +190,7 @@ func (ctrl *APIController) reconcile(ctx context.Context, r controller.Runtime,
|
||||
case <-refreshTicker.C:
|
||||
}
|
||||
|
||||
machineTypeRes, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, config.MachineTypeType, config.MachineTypeID, resource.VersionUndefined))
|
||||
machineTypeRes, err := safe.ReaderGet[*config.MachineType](ctx, r, resource.NewMetadata(config.NamespaceName, config.MachineTypeType, config.MachineTypeID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
continue
|
||||
@ -198,7 +199,7 @@ func (ctrl *APIController) reconcile(ctx context.Context, r controller.Runtime,
|
||||
return fmt.Errorf("error getting machine type: %w", err)
|
||||
}
|
||||
|
||||
machineType := machineTypeRes.(*config.MachineType).MachineType()
|
||||
machineType := machineTypeRes.MachineType()
|
||||
|
||||
switch machineType {
|
||||
case machine.TypeInit, machine.TypeControlPlane:
|
||||
@ -215,7 +216,7 @@ func (ctrl *APIController) reconcile(ctx context.Context, r controller.Runtime,
|
||||
panic(fmt.Sprintf("unexpected machine type %v", machineType))
|
||||
}
|
||||
|
||||
rootResource, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.OSRootType, secrets.OSRootID, resource.VersionUndefined))
|
||||
rootResource, err := safe.ReaderGet[*secrets.OSRoot](ctx, r, resource.NewMetadata(secrets.NamespaceName, secrets.OSRootType, secrets.OSRootID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
if err = ctrl.teardownAll(ctx, r); err != nil {
|
||||
@ -228,9 +229,9 @@ func (ctrl *APIController) reconcile(ctx context.Context, r controller.Runtime,
|
||||
return fmt.Errorf("error getting etcd root secrets: %w", err)
|
||||
}
|
||||
|
||||
rootSpec := rootResource.(*secrets.OSRoot).TypedSpec()
|
||||
rootSpec := rootResource.TypedSpec()
|
||||
|
||||
certSANResource, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.CertSANType, secrets.CertSANAPIID, resource.VersionUndefined))
|
||||
certSANResource, err := safe.ReaderGet[*secrets.CertSAN](ctx, r, resource.NewMetadata(secrets.NamespaceName, secrets.CertSANType, secrets.CertSANAPIID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
continue
|
||||
@ -239,7 +240,7 @@ func (ctrl *APIController) reconcile(ctx context.Context, r controller.Runtime,
|
||||
return fmt.Errorf("error getting certSANs: %w", err)
|
||||
}
|
||||
|
||||
certSANs := certSANResource.(*secrets.CertSAN).TypedSpec()
|
||||
certSANs := certSANResource.TypedSpec()
|
||||
|
||||
var endpointsStr []string
|
||||
|
||||
@ -310,9 +311,9 @@ func (ctrl *APIController) generateControlPlane(ctx context.Context, r controlle
|
||||
return fmt.Errorf("failed to generate API client cert: %w", err)
|
||||
}
|
||||
|
||||
if err := r.Modify(ctx, secrets.NewAPI(),
|
||||
func(r resource.Resource) error {
|
||||
apiSecrets := r.(*secrets.API).TypedSpec()
|
||||
if err := safe.WriterModify(ctx, r, secrets.NewAPI(),
|
||||
func(r *secrets.API) error {
|
||||
apiSecrets := r.TypedSpec()
|
||||
|
||||
apiSecrets.AcceptedCAs = rootSpec.AcceptedCAs
|
||||
apiSecrets.Server = x509.NewCertificateAndKeyFromKeyPair(serverCert)
|
||||
@ -387,9 +388,9 @@ func (ctrl *APIController) generateWorker(ctx context.Context, r controller.Runt
|
||||
return fmt.Errorf("failed to sign API server CSR: %w", err)
|
||||
}
|
||||
|
||||
if err := r.Modify(ctx, secrets.NewAPI(),
|
||||
func(r resource.Resource) error {
|
||||
apiSecrets := r.(*secrets.API).TypedSpec()
|
||||
if err := safe.WriterModify(ctx, r, secrets.NewAPI(),
|
||||
func(r *secrets.API) error {
|
||||
apiSecrets := r.TypedSpec()
|
||||
|
||||
apiSecrets.AcceptedCAs = []*x509.PEMEncodedCertificate{
|
||||
{
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
"go.uber.org/zap"
|
||||
@ -66,7 +67,7 @@ func (ctrl *APICertSANsController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *APICertSANsController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *APICertSANsController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -74,7 +75,7 @@ func (ctrl *APICertSANsController) Run(ctx context.Context, r controller.Runtime
|
||||
case <-r.EventCh():
|
||||
}
|
||||
|
||||
apiRootRes, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.OSRootType, secrets.OSRootID, resource.VersionUndefined))
|
||||
apiRootRes, err := safe.ReaderGet[*secrets.OSRoot](ctx, r, resource.NewMetadata(secrets.NamespaceName, secrets.OSRootType, secrets.OSRootID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
if err = ctrl.teardownAll(ctx, r); err != nil {
|
||||
@ -87,9 +88,9 @@ func (ctrl *APICertSANsController) Run(ctx context.Context, r controller.Runtime
|
||||
return fmt.Errorf("error getting root k8s secrets: %w", err)
|
||||
}
|
||||
|
||||
apiRoot := apiRootRes.(*secrets.OSRoot).TypedSpec()
|
||||
apiRoot := apiRootRes.TypedSpec()
|
||||
|
||||
hostnameResource, err := r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.HostnameStatusType, network.HostnameID, resource.VersionUndefined))
|
||||
hostnameResource, err := safe.ReaderGet[*network.HostnameStatus](ctx, r, resource.NewMetadata(network.NamespaceName, network.HostnameStatusType, network.HostnameID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
continue
|
||||
@ -98,9 +99,9 @@ func (ctrl *APICertSANsController) Run(ctx context.Context, r controller.Runtime
|
||||
return err
|
||||
}
|
||||
|
||||
hostnameStatus := hostnameResource.(*network.HostnameStatus).TypedSpec()
|
||||
hostnameStatus := hostnameResource.TypedSpec()
|
||||
|
||||
addressesResource, err := r.Get(ctx,
|
||||
addressesResource, err := safe.ReaderGet[*network.NodeAddress](ctx, r,
|
||||
resource.NewMetadata(network.NamespaceName, network.NodeAddressType, network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, k8s.NodeAddressFilterNoK8s), resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
@ -110,10 +111,10 @@ func (ctrl *APICertSANsController) Run(ctx context.Context, r controller.Runtime
|
||||
return err
|
||||
}
|
||||
|
||||
nodeAddresses := addressesResource.(*network.NodeAddress).TypedSpec()
|
||||
nodeAddresses := addressesResource.TypedSpec()
|
||||
|
||||
if err = r.Modify(ctx, secrets.NewCertSAN(secrets.NamespaceName, secrets.CertSANAPIID), func(r resource.Resource) error {
|
||||
spec := r.(*secrets.CertSAN).TypedSpec()
|
||||
if err = safe.WriterModify(ctx, r, secrets.NewCertSAN(secrets.NamespaceName, secrets.CertSANAPIID), func(r *secrets.CertSAN) error {
|
||||
spec := r.TypedSpec()
|
||||
|
||||
spec.Reset()
|
||||
|
||||
|
||||
@ -80,7 +80,7 @@ func (ctrl *EtcdController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *EtcdController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *EtcdController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -120,7 +120,7 @@ func (ctrl *EtcdController) Run(ctx context.Context, r controller.Runtime, logge
|
||||
}
|
||||
|
||||
// wait for time sync as certs depend on current time
|
||||
timeSyncResource, err := r.Get(ctx, resource.NewMetadata(v1alpha1.NamespaceName, time.StatusType, time.StatusID, resource.VersionUndefined))
|
||||
timeSyncResource, err := safe.ReaderGet[*time.Status](ctx, r, resource.NewMetadata(v1alpha1.NamespaceName, time.StatusType, time.StatusID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
continue
|
||||
@ -129,7 +129,7 @@ func (ctrl *EtcdController) Run(ctx context.Context, r controller.Runtime, logge
|
||||
return err
|
||||
}
|
||||
|
||||
if !timeSyncResource.(*time.Status).TypedSpec().Synced {
|
||||
if !timeSyncResource.TypedSpec().Synced {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@ -71,7 +71,7 @@ func (ctrl *KubernetesController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *KubernetesController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *KubernetesController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
refreshTicker := time.NewTicker(KubernetesCertificateValidityDuration / 2)
|
||||
defer refreshTicker.Stop()
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
"go.uber.org/zap"
|
||||
@ -67,7 +68,7 @@ func (ctrl *KubernetesCertSANsController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (ctrl *KubernetesCertSANsController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *KubernetesCertSANsController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -75,7 +76,7 @@ func (ctrl *KubernetesCertSANsController) Run(ctx context.Context, r controller.
|
||||
case <-r.EventCh():
|
||||
}
|
||||
|
||||
k8sRootRes, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.KubernetesRootType, secrets.KubernetesRootID, resource.VersionUndefined))
|
||||
k8sRootRes, err := safe.ReaderGet[*secrets.KubernetesRoot](ctx, r, resource.NewMetadata(secrets.NamespaceName, secrets.KubernetesRootType, secrets.KubernetesRootID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
if err = ctrl.teardownAll(ctx, r); err != nil {
|
||||
@ -88,9 +89,9 @@ func (ctrl *KubernetesCertSANsController) Run(ctx context.Context, r controller.
|
||||
return fmt.Errorf("error getting root k8s secrets: %w", err)
|
||||
}
|
||||
|
||||
k8sRoot := k8sRootRes.(*secrets.KubernetesRoot).TypedSpec()
|
||||
k8sRoot := k8sRootRes.TypedSpec()
|
||||
|
||||
hostnameResource, err := r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.HostnameStatusType, network.HostnameID, resource.VersionUndefined))
|
||||
hostnameResource, err := safe.ReaderGet[*network.HostnameStatus](ctx, r, resource.NewMetadata(network.NamespaceName, network.HostnameStatusType, network.HostnameID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
continue
|
||||
@ -99,9 +100,10 @@ func (ctrl *KubernetesCertSANsController) Run(ctx context.Context, r controller.
|
||||
return err
|
||||
}
|
||||
|
||||
hostnameStatus := hostnameResource.(*network.HostnameStatus).TypedSpec()
|
||||
hostnameStatus := hostnameResource.TypedSpec()
|
||||
|
||||
addressesResource, err := r.Get(ctx,
|
||||
addressesResource, err := safe.ReaderGet[*network.NodeAddress](ctx,
|
||||
r,
|
||||
resource.NewMetadata(network.NamespaceName, network.NodeAddressType, network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, k8s.NodeAddressFilterNoK8s), resource.VersionUndefined))
|
||||
if err != nil {
|
||||
if state.IsNotFoundError(err) {
|
||||
@ -111,10 +113,10 @@ func (ctrl *KubernetesCertSANsController) Run(ctx context.Context, r controller.
|
||||
return err
|
||||
}
|
||||
|
||||
nodeAddresses := addressesResource.(*network.NodeAddress).TypedSpec()
|
||||
nodeAddresses := addressesResource.TypedSpec()
|
||||
|
||||
if err = r.Modify(ctx, secrets.NewCertSAN(secrets.NamespaceName, secrets.CertSANKubernetesID), func(r resource.Resource) error {
|
||||
spec := r.(*secrets.CertSAN).TypedSpec()
|
||||
if err = safe.WriterModify(ctx, r, secrets.NewCertSAN(secrets.NamespaceName, secrets.CertSANKubernetesID), func(r *secrets.CertSAN) error {
|
||||
spec := r.TypedSpec()
|
||||
|
||||
spec.Reset()
|
||||
|
||||
|
||||
@ -51,7 +51,7 @@ func (ctrl *KubernetesDynamicCertsController) Outputs() []controller.Output {
|
||||
// Run implements controller.Controller interface.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (ctrl *KubernetesDynamicCertsController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *KubernetesDynamicCertsController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
// wait for the network to be ready first, then switch to regular inputs
|
||||
if err := r.UpdateInputs([]controller.Input{
|
||||
{
|
||||
|
||||
@ -10,7 +10,6 @@ import (
|
||||
"net/netip"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"github.com/siderolabs/gen/optional"
|
||||
@ -58,7 +57,7 @@ func (ctrl *MaintenanceCertSANsController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *MaintenanceCertSANsController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *MaintenanceCertSANsController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -80,8 +79,8 @@ func (ctrl *MaintenanceCertSANsController) Run(ctx context.Context, r controller
|
||||
return err
|
||||
}
|
||||
|
||||
if err = r.Modify(ctx, secrets.NewCertSAN(secrets.NamespaceName, secrets.CertSANMaintenanceID), func(r resource.Resource) error {
|
||||
spec := r.(*secrets.CertSAN).TypedSpec()
|
||||
if err = safe.WriterModify(ctx, r, secrets.NewCertSAN(secrets.NamespaceName, secrets.CertSANMaintenanceID), func(r *secrets.CertSAN) error {
|
||||
spec := r.TypedSpec()
|
||||
|
||||
spec.Reset()
|
||||
|
||||
|
||||
@ -40,7 +40,7 @@ func (ctrl *MaintenanceRootController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *MaintenanceRootController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *MaintenanceRootController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
// run this controller only once, as the CA never changes
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -56,7 +56,7 @@ func (ctrl *TrustedRootsController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *TrustedRootsController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *TrustedRootsController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@ -45,7 +45,7 @@ func (ctrl *AdjtimeStatusController) Outputs() []controller.Output {
|
||||
}
|
||||
|
||||
// Run implements controller.Controller interface.
|
||||
func (ctrl *AdjtimeStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
|
||||
func (ctrl *AdjtimeStatusController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
|
||||
if ctrl.V1Alpha1Mode == v1alpha1runtime.ModeContainer {
|
||||
// in container mode, clock is managed by the host
|
||||
return nil
|
||||
|
||||
@ -132,7 +132,11 @@ func (ctrl *SyncController) Run(ctx context.Context, r controller.Runtime, logge
|
||||
timeSyncTimeoutTimer = nil
|
||||
}
|
||||
|
||||
timeServersStatus, err := r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.TimeServerStatusType, network.TimeServerID, resource.VersionUndefined))
|
||||
timeServersStatus, err := safe.ReaderGet[*network.TimeServerStatus](
|
||||
ctx,
|
||||
r,
|
||||
resource.NewMetadata(network.NamespaceName, network.TimeServerStatusType, network.TimeServerID, resource.VersionUndefined),
|
||||
)
|
||||
if err != nil {
|
||||
if !state.IsNotFoundError(err) {
|
||||
return fmt.Errorf("error getting time server status: %w", err)
|
||||
@ -142,7 +146,7 @@ func (ctrl *SyncController) Run(ctx context.Context, r controller.Runtime, logge
|
||||
continue
|
||||
}
|
||||
|
||||
timeServers := timeServersStatus.(*network.TimeServerStatus).TypedSpec().NTPServers
|
||||
timeServers := timeServersStatus.TypedSpec().NTPServers
|
||||
|
||||
cfg, err := safe.ReaderGetByID[*config.MachineConfig](ctx, r, config.V1Alpha1ID)
|
||||
if err != nil {
|
||||
@ -227,8 +231,8 @@ func (ctrl *SyncController) Run(ctx context.Context, r controller.Runtime, logge
|
||||
timeSynced = true
|
||||
}
|
||||
|
||||
if err = r.Modify(ctx, time.NewStatus(), func(r resource.Resource) error {
|
||||
*r.(*time.Status).TypedSpec() = time.StatusSpec{
|
||||
if err = safe.WriterModify(ctx, r, time.NewStatus(), func(r *time.Status) error {
|
||||
*r.TypedSpec() = time.StatusSpec{
|
||||
Epoch: epoch,
|
||||
Synced: timeSynced,
|
||||
SyncDisabled: syncDisabled,
|
||||
|
||||
@ -9,7 +9,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/controller"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -74,9 +74,7 @@ func (ctrl *ServiceController) Run(ctx context.Context, r controller.Runtime, lo
|
||||
|
||||
switch msg.Action { //nolint:exhaustive
|
||||
case machine.ServiceStateEvent_RUNNING:
|
||||
if err := r.Modify(ctx, service, func(r resource.Resource) error {
|
||||
svc := r.(*v1alpha1.Service) //nolint:errcheck,forcetypeassert
|
||||
|
||||
if err := safe.WriterModify(ctx, r, service, func(svc *v1alpha1.Service) error {
|
||||
*svc.TypedSpec() = v1alpha1.ServiceSpec{
|
||||
Running: true,
|
||||
Healthy: msg.GetHealth().GetHealthy(),
|
||||
|
||||
@ -195,12 +195,16 @@ func (r *Runtime) Logging() runtime.LoggingManager {
|
||||
|
||||
// NodeName implements the Runtime interface.
|
||||
func (r *Runtime) NodeName() (string, error) {
|
||||
nodenameResource, err := r.s.V1Alpha2().Resources().Get(context.Background(), resource.NewMetadata(k8s.NamespaceName, k8s.NodenameType, k8s.NodenameID, resource.VersionUndefined))
|
||||
nodenameResource, err := safe.ReaderGet[*k8s.Nodename](
|
||||
context.Background(),
|
||||
r.s.V1Alpha2().Resources(),
|
||||
resource.NewMetadata(k8s.NamespaceName, k8s.NodenameType, k8s.NodenameID, resource.VersionUndefined),
|
||||
)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error getting nodename resource: %w", err)
|
||||
}
|
||||
|
||||
return nodenameResource.(*k8s.Nodename).TypedSpec().Nodename, nil
|
||||
return nodenameResource.TypedSpec().Nodename, nil
|
||||
}
|
||||
|
||||
// IsBootstrapAllowed checks for CRI to be up, checked in the bootstrap method.
|
||||
|
||||
@ -76,7 +76,7 @@ type Etcd struct {
|
||||
}
|
||||
|
||||
// ID implements the Service interface.
|
||||
func (e *Etcd) ID(r runtime.Runtime) string {
|
||||
func (e *Etcd) ID(runtime.Runtime) string {
|
||||
return "etcd"
|
||||
}
|
||||
|
||||
@ -149,7 +149,7 @@ func (e *Etcd) PreFunc(ctx context.Context, r runtime.Runtime) error {
|
||||
}
|
||||
|
||||
// PostFunc implements the Service interface.
|
||||
func (e *Etcd) PostFunc(r runtime.Runtime, state events.ServiceState) (err error) {
|
||||
func (e *Etcd) PostFunc(runtime.Runtime, events.ServiceState) (err error) {
|
||||
if e.promoteCtxCancel != nil {
|
||||
e.promoteCtxCancel()
|
||||
}
|
||||
@ -173,7 +173,7 @@ func (e *Etcd) Condition(r runtime.Runtime) conditions.Condition {
|
||||
}
|
||||
|
||||
// DependsOn implements the Service interface.
|
||||
func (e *Etcd) DependsOn(r runtime.Runtime) []string {
|
||||
func (e *Etcd) DependsOn(runtime.Runtime) []string {
|
||||
return []string{"cri"}
|
||||
}
|
||||
|
||||
@ -330,12 +330,15 @@ func buildInitialCluster(ctx context.Context, r runtime.Runtime, name string, pe
|
||||
|
||||
// we "allow" a failure here since we want to fallthrough and attempt to add the etcd member regardless of
|
||||
// whether we can print our IPs
|
||||
currentAddresses, addrErr := r.State().V1Alpha2().Resources().Get(ctx,
|
||||
resource.NewMetadata(network.NamespaceName, network.NodeAddressType, network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterNoK8s), resource.VersionUndefined))
|
||||
currentAddresses, addrErr := safe.ReaderGet[*network.NodeAddress](
|
||||
ctx,
|
||||
r.State().V1Alpha2().Resources(),
|
||||
resource.NewMetadata(network.NamespaceName, network.NodeAddressType, network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterNoK8s), resource.VersionUndefined),
|
||||
)
|
||||
if addrErr != nil {
|
||||
log.Printf("error getting node addresses: %s", addrErr.Error())
|
||||
} else {
|
||||
ips := currentAddresses.(*network.NodeAddress).TypedSpec().IPs()
|
||||
ips := currentAddresses.TypedSpec().IPs()
|
||||
log.Printf("%s", ips)
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/containerd/containerd/v2/pkg/namespaces"
|
||||
"github.com/containerd/containerd/v2/pkg/oci"
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/safe"
|
||||
"github.com/cosi-project/runtime/pkg/state"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
|
||||
@ -45,18 +46,18 @@ type Kubelet struct {
|
||||
}
|
||||
|
||||
// ID implements the Service interface.
|
||||
func (k *Kubelet) ID(r runtime.Runtime) string {
|
||||
func (k *Kubelet) ID(runtime.Runtime) string {
|
||||
return "kubelet"
|
||||
}
|
||||
|
||||
// PreFunc implements the Service interface.
|
||||
func (k *Kubelet) PreFunc(ctx context.Context, r runtime.Runtime) error {
|
||||
specResource, err := r.State().V1Alpha2().Resources().Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletSpecType, k8s.KubeletID, resource.VersionUndefined))
|
||||
specResource, err := safe.ReaderGet[*k8s.KubeletSpec](ctx, r.State().V1Alpha2().Resources(), resource.NewMetadata(k8s.NamespaceName, k8s.KubeletSpecType, k8s.KubeletID, resource.VersionUndefined))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
spec := specResource.(*k8s.KubeletSpec).TypedSpec()
|
||||
spec := specResource.TypedSpec()
|
||||
|
||||
client, err := containerdapi.New(constants.CRIContainerdAddress)
|
||||
if err != nil {
|
||||
@ -85,7 +86,7 @@ func (k *Kubelet) PreFunc(ctx context.Context, r runtime.Runtime) error {
|
||||
}
|
||||
|
||||
// PostFunc implements the Service interface.
|
||||
func (k *Kubelet) PostFunc(r runtime.Runtime, state events.ServiceState) (err error) {
|
||||
func (k *Kubelet) PostFunc(runtime.Runtime, events.ServiceState) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -98,18 +99,22 @@ func (k *Kubelet) Condition(r runtime.Runtime) conditions.Condition {
|
||||
}
|
||||
|
||||
// DependsOn implements the Service interface.
|
||||
func (k *Kubelet) DependsOn(r runtime.Runtime) []string {
|
||||
func (k *Kubelet) DependsOn(runtime.Runtime) []string {
|
||||
return []string{"cri"}
|
||||
}
|
||||
|
||||
// Runner implements the Service interface.
|
||||
func (k *Kubelet) Runner(r runtime.Runtime) (runner.Runner, error) {
|
||||
specResource, err := r.State().V1Alpha2().Resources().Get(context.Background(), resource.NewMetadata(k8s.NamespaceName, k8s.KubeletSpecType, k8s.KubeletID, resource.VersionUndefined))
|
||||
specResource, err := safe.ReaderGet[*k8s.KubeletSpec](
|
||||
context.Background(),
|
||||
r.State().V1Alpha2().Resources(),
|
||||
resource.NewMetadata(k8s.NamespaceName, k8s.KubeletSpecType, k8s.KubeletID, resource.VersionUndefined),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
spec := specResource.(*k8s.KubeletSpec).TypedSpec()
|
||||
spec := specResource.TypedSpec()
|
||||
|
||||
// Set the process arguments.
|
||||
args := runner.Args{
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
x
Reference in New Issue
Block a user