diff --git a/internal/app/init/pkg/system/health/health_test.go b/internal/app/init/pkg/system/health/health_test.go index 497716ea5..0681c44b4 100644 --- a/internal/app/init/pkg/system/health/health_test.go +++ b/internal/app/init/pkg/system/health/health_test.go @@ -72,6 +72,9 @@ func (suite *CheckSuite) TestHealthChange() { var state health.State + notifyCh := make(chan health.StateChange, 2) + state.Subscribe(notifyCh) + errCh := make(chan error) ctx, ctxCancel := context.WithCancel(context.Background()) @@ -94,6 +97,18 @@ func (suite *CheckSuite) TestHealthChange() { ctxCancel() suite.Assert().EqualError(<-errCh, context.Canceled.Error()) + + state.Unsubscribe(notifyCh) + + close(notifyCh) + + change := <-notifyCh + suite.Assert().Nil(change.Old.Healthy) + suite.Assert().False(*change.New.Healthy) + + change = <-notifyCh + suite.Assert().False(*change.Old.Healthy) + suite.Assert().True(*change.New.Healthy) } func (suite *CheckSuite) TestCheckAbort() { @@ -107,7 +122,7 @@ func (suite *CheckSuite) TestCheckAbort() { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(25 * time.Millisecond): + case <-time.After(10 * time.Millisecond): return nil } } diff --git a/internal/app/init/pkg/system/health/settings.go b/internal/app/init/pkg/system/health/settings.go index bf643ba81..15d4149c5 100644 --- a/internal/app/init/pkg/system/health/settings.go +++ b/internal/app/init/pkg/system/health/settings.go @@ -17,7 +17,7 @@ type Settings struct { // DefaultSettings provides some default health check settings var DefaultSettings = Settings{ - InitialDelay: 500 * time.Millisecond, + InitialDelay: 200 * time.Millisecond, Period: time.Second, Timeout: 500 * time.Millisecond, } diff --git a/internal/app/init/pkg/system/health/status.go b/internal/app/init/pkg/system/health/status.go index 347590406..f6a602615 100644 --- a/internal/app/init/pkg/system/health/status.go +++ b/internal/app/init/pkg/system/health/status.go @@ -16,23 +16,74 @@ type Status struct { LastMessage string } +// StateChange is used to notify about status changes +type StateChange struct { + Old Status + New Status +} + // State provides proper locking around health state type State struct { sync.Mutex - status Status + status Status + subscribers []chan<- StateChange } // Update health status (locked) func (state *State) Update(healthy bool, message string) { state.Lock() - defer state.Unlock() - state.status.LastMessage = message + oldStatus := state.status + notify := false + if state.status.Healthy == nil || *state.status.Healthy != healthy { + notify = true state.status.Healthy = &healthy state.status.LastChange = time.Now() } + state.status.LastMessage = message + + newStatus := state.status + + var subscribers []chan<- StateChange + if notify { + subscribers = append([]chan<- StateChange(nil), state.subscribers...) + } + + state.Unlock() + + if notify { + for _, ch := range subscribers { + select { + case ch <- StateChange{oldStatus, newStatus}: + default: + // drop messages to clients which don't consume them + } + } + } +} + +// Subscribe for the notifications on state changes +func (state *State) Subscribe(ch chan<- StateChange) { + state.Lock() + defer state.Unlock() + + state.subscribers = append(state.subscribers, ch) +} + +// Unsubscribe from state changes +func (state *State) Unsubscribe(ch chan<- StateChange) { + state.Lock() + defer state.Unlock() + + for i := range state.subscribers { + if state.subscribers[i] == ch { + state.subscribers[i] = state.subscribers[len(state.subscribers)-1] + state.subscribers[len(state.subscribers)-1] = nil + state.subscribers = state.subscribers[:len(state.subscribers)-1] + } + } } // Init health status (locked) diff --git a/internal/app/init/pkg/system/mocks_test.go b/internal/app/init/pkg/system/mocks_test.go index bb4b2bc91..79da560f5 100644 --- a/internal/app/init/pkg/system/mocks_test.go +++ b/internal/app/init/pkg/system/mocks_test.go @@ -6,9 +6,13 @@ package system_test import ( "context" + "sync/atomic" + "time" + "github.com/pkg/errors" "github.com/talos-systems/talos/internal/app/init/pkg/system/conditions" "github.com/talos-systems/talos/internal/app/init/pkg/system/events" + "github.com/talos-systems/talos/internal/app/init/pkg/system/health" "github.com/talos-systems/talos/internal/app/init/pkg/system/runner" "github.com/talos-systems/talos/pkg/userdata" ) @@ -52,6 +56,38 @@ func (m *MockService) ConditionFunc(*userdata.UserData) conditions.ConditionFunc return conditions.None() } +type MockHealthcheckedService struct { + MockService + + notHealthy uint32 +} + +func (m *MockHealthcheckedService) SetHealthy(healthy bool) { + if healthy { + atomic.StoreUint32(&m.notHealthy, 0) + } else { + atomic.StoreUint32(&m.notHealthy, 1) + } +} + +func (m *MockHealthcheckedService) HealthFunc(*userdata.UserData) health.Check { + return func(context.Context) error { + if atomic.LoadUint32(&m.notHealthy) == 0 { + return nil + } + + return errors.New("not healthy") + } +} + +func (m *MockHealthcheckedService) HealthSettings(*userdata.UserData) *health.Settings { + return &health.Settings{ + InitialDelay: time.Millisecond, + Timeout: time.Second, + Period: time.Millisecond, + } +} + type MockRunner struct { exitCh chan error } diff --git a/internal/app/init/pkg/system/service.go b/internal/app/init/pkg/system/service.go new file mode 100644 index 000000000..794e7f290 --- /dev/null +++ b/internal/app/init/pkg/system/service.go @@ -0,0 +1,35 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package system + +import ( + "github.com/talos-systems/talos/internal/app/init/pkg/system/conditions" + "github.com/talos-systems/talos/internal/app/init/pkg/system/health" + "github.com/talos-systems/talos/internal/app/init/pkg/system/runner" + "github.com/talos-systems/talos/pkg/userdata" +) + +// Service is an interface describing a system service. +type Service interface { + // ID is the service id. + ID(*userdata.UserData) string + // PreFunc is invoked before a runner is created + PreFunc(*userdata.UserData) error + // Runner creates runner for the service + Runner(*userdata.UserData) (runner.Runner, error) + // PostFunc is invoked after a runner is closed. + PostFunc(*userdata.UserData) error + // ConditionFunc describes the conditions under which a service should + // start. + ConditionFunc(*userdata.UserData) conditions.ConditionFunc +} + +// HealthcheckedService is a service which provides health check +type HealthcheckedService interface { + // HealtFunc provides function that checks health of the service + HealthFunc(*userdata.UserData) health.Check + // HealthSettings returns settings for the health check + HealthSettings(*userdata.UserData) *health.Settings +} diff --git a/internal/app/init/pkg/system/service_runner.go b/internal/app/init/pkg/system/service_runner.go index c92042dd5..4ed3146e5 100644 --- a/internal/app/init/pkg/system/service_runner.go +++ b/internal/app/init/pkg/system/service_runner.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/talos-systems/talos/internal/app/init/pkg/system/events" + "github.com/talos-systems/talos/internal/app/init/pkg/system/health" "github.com/talos-systems/talos/internal/app/init/pkg/system/runner" "github.com/talos-systems/talos/pkg/userdata" ) @@ -28,6 +29,8 @@ type ServiceRunner struct { state events.ServiceState events events.ServiceEvents + healthState health.State + ctx context.Context ctxCancel context.CancelFunc } @@ -63,6 +66,32 @@ func (svcrunner *ServiceRunner) UpdateState(newstate events.ServiceState, messag log.Printf("service[%s](%s): %s", svcrunner.id, svcrunner.state, event.Message) } +func (svcrunner *ServiceRunner) healthUpdate(change health.StateChange) { + svcrunner.mu.Lock() + defer svcrunner.mu.Unlock() + + // service not running, suppress event + if svcrunner.state != events.StateRunning { + return + } + + var message string + if *change.New.Healthy { + message = "Health check successful" + } else { + message = fmt.Sprintf("Health check failed: %s", change.New.LastMessage) + } + + event := events.ServiceEvent{ + Message: message, + State: svcrunner.state, + Timestamp: time.Now(), + } + svcrunner.events.Push(event) + + log.Printf("service[%s](%s): %s", svcrunner.id, svcrunner.state, event.Message) +} + // GetEventHistory returns history of events for this service func (svcrunner *ServiceRunner) GetEventHistory(count int) []events.ServiceEvent { svcrunner.mu.Lock() @@ -107,6 +136,7 @@ func (svcrunner *ServiceRunner) Start() { } } +// nolint: gocyclo func (svcrunner *ServiceRunner) run(runnr runner.Runner) error { if runnr == nil { // special case - run nothing (TODO: we should handle it better, e.g. in PreFunc) @@ -126,6 +156,42 @@ func (svcrunner *ServiceRunner) run(runnr runner.Runner) error { errCh <- runnr.Run(svcrunner.UpdateState) }() + if healthSvc, ok := svcrunner.service.(HealthcheckedService); ok { + var healthWg sync.WaitGroup + defer healthWg.Wait() + + healthWg.Add(1) + go func() { + defer healthWg.Done() + + // nolint: errcheck + health.Run(svcrunner.ctx, healthSvc.HealthSettings(svcrunner.userData), &svcrunner.healthState, healthSvc.HealthFunc(svcrunner.userData)) + }() + + notifyCh := make(chan health.StateChange, 2) + svcrunner.healthState.Subscribe(notifyCh) + defer svcrunner.healthState.Unsubscribe(notifyCh) + + healthWg.Add(1) + go func() { + defer healthWg.Done() + + for { + select { + case <-svcrunner.ctx.Done(): + return + case change := <-notifyCh: + svcrunner.healthUpdate(change) + } + } + }() + + } + + // when service run finishes, cancel context, this is important if service + // terminates on its own before being terminated by Stop() + defer svcrunner.ctxCancel() + select { case <-svcrunner.ctx.Done(): err := runnr.Stop() diff --git a/internal/app/init/pkg/system/service_runner_test.go b/internal/app/init/pkg/system/service_runner_test.go index 63bfc1342..f65f373a0 100644 --- a/internal/app/init/pkg/system/service_runner_test.go +++ b/internal/app/init/pkg/system/service_runner_test.go @@ -59,6 +59,73 @@ func (suite *ServiceRunnerSuite) TestFullFlow() { }, sr) } +func (suite *ServiceRunnerSuite) TestFullFlowHealthy() { + sr := system.NewServiceRunner(&MockHealthcheckedService{}, nil) + + finished := make(chan struct{}) + go func() { + defer close(finished) + sr.Start() + }() + + time.Sleep(50 * time.Millisecond) + + select { + case <-finished: + suite.Require().Fail("service running should be still running") + default: + } + + sr.Shutdown() + + <-finished + + suite.assertStateSequence([]events.ServiceState{ + events.StatePreparing, + events.StateWaiting, + events.StatePreparing, + events.StateRunning, + events.StateRunning, // one more notification when service is healthy + events.StateFinished, + }, sr) +} + +func (suite *ServiceRunnerSuite) TestFullFlowHealthChanges() { + m := MockHealthcheckedService{} + sr := system.NewServiceRunner(&m, nil) + + finished := make(chan struct{}) + go func() { + defer close(finished) + sr.Start() + }() + + time.Sleep(50 * time.Millisecond) + + m.SetHealthy(false) + + time.Sleep(50 * time.Millisecond) + + m.SetHealthy(true) + + time.Sleep(50 * time.Millisecond) + + sr.Shutdown() + + <-finished + + suite.assertStateSequence([]events.ServiceState{ + events.StatePreparing, + events.StateWaiting, + events.StatePreparing, + events.StateRunning, + events.StateRunning, // initial: healthy + events.StateRunning, // not healthy + events.StateRunning, // one again healthy + events.StateFinished, + }, sr) +} + func (suite *ServiceRunnerSuite) TestPreStageFail() { svc := &MockService{ preError: errors.New("pre failed"), diff --git a/internal/app/init/pkg/system/services/containerd.go b/internal/app/init/pkg/system/services/containerd.go index 73ce95369..0e5ffe3e7 100644 --- a/internal/app/init/pkg/system/services/containerd.go +++ b/internal/app/init/pkg/system/services/containerd.go @@ -5,11 +5,18 @@ package services import ( + "context" "fmt" "os" + "github.com/containerd/containerd" "github.com/containerd/containerd/defaults" + "github.com/pkg/errors" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/talos-systems/talos/internal/app/init/pkg/system" "github.com/talos-systems/talos/internal/app/init/pkg/system/conditions" + "github.com/talos-systems/talos/internal/app/init/pkg/system/health" "github.com/talos-systems/talos/internal/app/init/pkg/system/runner" "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/process" "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart" @@ -62,3 +69,34 @@ func (c *Containerd) Runner(data *userdata.UserData) (runner.Runner, error) { restart.WithType(restart.Forever), ), nil } + +// HealthFunc implements the HealthcheckedService interface +func (c *Containerd) HealthFunc(*userdata.UserData) health.Check { + return func(ctx context.Context) error { + client, err := containerd.New(constants.ContainerdAddress) + if err != nil { + return err + } + + resp, err := client.HealthService().Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + if err != nil { + return err + } + + if resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { + return errors.Errorf("unexpected serving status: %d", resp.Status) + } + + return nil + } +} + +// HealthSettings implements the HealthcheckedService interface +func (c *Containerd) HealthSettings(*userdata.UserData) *health.Settings { + return &health.DefaultSettings +} + +// Verify healthchecked interface +var ( + _ system.HealthcheckedService = &Containerd{} +) diff --git a/internal/app/init/pkg/system/system.go b/internal/app/init/pkg/system/system.go index 312bdf946..2b45b8b2b 100644 --- a/internal/app/init/pkg/system/system.go +++ b/internal/app/init/pkg/system/system.go @@ -8,8 +8,6 @@ import ( "sync" "time" - "github.com/talos-systems/talos/internal/app/init/pkg/system/conditions" - "github.com/talos-systems/talos/internal/app/init/pkg/system/runner" "github.com/talos-systems/talos/pkg/userdata" ) @@ -27,21 +25,6 @@ type singleton struct { var instance *singleton var once sync.Once -// Service is an interface describing a system service. -type Service interface { - // ID is the service id. - ID(*userdata.UserData) string - // PreFunc is invoked before a runner is created - PreFunc(*userdata.UserData) error - // Runner creates runner for the service - Runner(*userdata.UserData) (runner.Runner, error) - // PostFunc is invoked after a runner is closed. - PostFunc(*userdata.UserData) error - // ConditionFunc describes the conditions under which a service should - // start. - ConditionFunc(*userdata.UserData) conditions.ConditionFunc -} - // Services returns the instance of the system services API. // TODO(andrewrynhard): This should be a gRPC based API availale on a local // unix socket.