fix: implement the controller for handling machine status snapshot

Make the controller run tasks that can collect machine status from each
machine.
Instead of changing the `MachineStatusSnapshot` directly in the
siderolink events handler pass these events to the controller through
the channel, so that all events are handled in the same place.

If either event comes from siderolink or if task runner gets the machine
status it updates the `MachineStatusSnapshot` resource.

Signed-off-by: Artem Chernyshev <artem.chernyshev@talos-systems.com>
This commit is contained in:
Artem Chernyshev 2024-05-31 21:05:58 +03:00
parent 6aa21409e5
commit ed26122ce0
No known key found for this signature in database
GPG Key ID: E084A2DF1143C14D
60 changed files with 1151 additions and 1161 deletions

File diff suppressed because it is too large Load Diff

View File

@ -177,11 +177,6 @@ message MachineStatusSpec {
bool enabled = 1;
}
message TalosMachineStatus {
machine.MachineStatusEvent status = 1;
google.protobuf.Timestamp updated_at = 2;
}
// Talos version.
string talos_version = 1;
@ -231,7 +226,7 @@ message MachineStatusSpec {
// InitialTalosVersion is set only once when the machine first joined Omni.
string initial_talos_version = 16;
TalosMachineStatus talos_machine_status = 17;
reserved 17;
SecureBootStatus secure_boot_status = 18;
}

View File

