Andrew Rynhard 49307d554d refactor: improve machined
This is a rewrite of machined. It addresses some of the limitations and
complexity in the implementation. This introduces the idea of a
controller. A controller is responsible for managing the runtime, the
sequencer, and a new state type introduced in this PR.

A few highlights are:

- no more event bus
- functional approach to tasks (no more types defined for each task)
  - the task function definition now offers a lot more context, like
    access to raw API requests, the current sequence, a logger, the new
    state interface, and the runtime interface.
- no more panics to handle reboots
- additional initialize and reboot sequences
- graceful gRPC server shutdown on critical errors
- config is now stored at install time to avoid having to download it at
  install time and at boot time
- upgrades now use the local config instead of downloading it
- the upgrade API's preserve option takes precedence over the config's
  install force option

Additionally, this pulls various packes in under machined to make the
code easier to navigate.

Signed-off-by: Andrew Rynhard <andrew@andrewrynhard.com>
2020-04-28 08:20:55 -07:00

351 lines
7.7 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package system
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/internal/pkg/conditions"
)
type singleton struct {
runtime runtime.Runtime
// State of running services by ID
state map[string]*ServiceRunner
// List of running services at the moment.
//
// Service might be in any state, but service ID in the map
// implies ServiceRunner.Start() method is running at the momemnt
runningMu sync.Mutex
running map[string]struct{}
mu sync.Mutex
wg sync.WaitGroup
terminating bool
}
var (
instance *singleton
once sync.Once
)
// Services returns the instance of the system services API.
// nolint: golint
func Services(runtime runtime.Runtime) *singleton {
once.Do(func() {
instance = &singleton{
runtime: runtime,
state: make(map[string]*ServiceRunner),
running: make(map[string]struct{}),
}
})
return instance
}
// Load adds service to the list of services managed by the runner.
//
// Load returns service IDs for each of the services.
func (s *singleton) Load(services ...Service) []string {
s.mu.Lock()
defer s.mu.Unlock()
if s.terminating {
return nil
}
ids := make([]string, 0, len(services))
for _, service := range services {
id := service.ID(s.runtime)
ids = append(ids, id)
if _, exists := s.state[id]; exists {
// service already loaded, ignore
continue
}
svcrunner := NewServiceRunner(service, s.runtime)
s.state[id] = svcrunner
}
return ids
}
// Start will invoke the service's Pre, Condition, and Type funcs. If the any
// error occurs in the Pre or Condition invocations, it is up to the caller to
// to restart the service.
func (s *singleton) Start(serviceIDs ...string) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.terminating {
return nil
}
var multiErr *multierror.Error
for _, id := range serviceIDs {
svcrunner := s.state[id]
if svcrunner == nil {
multiErr = multierror.Append(multiErr, fmt.Errorf("service %q not defined", id))
}
s.runningMu.Lock()
_, running := s.running[id]
if !running {
s.running[id] = struct{}{}
}
s.runningMu.Unlock()
if running {
// service already running, skip
continue
}
s.wg.Add(1)
go func(id string, svcrunner *ServiceRunner) {
defer func() {
s.runningMu.Lock()
delete(s.running, id)
s.runningMu.Unlock()
}()
defer s.wg.Done()
svcrunner.Start()
}(id, svcrunner)
}
return multiErr.ErrorOrNil()
}
// StartAll starts all the services.
func (s *singleton) StartAll() {
s.mu.Lock()
serviceIDs := make([]string, 0, len(s.state))
for id := range s.state {
serviceIDs = append(serviceIDs, id)
}
s.mu.Unlock()
// nolint: errcheck
s.Start(serviceIDs...)
}
// LoadAndStart combines Load and Start into single call.
func (s *singleton) LoadAndStart(services ...Service) {
err := s.Start(s.Load(services...)...)
if err != nil {
// should never happen
panic(err)
}
}
// Shutdown all the services
func (s *singleton) Shutdown() {
s.mu.Lock()
if s.terminating {
s.mu.Unlock()
return
}
stateCopy := make(map[string]*ServiceRunner)
s.terminating = true
for name, svcrunner := range s.state {
stateCopy[name] = svcrunner
}
s.mu.Unlock()
// build reverse dependencies
reverseDependencies := make(map[string][]string)
for name, svcrunner := range stateCopy {
for _, dependency := range svcrunner.service.DependsOn(s.runtime) {
reverseDependencies[dependency] = append(reverseDependencies[dependency], name)
}
}
// shutdown all the services waiting for rev deps
var shutdownWg sync.WaitGroup
// wait max 30 seconds for reverse deps to shut down
shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCtxCancel()
for name, svcrunner := range stateCopy {
shutdownWg.Add(1)
go func(svcrunner *ServiceRunner, reverseDeps []string) {
defer shutdownWg.Done()
conds := make([]conditions.Condition, len(reverseDeps))
for i := range reverseDeps {
conds[i] = WaitForService(StateEventDown, reverseDeps[i])
}
// nolint: errcheck
_ = conditions.WaitForAll(conds...).Wait(shutdownCtx)
svcrunner.Shutdown()
}(svcrunner, reverseDependencies[name])
}
shutdownWg.Wait()
s.wg.Wait()
}
// List returns snapshot of ServiceRunner instances
func (s *singleton) List() (result []*ServiceRunner) {
s.mu.Lock()
defer s.mu.Unlock()
result = make([]*ServiceRunner, 0, len(s.state))
for _, svcrunner := range s.state {
result = append(result, svcrunner)
}
// TODO: results should be sorted properly with topological sort on dependencies
// but, we don't have dependencies yet, so sort by service id for now to get stable order
sort.Slice(result, func(i, j int) bool { return result[i].id < result[j].id })
return
}
// Stop will initiate a shutdown of the specified service.
func (s *singleton) Stop(ctx context.Context, serviceIDs ...string) (err error) {
if len(serviceIDs) == 0 {
return
}
s.mu.Lock()
if s.terminating {
s.mu.Unlock()
return
}
// Copy current service state
stateCopy := make(map[string]*ServiceRunner)
for _, id := range serviceIDs {
if _, ok := s.state[id]; !ok {
return fmt.Errorf("service not found: %s", id)
}
stateCopy[id] = s.state[id]
}
s.mu.Unlock()
conds := make([]conditions.Condition, 0, len(stateCopy))
// Initiate a shutdown on the specific service
for id, svcrunner := range stateCopy {
svcrunner.Shutdown()
conds = append(conds, WaitForService(StateEventDown, id))
}
// Wait for service to actually shut down
return conditions.WaitForAll(conds...).Wait(ctx)
}
// IsRunning checks service status (started/stopped).
//
// It doesn't check if service runner was started or not, just pure
// check for service status in terms of start/stop.
func (s *singleton) IsRunning(id string) (Service, bool, error) {
s.mu.Lock()
runner, exists := s.state[id]
s.mu.Unlock()
if !exists {
return nil, false, fmt.Errorf("service %q not defined", id)
}
s.runningMu.Lock()
_, running := s.running[id]
s.runningMu.Unlock()
return runner.service, running, nil
}
// APIStart processes service start request from the API
func (s *singleton) APIStart(ctx context.Context, id string) error {
service, running, err := s.IsRunning(id)
if err != nil {
return err
}
if running {
// already started, skip
return nil
}
if svc, ok := service.(APIStartableService); ok && svc.APIStartAllowed(s.runtime) {
return s.Start(id)
}
return fmt.Errorf("service %q doesn't support start operation via API", id)
}
// APIStop processes services stop request from the API
func (s *singleton) APIStop(ctx context.Context, id string) error {
service, running, err := s.IsRunning(id)
if err != nil {
return err
}
if !running {
// already stopped, skip
return nil
}
if svc, ok := service.(APIStoppableService); ok && svc.APIStopAllowed(s.runtime) {
return s.Stop(ctx, id)
}
return fmt.Errorf("service %q doesn't support stop operation via API", id)
}
// APIRestart processes services restart request from the API
func (s *singleton) APIRestart(ctx context.Context, id string) error {
service, running, err := s.IsRunning(id)
if err != nil {
return err
}
if !running {
// restart for not running service is equivalent to Start()
return s.APIStart(ctx, id)
}
if svc, ok := service.(APIRestartableService); ok && svc.APIRestartAllowed(s.runtime) {
if err := s.Stop(ctx, id); err != nil {
return err
}
return s.Start(id)
}
return fmt.Errorf("service %q doesn't support restart operation via API", id)
}