diff --git a/cmd/talosctl/cmd/talos/containers.go b/cmd/talosctl/cmd/talos/containers.go index 2d6b4ff44..686b30892 100644 --- a/cmd/talosctl/cmd/talos/containers.go +++ b/cmd/talosctl/cmd/talos/containers.go @@ -19,7 +19,6 @@ import ( "github.com/talos-systems/talos/api/common" osapi "github.com/talos-systems/talos/api/os" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" "github.com/talos-systems/talos/pkg/constants" @@ -65,7 +64,7 @@ func containerRender(remotePeer *peer.Peer, resp *osapi.ContainersResponse) erro w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tNAMESPACE\tID\tIMAGE\tPID\tSTATUS") - defaultNode := helpers.AddrFromPeer(remotePeer) + defaultNode := client.AddrFromPeer(remotePeer) for _, msg := range resp.Messages { resp := msg diff --git a/cmd/talosctl/cmd/talos/dmesg.go b/cmd/talosctl/cmd/talos/dmesg.go index 9c4caa102..7101a204f 100644 --- a/cmd/talosctl/cmd/talos/dmesg.go +++ b/cmd/talosctl/cmd/talos/dmesg.go @@ -14,7 +14,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/client" ) @@ -33,7 +32,7 @@ var dmesgCmd = &cobra.Command{ return fmt.Errorf("error getting dmesg: %w", err) } - defaultNode := helpers.RemotePeer(stream.Context()) + defaultNode := client.RemotePeer(stream.Context()) for { resp, err := stream.Recv() diff --git a/cmd/talosctl/cmd/talos/events.go b/cmd/talosctl/cmd/talos/events.go index 536a601f9..0df7886e7 100644 --- a/cmd/talosctl/cmd/talos/events.go +++ b/cmd/talosctl/cmd/talos/events.go @@ -7,18 +7,12 @@ package talos import ( "context" "fmt" - "io" - "log" "os" "text/tabwriter" - "github.com/golang/protobuf/proto" "github.com/spf13/cobra" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/talos-systems/talos/api/machine" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/client" ) @@ -29,63 +23,48 @@ var eventsCmd = &cobra.Command{ Long: ``, RunE: func(cmd *cobra.Command, args []string) error { return WithClient(func(ctx context.Context, c *client.Client) error { - stream, err := c.Events(ctx) - if err != nil { - return fmt.Errorf("error fetching events: %s", err) - } - - defaultNode := helpers.RemotePeer(stream.Context()) - w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tEVENT\tMESSAGE") - for { - event, err := stream.Recv() - if err != nil { - if err == io.EOF || status.Code(err) == codes.Canceled { - return nil + return c.EventsWatch(ctx, func(ch <-chan client.Event) { + for { + var ( + event client.Event + ok bool + ) + + select { + case event, ok = <-ch: + if !ok { + return + } + case <-ctx.Done(): + return } - return fmt.Errorf("failed to watch events: %w", err) - } + format := "%s\t%s\t%s\n" - node := defaultNode + var args []interface{} - if event.Metadata != nil { - node = event.Metadata.Hostname - } - - typeURL := event.GetData().GetTypeUrl() - - format := "%s\t%s\t%s\n" - - var args []interface{} - - switch event.GetData().GetTypeUrl() { - case "talos/runtime/" + proto.MessageName(&machine.SequenceEvent{}): - msg := &machine.SequenceEvent{} - - if err = proto.Unmarshal(event.GetData().GetValue(), msg); err != nil { - log.Printf("failed to unmarshal message: %v", err) + switch msg := event.Payload.(type) { + case *machine.SequenceEvent: + if msg.Error != nil { + args = []interface{}{msg.GetSequence() + " error:" + " " + msg.GetError().GetMessage()} + } else { + args = []interface{}{msg.GetSequence() + " " + msg.GetAction().String()} + } + default: + // We haven't implemented the handling of this event yet. continue } - if msg.Error != nil { - args = []interface{}{msg.GetSequence() + " error:" + " " + msg.GetError().GetMessage()} - } else { - args = []interface{}{msg.GetSequence() + " " + msg.GetAction().String()} - } - default: - // We haven't implemented the handling of this event yet. - continue + args = append([]interface{}{event.Node, event.TypeURL}, args...) + fmt.Fprintf(w, format, args...) + + // nolint: errcheck + w.Flush() } - - args = append([]interface{}{node, typeURL}, args...) - fmt.Fprintf(w, format, args...) - - // nolint: errcheck - w.Flush() - } + }) }) }, } diff --git a/cmd/talosctl/cmd/talos/interfaces.go b/cmd/talosctl/cmd/talos/interfaces.go index 6fbf23a07..0c2a94ae6 100644 --- a/cmd/talosctl/cmd/talos/interfaces.go +++ b/cmd/talosctl/cmd/talos/interfaces.go @@ -15,7 +15,6 @@ import ( "google.golang.org/grpc/peer" networkapi "github.com/talos-systems/talos/api/network" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" ) @@ -48,7 +47,7 @@ func intersRender(remotePeer *peer.Peer, resp *networkapi.InterfacesResponse) er w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tINDEX\tNAME\tMAC\tMTU\tADDRESS") - defaultNode := helpers.AddrFromPeer(remotePeer) + defaultNode := client.AddrFromPeer(remotePeer) for _, msg := range resp.Messages { node := defaultNode diff --git a/cmd/talosctl/cmd/talos/list.go b/cmd/talosctl/cmd/talos/list.go index de85baccd..798bc0e33 100644 --- a/cmd/talosctl/cmd/talos/list.go +++ b/cmd/talosctl/cmd/talos/list.go @@ -18,7 +18,6 @@ import ( "google.golang.org/grpc/status" machineapi "github.com/talos-systems/talos/api/machine" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/client" ) @@ -55,7 +54,7 @@ var lsCmd = &cobra.Command{ return fmt.Errorf("error fetching logs: %s", err) } - defaultNode := helpers.RemotePeer(stream.Context()) + defaultNode := client.RemotePeer(stream.Context()) if !long { w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) diff --git a/cmd/talosctl/cmd/talos/logs.go b/cmd/talosctl/cmd/talos/logs.go index 2f7d35eb6..4b6e6d9bf 100644 --- a/cmd/talosctl/cmd/talos/logs.go +++ b/cmd/talosctl/cmd/talos/logs.go @@ -19,7 +19,6 @@ import ( "github.com/talos-systems/talos/api/common" "github.com/talos-systems/talos/api/machine" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" "github.com/talos-systems/talos/pkg/constants" @@ -55,7 +54,7 @@ var logsCmd = &cobra.Command{ return fmt.Errorf("error fetching logs: %s", err) } - defaultNode := helpers.RemotePeer(stream.Context()) + defaultNode := client.RemotePeer(stream.Context()) respCh, errCh := newLineSlicer(stream) diff --git a/cmd/talosctl/cmd/talos/memory.go b/cmd/talosctl/cmd/talos/memory.go index e3739ef8c..3a45d5d4f 100644 --- a/cmd/talosctl/cmd/talos/memory.go +++ b/cmd/talosctl/cmd/talos/memory.go @@ -15,7 +15,6 @@ import ( "google.golang.org/grpc/peer" osapi "github.com/talos-systems/talos/api/os" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" ) @@ -55,7 +54,7 @@ func briefRender(remotePeer *peer.Peer, resp *osapi.MemoryResponse) error { w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tTOTAL\tUSED\tFREE\tSHARED\tBUFFERS\tCACHE\tAVAILABLE") - defaultNode := helpers.AddrFromPeer(remotePeer) + defaultNode := client.AddrFromPeer(remotePeer) for _, msg := range resp.Messages { node := defaultNode @@ -81,7 +80,7 @@ func briefRender(remotePeer *peer.Peer, resp *osapi.MemoryResponse) error { } func verboseRender(remotePeer *peer.Peer, resp *osapi.MemoryResponse) error { - defaultNode := helpers.AddrFromPeer(remotePeer) + defaultNode := client.AddrFromPeer(remotePeer) // Dump as /proc/meminfo for _, msg := range resp.Messages { diff --git a/cmd/talosctl/cmd/talos/mounts.go b/cmd/talosctl/cmd/talos/mounts.go index 87daeb54a..9449446f4 100644 --- a/cmd/talosctl/cmd/talos/mounts.go +++ b/cmd/talosctl/cmd/talos/mounts.go @@ -16,7 +16,6 @@ import ( "google.golang.org/grpc/peer" machineapi "github.com/talos-systems/talos/api/machine" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" ) @@ -50,7 +49,7 @@ func mountsRender(remotePeer *peer.Peer, resp *machineapi.MountsResponse) error w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tFILESYSTEM\tSIZE(GB)\tUSED(GB)\tAVAILABLE(GB)\tPERCENT USED\tMOUNTED ON") - defaultNode := helpers.AddrFromPeer(remotePeer) + defaultNode := client.AddrFromPeer(remotePeer) for _, msg := range resp.Messages { for _, r := range msg.Stats { diff --git a/cmd/talosctl/cmd/talos/processes.go b/cmd/talosctl/cmd/talos/processes.go index 6fd016f09..fd4db9e48 100644 --- a/cmd/talosctl/cmd/talos/processes.go +++ b/cmd/talosctl/cmd/talos/processes.go @@ -22,7 +22,6 @@ import ( "google.golang.org/grpc/peer" osapi "github.com/talos-systems/talos/api/os" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" ) @@ -184,7 +183,7 @@ func processesOutput(ctx context.Context, c *client.Client) (output string, err return output, nil } - defaultNode := helpers.AddrFromPeer(&remotePeer) + defaultNode := client.AddrFromPeer(&remotePeer) s := []string{} diff --git a/cmd/talosctl/cmd/talos/routes.go b/cmd/talosctl/cmd/talos/routes.go index 36fd7d0b9..000030f2c 100644 --- a/cmd/talosctl/cmd/talos/routes.go +++ b/cmd/talosctl/cmd/talos/routes.go @@ -15,7 +15,6 @@ import ( "google.golang.org/grpc/peer" networkapi "github.com/talos-systems/talos/api/network" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" ) @@ -46,7 +45,7 @@ func routesRender(remotePeer *peer.Peer, resp *networkapi.RoutesResponse) error w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tINTERFACE\tDESTINATION\tGATEWAY\tMETRIC") - defaultNode := helpers.AddrFromPeer(remotePeer) + defaultNode := client.AddrFromPeer(remotePeer) for _, msg := range resp.Messages { node := defaultNode diff --git a/cmd/talosctl/cmd/talos/service.go b/cmd/talosctl/cmd/talos/service.go index 9a7d97358..f110bab5b 100644 --- a/cmd/talosctl/cmd/talos/service.go +++ b/cmd/talosctl/cmd/talos/service.go @@ -17,7 +17,6 @@ import ( "google.golang.org/grpc/peer" machineapi "github.com/talos-systems/talos/api/machine" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" ) @@ -79,7 +78,7 @@ func serviceList(ctx context.Context, c *client.Client) error { w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tSERVICE\tSTATE\tHEALTH\tLAST CHANGE\tLAST EVENT") - defaultNode := helpers.AddrFromPeer(&remotePeer) + defaultNode := client.AddrFromPeer(&remotePeer) for _, msg := range resp.Messages { for _, s := range msg.Services { @@ -112,7 +111,7 @@ func serviceInfo(ctx context.Context, c *client.Client, id string) error { w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) - defaultNode := helpers.AddrFromPeer(&remotePeer) + defaultNode := client.AddrFromPeer(&remotePeer) for _, s := range services { node := defaultNode @@ -163,7 +162,7 @@ func serviceStart(ctx context.Context, c *client.Client, id string) error { cli.Warning("%s", err) } - defaultNode := helpers.AddrFromPeer(&remotePeer) + defaultNode := client.AddrFromPeer(&remotePeer) w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tRESPONSE") @@ -193,7 +192,7 @@ func serviceStop(ctx context.Context, c *client.Client, id string) error { cli.Warning("%s", err) } - defaultNode := helpers.AddrFromPeer(&remotePeer) + defaultNode := client.AddrFromPeer(&remotePeer) w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tRESPONSE") @@ -223,7 +222,7 @@ func serviceRestart(ctx context.Context, c *client.Client, id string) error { cli.Warning("%s", err) } - defaultNode := helpers.AddrFromPeer(&remotePeer) + defaultNode := client.AddrFromPeer(&remotePeer) w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tRESPONSE") diff --git a/cmd/talosctl/cmd/talos/stats.go b/cmd/talosctl/cmd/talos/stats.go index ea97fd01c..5fe12f545 100644 --- a/cmd/talosctl/cmd/talos/stats.go +++ b/cmd/talosctl/cmd/talos/stats.go @@ -19,7 +19,6 @@ import ( "github.com/talos-systems/talos/api/common" osapi "github.com/talos-systems/talos/api/os" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" "github.com/talos-systems/talos/pkg/constants" @@ -65,7 +64,7 @@ func statsRender(remotePeer *peer.Peer, resp *osapi.StatsResponse) error { fmt.Fprintln(w, "NODE\tNAMESPACE\tID\tMEMORY(MB)\tCPU") - defaultNode := helpers.AddrFromPeer(remotePeer) + defaultNode := client.AddrFromPeer(remotePeer) for _, msg := range resp.Messages { resp := msg diff --git a/cmd/talosctl/cmd/talos/time.go b/cmd/talosctl/cmd/talos/time.go index 74198b801..fbf7061f1 100644 --- a/cmd/talosctl/cmd/talos/time.go +++ b/cmd/talosctl/cmd/talos/time.go @@ -17,7 +17,6 @@ import ( "google.golang.org/grpc/peer" timeapi "github.com/talos-systems/talos/api/time" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" ) @@ -57,7 +56,7 @@ var timeCmd = &cobra.Command{ w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tNTP-SERVER\tLOCAL-TIME\tREMOTE-TIME") - defaultNode := helpers.AddrFromPeer(&remotePeer) + defaultNode := client.AddrFromPeer(&remotePeer) var localtime, remotetime time.Time for _, msg := range resp.Messages { diff --git a/cmd/talosctl/cmd/talos/upgrade.go b/cmd/talosctl/cmd/talos/upgrade.go index b241af963..f4f72610b 100644 --- a/cmd/talosctl/cmd/talos/upgrade.go +++ b/cmd/talosctl/cmd/talos/upgrade.go @@ -15,7 +15,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/peer" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" ) @@ -60,7 +59,7 @@ func upgrade() error { w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0) fmt.Fprintln(w, "NODE\tACK\tSTARTED") - defaultNode := helpers.AddrFromPeer(&remotePeer) + defaultNode := client.AddrFromPeer(&remotePeer) for _, msg := range resp.Messages { node := defaultNode diff --git a/cmd/talosctl/cmd/talos/version.go b/cmd/talosctl/cmd/talos/version.go index 976797c3c..61772b1f9 100644 --- a/cmd/talosctl/cmd/talos/version.go +++ b/cmd/talosctl/cmd/talos/version.go @@ -12,7 +12,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/peer" - "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/client" "github.com/talos-systems/talos/pkg/version" @@ -54,7 +53,7 @@ var versionCmd = &cobra.Command{ cli.Warning("%s", err) } - defaultNode := helpers.AddrFromPeer(&remotePeer) + defaultNode := client.AddrFromPeer(&remotePeer) for _, msg := range resp.Messages { node := defaultNode diff --git a/internal/app/machined/internal/server/v1alpha1/v1alpha1_server.go b/internal/app/machined/internal/server/v1alpha1/v1alpha1_server.go index ec3e8cff3..8e25c6a0e 100644 --- a/internal/app/machined/internal/server/v1alpha1/v1alpha1_server.go +++ b/internal/app/machined/internal/server/v1alpha1/v1alpha1_server.go @@ -689,7 +689,11 @@ func (s *Server) Events(req *machine.EventsRequest, l machine.MachineService_Eve select { case <-l.Context().Done(): return l.Context().Err() - case event := <-events: + case event, ok := <-events: + if !ok { + return nil + } + msg, err := event.ToMachineEvent() if err != nil { return err diff --git a/pkg/client/client.go b/pkg/client/client.go index d4f10c3bb..3138b8923 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -14,8 +14,11 @@ import ( "errors" "fmt" "io" + "log" "strings" + "sync" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -743,3 +746,81 @@ func ReadStream(stream MachineStream) (io.ReadCloser, <-chan error, error) { func (c *Client) Events(ctx context.Context) (stream machineapi.MachineService_EventsClient, err error) { return c.MachineClient.Events(ctx, &machineapi.EventsRequest{}) } + +// Event as received from the API. +type Event struct { + Node string + TypeURL string + Payload proto.Message +} + +// EventsWatch wraps Events by providing more simple interface. +// +//nolint: gocyclo +func (c *Client) EventsWatch(ctx context.Context, watchFunc func(<-chan Event)) error { + stream, err := c.Events(ctx) + if err != nil { + return fmt.Errorf("error fetching events: %s", err) + } + + defaultNode := RemotePeer(stream.Context()) + + var wg sync.WaitGroup + + defer wg.Wait() + + ch := make(chan Event) + defer close(ch) + + wg.Add(1) + + go func() { + defer wg.Done() + + watchFunc(ch) + }() + + for { + event, err := stream.Recv() + if err != nil { + if err == io.EOF || status.Code(err) == codes.Canceled { + return nil + } + + return fmt.Errorf("failed to watch events: %w", err) + } + + typeURL := event.GetData().GetTypeUrl() + + var msg proto.Message + + switch typeURL { + case "talos/runtime/" + proto.MessageName(&machineapi.SequenceEvent{}): + msg = &machineapi.SequenceEvent{} + + if err = proto.Unmarshal(event.GetData().GetValue(), msg); err != nil { + log.Printf("failed to unmarshal message: %v", err) // TODO: this should be fixed to return errors + continue + } + default: + // We haven't implemented the handling of this event yet. + continue + } + + ev := Event{ + Node: defaultNode, + TypeURL: typeURL, + Payload: msg, + } + + if event.Metadata != nil { + ev.Node = event.Metadata.Hostname + } + + select { + case ch <- ev: + case <-ctx.Done(): + return nil + } + } +} diff --git a/cmd/talosctl/pkg/talos/helpers/peer.go b/pkg/client/peer.go similarity index 97% rename from cmd/talosctl/pkg/talos/helpers/peer.go rename to pkg/client/peer.go index eeedc98f8..9b58c92f0 100644 --- a/cmd/talosctl/pkg/talos/helpers/peer.go +++ b/pkg/client/peer.go @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -package helpers +package client import ( "context"