@ -319,32 +319,6 @@ func (m *MachineStatusSpec_SecureBootStatus) CloneMessageVT() proto.Message {
return m.CloneVT()
}
func (m *MachineStatusSpec_TalosMachineStatus) CloneVT() *MachineStatusSpec_TalosMachineStatus {
if m == nil {
return (*MachineStatusSpec_TalosMachineStatus)(nil)
}
r := new(MachineStatusSpec_TalosMachineStatus)
r.UpdatedAt = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.UpdatedAt).CloneVT())
if rhs := m.Status; rhs != nil {
if vtpb, ok := interface{}(rhs).(interface {
CloneVT() *machine.MachineStatusEvent
}); ok {
r.Status = vtpb.CloneVT()
} else {
r.Status = proto.Clone(rhs).(*machine.MachineStatusEvent)
}
}
if len(m.unknownFields) > 0 {
r.unknownFields = make([]byte, len(m.unknownFields))
copy(r.unknownFields, m.unknownFields)
}
return r
}
func (m *MachineStatusSpec_TalosMachineStatus) CloneMessageVT() proto.Message {
return m.CloneVT()
}
func (m *MachineStatusSpec) CloneVT() *MachineStatusSpec {
if m == nil {
return (*MachineStatusSpec)(nil)
@ -362,7 +336,6 @@ func (m *MachineStatusSpec) CloneVT() *MachineStatusSpec {
r.PlatformMetadata = m.PlatformMetadata.CloneVT()
r.Schematic = m.Schematic.CloneVT()
r.InitialTalosVersion = m.InitialTalosVersion
r.TalosMachineStatus = m.TalosMachineStatus.CloneVT()
r.SecureBootStatus = m.SecureBootStatus.CloneVT()
if rhs := m.ImageLabels; rhs != nil {
tmpContainer := make(map[string]string, len(rhs))
@ -2502,34 +2475,6 @@ func (this *MachineStatusSpec_SecureBootStatus) EqualMessageVT(thatMsg proto.Mes
}
return this.EqualVT(that)
}
func (this *MachineStatusSpec_TalosMachineStatus) EqualVT(that *MachineStatusSpec_TalosMachineStatus) bool {
if this == that {
return true
} else if this == nil || that == nil {
return false
}
if equal, ok := interface{}(this.Status).(interface {
EqualVT(*machine.MachineStatusEvent) bool
}); ok {
if !equal.EqualVT(that.Status) {
return false
}
} else if !proto.Equal(this.Status, that.Status) {
return false
}
if !(*timestamppb1.Timestamp)(this.UpdatedAt).EqualVT((*timestamppb1.Timestamp)(that.UpdatedAt)) {
return false
}
return string(this.unknownFields) == string(that.unknownFields)
}
func (this *MachineStatusSpec_TalosMachineStatus) EqualMessageVT(thatMsg proto.Message) bool {
that, ok := thatMsg.(*MachineStatusSpec_TalosMachineStatus)
if !ok {
return false
}
return this.EqualVT(that)
}
func (this *MachineStatusSpec) EqualVT(that *MachineStatusSpec) bool {
if this == that {
return true
@ -2584,9 +2529,6 @@ func (this *MachineStatusSpec) EqualVT(that *MachineStatusSpec) bool {
if this.InitialTalosVersion != that.InitialTalosVersion {
return false
}
if !this.TalosMachineStatus.EqualVT(that.TalosMachineStatus) {
return false
}
if !this.SecureBootStatus.EqualVT(that.SecureBootStatus) {
return false
}
@ -5621,71 +5563,6 @@ func (m *MachineStatusSpec_SecureBootStatus) MarshalToSizedBufferVT(dAtA []byte)
return len(dAtA) - i, nil
}
func (m *MachineStatusSpec_TalosMachineStatus) MarshalVT() (dAtA []byte, err error) {
if m == nil {
return nil, nil
}
size := m.SizeVT()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBufferVT(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MachineStatusSpec_TalosMachineStatus) MarshalToVT(dAtA []byte) (int, error) {
size := m.SizeVT()
return m.MarshalToSizedBufferVT(dAtA[:size])
}
func (m *MachineStatusSpec_TalosMachineStatus) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
if m == nil {
return 0, nil
}
i := len(dAtA)
_ = i
var l int
_ = l
if m.unknownFields != nil {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if m.UpdatedAt != nil {
size, err := (*timestamppb1.Timestamp)(m.UpdatedAt).MarshalToSizedBufferVT(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = protohelpers.EncodeVarint(dAtA, i, uint64(size))
i--
dAtA[i] = 0x12
}
if m.Status != nil {
if vtmsg, ok := interface{}(m.Status).(interface {
MarshalToSizedBufferVT([]byte) (int, error)
}); ok {
size, err := vtmsg.MarshalToSizedBufferVT(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = protohelpers.EncodeVarint(dAtA, i, uint64(size))
} else {
encoded, err := proto.Marshal(m.Status)
if err != nil {
return 0, err
}
i -= len(encoded)
copy(dAtA[i:], encoded)
i = protohelpers.EncodeVarint(dAtA, i, uint64(len(encoded)))
}
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *MachineStatusSpec) MarshalVT() (dAtA []byte, err error) {
if m == nil {
return nil, nil
@ -5728,18 +5605,6 @@ func (m *MachineStatusSpec) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i--
dAtA[i] = 0x92
}
if m.TalosMachineStatus != nil {
size, err := m.TalosMachineStatus.MarshalToSizedBufferVT(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = protohelpers.EncodeVarint(dAtA, i, uint64(size))
i--
dAtA[i] = 0x1
i--
dAtA[i] = 0x8a
}
if len(m.InitialTalosVersion) > 0 {
i -= len(m.InitialTalosVersion)
copy(dAtA[i:], m.InitialTalosVersion)
@ -10574,30 +10439,6 @@ func (m *MachineStatusSpec_SecureBootStatus) SizeVT() (n int) {
return n
}
func (m *MachineStatusSpec_TalosMachineStatus) SizeVT() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Status != nil {
if size, ok := interface{}(m.Status).(interface {
SizeVT() int
}); ok {
l = size.SizeVT()
} else {
l = proto.Size(m.Status)
}
n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
}
if m.UpdatedAt != nil {
l = (*timestamppb1.Timestamp)(m.UpdatedAt).SizeVT()
n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
}
n += len(m.unknownFields)
return n
}
func (m *MachineStatusSpec) SizeVT() (n int) {
if m == nil {
return 0
@ -10657,10 +10498,6 @@ func (m *MachineStatusSpec) SizeVT() (n int) {
if l > 0 {
n += 2 + l + protohelpers.SizeOfVarint(uint64(l))
}
if m.TalosMachineStatus != nil {
l = m.TalosMachineStatus.SizeVT()
n += 2 + l + protohelpers.SizeOfVarint(uint64(l))
}
if m.SecureBootStatus != nil {
l = m.SecureBootStatus.SizeVT()
n += 2 + l + protohelpers.SizeOfVarint(uint64(l))
@ -14543,137 +14380,6 @@ func (m *MachineStatusSpec_SecureBootStatus) UnmarshalVT(dAtA []byte) error {
}
return nil
}
func (m *MachineStatusSpec_TalosMachineStatus) UnmarshalVT(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return protohelpers.ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MachineStatusSpec_TalosMachineStatus: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MachineStatusSpec_TalosMachineStatus: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return protohelpers.ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return protohelpers.ErrInvalidLength
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return protohelpers.ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Status == nil {
m.Status = &machine.MachineStatusEvent{}
}
if unmarshal, ok := interface{}(m.Status).(interface {
UnmarshalVT([]byte) error
}); ok {
if err := unmarshal.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err
}
} else {
if err := proto.Unmarshal(dAtA[iNdEx:postIndex], m.Status); err != nil {
return err
}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field UpdatedAt", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return protohelpers.ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return protohelpers.ErrInvalidLength
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return protohelpers.ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.UpdatedAt == nil {
m.UpdatedAt = &timestamppb.Timestamp{}
}
if err := (*timestamppb1.Timestamp)(m.UpdatedAt).UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := protohelpers.Skip(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return protohelpers.ErrInvalidLength
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *MachineStatusSpec) UnmarshalVT(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -15193,42 +14899,6 @@ func (m *MachineStatusSpec) UnmarshalVT(dAtA []byte) error {
}
m.InitialTalosVersion = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 17:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field TalosMachineStatus", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return protohelpers.ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return protohelpers.ErrInvalidLength
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return protohelpers.ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.TalosMachineStatus == nil {
m.TalosMachineStatus = &MachineStatusSpec_TalosMachineStatus{}
}
if err := m.TalosMachineStatus.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 18:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SecureBootStatus", wireType)

View File

@ -28,6 +28,7 @@ import (
"github.com/siderolabs/omni/client/pkg/constants"
authres "github.com/siderolabs/omni/client/pkg/omni/resources/auth"
omnires "github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/backend"
"github.com/siderolabs/omni/internal/backend/discovery"
"github.com/siderolabs/omni/internal/backend/dns"
@ -171,6 +172,7 @@ func runWithState(logger *zap.Logger) func(context.Context, state.State, *virtua
}
linkCounterDeltaCh := make(chan siderolink.LinkCounterDeltas)
siderolinkEventsCh := make(chan *omnires.MachineStatusSnapshot)
discoveryClient, err := discovery.NewClient(ctx)
if err != nil {
@ -184,7 +186,7 @@ func runWithState(logger *zap.Logger) func(context.Context, state.State, *virtua
}()
omniRuntime, err := omni.New(talosClientFactory, dnsService, workloadProxyServiceRegistry, resourceLogger,
imageFactoryClient, linkCounterDeltaCh, resourceState, virtualState,
imageFactoryClient, linkCounterDeltaCh, siderolinkEventsCh, resourceState, virtualState,
prometheus.DefaultRegisterer, discoveryClient, logger.With(logging.Component("omni_runtime")))
if err != nil {
return fmt.Errorf("failed to set up the controller runtime: %w", err)
@ -230,6 +232,7 @@ func runWithState(logger *zap.Logger) func(context.Context, state.State, *virtua
workloadProxyServiceRegistry,
imageFactoryClient,
linkCounterDeltaCh,
siderolinkEventsCh,
omniRuntime,
talosRuntime,
logHandler,

View File

@ -214,11 +214,6 @@ export type MachineStatusSpecSecureBootStatus = {
enabled?: boolean
}
export type MachineStatusSpecTalosMachineStatus = {
status?: MachineMachine.MachineStatusEvent
updated_at?: GoogleProtobufTimestamp.Timestamp
}
export type MachineStatusSpec = {
talos_version?: string
hardware?: MachineStatusSpecHardwareStatus
@ -233,7 +228,6 @@ export type MachineStatusSpec = {
image_labels?: {[key: string]: string}
schematic?: MachineStatusSpecSchematic
initial_talos_version?: string
talos_machine_status?: MachineStatusSpecTalosMachineStatus
secure_boot_status?: MachineStatusSpecSecureBootStatus
}

View File

@ -158,5 +158,7 @@ func (suite *ServiceSuite) assertResolve(node string, expected dns.Info) {
}
func TestServiceSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ServiceSuite))
}

View File

@ -94,7 +94,7 @@ func (suite *GrpcSuite) SetupTest() {
suite.Require().NoError(err)
suite.runtime, err = omniruntime.New(clientFactory, dnsService, workloadProxyServiceRegistry, nil,
imageFactoryClient, nil, suite.state, nil, prometheus.NewRegistry(), discoveryServiceClientMock, logger)
imageFactoryClient, nil, nil, suite.state, nil, prometheus.NewRegistry(), discoveryServiceClientMock, logger)
suite.Require().NoError(err)
runtime.Install(omniruntime.Name, suite.runtime)
@ -324,5 +324,7 @@ func (d *discoveryClientMock) AffiliateDelete(context.Context, string, string) e
}
func TestGrpcSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(GrpcSuite))
}

View File

@ -7,13 +7,18 @@
package helpers
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"strings"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/controller/generic"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/kvutils"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
@ -127,3 +132,57 @@ func ClearUserLabels(res resource.Resource) {
}
})
}
// HandleInputOptions optional args for HandleInput.
type HandleInputOptions struct {
id string
}
// HandleInputOption optional arg for HandleInput.
type HandleInputOption func(*HandleInputOptions)
// WithID maps the resource using another id.
func WithID(id string) HandleInputOption {
return func(hio *HandleInputOptions) {
hio.id = id
}
}
// HandleInput reads the additional input resource and automatically manages finalizers.
// By default maps the resource using same id.
func HandleInput[T generic.ResourceWithRD, S generic.ResourceWithRD](ctx context.Context, r controller.QRuntime, finalizer string, main S, opts ...HandleInputOption) (T, error) {
var zero T
options := HandleInputOptions{
id: main.Metadata().ID(),
}
for _, o := range opts {
o(&options)
}
res, err := safe.ReaderGetByID[T](ctx, r, options.id)
if err != nil {
if state.IsNotFoundError(err) {
return zero, nil
}
return zero, err
}
if res.Metadata().Phase() == resource.PhaseTearingDown || main.Metadata().Phase() == resource.PhaseTearingDown {
if err := r.RemoveFinalizer(ctx, res.Metadata(), finalizer); err != nil && !state.IsNotFoundError(err) {
return zero, err
}
return zero, nil
}
if !res.Metadata().Finalizers().Has(finalizer) {
if err := r.AddFinalizer(ctx, res.Metadata(), finalizer); err != nil {
return zero, err
}
}
return res, nil
}

View File

@ -50,5 +50,7 @@ func (suite *CertRefreshTickSuite) TestReconcile() {
}
func TestCertRefreshTickSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(CertRefreshTickSuite))
}

View File

@ -177,5 +177,7 @@ func (suite *ClusterBootstrapStatusSuite) testRecoverControlPlaneFromEtcdBackup(
}
func TestClusterBootstrapStatusSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ClusterBootstrapStatusSuite))
}

View File

@ -49,5 +49,7 @@ func (suite *ClusterEndpointSuite) TestReconcile() {
}
func TestClusterEndpointSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ClusterEndpointSuite))
}

View File

@ -66,5 +66,7 @@ func (suite *ClusterKubernetesNodesSuite) TestReconcile() {
}
func TestClusterKubernetesNodesSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ClusterKubernetesNodesSuite))
}

View File

@ -110,5 +110,7 @@ func (suite *ClusterLoadBalancerSuite) TestReconcile() {
}
func TestClusterLoadbalancerSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ClusterLoadBalancerSuite))
}

View File

@ -696,5 +696,7 @@ func (suite *ClusterMachineConfigStatusSuite) TestGenerationErrorPropagation() {
}
func TestClusterMachineConfigStatusSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ClusterMachineConfigStatusSuite))
}

View File

@ -179,5 +179,7 @@ func (suite *ClusterMachineConfigSuite) TestGenerationError() {
}
func TestClusterMachineConfigSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ClusterMachineConfigSuite))
}

View File

@ -219,5 +219,7 @@ func (suite *ClusterMachineStatusSuite) TestApidAvailable() {
}
func TestClusterMachineStatusSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ClusterMachineStatusSuite))
}

View File

@ -222,5 +222,7 @@ func (suite *ClusterStatusSuite) TestReconcile() {
}
func TestClusterStatusSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ClusterStatusSuite))
}

View File

@ -82,5 +82,7 @@ func (suite *ClusterSuite) TestReconcile() {
}
func TestClusterSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ClusterSuite))
}

View File

