// This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. package system import ( "context" "fmt" "log" "sort" "sync" "time" "github.com/hashicorp/go-multierror" "github.com/siderolabs/gen/maps" "github.com/siderolabs/gen/slices" "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/pkg/conditions" ) // singleton the system services API interface. type singleton struct { runtime runtime.Runtime // State of running services by ID state map[string]*ServiceRunner // List of running services at the moment. // // Service might be in any state, but service ID in the map // implies ServiceRunner.Start() method is running at the momemnt runningMu sync.Mutex running map[string]struct{} mu sync.Mutex wg sync.WaitGroup terminating bool } var ( instance *singleton once sync.Once ) // Services returns the instance of the system services API. // //nolint:revive,golint func Services(runtime runtime.Runtime) *singleton { once.Do(func() { instance = &singleton{ runtime: runtime, state: make(map[string]*ServiceRunner), running: make(map[string]struct{}), } }) return instance } // Load adds service to the list of services managed by the runner. // // Load returns service IDs for each of the services. func (s *singleton) Load(services ...Service) []string { s.mu.Lock() defer s.mu.Unlock() if s.terminating { return nil } ids := make([]string, 0, len(services)) for _, service := range services { id := service.ID(s.runtime) ids = append(ids, id) if _, exists := s.state[id]; exists { // service already loaded, ignore continue } svcrunner := NewServiceRunner(service, s.runtime) s.state[id] = svcrunner } return ids } // Unload stops the service and removes it from the list of running services. // // It is not an error to unload a service which was already removed or stopped. func (s *singleton) Unload(ctx context.Context, serviceIDs ...string) error { s.mu.Lock() if s.terminating { s.mu.Unlock() return nil } servicesToRemove := []string{} for _, id := range serviceIDs { if _, exists := s.state[id]; exists { servicesToRemove = append(servicesToRemove, id) } } s.mu.Unlock() if err := s.Stop(ctx, servicesToRemove...); err != nil { return fmt.Errorf("error stopping services %v: %w", servicesToRemove, err) } s.mu.Lock() defer s.mu.Unlock() s.runningMu.Lock() defer s.runningMu.Unlock() for _, id := range servicesToRemove { delete(s.state, id) delete(s.running, id) // this fixes an edge case when defer() in Start() doesn't have time to remove stopped service from running } return nil } // Start will invoke the service's Pre, Condition, and Type funcs. If any // error occurs in the Pre or Condition invocations, it is up to the caller to // restart the service. func (s *singleton) Start(serviceIDs ...string) error { s.mu.Lock() defer s.mu.Unlock() if s.terminating { return nil } var multiErr *multierror.Error for _, id := range serviceIDs { svcrunner := s.state[id] if svcrunner == nil { multiErr = multierror.Append(multiErr, fmt.Errorf("service %q not defined", id)) } s.runningMu.Lock() _, running := s.running[id] if !running { s.running[id] = struct{}{} } s.runningMu.Unlock() if running { // service already running, skip continue } s.wg.Add(1) go func(id string, svcrunner *ServiceRunner) { defer func() { s.runningMu.Lock() delete(s.running, id) s.runningMu.Unlock() }() defer s.wg.Done() svcrunner.Start() }(id, svcrunner) } return multiErr.ErrorOrNil() } // StartAll starts all the services. func (s *singleton) StartAll() { s.mu.Lock() serviceIDs := maps.Keys(s.state) s.mu.Unlock() //nolint:errcheck s.Start(serviceIDs...) } // LoadAndStart combines Load and Start into single call. func (s *singleton) LoadAndStart(services ...Service) { err := s.Start(s.Load(services...)...) if err != nil { // should never happen panic(err) } } // Shutdown all the services. func (s *singleton) Shutdown(ctx context.Context) { s.mu.Lock() if s.terminating { s.mu.Unlock() return } s.terminating = true _ = s.stopServices(ctx, nil, true) //nolint:errcheck } // Stop will initiate a shutdown of the specified service. func (s *singleton) Stop(ctx context.Context, serviceIDs ...string) (err error) { if len(serviceIDs) == 0 { return } s.mu.Lock() if s.terminating { s.mu.Unlock() return nil } return s.stopServices(ctx, serviceIDs, false) } // StopWithRevDepenencies will initiate a shutdown of the specified services waiting for reverse dependencies to finish first. // // If reverse dependency is not stopped, this method might block waiting on it being stopped for up to 30 seconds. func (s *singleton) StopWithRevDepenencies(ctx context.Context, serviceIDs ...string) (err error) { if len(serviceIDs) == 0 { return } s.mu.Lock() if s.terminating { s.mu.Unlock() return nil } return s.stopServices(ctx, serviceIDs, true) } //nolint:gocyclo func (s *singleton) stopServices(ctx context.Context, services []string, waitForRevDependencies bool) error { servicesToStop := make(map[string]*ServiceRunner) if services == nil { for name, svcrunner := range s.state { servicesToStop[name] = svcrunner } } else { for _, name := range services { if _, ok := s.state[name]; !ok { continue } servicesToStop[name] = s.state[name] } } // build reverse dependencies, and expand the list of services to stop // with services which depend on the one being stopped reverseDependencies := make(map[string][]string) if waitForRevDependencies { // expand the list of services to stop with the list of services which depend // on the ones being stopped // the loop is run as long as more dependencies are added to the list for { expanded := false for name, svcrunner := range s.state { if _, scheduledToStop := servicesToStop[name]; scheduledToStop { continue } dependencies := svcrunner.service.DependsOn(s.runtime) shouldStopService := false for _, dependency := range dependencies { for scheduledService := range servicesToStop { if scheduledService == dependency { shouldStopService = true break } } if shouldStopService { break } } if shouldStopService { servicesToStop[name] = svcrunner expanded = true } } if !expanded { break } } // build a list of dependencies to wait for before stopping each of the services for name, svcrunner := range servicesToStop { for _, dependency := range svcrunner.service.DependsOn(s.runtime) { reverseDependencies[dependency] = append(reverseDependencies[dependency], name) } } } s.mu.Unlock() // shutdown all the services waiting for rev deps var shutdownWg sync.WaitGroup // wait max 30 seconds for reverse deps to shut down shutdownCtx, shutdownCtxCancel := context.WithTimeout(ctx, 30*time.Second) defer shutdownCtxCancel() stoppedConds := []conditions.Condition{} for name, svcrunner := range servicesToStop { shutdownWg.Add(1) stoppedConds = append(stoppedConds, WaitForService(StateEventDown, name)) go func(svcrunner *ServiceRunner, reverseDeps []string) { defer shutdownWg.Done() conds := slices.Map(reverseDeps, func(dep string) conditions.Condition { return WaitForService(StateEventDown, dep) }) allDeps := conditions.WaitForAll(conds...) if err := allDeps.Wait(shutdownCtx); err != nil { log.Printf("gave up on %s while stopping %q", allDeps, svcrunner.id) } svcrunner.Shutdown() }(svcrunner, reverseDependencies[name]) } shutdownWg.Wait() return conditions.WaitForAll(stoppedConds...).Wait(ctx) } // List returns snapshot of ServiceRunner instances. func (s *singleton) List() (result []*ServiceRunner) { s.mu.Lock() defer s.mu.Unlock() result = maps.Values(s.state) // TODO: results should be sorted properly with topological sort on dependencies // but, we don't have dependencies yet, so sort by service id for now to get stable order sort.Slice(result, func(i, j int) bool { return result[i].id < result[j].id }) return } // IsRunning checks service status (started/stopped). // // It doesn't check if service runner was started or not, just pure // check for service status in terms of start/stop. func (s *singleton) IsRunning(id string) (Service, bool, error) { s.mu.Lock() runner, exists := s.state[id] s.mu.Unlock() if !exists { return nil, false, fmt.Errorf("service %q not defined", id) } s.runningMu.Lock() _, running := s.running[id] s.runningMu.Unlock() return runner.service, running, nil } // APIStart processes service start request from the API. func (s *singleton) APIStart(ctx context.Context, id string) error { service, running, err := s.IsRunning(id) if err != nil { return err } if running { // already started, skip return nil } if svc, ok := service.(APIStartableService); ok && svc.APIStartAllowed(s.runtime) { return s.Start(id) } return fmt.Errorf("service %q doesn't support start operation via API", id) } // APIStop processes services stop request from the API. func (s *singleton) APIStop(ctx context.Context, id string) error { service, running, err := s.IsRunning(id) if err != nil { return err } if !running { // already stopped, skip return nil } if svc, ok := service.(APIStoppableService); ok && svc.APIStopAllowed(s.runtime) { return s.Stop(ctx, id) } return fmt.Errorf("service %q doesn't support stop operation via API", id) } // APIRestart processes services restart request from the API. func (s *singleton) APIRestart(ctx context.Context, id string) error { service, running, err := s.IsRunning(id) if err != nil { return err } if !running { // restart for not running service is equivalent to Start() return s.APIStart(ctx, id) } if svc, ok := service.(APIRestartableService); ok && svc.APIRestartAllowed(s.runtime) { if err := s.Stop(ctx, id); err != nil { return err } return s.Start(id) } return fmt.Errorf("service %q doesn't support restart operation via API", id) }