/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ package system import ( "context" "fmt" "log" "sync" "time" "github.com/pkg/errors" "github.com/talos-systems/talos/internal/app/machined/pkg/system/conditions" "github.com/talos-systems/talos/internal/app/machined/pkg/system/events" "github.com/talos-systems/talos/internal/app/machined/pkg/system/health" "github.com/talos-systems/talos/internal/app/machined/pkg/system/runner" "github.com/talos-systems/talos/internal/app/machined/proto" "github.com/talos-systems/talos/pkg/userdata" ) // WaitConditionCheckInterval is time between checking for wait condition // description changes. // // Exposed here for unit-tests to override var WaitConditionCheckInterval = time.Second // ServiceRunner wraps the state of the service (running, stopped, ...) type ServiceRunner struct { mu sync.Mutex userData *userdata.UserData service Service id string state events.ServiceState events events.ServiceEvents healthState health.State stateSubscribers map[StateEvent][]chan<- struct{} ctxMu sync.Mutex ctx context.Context ctxCancel context.CancelFunc } // NewServiceRunner creates new ServiceRunner around Service instance func NewServiceRunner(service Service, userData *userdata.UserData) *ServiceRunner { ctx, ctxCancel := context.WithCancel(context.Background()) return &ServiceRunner{ service: service, userData: userData, id: service.ID(userData), state: events.StateInitialized, stateSubscribers: make(map[StateEvent][]chan<- struct{}), ctx: ctx, ctxCancel: ctxCancel, } } // UpdateState implements events.Recorder func (svcrunner *ServiceRunner) UpdateState(newstate events.ServiceState, message string, args ...interface{}) { svcrunner.mu.Lock() event := events.ServiceEvent{ Message: fmt.Sprintf(message, args...), State: newstate, Timestamp: time.Now(), } svcrunner.state = newstate svcrunner.events.Push(event) log.Printf("service[%s](%s): %s", svcrunner.id, svcrunner.state, event.Message) isUp := svcrunner.inStateLocked(StateEventUp) isDown := svcrunner.inStateLocked(StateEventDown) svcrunner.mu.Unlock() if isUp { svcrunner.notifyEvent(StateEventUp) } if isDown { svcrunner.notifyEvent(StateEventDown) } } func (svcrunner *ServiceRunner) healthUpdate(change health.StateChange) { svcrunner.mu.Lock() // service not running, suppress event if svcrunner.state != events.StateRunning { svcrunner.mu.Unlock() return } var message string if *change.New.Healthy { message = "Health check successful" } else { message = fmt.Sprintf("Health check failed: %s", change.New.LastMessage) } event := events.ServiceEvent{ Message: message, State: svcrunner.state, Timestamp: time.Now(), } svcrunner.events.Push(event) log.Printf("service[%s](%s): %s", svcrunner.id, svcrunner.state, event.Message) isUp := svcrunner.inStateLocked(StateEventUp) svcrunner.mu.Unlock() if isUp { svcrunner.notifyEvent(StateEventUp) } } // GetEventHistory returns history of events for this service func (svcrunner *ServiceRunner) GetEventHistory(count int) []events.ServiceEvent { svcrunner.mu.Lock() defer svcrunner.mu.Unlock() return svcrunner.events.Get(count) } func (svcrunner *ServiceRunner) waitFor(ctx context.Context, condition conditions.Condition) error { description := condition.String() svcrunner.UpdateState(events.StateWaiting, "Waiting for %s", description) errCh := make(chan error) go func() { errCh <- condition.Wait(ctx) }() ticker := time.NewTicker(WaitConditionCheckInterval) defer ticker.Stop() // update state if condition description changes (some conditions are satisfied) for { select { case err := <-errCh: return err case <-ticker.C: newDescription := condition.String() if newDescription != description && newDescription != "" { description = newDescription svcrunner.UpdateState(events.StateWaiting, "Waiting for %s", description) } } } } // Start initializes the service and runs it // // Start should be run in a goroutine. // nolint: gocyclo func (svcrunner *ServiceRunner) Start() { defer func() { // reset context for the next run svcrunner.ctxMu.Lock() svcrunner.ctx, svcrunner.ctxCancel = context.WithCancel(context.Background()) svcrunner.ctxMu.Unlock() }() svcrunner.ctxMu.Lock() ctx := svcrunner.ctx svcrunner.ctxMu.Unlock() condition := svcrunner.service.Condition(svcrunner.userData) dependencies := svcrunner.service.DependsOn(svcrunner.userData) if len(dependencies) > 0 { serviceConditions := make([]conditions.Condition, len(dependencies)) for i := range dependencies { serviceConditions[i] = WaitForService(StateEventUp, dependencies[i]) } serviceDependencies := conditions.WaitForAll(serviceConditions...) if condition != nil { condition = conditions.WaitForAll(serviceDependencies, condition) } else { condition = serviceDependencies } } if condition != nil { if err := svcrunner.waitFor(ctx, condition); err != nil { svcrunner.UpdateState(events.StateFailed, "Condition failed: %v", err) return } } svcrunner.UpdateState(events.StatePreparing, "Running pre state") if err := svcrunner.service.PreFunc(ctx, svcrunner.userData); err != nil { svcrunner.UpdateState(events.StateFailed, "Failed to run pre stage: %v", err) return } svcrunner.UpdateState(events.StatePreparing, "Creating service runner") runnr, err := svcrunner.service.Runner(svcrunner.userData) if err != nil { svcrunner.UpdateState(events.StateFailed, "Failed to create runner: %v", err) return } if runnr == nil { svcrunner.UpdateState(events.StateSkipped, "Service skipped") return } if err := svcrunner.run(ctx, runnr); err != nil { svcrunner.UpdateState(events.StateFailed, "Failed running service: %v", err) } else { svcrunner.UpdateState(events.StateFinished, "Service finished successfully") } if err := svcrunner.service.PostFunc(svcrunner.userData); err != nil { svcrunner.UpdateState(events.StateFailed, "Failed to run post stage: %v", err) return } } // nolint: gocyclo func (svcrunner *ServiceRunner) run(ctx context.Context, runnr runner.Runner) error { if runnr == nil { // special case - run nothing (TODO: we should handle it better, e.g. in PreFunc) return nil } if err := runnr.Open(ctx); err != nil { return errors.Wrap(err, "error opening runner") } // nolint: errcheck defer runnr.Close() errCh := make(chan error) go func() { errCh <- runnr.Run(svcrunner.UpdateState) }() if healthSvc, ok := svcrunner.service.(HealthcheckedService); ok { var healthWg sync.WaitGroup defer healthWg.Wait() healthWg.Add(1) go func() { defer healthWg.Done() // nolint: errcheck health.Run(ctx, healthSvc.HealthSettings(svcrunner.userData), &svcrunner.healthState, healthSvc.HealthFunc(svcrunner.userData)) }() notifyCh := make(chan health.StateChange, 2) svcrunner.healthState.Subscribe(notifyCh) defer svcrunner.healthState.Unsubscribe(notifyCh) healthWg.Add(1) go func() { defer healthWg.Done() for { select { case <-ctx.Done(): return case change := <-notifyCh: svcrunner.healthUpdate(change) } } }() } // when service run finishes, cancel context, this is important if service // terminates on its own before being terminated by Stop() defer svcrunner.ctxCancel() select { case <-ctx.Done(): err := runnr.Stop() <-errCh if err != nil { return errors.Wrap(err, "error stopping service") } case err := <-errCh: if err != nil { return errors.Wrap(err, "error running service") } } return nil } // Shutdown initiates shutdown of the service runner // // Shutdown completes when Start() returns func (svcrunner *ServiceRunner) Shutdown() { svcrunner.ctxMu.Lock() defer svcrunner.ctxMu.Unlock() svcrunner.ctxCancel() } // AsProto returns protobuf struct with the state of the service runner func (svcrunner *ServiceRunner) AsProto() *proto.ServiceInfo { svcrunner.mu.Lock() defer svcrunner.mu.Unlock() return &proto.ServiceInfo{ Id: svcrunner.id, State: svcrunner.state.String(), Events: svcrunner.events.AsProto(events.MaxEventsToKeep), Health: svcrunner.healthState.AsProto(), } } // Subscribe to a specific event for this service. // // Channel `ch` should be buffered or it should have listener attached to it, // as event might be delivered before Subscribe() returns. func (svcrunner *ServiceRunner) Subscribe(event StateEvent, ch chan<- struct{}) { svcrunner.mu.Lock() if svcrunner.inStateLocked(event) { svcrunner.mu.Unlock() // svcrunner is already in expected state, notify immediately select { case ch <- struct{}{}: default: } return } svcrunner.stateSubscribers[event] = append(svcrunner.stateSubscribers[event], ch) svcrunner.mu.Unlock() } // Unsubscribe cancels subscription established with Subscribe. func (svcrunner *ServiceRunner) Unsubscribe(event StateEvent, ch chan<- struct{}) { svcrunner.mu.Lock() defer svcrunner.mu.Unlock() channels := svcrunner.stateSubscribers[event] for i := 0; i < len(channels); { if channels[i] == ch { channels[i], channels[len(channels)-1] = channels[len(channels)-1], nil channels = channels[:len(channels)-1] } else { i++ } } svcrunner.stateSubscribers[event] = channels } func (svcrunner *ServiceRunner) notifyEvent(event StateEvent) { svcrunner.mu.Lock() channels := append([]chan<- struct{}(nil), svcrunner.stateSubscribers[event]...) svcrunner.mu.Unlock() for _, ch := range channels { select { case ch <- struct{}{}: default: } } } // nolint: gocyclo func (svcrunner *ServiceRunner) inStateLocked(event StateEvent) bool { switch event { case StateEventUp: // up when: // a) either skipped or already finished // b) or running and healthy (if supports health checks) switch svcrunner.state { case events.StateSkipped, events.StateFinished: return true case events.StateRunning: // check if service supports health checks _, supportsHealth := svcrunner.service.(HealthcheckedService) health := svcrunner.healthState.Get() return !supportsHealth || (health.Healthy != nil && *health.Healthy) default: return false } case StateEventDown: // down when in any of the terminal states switch svcrunner.state { case events.StateFailed, events.StateFinished, events.StateSkipped: return true default: return false } default: panic("unsupported event") } }