@ -50,6 +50,8 @@ import (
)
func TestEtcdBackupControllerSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(EtcdBackupControllerSuite))
}

View File

@ -68,6 +68,8 @@ func (m *mockImageClient) PullImageToNode(_ context.Context, cluster, node, imag
}
func TestImagePullStatusControllerSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ImagePullStatusControllerSuite))
}

View File

@ -7,9 +7,12 @@ package machine
import (
"context"
"fmt"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/meta"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/talos/pkg/machinery/client"
)
@ -35,3 +38,20 @@ func forEachResource[T resource.Resource](
return nil
}
// QueryRegisteredTypes gets all registered types from the meta namespace.
func QueryRegisteredTypes(ctx context.Context, st state.State) (map[resource.Type]struct{}, error) {
// query all resources to start watching only resources that are defined for running version of talos
resources, err := safe.StateList[*meta.ResourceDefinition](ctx, st, resource.NewMetadata(meta.NamespaceName, meta.ResourceDefinitionType, "", resource.VersionUndefined))
if err != nil {
return nil, fmt.Errorf("failed to list resource definitions: %w", err)
}
registeredTypes := map[resource.Type]struct{}{}
resources.ForEach(func(rd *meta.ResourceDefinition) {
registeredTypes[rd.TypedSpec().Type] = struct{}{}
})
return registeredTypes, nil
}

View File

@ -15,8 +15,6 @@ import (
"time"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/meta"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/maps"
@ -50,9 +48,8 @@ type Info struct { //nolint:govet
MemoryModules []*specs.MachineStatusSpec_HardwareStatus_MemoryModule
Blockdevices []*specs.MachineStatusSpec_HardwareStatus_BlockDevice
TalosMachineStatus *specs.MachineStatusSpec_TalosMachineStatus
PlatformMetadata *specs.MachineStatusSpec_PlatformMetadata
Schematic *specs.MachineStatusSpec_Schematic
PlatformMetadata *specs.MachineStatusSpec_PlatformMetadata
Schematic *specs.MachineStatusSpec_Schematic
LastError error
MachineID string
@ -193,8 +190,7 @@ func (spec CollectTaskSpec) RunTask(ctx context.Context, logger *zap.Logger, not
watchCh := make(chan state.Event)
// query all resources to start watching only resources that are defined for running version of talos
resources, err := safe.StateList[*meta.ResourceDefinition](ctx, c.COSI, resource.NewMetadata(meta.NamespaceName, meta.ResourceDefinitionType, "", resource.VersionUndefined))
registeredTypes, err := QueryRegisteredTypes(ctx, c.COSI)
if err != nil {
// this is the first request to the Talos API
// if it fails we handle it and update the machine status with the request error
@ -202,15 +198,9 @@ func (spec CollectTaskSpec) RunTask(ctx context.Context, logger *zap.Logger, not
return nil
}
return fmt.Errorf("failed to list resource definitions: %w", err)
return err
}
registeredTypes := map[resource.Type]struct{}{}
resources.ForEach(func(rd *meta.ResourceDefinition) {
registeredTypes[rd.TypedSpec().Type] = struct{}{}
})
// as Talos < 1.3.0 doesn't support Bootstrapped event, we use a mixed approach:
// watch is used to trigger polling on changes to the resources
watchers := map[resource.Type]struct {

View File

@ -21,7 +21,6 @@ import (
"github.com/siderolabs/go-pointer"
"github.com/siderolabs/go-procfs/procfs"
"github.com/siderolabs/image-factory/pkg/schematic"
"github.com/siderolabs/talos/pkg/machinery/api/machine"
"github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/nethelpers"
@ -31,7 +30,6 @@ import (
"github.com/siderolabs/talos/pkg/machinery/resources/runtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/siderolabs/omni/client/api/omni/specs"
omnimeta "github.com/siderolabs/omni/client/pkg/meta"
@ -51,7 +49,6 @@ var resourcePollers = map[string]machinePollFunction{
runtime.PlatformMetadataType: pollPlatformMetadata,
runtime.MetaKeyType: pollMeta,
runtime.ExtensionStatusType: pollExtensions,
runtime.MachineStatusType: pollTalosMachineStatus,
}
var machinePollers = map[string]machinePollFunction{
@ -279,74 +276,6 @@ func pollPlatformMetadata(ctx context.Context, c *client.Client, info *Info) err
})
}
func pollTalosMachineStatus(ctx context.Context, c *client.Client, info *Info) error {
convertStage := func(stage runtime.MachineStage) (machine.MachineStatusEvent_MachineStage, error) {
switch stage {
case runtime.MachineStageUnknown:
return machine.MachineStatusEvent_UNKNOWN, nil
case runtime.MachineStageBooting:
return machine.MachineStatusEvent_BOOTING, nil
case runtime.MachineStageInstalling:
return machine.MachineStatusEvent_INSTALLING, nil
case runtime.MachineStageMaintenance:
return machine.MachineStatusEvent_MAINTENANCE, nil
case runtime.MachineStageRunning:
return machine.MachineStatusEvent_RUNNING, nil
case runtime.MachineStageRebooting:
return machine.MachineStatusEvent_REBOOTING, nil
case runtime.MachineStageShuttingDown:
return machine.MachineStatusEvent_SHUTTING_DOWN, nil
case runtime.MachineStageResetting:
return machine.MachineStatusEvent_RESETTING, nil
case runtime.MachineStageUpgrading:
return machine.MachineStatusEvent_UPGRADING, nil
default:
return machine.MachineStatusEvent_UNKNOWN, fmt.Errorf("unknown stage: %d", stage)
}
}
convertStatus := func(r *runtime.MachineStatus) (*specs.MachineStatusSpec_TalosMachineStatus, error) {
spec := r.TypedSpec()
statusEventMachineStage, err := convertStage(spec.Stage)
if err != nil {
return nil, err
}
return &specs.MachineStatusSpec_TalosMachineStatus{
UpdatedAt: timestamppb.New(r.Metadata().Updated()),
Status: &machine.MachineStatusEvent{
Stage: statusEventMachineStage,
Status: &machine.MachineStatusEvent_MachineStatus{
Ready: spec.Status.Ready,
UnmetConditions: xslices.Map(spec.Status.UnmetConditions, func(t runtime.UnmetCondition) *machine.MachineStatusEvent_MachineStatus_UnmetCondition {
return &machine.MachineStatusEvent_MachineStatus_UnmetCondition{
Name: t.Name,
Reason: t.Reason,
}
}),
},
},
}, nil
}
return forEachResource(
ctx,
c,
runtime.NamespaceName,
runtime.MachineStatusType,
func(r *runtime.MachineStatus) error {
machineStatusEvent, err := convertStatus(r)
if err != nil {
return err
}
info.TalosMachineStatus = machineStatusEvent
return nil
})
}
func pollSecureBootStatus(ctx context.Context, c *client.Client, info *Info) error {
isSecureBootEnabled := func() (bool, error) {
_, err := safe.StateGetByID[*meta.ResourceDefinition](ctx, c.COSI, strings.ToLower(runtime.SecurityStateType))

View File

@ -7,6 +7,7 @@ package task
import (
"context"
"sync"
"go.uber.org/zap"
)
@ -18,6 +19,7 @@ type EqualityFunc[T any] func(x, y T) bool
type Runner[T any, S Spec[T]] struct {
running map[ID]*Task[T, S]
equalityFunc EqualityFunc[S]
mu sync.Mutex
}
// NewRunner creates a new task runner.
@ -44,8 +46,37 @@ func (runner *Runner[T, S]) Stop() {
}
}
// StartTask starts a new task.
func (runner *Runner[T, S]) StartTask(ctx context.Context, logger *zap.Logger, id string, spec S, task T) {
runner.mu.Lock()
defer runner.mu.Unlock()
runner.running[id] = New(logger, spec, task)
logger.Debug("starting task", zap.String("task", id))
runner.running[id].Start(ctx)
}
// StopTask stop the running task.
func (runner *Runner[T, S]) StopTask(logger *zap.Logger, id string) {
runner.mu.Lock()
defer runner.mu.Unlock()
if _, ok := runner.running[id]; !ok {
return
}
logger.Debug("stopping task", zap.String("task", id))
runner.running[id].Stop()
delete(runner.running, id)
}
// Reconcile running tasks.
func (runner *Runner[T, S]) Reconcile(ctx context.Context, logger *zap.Logger, shouldRun map[ID]S, in T) {
runner.mu.Lock()
defer runner.mu.Unlock()
// stop running tasks which shouldn't run
for id := range runner.running {
if _, exists := shouldRun[id]; !exists {

View File

@ -0,0 +1,217 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// Package snapshot implements a task which collects MachineStatus resource from a Machine.
package snapshot
import (
"context"
"crypto/tls"
"fmt"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/talos/pkg/machinery/api/machine"
"github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/pkg/machinery/resources/runtime"
"go.uber.org/zap"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
machinetask "github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/task/machine"
"github.com/siderolabs/omni/internal/backend/runtime/talos"
)
// InfoChan is a channel for sending machine info from tasks back to the controller.
type InfoChan chan<- *omni.MachineStatusSnapshot
// CollectTaskSpec describes a task to collect machine information.
type CollectTaskSpec struct {
_ [0]func() // make uncomparable
TalosConfig *omni.TalosConfig
Endpoint string
MachineID string
}
func resourceEqual[T any, S interface {
resource.Resource
*T
}](a, b S) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return resource.Equal(a, b)
}
// Equal compares two task specs for the same machine.
//
// If the task spec changes, the task will be restarted.
func (spec CollectTaskSpec) Equal(other CollectTaskSpec) bool {
if spec.Endpoint != other.Endpoint {
return false
}
if !resourceEqual(spec.TalosConfig, other.TalosConfig) {
return false
}
return true
}
// ID returns the task ID.
func (spec CollectTaskSpec) ID() string {
return spec.MachineID
}
func (spec CollectTaskSpec) sendInfo(ctx context.Context, info *omni.MachineStatusSnapshot, notifyCh InfoChan) bool {
return channel.SendWithContext(ctx, notifyCh, info)
}
// RunTask runs the machine status collector.
func (spec CollectTaskSpec) RunTask(ctx context.Context, _ *zap.Logger, notifyCh InfoChan) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
client, err := spec.getClient(ctx)
if err != nil {
return err
}
defer client.Close() //nolint:errcheck
registeredTypes, err := machinetask.QueryRegisteredTypes(ctx, client.COSI)
if err != nil {
return err
}
if _, registered := registeredTypes[runtime.MachineStatusType]; !registered {
return nil
}
watchCh := make(chan state.Event)
if err = client.COSI.Watch(ctx, runtime.NewMachineStatus().Metadata(), watchCh); err != nil {
return err
}
for {
var event state.Event
select {
case <-ctx.Done():
return nil
case event = <-watchCh:
}
switch event.Type {
case state.Errored:
return fmt.Errorf("error watching COSI resource: %w", event.Error)
case state.Bootstrapped, state.Destroyed:
// ignore
case state.Created, state.Updated:
snapshot := omni.NewMachineStatusSnapshot(resources.DefaultNamespace, spec.MachineID)
machineStatusSpec := event.Resource.Spec().(*runtime.MachineStatusSpec) //nolint:forcetypeassert,errcheck
ev, err := convertStatus(machineStatusSpec)
if err != nil {
return err
}
snapshot.TypedSpec().Value.MachineStatus = ev
if !spec.sendInfo(ctx, snapshot, notifyCh) {
return nil
}
}
}
}
func (spec CollectTaskSpec) getClient(ctx context.Context) (*client.Client, error) {
opts := talos.GetSocketOptions(spec.Endpoint)
talosConfig := spec.TalosConfig
retry:
if talosConfig == nil {
opts = append(opts, client.WithTLSConfig(&tls.Config{
InsecureSkipVerify: true,
}), client.WithEndpoints(spec.Endpoint))
return client.New(ctx, opts...)
}
config := omni.NewTalosClientConfig(spec.TalosConfig, spec.Endpoint)
opts = append(opts, client.WithConfig(config))
c, err := client.New(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("error building Talos API client: %w", err)
}
// if the request failed retry once again with the insecure client
_, err = c.Version(ctx)
if err != nil {
talosConfig = nil
goto retry
}
return c, nil
}
func convertStage(stage runtime.MachineStage) (machine.MachineStatusEvent_MachineStage, error) {
switch stage {
case runtime.MachineStageUnknown:
return machine.MachineStatusEvent_UNKNOWN, nil
case runtime.MachineStageBooting:
return machine.MachineStatusEvent_BOOTING, nil
case runtime.MachineStageInstalling:
return machine.MachineStatusEvent_INSTALLING, nil
case runtime.MachineStageMaintenance:
return machine.MachineStatusEvent_MAINTENANCE, nil
case runtime.MachineStageRunning:
return machine.MachineStatusEvent_RUNNING, nil
case runtime.MachineStageRebooting:
return machine.MachineStatusEvent_REBOOTING, nil
case runtime.MachineStageShuttingDown:
return machine.MachineStatusEvent_SHUTTING_DOWN, nil
case runtime.MachineStageResetting:
return machine.MachineStatusEvent_RESETTING, nil
case runtime.MachineStageUpgrading:
return machine.MachineStatusEvent_UPGRADING, nil
default:
return machine.MachineStatusEvent_UNKNOWN, fmt.Errorf("unknown stage: %d", stage)
}
}
func convertStatus(spec *runtime.MachineStatusSpec) (*machine.MachineStatusEvent, error) {
statusEventMachineStage, err := convertStage(spec.Stage)
if err != nil {
return nil, err
}
return &machine.MachineStatusEvent{
Stage: statusEventMachineStage,
Status: &machine.MachineStatusEvent_MachineStatus{
Ready: spec.Status.Ready,
UnmetConditions: xslices.Map(spec.Status.UnmetConditions, func(t runtime.UnmetCondition) *machine.MachineStatusEvent_MachineStatus_UnmetCondition {
return &machine.MachineStatusEvent_MachineStatus_UnmetCondition{
Name: t.Name,
Reason: t.Reason,
}
}),
},
}, nil
}

View File

@ -22,6 +22,8 @@ import (
)
func TestKeyPrunerSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(KeyPrunerSuite))
}

View File

@ -90,5 +90,7 @@ func (suite *KubeconfigSuite) TestReconcile() {
}
func TestKubeconfigSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(KubeconfigSuite))
}

View File

@ -131,5 +131,7 @@ func (k *kubernetesClientMock) DeleteNode(_ context.Context, node string) error
}
func TestKubernetesNodeAuditSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(KubernetesNodeAuditSuite))
}

