Andrew Rynhard 4ae8186107 feat: add configurator interface
This moves from translating a config into an internal config
representation, to using an interface. The idea is that an interface
gives us stronger compile time checks, and will prevent us from having to copy
from on struct to another. As long as a concrete type implements the
Configurator interface, it can be used to provide instructions to Talos.

Signed-off-by: Andrew Rynhard <andrew@andrewrynhard.com>
2019-10-04 07:53:09 -07:00

331 lines
7.7 KiB
Go

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package system
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/conditions"
"github.com/talos-systems/talos/pkg/config"
)
type singleton struct {
Config config.Configurator
// State of running services by ID
state map[string]*ServiceRunner
// List of running services at the moment.
//
// Service might be in any state, but service ID in the map
// implies ServiceRunner.Start() method is running at the momemnt
runningMu sync.Mutex
running map[string]struct{}
mu sync.Mutex
wg sync.WaitGroup
terminating bool
}
var instance *singleton
var once sync.Once
// Services returns the instance of the system services API.
// nolint: golint
func Services(config config.Configurator) *singleton {
once.Do(func() {
instance = &singleton{
Config: config,
state: make(map[string]*ServiceRunner),
running: make(map[string]struct{}),
}
})
return instance
}
// Load adds service to the list of services managed by the runner.
//
// Load returns service IDs for each of the services.
func (s *singleton) Load(services ...Service) []string {
s.mu.Lock()
defer s.mu.Unlock()
if s.terminating {
return nil
}
ids := make([]string, 0, len(services))
for _, service := range services {
id := service.ID(s.Config)
ids = append(ids, id)
if _, exists := s.state[id]; exists {
// service already loaded, ignore
continue
}
svcrunner := NewServiceRunner(service, s.Config)
s.state[id] = svcrunner
}
return ids
}
// Start will invoke the service's Pre, Condition, and Type funcs. If the any
// error occurs in the Pre or Condition invocations, it is up to the caller to
// to restart the service.
func (s *singleton) Start(serviceIDs ...string) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.terminating {
return nil
}
var multiErr *multierror.Error
for _, id := range serviceIDs {
svcrunner := s.state[id]
if svcrunner == nil {
multiErr = multierror.Append(multiErr, errors.Errorf("service %q not defined", id))
}
s.runningMu.Lock()
_, running := s.running[id]
if !running {
s.running[id] = struct{}{}
}
s.runningMu.Unlock()
if running {
// service already running, skip
continue
}
s.wg.Add(1)
go func(id string, svcrunner *ServiceRunner) {
defer func() {
s.runningMu.Lock()
delete(s.running, id)
s.runningMu.Unlock()
}()
defer s.wg.Done()
svcrunner.Start()
}(id, svcrunner)
}
return multiErr.ErrorOrNil()
}
// StartAll starts all the services.
func (s *singleton) StartAll() {
s.mu.Lock()
serviceIDs := make([]string, 0, len(s.state))
for id := range s.state {
serviceIDs = append(serviceIDs, id)
}
s.mu.Unlock()
// nolint: errcheck
s.Start(serviceIDs...)
}
// LoadAndStart combines Load and Start into single call.
func (s *singleton) LoadAndStart(services ...Service) {
err := s.Start(s.Load(services...)...)
if err != nil {
// should never happen
panic(err)
}
}
// Shutdown all the services
func (s *singleton) Shutdown() {
s.mu.Lock()
if s.terminating {
s.mu.Unlock()
return
}
stateCopy := make(map[string]*ServiceRunner)
s.terminating = true
for name, svcrunner := range s.state {
stateCopy[name] = svcrunner
}
s.mu.Unlock()
// build reverse dependencies
reverseDependencies := make(map[string][]string)
for name, svcrunner := range stateCopy {
for _, dependency := range svcrunner.service.DependsOn(s.Config) {
reverseDependencies[dependency] = append(reverseDependencies[dependency], name)
}
}
// shutdown all the services waiting for rev deps
var shutdownWg sync.WaitGroup
// wait max 30 seconds for reverse deps to shut down
shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCtxCancel()
for name, svcrunner := range stateCopy {
shutdownWg.Add(1)
go func(svcrunner *ServiceRunner, reverseDeps []string) {
defer shutdownWg.Done()
conds := make([]conditions.Condition, len(reverseDeps))
for i := range reverseDeps {
conds[i] = WaitForService(StateEventDown, reverseDeps[i])
}
// nolint: errcheck
_ = conditions.WaitForAll(conds...).Wait(shutdownCtx)
svcrunner.Shutdown()
}(svcrunner, reverseDependencies[name])
}
shutdownWg.Wait()
s.wg.Wait()
}
// List returns snapshot of ServiceRunner instances
func (s *singleton) List() (result []*ServiceRunner) {
s.mu.Lock()
defer s.mu.Unlock()
result = make([]*ServiceRunner, 0, len(s.state))
for _, svcrunner := range s.state {
result = append(result, svcrunner)
}
// TODO: results should be sorted properly with topological sort on dependencies
// but, we don't have dependencies yet, so sort by service id for now to get stable order
sort.Slice(result, func(i, j int) bool { return result[i].id < result[j].id })
return
}
// Stop will initiate a shutdown of the specified service.
func (s *singleton) Stop(ctx context.Context, serviceIDs ...string) (err error) {
if len(serviceIDs) == 0 {
return
}
s.mu.Lock()
if s.terminating {
s.mu.Unlock()
return
}
// Copy current service state
stateCopy := make(map[string]*ServiceRunner)
for _, id := range serviceIDs {
if _, ok := s.state[id]; !ok {
return fmt.Errorf("service not found: %s", id)
}
stateCopy[id] = s.state[id]
}
s.mu.Unlock()
conds := make([]conditions.Condition, 0, len(stateCopy))
// Initiate a shutdown on the specific service
for id, svcrunner := range stateCopy {
svcrunner.Shutdown()
conds = append(conds, WaitForService(StateEventDown, id))
}
// Wait for service to actually shut down
return conditions.WaitForAll(conds...).Wait(ctx)
}
// IsRunning checks service status (started/stopped).
//
// It doesn't check if service runner was started or not, just pure
// check for service status in terms of start/stop.
func (s *singleton) IsRunning(id string) (Service, bool, error) {
s.mu.Lock()
runner, exists := s.state[id]
s.mu.Unlock()
if !exists {
return nil, false, errors.Errorf("service %q not defined", id)
}
s.runningMu.Lock()
_, running := s.running[id]
s.runningMu.Unlock()
return runner.service, running, nil
}
// APIStart processes service start request from the API
func (s *singleton) APIStart(ctx context.Context, id string) error {
service, running, err := s.IsRunning(id)
if err != nil {
return err
}
if running {
// already started, skip
return nil
}
if svc, ok := service.(APIStartableService); ok && svc.APIStartAllowed(s.Config) {
return s.Start(id)
}
return errors.Errorf("service %q doesn't support start operation via API", id)
}
// APIStop processes services stop request from the API
func (s *singleton) APIStop(ctx context.Context, id string) error {
service, running, err := s.IsRunning(id)
if err != nil {
return err
}
if !running {
// already stopped, skip
return nil
}
if svc, ok := service.(APIStoppableService); ok && svc.APIStopAllowed(s.Config) {
return s.Stop(ctx, id)
}
return errors.Errorf("service %q doesn't support stop operation via API", id)
}
// APIRestart processes services restart request from the API
func (s *singleton) APIRestart(ctx context.Context, id string) error {
service, running, err := s.IsRunning(id)
if err != nil {
return err
}
if !running {
// restart for not running service is equivalent to Start()
return s.APIStart(ctx, id)
}
if svc, ok := service.(APIRestartableService); ok && svc.APIRestartAllowed(s.Config) {
if err := s.Stop(ctx, id); err != nil {
return err
}
return s.Start(id)
}
return errors.Errorf("service %q doesn't support restart operation via API", id)
}