feat: implement power states as machine stage events

Introduce a new component to watch infra.MachineStatus resources to produce complementary "synthetic" machine events for "powering on" and "powered off" stages.

Change the logic in `MachineStatusSnapshot` and `ClusterMachineStatus` controllers to take these changes into account.

This is required to display the correct status for the machines that are powered on/off by the infra providers.

Signed-off-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
This commit is contained in:
Utku Ozdemir 2025-01-16 17:09:03 +01:00
parent 2a2c648141
commit 1495ca007f
No known key found for this signature in database
GPG Key ID: DBD13117B0A14E93
12 changed files with 1210 additions and 989 deletions

File diff suppressed because it is too large Load Diff

View File

@ -442,7 +442,7 @@ message RedactedClusterMachineConfigSpec {
// CompressedData contains the config bytes. It is only set if the config is not compressed. Otherwise, CompressedData is set instead.
//
// Deprecated: use accessor methods GetUncompressedData/SetUncompressedDataa to manage this field.
// Deprecated: use accessor methods GetUncompressedData/SetUncompressedData to manage this field.
bytes compressed_data = 2;
}
@ -502,6 +502,7 @@ message ClusterMachineStatusSpec {
BEFORE_DESTROY = 9;
DESTROYING = 5;
POWERING_ON = 10;
POWERED_OFF = 11;
}
Stage stage = 2;
@ -829,7 +830,14 @@ message MachineLabelsSpec {}
// MachineStatusSnapshotSpec describes latest known status of MachineStatus Talos resource.
message MachineStatusSnapshotSpec {
enum PowerStage {
POWER_STAGE_NONE = 0;
POWER_STAGE_POWERED_OFF = 1;
POWER_STAGE_POWERING_ON = 2;
}
machine.MachineStatusEvent machine_status = 1;
PowerStage power_stage = 2;
}
enum ConditionType {

View File

@ -1328,6 +1328,7 @@ func (m *MachineStatusSnapshotSpec) CloneVT() *MachineStatusSnapshotSpec {
return (*MachineStatusSnapshotSpec)(nil)
}
r := new(MachineStatusSnapshotSpec)
r.PowerStage = m.PowerStage
if rhs := m.MachineStatus; rhs != nil {
if vtpb, ok := interface{}(rhs).(interface {
CloneVT() *machine.MachineStatusEvent
@ -4184,6 +4185,9 @@ func (this *MachineStatusSnapshotSpec) EqualVT(that *MachineStatusSnapshotSpec)
} else if !proto.Equal(this.MachineStatus, that.MachineStatus) {
return false
}
if this.PowerStage != that.PowerStage {
return false
}
return string(this.unknownFields) == string(that.unknownFields)
}
@ -9350,6 +9354,11 @@ func (m *MachineStatusSnapshotSpec) MarshalToSizedBufferVT(dAtA []byte) (int, er
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if m.PowerStage != 0 {
i = protohelpers.EncodeVarint(dAtA, i, uint64(m.PowerStage))
i--
dAtA[i] = 0x10
}
if m.MachineStatus != nil {
if vtmsg, ok := interface{}(m.MachineStatus).(interface {
MarshalToSizedBufferVT([]byte) (int, error)
@ -13447,6 +13456,9 @@ func (m *MachineStatusSnapshotSpec) SizeVT() (n int) {
}
n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
}
if m.PowerStage != 0 {
n += 1 + protohelpers.SizeOfVarint(uint64(m.PowerStage))
}
n += len(m.unknownFields)
return n
}
@ -24051,6 +24063,25 @@ func (m *MachineStatusSnapshotSpec) UnmarshalVT(dAtA []byte) error {
}
}
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field PowerStage", wireType)
}
m.PowerStage = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return protohelpers.ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.PowerStage |= MachineStatusSnapshotSpec_PowerStage(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := protohelpers.Skip(dAtA[iNdEx:])

View File

@ -129,6 +129,8 @@ func clusterMachineStageString(phase specs.ClusterMachineStatusSpec_Stage) strin
c = color.HiRedString
case specs.ClusterMachineStatusSpec_RUNNING:
c = color.GreenString
case specs.ClusterMachineStatusSpec_POWERED_OFF:
c = color.WhiteString
default:
c = fmt.Sprintf
}

View File

@ -78,6 +78,7 @@ export enum ClusterMachineStatusSpecStage {
BEFORE_DESTROY = 9,
DESTROYING = 5,
POWERING_ON = 10,
POWERED_OFF = 11,
}
export enum ClusterStatusSpecPhase {
@ -117,6 +118,12 @@ export enum TalosUpgradeStatusSpecPhase {
InstallingExtensions = 5,
}
export enum MachineStatusSnapshotSpecPowerStage {
POWER_STAGE_NONE = 0,
POWER_STAGE_POWERED_OFF = 1,
POWER_STAGE_POWERING_ON = 2,
}
export enum ControlPlaneStatusSpecConditionStatus {
Unknown = 0,
Ready = 1,
@ -570,6 +577,7 @@ export type MachineLabelsSpec = {
export type MachineStatusSnapshotSpec = {
machine_status?: MachineMachine.MachineStatusEvent
power_stage?: MachineStatusSnapshotSpecPowerStage
}
export type ControlPlaneStatusSpecCondition = {

View File

@ -0,0 +1,98 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// Package powerstage provides a power stage watcher that produces MachineStatusSnapshot events containing power stage information.
package powerstage
import (
"context"
"fmt"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"
"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/infra"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
)
// Watcher watches the infra.MachineStatus resources and sends MachineStatusSnapshot resources with the power stage information to the snapshot channel.
type Watcher struct {
state state.State
snapshotCh chan<- *omni.MachineStatusSnapshot
logger *zap.Logger
}
// NewWatcher creates a new Watcher.
func NewWatcher(state state.State, snapshotCh chan<- *omni.MachineStatusSnapshot, logger *zap.Logger) *Watcher {
return &Watcher{
state: state,
snapshotCh: snapshotCh,
logger: logger,
}
}
// Run runs the Watcher.
func (watcher *Watcher) Run(ctx context.Context) error {
eventCh := make(chan safe.WrappedStateEvent[*infra.MachineStatus])
if err := safe.StateWatchKind[*infra.MachineStatus](ctx, watcher.state, infra.NewMachineStatus("").Metadata(), eventCh); err != nil {
return err
}
for {
var event safe.WrappedStateEvent[*infra.MachineStatus]
select {
case <-ctx.Done():
watcher.logger.Info("power status watcher stopped")
return nil
case event = <-eventCh:
}
switch event.Type() { //nolint:exhaustive
case state.Created, state.Updated:
default: // ignore other events
continue
}
if err := event.Error(); err != nil {
return fmt.Errorf("failed to watch machine status: %w", err)
}
resource, err := event.Resource()
if err != nil {
return fmt.Errorf("failed to read resource from the event: %w", err)
}
if resource.TypedSpec().Value.PowerState == specs.InfraMachineStatusSpec_POWER_STATE_OFF {
snapshot := omni.NewMachineStatusSnapshot(resources.DefaultNamespace, resource.Metadata().ID())
// find out if it is assigned to a cluster, and if so, mark it as "powering on"
if _, err = watcher.state.Get(ctx, omni.NewClusterMachine(resources.DefaultNamespace, resource.Metadata().ID()).Metadata()); err != nil {
if !state.IsNotFoundError(err) {
return err
}
snapshot.TypedSpec().Value.PowerStage = specs.MachineStatusSnapshotSpec_POWER_STAGE_POWERED_OFF
} else {
snapshot.TypedSpec().Value.PowerStage = specs.MachineStatusSnapshotSpec_POWER_STAGE_POWERING_ON
}
select {
case <-ctx.Done():
watcher.logger.Info("power status watcher stopped before sending a snapshot")
case watcher.snapshotCh <- snapshot:
}
watcher.logger.Debug("sent power stage snapshot",
zap.String("machine_id", resource.Metadata().ID()),
zap.Stringer("power_stage", snapshot.TypedSpec().Value.PowerStage))
}
}
}

View File

@ -202,8 +202,14 @@ func NewClusterMachineStatusController() *ClusterMachineStatusController {
cmsVal.Stage = specs.ClusterMachineStatusSpec_CONFIGURING
}
if err = reconcilePoweringOn(ctx, r, clusterMachineStatus); err != nil {
return err
// if the machine snapshot has a power stage set (not NONE), overwrite the stage
switch statusSnapshot.TypedSpec().Value.PowerStage {
case specs.MachineStatusSnapshotSpec_POWER_STAGE_NONE:
// do not override
case specs.MachineStatusSnapshotSpec_POWER_STAGE_POWERED_OFF:
cmsVal.Stage = specs.ClusterMachineStatusSpec_POWERED_OFF
case specs.MachineStatusSnapshotSpec_POWER_STAGE_POWERING_ON:
cmsVal.Stage = specs.ClusterMachineStatusSpec_POWERING_ON
}
clusterMachineIdentity, err := safe.ReaderGet[*omni.ClusterMachineIdentity](ctx, r, omni.NewClusterMachineIdentity(
@ -269,30 +275,6 @@ func NewClusterMachineStatusController() *ClusterMachineStatusController {
)
}
// reconcilePoweringOn overwrites the machine stage if the machine is managed by the bare metal infra provider and is being powered on right now.
func reconcilePoweringOn(ctx context.Context, r controller.Reader, clusterMachineStatus *omni.ClusterMachineStatus) error {
_, err := safe.ReaderGetByID[*infra.Machine](ctx, r, clusterMachineStatus.Metadata().ID())
if err != nil {
if state.IsNotFoundError(err) {
return nil
}
return err
}
infraMachineStatus, err := safe.ReaderGetByID[*infra.MachineStatus](ctx, r, clusterMachineStatus.Metadata().ID())
if err != nil && !state.IsNotFoundError(err) {
return err
}
// if the expected state is powered on, but the actual power state is off overwrite the cluster machine state to be "POWERING_ON"
if infraMachineStatus == nil || infraMachineStatus.TypedSpec().Value.PowerState != specs.InfraMachineStatusSpec_POWER_STATE_ON {
clusterMachineStatus.TypedSpec().Value.Stage = specs.ClusterMachineStatusSpec_POWERING_ON
}
return nil
}
func updateMachineProvisionStatus(ctx context.Context, r controller.Reader, machineStatus *omni.MachineStatus, cmsVal *specs.ClusterMachineStatusSpec) error {
machineRequestID, ok := machineStatus.Metadata().Labels().Get(omni.LabelMachineRequest)

View File

@ -32,17 +32,19 @@ type MachineStatusSnapshotController struct {
runner *task.Runner[snapshot.InfoChan, snapshot.CollectTaskSpec]
notifyCh chan *omni.MachineStatusSnapshot
siderolinkCh <-chan *omni.MachineStatusSnapshot
powerStageCh <-chan *omni.MachineStatusSnapshot
generic.NamedController
}
// NewMachineStatusSnapshotController initializes MachineStatusSnapshotController.
func NewMachineStatusSnapshotController(siderolinkEventsCh <-chan *omni.MachineStatusSnapshot) *MachineStatusSnapshotController {
func NewMachineStatusSnapshotController(siderolinkEventsCh, powerStageEventsCh <-chan *omni.MachineStatusSnapshot) *MachineStatusSnapshotController {
return &MachineStatusSnapshotController{
NamedController: generic.NamedController{
ControllerName: MachineStatusSnapshotControllerName,
},
notifyCh: make(chan *omni.MachineStatusSnapshot),
siderolinkCh: siderolinkEventsCh,
powerStageCh: powerStageEventsCh,
runner: task.NewEqualRunner[snapshot.CollectTaskSpec](),
}
}
@ -87,6 +89,10 @@ func (ctrl *MachineStatusSnapshotController) Settings() controller.QSettings {
if err := ctrl.reconcileSnapshot(ctx, r, resource); err != nil {
return err
}
case resource := <-ctrl.powerStageCh:
if err := ctrl.reconcileSnapshot(ctx, r, resource); err != nil {
return err
}
}
}
},
@ -235,8 +241,12 @@ func (ctrl *MachineStatusSnapshotController) reconcileSnapshot(ctx context.Conte
return nil
}
if err := safe.WriterModify(ctx, r, omni.NewMachineStatusSnapshot(resources.DefaultNamespace, snapshot.Metadata().ID()), func(m *omni.MachineStatusSnapshot) error {
m.TypedSpec().Value = snapshot.TypedSpec().Value
if err = safe.WriterModify(ctx, r, omni.NewMachineStatusSnapshot(resources.DefaultNamespace, snapshot.Metadata().ID()), func(m *omni.MachineStatusSnapshot) error {
if snapshot.TypedSpec().Value.MachineStatus != nil { // if this is a power stage snapshot, it will not contain machine status, so we preserve the existing value
m.TypedSpec().Value.MachineStatus = snapshot.TypedSpec().Value.MachineStatus
}
m.TypedSpec().Value.PowerStage = snapshot.TypedSpec().Value.PowerStage // always set the power stage
return nil
}); err != nil && !state.IsPhaseConflictError(err) {

View File

@ -40,7 +40,7 @@ func (suite *MachineStatusSnapshotControllerSuite) TestReconcile() {
siderolinkEventsCh := make(chan *omni.MachineStatusSnapshot)
suite.Require().NoError(suite.runtime.RegisterQController(omnictrl.NewMachineStatusSnapshotController(siderolinkEventsCh)))
suite.Require().NoError(suite.runtime.RegisterQController(omnictrl.NewMachineStatusSnapshotController(siderolinkEventsCh, nil)))
m := omni.NewMachine(resources.DefaultNamespace, "1")
m.TypedSpec().Value.Connected = true

View File

@ -1393,7 +1393,7 @@ func (suite *MigrationSuite) TestSetMachineStatusSnapshotOwner() {
suite.Require().NoError(suite.state.Create(
ctx,
item,
state.WithCreateOwner(omnictrl.NewMachineStatusSnapshotController(nil).Name())),
state.WithCreateOwner(omnictrl.MachineStatusSnapshotControllerName)),
)
}
@ -1405,7 +1405,7 @@ func (suite *MigrationSuite) TestSetMachineStatusSnapshotOwner() {
check := func(item *omni.MachineStatusSnapshot, expectedVersion int) {
result, err := safe.StateGet[*omni.MachineStatusSnapshot](ctx, suite.state, item.Metadata())
suite.Require().NoError(err)
suite.Require().Equal(omnictrl.NewMachineStatusSnapshotController(nil).Name(), result.Metadata().Owner())
suite.Require().Equal(omnictrl.MachineStatusSnapshotControllerName, result.Metadata().Owner())
suite.Require().EqualValues(result.Metadata().Version().Value(), expectedVersion)
}

View File

@ -1175,7 +1175,7 @@ func setMachineStatusSnapshotOwner(ctx context.Context, st state.State, logger *
logger.Info("updating machine status snapshot with empty owner", zap.String("id", item.Metadata().String()))
_, err = safe.StateUpdateWithConflicts(ctx, st, item.Metadata(), func(res *omni.MachineStatusSnapshot) error {
return res.Metadata().SetOwner(omnictrl.NewMachineStatusSnapshotController(nil).Name())
return res.Metadata().SetOwner(omnictrl.MachineStatusSnapshotControllerName)
}, state.WithExpectedPhaseAny(), state.WithUpdateOwner(item.Metadata().Owner()))
if err != nil {
return err

View File

@ -39,6 +39,8 @@ import (
pkgruntime "github.com/siderolabs/omni/client/pkg/runtime"
"github.com/siderolabs/omni/internal/backend/dns"
"github.com/siderolabs/omni/internal/backend/imagefactory"
"github.com/siderolabs/omni/internal/backend/logging"
"github.com/siderolabs/omni/internal/backend/powerstage"
"github.com/siderolabs/omni/internal/backend/resourcelogger"
"github.com/siderolabs/omni/internal/backend/runtime"
"github.com/siderolabs/omni/internal/backend/runtime/cosi"
@ -75,7 +77,8 @@ type Runtime struct {
cachedState state.State
virtual *virtual.State
logger *zap.Logger
logger *zap.Logger
powerStageWatcher *powerstage.Watcher
}
// New creates a new Omni runtime.
@ -155,6 +158,9 @@ func New(talosClientFactory *talos.ClientFactory, dnsService *dns.Service, workl
)
}
powerStageEventsCh := make(chan *omni.MachineStatusSnapshot)
powerStageWatcher := powerstage.NewWatcher(resourceState, powerStageEventsCh, logger.With(logging.Component("power_stage_watcher")))
controllerRuntime, err := cosiruntime.NewRuntime(resourceState, logger, opts...)
if err != nil {
return nil, err
@ -258,7 +264,7 @@ func New(talosClientFactory *talos.ClientFactory, dnsService *dns.Service, workl
omnictrl.NewTalosConfigController(constants.CertificateValidityTime),
omnictrl.NewTalosExtensionsController(imageFactoryClient),
omnictrl.NewTalosUpgradeStatusController(),
omnictrl.NewMachineStatusSnapshotController(siderolinkEventsCh),
omnictrl.NewMachineStatusSnapshotController(siderolinkEventsCh, powerStageEventsCh),
omnictrl.NewMachineProvisionController(),
omnictrl.NewMachineRequestLinkController(resourceState),
omnictrl.NewLabelsExtractorController[*omni.MachineStatus](),
@ -342,6 +348,7 @@ func New(talosClientFactory *talos.ClientFactory, dnsService *dns.Service, workl
dnsService: dnsService,
workloadProxyReconciler: workloadProxyReconciler,
resourceLogger: resourceLogger,
powerStageWatcher: powerStageWatcher,
state: state.WrapCore(validated.NewState(resourceState, validationOptions...)),
cachedState: state.WrapCore(validated.NewState(controllerRuntime.CachedState(), validationOptions...)),
virtual: virtualState,
@ -365,6 +372,7 @@ func (r *Runtime) Run(ctx context.Context, eg newgroup.EGroup) {
newgroup.GoWithContext(ctx, eg, makeWrap(r.resourceLogger.Start, "resource logger failed"))
}
newgroup.GoWithContext(ctx, eg, makeWrap(r.powerStageWatcher.Run, "power stage watcher failed"))
newgroup.GoWithContext(ctx, eg, makeWrap(r.talosClientFactory.StartCacheManager, "talos client factory failed"))
newgroup.GoWithContext(ctx, eg, makeWrap(r.dnsService.Start, "dns service failed"))
newgroup.GoWithContext(ctx, eg, makeWrap(r.controllerRuntime.Run, "controller runtime failed"))