View File

@ -95,6 +95,8 @@ type LoadBalancerSuite struct {
}
func TestLoadBalancerSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(LoadBalancerSuite))
}

View File

@ -89,5 +89,7 @@ func (suite *MachineCleanupSuite) TestSkipMachineSetNodeWithOwner() {
}
func TestMachineCleanupSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(MachineCleanupSuite))
}

View File

@ -130,5 +130,7 @@ func (suite *ExtensionsConfigurationStatusSuite) TestReconcile() {
}
func TestExtensionsConfigurationStatusSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ExtensionsConfigurationStatusSuite))
}

View File

@ -69,5 +69,7 @@ func (suite *MachineLabelsSuite) TestLabelsReconcile() {
}
func TestMachineLabelsSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(MachineLabelsSuite))
}

View File

@ -530,5 +530,7 @@ func (suite *MachineSetEtcdAuditSuite) cleanupResources() {
}
func TestMachineSetEtcdAuditSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(MachineSetEtcdAuditSuite))
}

View File

@ -923,6 +923,8 @@ func (suite *MachineSetStatusSuite) assertMachineSetPhase(machineSet *omni.Machi
}
func TestMachineSetStatusSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(MachineSetStatusSuite))
}

View File

@ -417,10 +417,6 @@ func (ctrl *MachineStatusController) handleNotification(ctx context.Context, r c
spec.PlatformMetadata = event.PlatformMetadata
}
if event.TalosMachineStatus != nil {
spec.TalosMachineStatus = event.TalosMachineStatus
}
if event.ImageLabels != nil {
spec.ImageLabels = event.ImageLabels

View File

@ -208,5 +208,7 @@ func makeMD[T generic.ResourceWithRD](id resource.ID) resource.Metadata {
}
func TestMachineStatusLinkSuiteSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(MachineStatusLinkSuite))
}

View File

