feat: add actor ID to events & emit an initial empty event

Add a new field `actorID` to the events and populate it with a UUID for the lifecycle actions `reboot`, `reset`, `upgrade` and `shutdown`. This actor ID will be present on all events emitted by this triggered action. We can use this ID later on the client side to be able to track triggered actions.

We also emit an event with an empty payload on the events streaming GRPC endpoint when a client connects. The purpose of this event is to signal to the client that the event streaming has actually started.

Server-side part of siderolabs/talos#5499.

Signed-off-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
This commit is contained in:
Utku Ozdemir 2022-08-10 17:58:55 +02:00
parent fec0ed29d4
commit b5da686a7b
No known key found for this signature in database
GPG Key ID: 65933E76F0549B0D
16 changed files with 1699 additions and 1287 deletions

View File

@ -125,6 +125,7 @@ message RebootRequest {
// The reboot message containing the reboot status.
message Reboot {
common.Metadata metadata = 1;
string actor_id = 2;
}
message RebootResponse {
@ -248,12 +249,14 @@ message EventsRequest {
int32 tail_events = 1;
string tail_id = 2;
int32 tail_seconds = 3;
string with_actor_id = 4;
}
message Event {
common.Metadata metadata = 1;
google.protobuf.Any data = 2;
string id = 3;
string actor_id = 4;
}
// rpc reset
@ -276,6 +279,7 @@ message ResetRequest {
// The reset message containing the restart status.
message Reset {
common.Metadata metadata = 1;
string actor_id = 2;
}
message ResetResponse {
@ -286,6 +290,7 @@ message ResetResponse {
// The messages message containing the shutdown status.
message Shutdown {
common.Metadata metadata = 1;
string actor_id = 2;
}
message ShutdownRequest {
@ -308,6 +313,7 @@ message UpgradeRequest {
message Upgrade {
common.Metadata metadata = 1;
string ack = 2;
string actor_id = 3;
}
message UpgradeResponse {

View File

@ -34,6 +34,7 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcapgo"
"github.com/google/uuid"
multierror "github.com/hashicorp/go-multierror"
"github.com/prometheus/procfs"
"github.com/rs/xid"
@ -310,14 +311,18 @@ func (s *Server) GenerateConfiguration(ctx context.Context, in *machine.Generate
//
//nolint:dupl
func (s *Server) Reboot(ctx context.Context, in *machine.RebootRequest) (reply *machine.RebootResponse, err error) {
log.Printf("reboot via API received")
actorID := uuid.New().String()
log.Printf("reboot via API received. actor id: %s", actorID)
if err := s.checkSupported(runtime.Reboot); err != nil {
return nil, err
}
rebootCtx := context.WithValue(context.Background(), runtime.ActorIDCtxKey{}, actorID)
go func() {
if err := s.Controller.Run(context.Background(), runtime.SequenceReboot, in); err != nil {
if err := s.Controller.Run(rebootCtx, runtime.SequenceReboot, in); err != nil {
if !runtime.IsRebootError(err) {
log.Println("reboot failed:", err)
}
@ -326,7 +331,9 @@ func (s *Server) Reboot(ctx context.Context, in *machine.RebootRequest) (reply *
reply = &machine.RebootResponse{
Messages: []*machine.Reboot{
{},
{
ActorId: actorID,
},
},
}
@ -443,14 +450,18 @@ func (s *Server) Bootstrap(ctx context.Context, in *machine.BootstrapRequest) (r
//
//nolint:dupl
func (s *Server) Shutdown(ctx context.Context, in *machine.ShutdownRequest) (reply *machine.ShutdownResponse, err error) {
log.Printf("shutdown via API received")
actorID := uuid.New().String()
log.Printf("shutdown via API received. actor id: %s", actorID)
if err = s.checkSupported(runtime.Shutdown); err != nil {
return nil, err
}
shutdownCtx := context.WithValue(context.Background(), runtime.ActorIDCtxKey{}, actorID)
go func() {
if err := s.Controller.Run(context.Background(), runtime.SequenceShutdown, in, runtime.WithTakeover()); err != nil {
if err := s.Controller.Run(shutdownCtx, runtime.SequenceShutdown, in, runtime.WithTakeover()); err != nil {
if !runtime.IsRebootError(err) {
log.Println("shutdown failed:", err)
}
@ -459,7 +470,9 @@ func (s *Server) Shutdown(ctx context.Context, in *machine.ShutdownRequest) (rep
reply = &machine.ShutdownResponse{
Messages: []*machine.Shutdown{
{},
{
ActorId: actorID,
},
},
}
@ -470,8 +483,12 @@ func (s *Server) Shutdown(ctx context.Context, in *machine.ShutdownRequest) (rep
//
//nolint:gocyclo,cyclop
func (s *Server) Upgrade(ctx context.Context, in *machine.UpgradeRequest) (reply *machine.UpgradeResponse, err error) {
actorID := uuid.New().String()
var mu *concurrency.Mutex
ctx = context.WithValue(ctx, runtime.ActorIDCtxKey{}, actorID)
if err = s.checkSupported(runtime.Upgrade); err != nil {
return nil, err
}
@ -506,7 +523,7 @@ func (s *Server) Upgrade(ctx context.Context, in *machine.UpgradeRequest) (reply
}
}
runCtx := context.Background()
runCtx := context.WithValue(context.Background(), runtime.ActorIDCtxKey{}, actorID)
if in.GetStage() {
meta, err := bootloader.NewMeta()
@ -566,7 +583,8 @@ func (s *Server) Upgrade(ctx context.Context, in *machine.UpgradeRequest) (reply
reply = &machine.UpgradeResponse{
Messages: []*machine.Upgrade{
{
Ack: "Upgrade request received",
Ack: "Upgrade request received",
ActorId: actorID,
},
},
}
@ -594,7 +612,9 @@ func (opt *ResetOptions) GetSystemDiskTargets() []runtime.PartitionTarget {
//
//nolint:gocyclo
func (s *Server) Reset(ctx context.Context, in *machine.ResetRequest) (reply *machine.ResetResponse, err error) {
log.Printf("reset request received")
actorID := uuid.New().String()
log.Printf("reset request received. actorID: %s", actorID)
opts := ResetOptions{
ResetRequest: in,
@ -641,8 +661,10 @@ func (s *Server) Reset(ctx context.Context, in *machine.ResetRequest) (reply *ma
}
}
resetCtx := context.WithValue(context.Background(), runtime.ActorIDCtxKey{}, actorID)
go func() {
if err := s.Controller.Run(context.Background(), runtime.SequenceReset, &opts); err != nil {
if err := s.Controller.Run(resetCtx, runtime.SequenceReset, &opts); err != nil {
if !runtime.IsRebootError(err) {
log.Println("reset failed:", err)
}
@ -651,7 +673,9 @@ func (s *Server) Reset(ctx context.Context, in *machine.ResetRequest) (reply *ma
reply = &machine.ResetResponse{
Messages: []*machine.Reset{
{},
{
ActorId: actorID,
},
},
}
@ -1305,6 +1329,12 @@ func (s *Server) Read(in *machine.ReadRequest, srv machine.MachineService_ReadSe
//
//nolint:gocyclo
func (s *Server) Events(req *machine.EventsRequest, l machine.MachineService_EventsServer) error {
// send an empty (hello) event to indicate to client that streaming has started
err := sendEmptyEvent(req, l)
if err != nil {
return err
}
errCh := make(chan error)
var opts []runtime.WatchOptionFunc
@ -1326,6 +1356,10 @@ func (s *Server) Events(req *machine.EventsRequest, l machine.MachineService_Eve
opts = append(opts, runtime.WithTailDuration(time.Duration(req.TailSeconds)*time.Second))
}
if req.WithActorId != "" {
opts = append(opts, runtime.WithActorID(req.WithActorId))
}
if err := s.Controller.Runtime().Events().Watch(func(events <-chan runtime.EventInfo) {
errCh <- func() error {
for {
@ -1357,6 +1391,15 @@ func (s *Server) Events(req *machine.EventsRequest, l machine.MachineService_Eve
return <-errCh
}
func sendEmptyEvent(req *machine.EventsRequest, l machine.MachineService_EventsServer) error {
emptyEvent, err := pointer.To(runtime.NewEvent(nil, req.WithActorId)).ToMachineEvent()
if err != nil {
return err
}
return l.Send(emptyEvent)
}
//nolint:gocyclo
func pullAndValidateInstallerImage(ctx context.Context, reg config.Registries, ref string) error {
// Pull down specified installer image early so we can bail if it doesn't exist in the upstream registry

View File

@ -106,7 +106,7 @@ func (ctrl *AddressEventController) Run(ctx context.Context, r controller.Runtim
hostname = hostnameStatus.(*network.HostnameStatus).TypedSpec().Hostname
}
ctrl.V1Alpha1Events.Publish(&machine.AddressEvent{
ctrl.V1Alpha1Events.Publish(ctx, &machine.AddressEvent{
Hostname: hostname,
Addresses: addresses,
})

View File

@ -33,7 +33,7 @@ type mockEventsStream struct {
messages []proto.Message
}
func (s *mockEventsStream) Publish(m proto.Message) {
func (s *mockEventsStream) Publish(_ context.Context, m proto.Message) {
s.messagesMu.Lock()
defer s.messagesMu.Unlock()

View File

@ -147,12 +147,14 @@ func (suite *EventsSinkSuite) TestPublish() {
defer cancel()
suite.events.Publish(
ctx,
&machine.AddressEvent{
Hostname: "localhost",
},
)
suite.events.Publish(
ctx,
&machine.PhaseEvent{
Phase: "test",
Action: machine.PhaseEvent_START,
@ -184,12 +186,14 @@ func (suite *EventsSinkSuite) TestDrain() {
for i := 0; i < 10; i++ {
suite.events.Publish(
ctx,
&machine.PhaseEvent{
Phase: "test",
Action: machine.PhaseEvent_START,
},
)
suite.events.Publish(
ctx,
&machine.PhaseEvent{
Phase: "test",
Action: machine.PhaseEvent_STOP,

View File

@ -65,7 +65,7 @@ func (ctrl *MachineStatusPublisherController) Run(ctx context.Context, r control
return fmt.Errorf("error reading machine status: %w", err)
}
ctrl.V1Alpha1Events.Publish(&machine.MachineStatusEvent{
ctrl.V1Alpha1Events.Publish(ctx, &machine.MachineStatusEvent{
Stage: machine.MachineStatusEvent_MachineStage(machineStatus.TypedSpec().Stage),
Status: &machine.MachineStatusEvent_MachineStatus{
Ready: machineStatus.TypedSpec().Status.Ready,

View File

@ -5,6 +5,7 @@
package runtime
import (
"context"
"fmt"
"time"
@ -15,11 +16,15 @@ import (
"github.com/talos-systems/talos/pkg/machinery/proto"
)
// ActorIDCtxKey is the context key used for event actor id.
type ActorIDCtxKey struct{}
// Event is what is sent on the wire.
type Event struct {
TypeURL string
ID xid.ID
Payload proto.Message
ActorID string
}
// EventInfo unifies event and queue information for the WatchFunc.
@ -43,6 +48,8 @@ type WatchOptions struct {
TailID xid.ID
// Start at timestamp Now() - TailDuration.
TailDuration time.Duration
// ActorID to ID of the actor to filter events by.
ActorID string
}
// WatchOptionFunc defines the options for the watcher.
@ -89,6 +96,15 @@ func WithTailDuration(dur time.Duration) WatchOptionFunc {
}
}
// WithActorID sets up Watcher to return events filtered by given actor id.
func WithActorID(actorID string) WatchOptionFunc {
return func(opts *WatchOptions) error {
opts.ActorID = actorID
return nil
}
}
// Watcher defines a runtime event watcher.
type Watcher interface {
Watch(WatchFunc, ...WatchOptionFunc) error
@ -96,7 +112,7 @@ type Watcher interface {
// Publisher defines a runtime event publisher.
type Publisher interface {
Publish(proto.Message)
Publish(context.Context, proto.Message)
}
// EventStream defines the runtime event stream.
@ -105,6 +121,24 @@ type EventStream interface {
Publisher
}
// NewEvent creates a new event with the provided payload and actor ID.
func NewEvent(payload proto.Message, actorID string) Event {
typeURL := ""
if payload != nil {
typeURL = fmt.Sprintf("talos/runtime/%s", payload.ProtoReflect().Descriptor().FullName())
}
return Event{
// In the future, we can publish `talos/runtime`, and
// `talos/plugin/<plugin>` (or something along those lines) events.
// TypeURL: fmt.Sprintf("talos/runtime/%s", protoreflect.MessageDescriptor.FullName(msg)),
TypeURL: typeURL,
Payload: payload,
ID: xid.New(),
ActorID: actorID,
}
}
// ToMachineEvent serializes Event as proto message machine.Event.
func (event *Event) ToMachineEvent() (*machine.Event, error) {
value, err := proto.Marshal(event.Payload)
@ -117,6 +151,7 @@ func (event *Event) ToMachineEvent() (*machine.Event, error) {
TypeUrl: event.TypeURL,
Value: value,
},
Id: event.ID.String(),
Id: event.ID.String(),
ActorId: event.ActorID,
}, nil
}

View File

@ -84,7 +84,7 @@ func (c *Controller) Run(ctx context.Context, seq runtime.Sequence, data interfa
ctx, err := c.priorityLock.Lock(ctx, time.Minute, seq, setters...)
if err != nil {
if errors.Is(err, runtime.ErrLocked) {
c.Runtime().Events().Publish(&machine.SequenceEvent{
c.Runtime().Events().Publish(context.Background(), &machine.SequenceEvent{
Sequence: seq.String(),
Action: machine.SequenceEvent_NOOP,
Error: &common.Error{
@ -112,7 +112,7 @@ func (c *Controller) Run(ctx context.Context, seq runtime.Sequence, data interfa
code = common.Code_CANCELED
}
c.Runtime().Events().Publish(&machine.SequenceEvent{
c.Runtime().Events().Publish(ctx, &machine.SequenceEvent{
Sequence: seq.String(),
Action: machine.SequenceEvent_NOOP,
Error: &common.Error{
@ -190,12 +190,12 @@ func (c *Controller) ListenForEvents(ctx context.Context) error {
}
func (c *Controller) run(ctx context.Context, seq runtime.Sequence, phases []runtime.Phase, data interface{}) error {
c.Runtime().Events().Publish(&machine.SequenceEvent{
c.Runtime().Events().Publish(ctx, &machine.SequenceEvent{
Sequence: seq.String(),
Action: machine.SequenceEvent_START,
})
defer c.Runtime().Events().Publish(&machine.SequenceEvent{
defer c.Runtime().Events().Publish(ctx, &machine.SequenceEvent{
Sequence: seq.String(),
Action: machine.SequenceEvent_STOP,
})
@ -251,12 +251,12 @@ func (c *Controller) run(ctx context.Context, seq runtime.Sequence, phases []run
}
func (c *Controller) runPhase(ctx context.Context, phase runtime.Phase, seq runtime.Sequence, data interface{}) error {
c.Runtime().Events().Publish(&machine.PhaseEvent{
c.Runtime().Events().Publish(ctx, &machine.PhaseEvent{
Phase: phase.Name,
Action: machine.PhaseEvent_START,
})
defer c.Runtime().Events().Publish(&machine.PhaseEvent{
defer c.Runtime().Events().Publish(ctx, &machine.PhaseEvent{
Phase: phase.Name,
Action: machine.PhaseEvent_START,
})
@ -293,7 +293,7 @@ func (c *Controller) runTask(ctx context.Context, progress string, f runtime.Tas
start := time.Now()
c.Runtime().Events().Publish(&machine.TaskEvent{
c.Runtime().Events().Publish(ctx, &machine.TaskEvent{
Task: taskName,
Action: machine.TaskEvent_START,
})
@ -312,7 +312,7 @@ func (c *Controller) runTask(ctx context.Context, progress string, f runtime.Tas
}
}()
defer c.Runtime().Events().Publish(&machine.TaskEvent{
defer c.Runtime().Events().Publish(ctx, &machine.TaskEvent{
Task: taskName,
Action: machine.TaskEvent_STOP,
})

View File

@ -6,13 +6,10 @@ package v1alpha1
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/rs/xid"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/pkg/machinery/proto"
)
@ -198,6 +195,11 @@ func (e *Events) Watch(f runtime.WatchFunc, opt ...runtime.WatchOptionFunc) erro
e.mu.Unlock()
// if actor id filter is specified and does not match the event, skip it
if opts.ActorID != "" && event.ActorID != opts.ActorID {
continue
}
// send event to WatchFunc, wait for it to process the event
select {
case ch <- runtime.EventInfo{
@ -214,17 +216,14 @@ func (e *Events) Watch(f runtime.WatchFunc, opt ...runtime.WatchOptionFunc) erro
}
// Publish implements the Events interface.
func (e *Events) Publish(msg proto.Message) {
event := runtime.Event{
// In the future, we can publish `talos/runtime`, and
// `talos/plugin/<plugin>` (or something along those lines) events.
// TypeURL: fmt.Sprintf("talos/runtime/%s", protoreflect.MessageDescriptor.FullName(msg)),
TypeURL: fmt.Sprintf("talos/runtime/%s", msg.ProtoReflect().Descriptor().FullName()),
Payload: msg,
ID: xid.New(),
func (e *Events) Publish(ctx context.Context, msg proto.Message) {
actorID, ok := ctx.Value(runtime.ActorIDCtxKey{}).(string)
if !ok {
actorID = ""
}
event := runtime.NewEvent(msg, actorID)
e.mu.Lock()
defer e.mu.Unlock()

View File

@ -112,7 +112,7 @@ func TestEvents_Publish(t *testing.T) {
for i := 0; i < tt.messages; i++ {
_ = l.Wait(context.Background()) //nolint:errcheck
e.Publish(&machine.SequenceEvent{
e.Publish(context.Background(), &machine.SequenceEvent{
Sequence: strconv.Itoa(i),
})
}
@ -186,7 +186,7 @@ func TestEvents_WatchOptionsTailEvents(t *testing.T) {
e := NewEvents(100, 10)
for i := 0; i < 200; i++ {
e.Publish(&machine.SequenceEvent{
e.Publish(context.Background(), &machine.SequenceEvent{
Sequence: strconv.Itoa(i),
})
}
@ -203,7 +203,7 @@ func TestEvents_WatchOptionsTailEvents(t *testing.T) {
e = NewEvents(100, 10)
for i := 0; i < 30; i++ {
e.Publish(&machine.SequenceEvent{
e.Publish(context.Background(), &machine.SequenceEvent{
Sequence: strconv.Itoa(i),
})
}
@ -219,7 +219,7 @@ func TestEvents_WatchOptionsTailSeconds(t *testing.T) {
e := NewEvents(100, 10)
for i := 0; i < 20; i++ {
e.Publish(&machine.SequenceEvent{
e.Publish(context.Background(), &machine.SequenceEvent{
Sequence: strconv.Itoa(i),
})
}
@ -228,7 +228,7 @@ func TestEvents_WatchOptionsTailSeconds(t *testing.T) {
time.Sleep(3 * time.Second)
for i := 20; i < 30; i++ {
e.Publish(&machine.SequenceEvent{
e.Publish(context.Background(), &machine.SequenceEvent{
Sequence: strconv.Itoa(i),
})
}
@ -242,7 +242,7 @@ func TestEvents_WatchOptionsTailID(t *testing.T) {
e := NewEvents(100, 10)
for i := 0; i < 20; i++ {
e.Publish(&machine.SequenceEvent{
e.Publish(context.Background(), &machine.SequenceEvent{
Sequence: strconv.Itoa(i),
})
}
@ -296,7 +296,7 @@ func BenchmarkPublish(bb *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
e.Publish(&ev)
e.Publish(context.Background(), &ev)
}
wg.Wait()

View File

@ -490,7 +490,7 @@ func LoadConfig(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFu
}
if e != nil {
r.Events().Publish(&machineapi.ConfigLoadErrorEvent{
r.Events().Publish(ctx, &machineapi.ConfigLoadErrorEvent{
Error: e.Error(),
})
@ -510,7 +510,7 @@ func LoadConfig(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFu
cfg, e := r.LoadAndValidateConfig(b)
if e != nil {
r.Events().Publish(&machineapi.ConfigLoadErrorEvent{
r.Events().Publish(ctx, &machineapi.ConfigLoadErrorEvent{
Error: e.Error(),
})
@ -615,12 +615,12 @@ func fetchConfig(ctx context.Context, r runtime.Runtime) (out []byte, err error)
func receiveConfigViaMaintenanceService(ctx context.Context, logger *log.Logger, r runtime.Runtime) ([]byte, error) {
// add "fake" events to signal when Talos enters and leaves maintenance mode
r.Events().Publish(&machineapi.TaskEvent{
r.Events().Publish(ctx, &machineapi.TaskEvent{
Action: machineapi.TaskEvent_START,
Task: "runningMaintenance",
})
defer r.Events().Publish(&machineapi.TaskEvent{
defer r.Events().Publish(ctx, &machineapi.TaskEvent{
Action: machineapi.TaskEvent_STOP,
Task: "runningMaintenance",
})
@ -1652,7 +1652,7 @@ func Reboot(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc,
rebootCmd = unix.LINUX_REBOOT_CMD_KEXEC
}
r.Events().Publish(&machineapi.RestartEvent{
r.Events().Publish(ctx, &machineapi.RestartEvent{
Cmd: int64(rebootCmd),
})
@ -1680,7 +1680,7 @@ func Shutdown(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc
}
}
r.Events().Publish(&machineapi.RestartEvent{
r.Events().Publish(ctx, &machineapi.RestartEvent{
Cmd: int64(cmd),
})

View File

@ -70,7 +70,7 @@ func (svcrunner *ServiceRunner) GetState() events.ServiceState {
}
// UpdateState implements events.Recorder.
func (svcrunner *ServiceRunner) UpdateState(newstate events.ServiceState, message string, args ...interface{}) {
func (svcrunner *ServiceRunner) UpdateState(ctx context.Context, newstate events.ServiceState, message string, args ...interface{}) {
svcrunner.mu.Lock()
event := events.ServiceEvent{
@ -90,7 +90,7 @@ func (svcrunner *ServiceRunner) UpdateState(newstate events.ServiceState, messag
svcrunner.mu.Unlock()
if svcrunner.runtime != nil {
svcrunner.runtime.Events().Publish(event.AsProto(svcrunner.id))
svcrunner.runtime.Events().Publish(ctx, event.AsProto(svcrunner.id))
}
if isUp {
@ -106,7 +106,7 @@ func (svcrunner *ServiceRunner) UpdateState(newstate events.ServiceState, messag
}
}
func (svcrunner *ServiceRunner) healthUpdate(change health.StateChange) {
func (svcrunner *ServiceRunner) healthUpdate(ctx context.Context, change health.StateChange) {
svcrunner.mu.Lock()
// service not running, suppress event
@ -141,7 +141,7 @@ func (svcrunner *ServiceRunner) healthUpdate(change health.StateChange) {
}
if svcrunner.runtime != nil {
svcrunner.runtime.Events().Publish(event.AsProto(svcrunner.id))
svcrunner.runtime.Events().Publish(ctx, event.AsProto(svcrunner.id))
}
}
@ -155,7 +155,7 @@ func (svcrunner *ServiceRunner) GetEventHistory(count int) []events.ServiceEvent
func (svcrunner *ServiceRunner) waitFor(ctx context.Context, condition conditions.Condition) error {
description := condition.String()
svcrunner.UpdateState(events.StateWaiting, "Waiting for %s", description)
svcrunner.UpdateState(ctx, events.StateWaiting, "Waiting for %s", description)
errCh := make(chan error)
@ -175,7 +175,7 @@ func (svcrunner *ServiceRunner) waitFor(ctx context.Context, condition condition
newDescription := condition.String()
if newDescription != description && newDescription != "" {
description = newDescription
svcrunner.UpdateState(events.StateWaiting, "Waiting for %s", description)
svcrunner.UpdateState(ctx, events.StateWaiting, "Waiting for %s", description)
}
}
}
@ -214,25 +214,25 @@ func (svcrunner *ServiceRunner) Start() {
if condition != nil {
if err := svcrunner.waitFor(ctx, condition); err != nil {
svcrunner.UpdateState(events.StateFailed, "Condition failed: %v", err)
svcrunner.UpdateState(ctx, events.StateFailed, "Condition failed: %v", err)
return
}
}
svcrunner.UpdateState(events.StatePreparing, "Running pre state")
svcrunner.UpdateState(ctx, events.StatePreparing, "Running pre state")
if err := svcrunner.service.PreFunc(ctx, svcrunner.runtime); err != nil {
svcrunner.UpdateState(events.StateFailed, "Failed to run pre stage: %v", err)
svcrunner.UpdateState(ctx, events.StateFailed, "Failed to run pre stage: %v", err)
return
}
svcrunner.UpdateState(events.StatePreparing, "Creating service runner")
svcrunner.UpdateState(ctx, events.StatePreparing, "Creating service runner")
runnr, err := svcrunner.service.Runner(svcrunner.runtime)
if err != nil {
svcrunner.UpdateState(events.StateFailed, "Failed to create runner: %v", err)
svcrunner.UpdateState(ctx, events.StateFailed, "Failed to create runner: %v", err)
return
}
@ -242,20 +242,20 @@ func (svcrunner *ServiceRunner) Start() {
state := svcrunner.GetState()
if err := svcrunner.service.PostFunc(svcrunner.runtime, state); err != nil {
svcrunner.UpdateState(events.StateFailed, "Failed to run post stage: %v", err)
svcrunner.UpdateState(ctx, events.StateFailed, "Failed to run post stage: %v", err)
}
}()
if runnr == nil {
svcrunner.UpdateState(events.StateSkipped, "Service skipped")
svcrunner.UpdateState(ctx, events.StateSkipped, "Service skipped")
return
}
if err := svcrunner.run(ctx, runnr); err != nil {
svcrunner.UpdateState(events.StateFailed, "Failed running service: %v", err)
svcrunner.UpdateState(ctx, events.StateFailed, "Failed running service: %v", err)
} else {
svcrunner.UpdateState(events.StateFinished, "Service finished successfully")
svcrunner.UpdateState(ctx, events.StateFinished, "Service finished successfully")
}
}
@ -276,7 +276,9 @@ func (svcrunner *ServiceRunner) run(ctx context.Context, runnr runner.Runner) er
errCh := make(chan error)
go func() {
errCh <- runnr.Run(svcrunner.UpdateState)
errCh <- runnr.Run(func(s events.ServiceState, msg string, args ...interface{}) {
svcrunner.UpdateState(ctx, s, msg, args...)
})
}()
if healthSvc, ok := svcrunner.service.(HealthcheckedService); ok {
@ -312,7 +314,7 @@ func (svcrunner *ServiceRunner) run(ctx context.Context, runnr runner.Runner) er
case <-ctx.Done():
return
case change := <-notifyCh:
svcrunner.healthUpdate(change)
svcrunner.healthUpdate(ctx, change)
}
}
}()

View File

@ -668,7 +668,7 @@ func BootstrapEtcd(ctx context.Context, r runtime.Runtime, req *machineapi.Boots
// wait in the boot sequence to unblock.
for _, svc := range system.Services(r).List() {
if svc.AsProto().GetId() == "etcd" {
svc.UpdateState(events.StateFinished, "Bootstrap requested")
svc.UpdateState(ctx, events.StateFinished, "Bootstrap requested")
break
}

File diff suppressed because it is too large Load Diff

View File

@ -313,6 +313,13 @@ func (m *Reboot) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.ActorId) > 0 {
i -= len(m.ActorId)
copy(dAtA[i:], m.ActorId)
i = encodeVarint(dAtA, i, uint64(len(m.ActorId)))
i--
dAtA[i] = 0x12
}
if m.Metadata != nil {
if marshalto, ok := interface{}(m.Metadata).(interface {
MarshalToSizedBufferVT([]byte) (int, error)
@ -1102,6 +1109,13 @@ func (m *EventsRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.WithActorId) > 0 {
i -= len(m.WithActorId)
copy(dAtA[i:], m.WithActorId)
i = encodeVarint(dAtA, i, uint64(len(m.WithActorId)))
i--
dAtA[i] = 0x22
}
if m.TailSeconds != 0 {
i = encodeVarint(dAtA, i, uint64(m.TailSeconds))
i--
@ -1152,6 +1166,13 @@ func (m *Event) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.ActorId) > 0 {
i -= len(m.ActorId)
copy(dAtA[i:], m.ActorId)
i = encodeVarint(dAtA, i, uint64(len(m.ActorId)))
i--
dAtA[i] = 0x22
}
if len(m.Id) > 0 {
i -= len(m.Id)
copy(dAtA[i:], m.Id)
@ -1351,6 +1372,13 @@ func (m *Reset) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.ActorId) > 0 {
i -= len(m.ActorId)
copy(dAtA[i:], m.ActorId)
i = encodeVarint(dAtA, i, uint64(len(m.ActorId)))
i--
dAtA[i] = 0x12
}
if m.Metadata != nil {
if marshalto, ok := interface{}(m.Metadata).(interface {
MarshalToSizedBufferVT([]byte) (int, error)
@ -1451,6 +1479,13 @@ func (m *Shutdown) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.ActorId) > 0 {
i -= len(m.ActorId)
copy(dAtA[i:], m.ActorId)
i = encodeVarint(dAtA, i, uint64(len(m.ActorId)))
i--
dAtA[i] = 0x12
}
if m.Metadata != nil {
if marshalto, ok := interface{}(m.Metadata).(interface {
MarshalToSizedBufferVT([]byte) (int, error)
@ -1664,6 +1699,13 @@ func (m *Upgrade) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.ActorId) > 0 {
i -= len(m.ActorId)
copy(dAtA[i:], m.ActorId)
i = encodeVarint(dAtA, i, uint64(len(m.ActorId)))
i--
dAtA[i] = 0x1a
}
if len(m.Ack) > 0 {
i -= len(m.Ack)
copy(dAtA[i:], m.Ack)
@ -8356,6 +8398,10 @@ func (m *Reboot) SizeVT() (n int) {
}
n += 1 + l + sov(uint64(l))
}
l = len(m.ActorId)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -8677,6 +8723,10 @@ func (m *EventsRequest) SizeVT() (n int) {
if m.TailSeconds != 0 {
n += 1 + sov(uint64(m.TailSeconds))
}
l = len(m.WithActorId)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -8713,6 +8763,10 @@ func (m *Event) SizeVT() (n int) {
if l > 0 {
n += 1 + l + sov(uint64(l))
}
l = len(m.ActorId)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -8778,6 +8832,10 @@ func (m *Reset) SizeVT() (n int) {
}
n += 1 + l + sov(uint64(l))
}
l = len(m.ActorId)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -8818,6 +8876,10 @@ func (m *Shutdown) SizeVT() (n int) {
}
n += 1 + l + sov(uint64(l))
}
l = len(m.ActorId)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -8902,6 +8964,10 @@ func (m *Upgrade) SizeVT() (n int) {
if l > 0 {
n += 1 + l + sov(uint64(l))
}
l = len(m.ActorId)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -12323,6 +12389,38 @@ func (m *Reboot) UnmarshalVT(dAtA []byte) error {
}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ActorId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ActorId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
@ -13997,6 +14095,38 @@ func (m *EventsRequest) UnmarshalVT(dAtA []byte) error {
break
}
}
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field WithActorId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.WithActorId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
@ -14168,6 +14298,38 @@ func (m *Event) UnmarshalVT(dAtA []byte) error {
}
m.Id = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ActorId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ActorId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
@ -14491,6 +14653,38 @@ func (m *Reset) UnmarshalVT(dAtA []byte) error {
}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ActorId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ActorId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
@ -14671,6 +14865,38 @@ func (m *Shutdown) UnmarshalVT(dAtA []byte) error {
}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ActorId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ActorId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
@ -15097,6 +15323,38 @@ func (m *Upgrade) UnmarshalVT(dAtA []byte) error {
}
m.Ack = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ActorId", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ActorId = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])

View File

@ -1254,6 +1254,7 @@ EtcdMembers contains the list of members registered on the host.
| metadata | [common.Metadata](#common.Metadata) | | |
| data | [google.protobuf.Any](#google.protobuf.Any) | | |
| id | [string](#string) | | |
| actor_id | [string](#string) | | |
@ -1271,6 +1272,7 @@ EtcdMembers contains the list of members registered on the host.
| tail_events | [int32](#int32) | | |
| tail_id | [string](#string) | | |
| tail_seconds | [int32](#int32) | | |
| with_actor_id | [string](#string) | | |
@ -1973,6 +1975,7 @@ The reboot message containing the reboot status.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| metadata | [common.Metadata](#common.Metadata) | | |
| actor_id | [string](#string) | | |
@ -2018,6 +2021,7 @@ The reset message containing the restart status.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| metadata | [common.Metadata](#common.Metadata) | | |
| actor_id | [string](#string) | | |
@ -2474,6 +2478,7 @@ The messages message containing the shutdown status.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| metadata | [common.Metadata](#common.Metadata) | | |
| actor_id | [string](#string) | | |
@ -2668,6 +2673,7 @@ The request message containing the containerd namespace.
| ----- | ---- | ----- | ----------- |
| metadata | [common.Metadata](#common.Metadata) | | |
| ack | [string](#string) | | |
| actor_id | [string](#string) | | |