@ -0,0 +1,241 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
package omni
import (
"context"
"fmt"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/controller/generic"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
"go.uber.org/zap"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/helpers"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/task"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/internal/task/snapshot"
)
// MachineStatusSnapshotControllerName is the name of the MachineStatusSnapshotController.
const MachineStatusSnapshotControllerName = "MachineStatusSnapshotController"
// MachineStatusSnapshotController manages omni.MachineStatuses based on information from Talos API.
type MachineStatusSnapshotController struct {
runner *task.Runner[snapshot.InfoChan, snapshot.CollectTaskSpec]
notifyCh chan *omni.MachineStatusSnapshot
siderolinkCh <-chan *omni.MachineStatusSnapshot
generic.NamedController
}
// NewMachineStatusSnapshotController initializes MachineStatusSnapshotController.
func NewMachineStatusSnapshotController(siderolinkEventsCh <-chan *omni.MachineStatusSnapshot) *MachineStatusSnapshotController {
return &MachineStatusSnapshotController{
NamedController: generic.NamedController{
ControllerName: MachineStatusSnapshotControllerName,
},
notifyCh: make(chan *omni.MachineStatusSnapshot),
siderolinkCh: siderolinkEventsCh,
runner: task.NewEqualRunner[snapshot.CollectTaskSpec](),
}
}
// Settings implements controller.QController interface.
func (ctrl *MachineStatusSnapshotController) Settings() controller.QSettings {
return controller.QSettings{
Inputs: []controller.Input{
{
Namespace: resources.DefaultNamespace,
Type: omni.MachineType,
Kind: controller.InputQPrimary,
},
{
Namespace: resources.DefaultNamespace,
Type: omni.TalosConfigType,
Kind: controller.InputQMapped,
},
{
Namespace: resources.DefaultNamespace,
Type: omni.ClusterMachineType,
Kind: controller.InputQMapped,
},
},
Outputs: []controller.Output{
{
Kind: controller.OutputExclusive,
Type: omni.MachineStatusSnapshotType,
},
},
Concurrency: optional.Some[uint](4),
RunHook: func(ctx context.Context, _ *zap.Logger, r controller.QRuntime) error {
for {
select {
case <-ctx.Done():
return nil
case resource := <-ctrl.siderolinkCh:
if err := ctrl.reconcileSnapshot(ctx, r, resource); err != nil {
return err
}
case resource := <-ctrl.notifyCh:
if err := ctrl.reconcileSnapshot(ctx, r, resource); err != nil {
return err
}
}
}
},
ShutdownHook: func() {
ctrl.runner.Stop()
},
}
}
// MapInput implements controller.QController interface.
func (ctrl *MachineStatusSnapshotController) MapInput(ctx context.Context, _ *zap.Logger,
r controller.QRuntime, ptr resource.Pointer,
) ([]resource.Pointer, error) {
_, err := r.Get(ctx, ptr)
if err != nil {
if state.IsNotFoundError(err) {
return nil, nil
}
}
switch ptr.Type() {
case omni.ClusterMachineType:
fallthrough
case omni.MachineType:
return []resource.Pointer{
omni.NewMachineStatusSnapshot(resources.DefaultNamespace, ptr.ID()).Metadata(),
}, nil
case omni.TalosConfigType:
machines, err := safe.ReaderListAll[*omni.ClusterMachineStatus](ctx, r, state.WithLabelQuery(resource.LabelEqual(omni.LabelCluster, ptr.ID())))
if err != nil {
return nil, err
}
res := make([]resource.Pointer, 0, machines.Len())
machines.ForEach(func(r *omni.ClusterMachineStatus) {
res = append(res, omni.NewMachineStatusSnapshot(resources.DefaultNamespace, r.Metadata().ID()).Metadata())
})
return res, nil
}
return nil, fmt.Errorf("unexpected resource type %q", ptr.Type())
}
// Reconcile implements controller.QController interface.
func (ctrl *MachineStatusSnapshotController) Reconcile(ctx context.Context,
logger *zap.Logger, r controller.QRuntime, ptr resource.Pointer,
) error {
machine, err := safe.ReaderGetByID[*omni.Machine](ctx, r, ptr.ID())
if err != nil {
if state.IsNotFoundError(err) {
return nil
}
return err
}
if machine.Metadata().Phase() == resource.PhaseTearingDown {
return ctrl.reconcileTearingDown(ctx, r, logger, machine)
}
return ctrl.reconcileRunning(ctx, r, logger, machine)
}
func (ctrl *MachineStatusSnapshotController) reconcileRunning(ctx context.Context, r controller.QRuntime, logger *zap.Logger, machine *omni.Machine) error {
if !machine.Metadata().Finalizers().Has(ctrl.Name()) {
if err := r.AddFinalizer(ctx, machine.Metadata(), ctrl.Name()); err != nil {
return err
}
}
ctrl.runner.StopTask(logger, machine.Metadata().ID())
clusterMachine, err := helpers.HandleInput[*omni.ClusterMachine](ctx, r, ctrl.Name(), machine)
if err != nil {
return err
}
var talosConfig *omni.TalosConfig
if clusterMachine != nil {
clusterName, ok := clusterMachine.Metadata().Labels().Get(omni.LabelCluster)
if ok {
talosConfig, err = safe.ReaderGetByID[*omni.TalosConfig](ctx, r, clusterName)
if err != nil && !state.IsNotFoundError(err) {
return err
}
}
}
if machine.TypedSpec().Value.Connected {
ctrl.runner.StartTask(ctx, logger, machine.Metadata().ID(), snapshot.CollectTaskSpec{
Endpoint: machine.TypedSpec().Value.ManagementAddress,
TalosConfig: talosConfig,
MachineID: machine.Metadata().ID(),
}, ctrl.notifyCh)
}
return nil
}
func (ctrl *MachineStatusSnapshotController) reconcileTearingDown(ctx context.Context, r controller.QRuntime, logger *zap.Logger, machine *omni.Machine) error {
ctrl.runner.StopTask(logger, machine.Metadata().ID())
_, err := helpers.HandleInput[*omni.ClusterMachine](ctx, r, ctrl.Name(), machine)
if err != nil {
return err
}
md := omni.NewMachineStatusSnapshot(resources.DefaultNamespace, machine.Metadata().ID()).Metadata()
ready, err := r.Teardown(ctx, md)
if err != nil {
return err
}
if !ready {
return nil
}
if err = r.Destroy(ctx, md); err != nil {
return err
}
return r.RemoveFinalizer(ctx, machine.Metadata(), ctrl.Name())
}
func (ctrl *MachineStatusSnapshotController) reconcileSnapshot(ctx context.Context, r controller.QRuntime, snapshot *omni.MachineStatusSnapshot) error {
machine, err := safe.ReaderGetByID[*omni.Machine](ctx, r, snapshot.Metadata().ID())
if err != nil {
if state.IsNotFoundError(err) {
return nil
}
return err
}
if machine.Metadata().Phase() == resource.PhaseTearingDown {
return nil
}
if err := safe.WriterModify(ctx, r, omni.NewMachineStatusSnapshot(resources.DefaultNamespace, snapshot.Metadata().ID()), func(m *omni.MachineStatusSnapshot) error {
m.TypedSpec().Value = snapshot.TypedSpec().Value
return nil
}); err != nil && !state.IsPhaseConflictError(err) {
return fmt.Errorf("error modifying resource: %w", err)
}
return nil
}

View File

@ -0,0 +1,106 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
package omni_test
import (
"context"
"testing"
"time"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/talos/pkg/machinery/api/machine"
"github.com/siderolabs/talos/pkg/machinery/resources/runtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
omnictrl "github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni"
)
type MachineStatusSnapshotControllerSuite struct {
OmniSuite
}
func (suite *MachineStatusSnapshotControllerSuite) TestReconcile() {
ctx, cancel := context.WithTimeout(suite.ctx, time.Second*5)
defer cancel()
require := suite.Require()
suite.startRuntime()
siderolinkEventsCh := make(chan *omni.MachineStatusSnapshot)
suite.Require().NoError(suite.runtime.RegisterQController(omnictrl.NewMachineStatusSnapshotController(siderolinkEventsCh)))
m := omni.NewMachine(resources.DefaultNamespace, "1")
m.TypedSpec().Value.Connected = true
m.TypedSpec().Value.ManagementAddress = suite.socketConnectionString
require.NoError(suite.state.Create(suite.ctx, m))
snapshot := omni.NewMachineStatusSnapshot(resources.DefaultNamespace, m.Metadata().ID())
snapshot.TypedSpec().Value.MachineStatus = &machine.MachineStatusEvent{
Stage: machine.MachineStatusEvent_BOOTING,
Status: &machine.MachineStatusEvent_MachineStatus{
Ready: false,
UnmetConditions: []*machine.MachineStatusEvent_MachineStatus_UnmetCondition{
{
Name: "name",
Reason: "nope",
},
},
},
}
// handle siderolink
suite.Require().True(channel.SendWithContext(ctx, siderolinkEventsCh, snapshot))
rtestutils.AssertResources(ctx, suite.T(), suite.state, []string{m.Metadata().ID()}, func(r *omni.MachineStatusSnapshot, assertion *assert.Assertions) {
assertion.EqualValues(snapshot.TypedSpec().Value, r.TypedSpec().Value)
})
snapshot = omni.NewMachineStatusSnapshot(resources.DefaultNamespace, "not exists")
// ignore events for machines that do not exist
suite.Require().True(channel.SendWithContext(ctx, siderolinkEventsCh, snapshot))
rtestutils.AssertNoResource[*omni.MachineStatusSnapshot](ctx, suite.T(), suite.state, snapshot.Metadata().ID())
ms := runtime.NewMachineStatus()
ms.TypedSpec().Stage = runtime.MachineStageInstalling
ms.TypedSpec().Status = runtime.MachineStatusStatus{
Ready: false,
UnmetConditions: []runtime.UnmetCondition{
{
Name: "you",
Reason: "failed",
},
},
}
suite.Require().NoError(suite.machineService.state.Create(ctx, ms))
rtestutils.AssertResources(ctx, suite.T(), suite.state, []string{m.Metadata().ID()}, func(r *omni.MachineStatusSnapshot, assertion *assert.Assertions) {
assertion.EqualValues(machine.MachineStatusEvent_INSTALLING, r.TypedSpec().Value.MachineStatus.Stage)
assertion.EqualValues(false, r.TypedSpec().Value.MachineStatus.Status.Ready)
assertion.EqualValues("you", r.TypedSpec().Value.MachineStatus.Status.UnmetConditions[0].Name)
assertion.EqualValues("failed", r.TypedSpec().Value.MachineStatus.Status.UnmetConditions[0].Reason)
})
rtestutils.DestroyAll[*omni.Machine](ctx, suite.T(), suite.state)
rtestutils.AssertNoResource[*omni.MachineStatusSnapshot](ctx, suite.T(), suite.state, m.Metadata().ID())
}
func TestMachineStatusSnapshotControllerSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(MachineStatusSnapshotControllerSuite))
}

View File

@ -390,5 +390,7 @@ func (suite *MachineStatusSuite) TestMachineSchematic() {
}
func TestMachineStatusSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(MachineStatusSuite))
}

View File

@ -62,5 +62,7 @@ func (suite *MachineSuite) TestReconcile() {
}
func TestMachineSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(MachineSuite))
}

View File

@ -85,5 +85,7 @@ url: tcp://[fdae:41e4:649b:9303::1]:8092
}
func TestMaintenanceConfigPatchSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(MaintenanceConfigPatchSuite))
}

View File

@ -347,7 +347,7 @@ func (suite *OmniSuite) newServer(suffix string, opts ...grpc.ServerOption) (*ma
}
func (suite *OmniSuite) SetupTest() {
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 60*time.Second)
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 20*time.Second)
suite.stateBuilder = dynamicStateBuilder{m: map[resource.Namespace]state.CoreState{}}

View File

@ -69,5 +69,7 @@ func (suite *RedactedClusterMachineConfigSuite) generateConfig() []byte {
}
func TestRedactedClusterMachineConfigSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(RedactedClusterMachineConfigSuite))
}

View File

@ -232,5 +232,7 @@ func (suite *SchematicConfigurationSuite) TestReconcile() {
}
func TestSchematicConfigurationSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(SchematicConfigurationSuite))
}

View File

@ -154,5 +154,7 @@ func (suite *ClusterSecretsSuite) TestSecretsFromBackup() {
}
func TestClusterSecretsSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ClusterSecretsSuite))
}

View File

@ -266,5 +266,7 @@ func (suite *TalosExtensionsSuite) TestReconcile() {
}
func TestTalosExtensionsSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(TalosExtensionsSuite))
}

View File

@ -312,5 +312,7 @@ func (suite *TalosUpgradeStatusSuite) TestReconcileLocked() {
}
func TestTalosUpgradeStatusSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(TalosUpgradeStatusSuite))
}

View File

@ -96,5 +96,7 @@ func (suite *TalosConfigSuite) TestReconcile() {
}
func TestTalosConfigSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(TalosConfigSuite))
}

View File

@ -156,6 +156,10 @@ func NewManager(state state.State, logger *zap.Logger) *Manager {
callback: generateAllMaintenanceConfigs,
name: "generateAllMaintenanceConfigs",
},
{
callback: setMachineStatusSnapshotOwner,
name: "setMachineStatusSnapshotOwner",
},
},
}
}

View File

@ -1452,6 +1452,51 @@ func (suite *MigrationSuite) TestGenerateAllMaintenanceConfigs() {
suite.Require().Equal(oldVer, config.Metadata().Version())
}
func (suite *MigrationSuite) TestSetMachineStatusSnapshotOwner() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
items := []*omni.MachineStatusSnapshot{
omni.NewMachineStatusSnapshot(resources.DefaultNamespace, "test1"),
omni.NewMachineStatusSnapshot(resources.DefaultNamespace, "test2"),
omni.NewMachineStatusSnapshot(resources.DefaultNamespace, "test3"),
}
for _, item := range items[:2] {
suite.Require().NoError(suite.state.Create(ctx, item))
}
for _, item := range items[2:] {
suite.Require().NoError(suite.state.Create(
ctx,
item,
state.WithCreateOwner(omnictrl.NewMachineStatusSnapshotController(nil).Name())),
)
}
// test migration in isolation
suite.Require().NoError(suite.manager.Run(ctx, migration.WithFilter(func(name string) bool {
return name == "setMachineStatusSnapshotOwner"
})))
check := func(item *omni.MachineStatusSnapshot, expectedVersion int) {
result, err := safe.StateGet[*omni.MachineStatusSnapshot](ctx, suite.state, item.Metadata())
suite.Require().NoError(err)
suite.Require().Equal(omnictrl.NewMachineStatusSnapshotController(nil).Name(), result.Metadata().Owner())
suite.Require().EqualValues(result.Metadata().Version().Value(), expectedVersion)
}
for _, item := range items[:2] {
check(item, 2)
}
for _, item := range items[2:] {
check(item, 1)
}
}
func TestMigrationSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(MigrationSuite))
}

View File

@ -1211,3 +1211,30 @@ func generateAllMaintenanceConfigs(ctx context.Context, st state.State, _ *zap.L
return reconcileConfigInputs(ctx, st, item, true)
})
}
// setMachineStatusSnapshotOwner reconciles maintenance configs for all machines and update the inputs to avoid triggering config updates for each machine.
func setMachineStatusSnapshotOwner(ctx context.Context, st state.State, logger *zap.Logger) error {
list, err := safe.StateListAll[*omni.MachineStatusSnapshot](ctx, st)
if err != nil {
return err
}
for iter := list.Iterator(); iter.Next(); {
item := iter.Value()
if item.Metadata().Owner() != "" {
continue
}
logger.Info("updating machine status snapshot with empty owner", zap.String("id", item.Metadata().String()))
_, err = safe.StateUpdateWithConflicts(ctx, st, item.Metadata(), func(res *omni.MachineStatusSnapshot) error {
return res.Metadata().SetOwner(omnictrl.NewMachineStatusSnapshotController(nil).Name())
}, state.WithExpectedPhaseAny(), state.WithUpdateOwner(item.Metadata().Owner()))
if err != nil {
return err
}
}
return nil
}

View File

@ -74,8 +74,9 @@ type Runtime struct {
//
//nolint:maintidx
func New(talosClientFactory *talos.ClientFactory, dnsService *dns.Service, workloadProxyServiceRegistry *workloadproxy.ServiceRegistry,
resourceLogger *resourcelogger.Logger, imageFactoryClient *imagefactory.Client, linkCounterDeltaCh <-chan siderolink.LinkCounterDeltas, resourceState state.State,
virtualState *virtual.State, metricsRegistry prometheus.Registerer, discoveryClient omnictrl.DiscoveryClient, logger *zap.Logger,
resourceLogger *resourcelogger.Logger, imageFactoryClient *imagefactory.Client, linkCounterDeltaCh <-chan siderolink.LinkCounterDeltas,
siderolinkEventsCh <-chan *omni.MachineStatusSnapshot, resourceState state.State, virtualState *virtual.State, metricsRegistry prometheus.Registerer,
discoveryClient omnictrl.DiscoveryClient, logger *zap.Logger,
) (*Runtime, error) {
var opts []options.Option
@ -220,6 +221,7 @@ func New(talosClientFactory *talos.ClientFactory, dnsService *dns.Service, workl
omnictrl.NewTalosConfigController(constants.CertificateValidityTime),
omnictrl.NewTalosExtensionsController(imageFactoryClient),
omnictrl.NewTalosUpgradeStatusController(),
omnictrl.NewMachineStatusSnapshotController(siderolinkEventsCh),
}
if config.Config.Auth.SAML.Enabled {

View File

@ -84,7 +84,7 @@ func (suite *OmniRuntimeSuite) SetupTest() {
workloadProxyServiceRegistry, err := workloadproxy.NewServiceRegistry(resourceState, logger)
suite.Require().NoError(err)
suite.runtime, err = omniruntime.New(clientFactory, dnsService, workloadProxyServiceRegistry, nil, nil, nil,
suite.runtime, err = omniruntime.New(clientFactory, dnsService, workloadProxyServiceRegistry, nil, nil, nil, nil,
resourceState, nil, prometheus.NewRegistry(), discoveryServiceClient, logger)
suite.Require().NoError(err)

View File

@ -118,5 +118,7 @@ func (suite *ClientsSuite) TearDownTest() {
}
func TestClients(t *testing.T) {
t.Parallel()
suite.Run(t, &ClientsSuite{})
}

View File

@ -99,6 +99,7 @@ type Server struct {
imageFactoryClient *imagefactory.Client
linkCounterDeltaCh chan<- siderolink.LinkCounterDeltas
siderolinkEventsCh chan<- *omnires.MachineStatusSnapshot
proxyServer Proxy
bindAddress string
@ -116,6 +117,7 @@ func NewServer(
workloadProxyServiceRegistry *workloadproxy.ServiceRegistry,
imageFactoryClient *imagefactory.Client,
linkCounterDeltaCh chan<- siderolink.LinkCounterDeltas,
siderolinkEventsCh chan<- *omnires.MachineStatusSnapshot,
omniRuntime *omni.Runtime,
talosRuntime *talos.Runtime,
logHandler *siderolink.LogHandler,
@ -133,6 +135,7 @@ func NewServer(
workloadProxyServiceRegistry: workloadProxyServiceRegistry,
imageFactoryClient: imageFactoryClient,
linkCounterDeltaCh: linkCounterDeltaCh,
siderolinkEventsCh: siderolinkEventsCh,
proxyServer: proxyServer,
bindAddress: bindAddress,
metricsBindAddress: metricsBindAddress,
@ -463,7 +466,7 @@ func (s *Server) runMachineAPI(ctx context.Context) error {
}
omniState := s.omniRuntime.State()
machineStatusHandler := machinestatus.NewHandler(omniState, s.logger)
machineStatusHandler := machinestatus.NewHandler(omniState, s.logger, s.siderolinkEventsCh)
slink, err := siderolink.NewManager(
ctx,
@ -505,10 +508,6 @@ func (s *Server) runMachineAPI(ctx context.Context) error {
slink.Register(server)
kms.Register(server)
eg.Go(func() error {
return machineStatusHandler.Start(groupCtx)
})
eg.Go(func() error {
return slink.Run(groupCtx,
"",

View File

@ -171,5 +171,7 @@ func (suite *SignatureTestSuite) TestValidSignature() {
}
func TestSignatureTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(SignatureTestSuite))
}

View File

@ -9,12 +9,11 @@ import (
"context"
"fmt"
"net/netip"
"sync"
"time"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/siderolink/pkg/events"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
"go.uber.org/zap"
@ -24,110 +23,22 @@ import (
"github.com/siderolabs/omni/internal/pkg/auth/actor"
)
// Clock is here to be able to mock time.Now() in the tests.
type Clock interface {
Now() time.Time
}
type clock struct{}
func (c *clock) Now() time.Time {
return time.Now()
}
type eventInfo struct {
timestamp time.Time
fromSideroLink bool
}
// Handler is a machine status handler.
type Handler struct {
logger *zap.Logger
state state.State
Clock Clock
machineToLastEventInfo map[resource.ID]eventInfo
lock sync.Mutex
logger *zap.Logger
state state.State
notifyCh chan<- *omni.MachineStatusSnapshot
}
// NewHandler creates a new machine status handler.
func NewHandler(state state.State, logger *zap.Logger) *Handler {
func NewHandler(state state.State, logger *zap.Logger, notifyCh chan<- *omni.MachineStatusSnapshot) *Handler {
return &Handler{
state: state,
logger: logger,
machineToLastEventInfo: make(map[resource.ID]eventInfo),
Clock: &clock{},
state: state,
logger: logger,
notifyCh: notifyCh,
}
}
// Start starts the machine status handler.
func (handler *Handler) Start(ctx context.Context) error {
eventCh := make(chan state.Event)
machineKind := omni.NewMachine(resources.DefaultNamespace, "").Metadata()
machineStatusKind := omni.NewMachineStatus(resources.DefaultNamespace, "").Metadata()
if err := handler.state.WatchKind(ctx, machineStatusKind, eventCh); err != nil {
return fmt.Errorf("error watching machine statuses: %w", err)
}
if err := handler.state.WatchKind(ctx, machineKind, eventCh); err != nil {
return fmt.Errorf("error watching machines: %w", err)
}
for {
var event state.Event
select {
case <-ctx.Done():
return nil
case event = <-eventCh:
switch event.Type {
case state.Errored:
return fmt.Errorf("error watching resources: %w", event.Error)
case state.Bootstrapped: // ignore
case state.Created, state.Updated:
if machineStatus, ok := event.Resource.(*omni.MachineStatus); ok {
handler.handleMachineStatusResourceUpdate(ctx, machineStatus)
}
case state.Destroyed:
if machine, ok := event.Resource.(*omni.Machine); ok {
handler.handleMachineDestroy(ctx, machine)
}
}
}
}
}
func (handler *Handler) handleMachineStatusResourceUpdate(ctx context.Context, machineStatus *omni.MachineStatus) {
machineStatusSpec := machineStatus.TypedSpec().Value
talosMachineStatus := machineStatusSpec.GetTalosMachineStatus()
if talosMachineStatus == nil {
return
}
if err := handler.handleMachineStatusEvent(ctx, talosMachineStatus.Status, machineStatus.Metadata().ID(),
handler.Clock.Now(), false); err != nil {
handler.logger.Error("error handling machine status machineStatus", zap.Error(err))
}
}
func (handler *Handler) handleMachineDestroy(ctx context.Context, machine *omni.Machine) {
id := machine.Metadata().ID()
if err := handler.state.Destroy(ctx, resource.NewMetadata(resources.DefaultNamespace, omni.MachineStatusSnapshotType, id, resource.VersionUndefined)); err != nil {
if !state.IsNotFoundError(err) {
handler.logger.Error("error destroying machine status snapshot", zap.Error(err))
}
}
handler.lock.Lock()
delete(handler.machineToLastEventInfo, id)
handler.lock.Unlock()
}
// HandleEvent is called on each event coming from the Talos nodes.
func (handler *Handler) HandleEvent(ctx context.Context, event events.Event) error {
ctx = actor.MarkContextAsInternalActor(ctx)
@ -154,80 +65,25 @@ func (handler *Handler) HandleEvent(ctx context.Context, event events.Event) err
switch event := event.Payload.(type) {
case *machineapi.MachineStatusEvent:
return handler.handleMachineStatusEvent(ctx, event, machines.Get(0).Metadata().ID(), handler.Clock.Now(), true)
return handler.handleMachineStatusEvent(ctx, event, machines.Get(0).Metadata().ID())
default: // nothing, we ignore other events
}
return nil
}
func (handler *Handler) handleMachineStatusEvent(ctx context.Context, event *machineapi.MachineStatusEvent, machineID resource.ID, timestamp time.Time, isFromSiderolink bool) error {
accept := handler.acceptEvent(machineID, timestamp, isFromSiderolink)
func (handler *Handler) handleMachineStatusEvent(ctx context.Context, event *machineapi.MachineStatusEvent, machineID resource.ID) error {
handler.logger.Info("got machine status event",
zap.String("machine", machineID),
zap.String("stage", event.Stage.String()),
zap.Any("status", event.Status),
zap.Time("timestamp", timestamp),
zap.Bool("from_siderolink", isFromSiderolink),
zap.Bool("accept", accept))
if !accept {
return nil
}
)
snapshot := omni.NewMachineStatusSnapshot(resources.DefaultNamespace, machineID)
if _, err := safe.StateUpdateWithConflicts(
ctx,
handler.state,
snapshot.Metadata(),
func(status *omni.MachineStatusSnapshot) error {
status.TypedSpec().Value.MachineStatus = event
snapshot.TypedSpec().Value.MachineStatus = event
return nil
},
); err != nil {
if !state.IsNotFoundError(err) {
return err
}
snapshot.TypedSpec().Value.MachineStatus = event
return handler.state.Create(ctx, snapshot)
}
channel.SendWithContext(ctx, handler.notifyCh, snapshot)
return nil
}
func (handler *Handler) acceptEvent(machineID resource.ID, timestamp time.Time, isFromSiderolink bool) bool {
handler.lock.Lock()
defer handler.lock.Unlock()
currentEventInfo := eventInfo{
timestamp: timestamp,
fromSideroLink: isFromSiderolink,
}
lastEventInfo, ok := handler.machineToLastEventInfo[machineID]
accept := false
switch {
// The last event is zero, i.e., current event is the first event
case !ok:
accept = true
// Both the source of the current event and the last one are from SideroLink
case lastEventInfo.fromSideroLink && currentEventInfo.fromSideroLink:
accept = true
// if the current event is newer than the last one, we accept the current one
case currentEventInfo.timestamp.After(lastEventInfo.timestamp):
accept = true
}
if accept {
handler.machineToLastEventInfo[machineID] = currentEventInfo
}
return accept
}

View File

@ -1,168 +0,0 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
package machinestatus_test
import (
"context"
"testing"
"time"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/impl/inmem"
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/cosi-project/runtime/pkg/state/registry"
"github.com/jonboulle/clockwork"
"github.com/rs/xid"
"github.com/siderolabs/siderolink/pkg/events"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/internal/pkg/machinestatus"
)
const (
machineID = "machine-status-handler-test"
machineIP = "127.0.0.42"
machinePort = "1234"
machineIPPort = machineIP + ":" + machinePort
)
func TestHandler(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
st := state.WrapCore(namespaced.NewState(inmem.Build))
resourceRegistry := registry.NewResourceRegistry(st)
require.NoError(t, resourceRegistry.Register(ctx, omni.NewMachineStatus(resources.DefaultNamespace, "")))
require.NoError(t, resourceRegistry.Register(ctx, omni.NewMachine(resources.DefaultNamespace, "")))
handler := machinestatus.NewHandler(st, zaptest.NewLogger(t))
// send an event without corresponding machine - assert that it is ignored
require.ErrorContains(t, handler.HandleEvent(ctx, events.Event{
Payload: &machineapi.MachineStatusEvent{
Stage: machineapi.MachineStatusEvent_BOOTING,
},
ID: xid.NewWithTime(time.Now()).String(),
Node: machineIPPort,
}), "no machines found for address "+machineIP)
rtestutils.AssertLength[*omni.MachineStatusSnapshot](ctx, t, st, 0)
// create a machine
machine := omni.NewMachine(resources.DefaultNamespace, machineID)
machine.Metadata().Labels().Set(omni.MachineAddressLabel, machineIP)
require.NoError(t, st.Create(ctx, machine))
var eg errgroup.Group
eg.Go(func() error {
return handler.Start(ctx)
})
// send an event over siderolink & assert that it is stored
timestamp := handler.Clock.Now()
sendEvent(ctx, t, handler, machineapi.MachineStatusEvent_BOOTING, timestamp)
assertStage(ctx, t, st, machineapi.MachineStatusEvent_BOOTING)
// send another event over siderolink in the past - it should be stored despite being in the past, as the previous status was also received over siderolink
sendEvent(ctx, t, handler, machineapi.MachineStatusEvent_INSTALLING, timestamp)
assertStage(ctx, t, st, machineapi.MachineStatusEvent_INSTALLING)
// send a machine status in the past - it should be ignored
sendMachineStatus(ctx, t, handler, st, machineapi.MachineStatusEvent_BOOTING, timestamp.Add(-time.Second))
time.Sleep(100 * time.Millisecond)
assertStage(ctx, t, st, machineapi.MachineStatusEvent_INSTALLING)
// send a machine status in the future - it should be stored
sendMachineStatus(ctx, t, handler, st, machineapi.MachineStatusEvent_REBOOTING, timestamp.Add(time.Second))
assertStage(ctx, t, st, machineapi.MachineStatusEvent_REBOOTING)
// send an event in the past - it should be ignored
sendEvent(ctx, t, handler, machineapi.MachineStatusEvent_INSTALLING, timestamp.Add(-time.Second))
assertStage(ctx, t, st, machineapi.MachineStatusEvent_REBOOTING)
// send an event in the future - it should be stored
sendEvent(ctx, t, handler, machineapi.MachineStatusEvent_MAINTENANCE, timestamp.Add(time.Second*2))
assertStage(ctx, t, st, machineapi.MachineStatusEvent_MAINTENANCE)
// destroy and recreate the machine
require.NoError(t, st.Destroy(ctx, machine.Metadata()))
require.NoError(t, st.Create(ctx, machine))
time.Sleep(100 * time.Millisecond)
// send a machine status in the past - it should be stored despite being in the past, as the state should be cleared on machine destroy
sendMachineStatus(ctx, t, handler, st, machineapi.MachineStatusEvent_RESETTING, timestamp.Add(-time.Second))
assertStage(ctx, t, st, machineapi.MachineStatusEvent_RESETTING)
cancel()
require.NoError(t, eg.Wait())
}
func sendMachineStatus(ctx context.Context, t *testing.T, handler *machinestatus.Handler, st state.State, stage machineapi.MachineStatusEvent_MachineStage, timestamp time.Time) {
handler.Clock = clockwork.NewFakeClockAt(timestamp)
machineStatus := omni.NewMachineStatus(resources.DefaultNamespace, machineID)
status := &specs.MachineStatusSpec_TalosMachineStatus{
Status: &machineapi.MachineStatusEvent{
Stage: stage,
},
UpdatedAt: timestamppb.New(timestamp),
}
_, err := safe.StateUpdateWithConflicts[*omni.MachineStatus](ctx, st, machineStatus.Metadata(), func(r *omni.MachineStatus) error {
r.TypedSpec().Value.TalosMachineStatus = status
return nil
})
if err == nil {
return
}
if !state.IsNotFoundError(err) {
require.NoError(t, err)
}
machineStatus.TypedSpec().Value.TalosMachineStatus = status
require.NoError(t, st.Create(ctx, machineStatus))
}
func sendEvent(ctx context.Context, t *testing.T, handler *machinestatus.Handler, stage machineapi.MachineStatusEvent_MachineStage, timestamp time.Time) {
handler.Clock = clockwork.NewFakeClockAt(timestamp)
err := handler.HandleEvent(ctx, events.Event{
Payload: &machineapi.MachineStatusEvent{
Stage: stage,
},
ID: xid.NewWithTime(timestamp).String(),
Node: machineIPPort,
})
require.NoError(t, err)
}
func assertStage(ctx context.Context, t *testing.T, st state.State, stage machineapi.MachineStatusEvent_MachineStage) {
rtestutils.AssertResource[*omni.MachineStatusSnapshot](ctx, t, st, machineID, func(r *omni.MachineStatusSnapshot, assertion *assert.Assertions) {
assertion.Equal(stage, r.TypedSpec().Value.GetMachineStatus().GetStage())
})
}

View File

@ -94,5 +94,7 @@ func sha256Hex(data []byte) string {
}
func TestLogStorageSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(LogStorageSuite))
}

View File

@ -33,6 +33,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/omni/resources/siderolink"
"github.com/siderolabs/omni/internal/pkg/config"
"github.com/siderolabs/omni/internal/pkg/errgroup"
@ -110,15 +111,7 @@ func (suite *SiderolinkSuite) SetupTest() {
var err error
machineStatusHandler := machinestatus.NewHandler(suite.state, zaptest.NewLogger(suite.T()))
suite.wg.Add(1)
go func() {
defer suite.wg.Done()
suite.Require().NoError(machineStatusHandler.Start(suite.ctx))
}()
machineStatusHandler := machinestatus.NewHandler(suite.state, zaptest.NewLogger(suite.T()), make(chan *omni.MachineStatusSnapshot))
suite.manager, err = sideromanager.NewManager(suite.ctx, suite.state, &fakeWireguardHandler{}, params, zaptest.NewLogger(suite.T()), nil, machineStatusHandler, nil)
suite.Require().NoError(err)
@ -388,6 +381,8 @@ func (suite *SiderolinkSuite) TearDownTest() {
}
func TestSiderolinkSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(SiderolinkSuite))
}