feat(osd): Enable hitting multiple OSD endpoints

This enables the ability to specify additional <talos> endpoints to connect to
to pull back data.

Signed-off-by: Brad Beam <brad.beam@talos-systems.com>
This commit is contained in:
Brad Beam 2019-10-08 22:40:33 -05:00 committed by Brad Beam
parent d0111fe617
commit e6bf92ce31
14 changed files with 1320 additions and 571 deletions

View File

@ -24,6 +24,7 @@ RUN curl -sfL https://github.com/uber/prototool/releases/download/v1.8.0/prototo
FROM scratch AS build
COPY --from=tools / /
COPY --from=autonomy/protoc-gen-proxy:a87401e /protoc-gen-proxy /toolchain/bin/protoc-gen-proxy
SHELL ["/toolchain/bin/bash", "-c"]
ENV PATH /toolchain/bin:/toolchain/go/bin
ENV GO111MODULE on
@ -36,7 +37,8 @@ WORKDIR /src
FROM build AS generate-build
WORKDIR /osd
COPY ./api/os ./proto
RUN protoc -I./proto --go_out=plugins=grpc:proto proto/api.proto
# Generate additional grpc functionality only for OSD
RUN protoc -I./proto --plugin=proxy --proxy_out=plugins=grpc+proxy:proto proto/api.proto
WORKDIR /trustd
COPY ./api/security ./proto
RUN protoc -I./proto --go_out=plugins=grpc:proto proto/api.proto

File diff suppressed because it is too large Load Diff

View File

@ -13,30 +13,34 @@ import "google/protobuf/empty.proto";
//
// OS Service also implements all the API of Init Service
service OS {
rpc Dmesg(google.protobuf.Empty) returns (Data);
rpc Kubeconfig(google.protobuf.Empty) returns (Data);
rpc Logs(LogsRequest) returns (stream Data);
rpc Containers(ContainersRequest) returns (ContainersReply);
rpc Dmesg(google.protobuf.Empty) returns (DataReply);
rpc Kubeconfig(google.protobuf.Empty) returns (DataReply);
rpc Logs(LogsRequest) returns (stream Data);
rpc Processes(google.protobuf.Empty) returns (ProcessesReply);
rpc Restart(RestartRequest) returns (RestartReply);
rpc Stats(StatsRequest) returns (StatsReply);
rpc Processes(google.protobuf.Empty) returns (ProcessesReply);
}
// Common metadata message nested in all reply message types
message NodeMetadata {
string hostname = 1;
}
// rpc Containers
// The request message containing the containerd namespace.
enum ContainerDriver {
CONTAINERD = 0;
CRI = 1;
}
// The request message containing the containerd namespace.
message ContainersRequest {
string namespace = 1;
// driver might be default "containerd" or "cri"
ContainerDriver driver = 2;
}
// The response message containing the requested containers.
message ContainersReply { repeated Container containers = 1; }
// The response message containing the requested containers.
message Container {
string namespace = 1;
@ -48,36 +52,34 @@ message Container {
string name = 7;
}
// The request message containing the containerd namespace.
message StatsRequest {
string namespace = 1;
// driver might be default "containerd" or "cri"
ContainerDriver driver = 2;
// The response message containing the requested containers.
message ContainerResponse {
NodeMetadata metadata = 1;
repeated Container containers = 2;
}
// The response message containing the requested stats.
message StatsReply { repeated Stat stats = 1; }
// The response message containing the requested stat.
message Stat {
string namespace = 1;
string id = 2;
uint64 memory_usage = 4;
uint64 cpu_usage = 5;
string pod_id = 6;
string name = 7;
message ContainersReply {
repeated ContainerResponse response = 1;
}
// The request message containing the process to restart.
message RestartRequest {
string namespace = 1;
string id = 2;
// driver might be default "containerd" or "cri"
ContainerDriver driver = 3;
// rpc dmesg
// rpc kubeconfig
// The response message containing the requested logs.
message Data {
bytes bytes = 1;
}
// The response message containing the restart status.
message RestartReply {}
message DataResponse {
NodeMetadata metadata = 1;
Data bytes = 2;
}
message DataReply {
repeated DataResponse response = 1;
}
// rpc logs
// The request message containing the process name.
message LogsRequest {
@ -87,12 +89,17 @@ message LogsRequest {
ContainerDriver driver = 3;
}
// The response message containing the requested logs.
message Data { bytes bytes = 1; }
// rpc processes
message ProcessesRequest {}
message ProcessesReply { repeated Process processes = 1; }
message ProcessesReply {
repeated ProcessResponse response = 1;
}
message ProcessResponse {
NodeMetadata metadata = 1;
repeated Process processes = 2;
}
message Process {
int32 pid = 1;
@ -106,3 +113,51 @@ message Process {
string executable = 9;
string args = 10;
}
// rpc restart
// The request message containing the process to restart.
message RestartRequest {
string namespace = 1;
string id = 2;
// driver might be default "containerd" or "cri"
ContainerDriver driver = 3;
}
message RestartResponse {
NodeMetadata metadata = 1;
}
// The response message containing the restart status.
message RestartReply {
repeated RestartResponse response = 1;
}
// rpc stats
// The request message containing the containerd namespace.
message StatsRequest {
string namespace = 1;
// driver might be default "containerd" or "cri"
ContainerDriver driver = 2;
}
// The response message containing the requested stats.
message StatsResponse {
NodeMetadata metadata = 1;
repeated Stat stats = 2;
}
message StatsReply {
repeated StatsResponse response = 1;
}
// The response message containing the requested stat.
message Stat {
string namespace = 1;
string id = 2;
uint64 memory_usage = 4;
uint64 cpu_usage = 5;
string pod_id = 6;
string name = 7;
}

View File

@ -42,11 +42,6 @@ var configTargetCmd = &cobra.Command{
Short: "Set the target for the current context",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
helpers.Should(cmd.Usage())
os.Exit(1)
}
target = args[0]
c, err := config.Open(talosconfig)
if err != nil {
helpers.Fatalf("error reading config: %s", err)
@ -54,7 +49,8 @@ var configTargetCmd = &cobra.Command{
if c.Context == "" {
helpers.Fatalf("no context is set")
}
c.Contexts[c.Context].Target = target
c.Contexts[c.Context].Target = args[0]
if err := c.Save(talosconfig); err != nil {
helpers.Fatalf("error writing config: %s", err)
}

View File

@ -14,6 +14,7 @@ import (
criconstants "github.com/containerd/cri/pkg/constants"
"github.com/spf13/cobra"
"google.golang.org/grpc/metadata"
osapi "github.com/talos-systems/talos/api/os"
"github.com/talos-systems/talos/cmd/osctl/pkg/client"
@ -44,7 +45,11 @@ var containersCmd = &cobra.Command{
if useCRI {
driver = osapi.ContainerDriver_CRI
}
reply, err := c.Containers(globalCtx, namespace, driver)
md := metadata.New(make(map[string]string))
md.Set("targets", target...)
reply, err := c.Containers(metadata.NewOutgoingContext(globalCtx, md), namespace, driver)
if err != nil {
helpers.Fatalf("error getting process list: %s", err)
}
@ -55,22 +60,25 @@ var containersCmd = &cobra.Command{
}
func containerRender(reply *osapi.ContainersReply) {
sort.Slice(reply.Containers,
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0)
fmt.Fprintln(w, "NODE\tNAMESPACE\tID\tIMAGE\tPID\tSTATUS")
for _, rep := range reply.Response {
resp := rep
sort.Slice(resp.Containers,
func(i, j int) bool {
return strings.Compare(reply.Containers[i].Id, reply.Containers[j].Id) < 0
return strings.Compare(resp.Containers[i].Id, resp.Containers[j].Id) < 0
})
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0)
fmt.Fprintln(w, "NAMESPACE\tID\tIMAGE\tPID\tSTATUS")
for _, p := range reply.Containers {
for _, p := range resp.Containers {
display := p.Id
if p.Id != p.PodId {
// container in a sandbox
display = "└─ " + display
}
fmt.Fprintf(w, "%s\t%s\t%s\t%d\t%s\n", p.Namespace, display, p.Image, p.Pid, p.Status)
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\t%s\n", resp.Metadata.Hostname, p.Namespace, display, p.Image, p.Pid, p.Status)
}
}
helpers.Should(w.Flush())

View File

@ -5,9 +5,11 @@
package cmd
import (
"fmt"
"os"
"github.com/spf13/cobra"
"google.golang.org/grpc/metadata"
"github.com/talos-systems/talos/cmd/osctl/pkg/client"
"github.com/talos-systems/talos/cmd/osctl/pkg/helpers"
@ -25,13 +27,20 @@ var dmesgCmd = &cobra.Command{
}
setupClient(func(c *client.Client) {
msg, err := c.Dmesg(globalCtx)
md := metadata.New(make(map[string]string))
md.Set("targets", target...)
reply, err := c.Dmesg(metadata.NewOutgoingContext(globalCtx, md))
if err != nil {
helpers.Fatalf("error getting dmesg: %s", err)
}
_, err = os.Stdout.Write(msg)
for _, resp := range reply.Response {
if len(reply.Response) > 1 {
fmt.Println(resp.Metadata.Hostname)
}
_, err = os.Stdout.Write(resp.Bytes.Bytes)
helpers.Should(err)
}
})
},
}

View File

@ -6,9 +6,11 @@
package cmd
import (
"fmt"
"os"
"github.com/spf13/cobra"
"google.golang.org/grpc/metadata"
"github.com/talos-systems/talos/cmd/osctl/pkg/client"
"github.com/talos-systems/talos/cmd/osctl/pkg/helpers"
@ -26,12 +28,21 @@ var kubeconfigCmd = &cobra.Command{
}
setupClient(func(c *client.Client) {
kubeconfig, err := c.Kubeconfig(globalCtx)
md := metadata.New(make(map[string]string))
md.Set("targets", target...)
reply, err := c.Kubeconfig(metadata.NewOutgoingContext(globalCtx, md))
if err != nil {
helpers.Fatalf("error fetching kubeconfig: %s", err)
}
_, err = os.Stdout.Write(kubeconfig)
for _, resp := range reply.Response {
if len(reply.Response) > 1 {
fmt.Println(resp.Metadata.Hostname)
}
_, err = os.Stdout.Write(resp.Bytes.Bytes)
helpers.Should(err)
}
})
},
}

View File

@ -20,6 +20,7 @@ import (
"github.com/ryanuber/columnize"
"github.com/spf13/cobra"
"golang.org/x/crypto/ssh/terminal"
"google.golang.org/grpc/metadata"
osapi "github.com/talos-systems/talos/api/os"
"github.com/talos-systems/talos/cmd/osctl/pkg/client"
@ -44,17 +45,26 @@ var processesCmd = &cobra.Command{
setupClient(func(c *client.Client) {
var err error
md := metadata.New(make(map[string]string))
md.Set("targets", target...)
switch {
case watchProcesses:
// Only allow single node view refresh..
// No hard limitiation that I can think of to prevent aggregating all nodes
if len(target) > 1 {
md.Set("targets", target[0])
}
if err = ui.Init(); err != nil {
log.Fatalf("failed to initialize termui: %v", err)
}
defer ui.Close()
processesUI(globalCtx, c)
processesUI(metadata.NewOutgoingContext(globalCtx, md), c)
default:
var output string
output, err = processesOutput(globalCtx, c)
output, err = processesOutput(metadata.NewOutgoingContext(globalCtx, md), c)
helpers.Should(err)
// Note this is unlimited output of process lines
// we arent artificially limited by the box we would otherwise draw
@ -178,7 +188,12 @@ func processesOutput(ctx context.Context, c *client.Client) (output string, err
return output, nil
}
procs := reply.Processes
s := []string{}
s = append(s, "NODE | PID | STATE | THREADS | CPU-TIME | VIRTMEM | RESMEM | COMMAND")
for _, resp := range reply.Response {
procs := resp.Processes
switch sortMethod {
case "cpu":
@ -187,9 +202,6 @@ func processesOutput(ctx context.Context, c *client.Client) (output string, err
by(rss).sort(procs)
}
s := make([]string, 0, len(procs))
s = append(s, "PID | STATE | THREADS | CPU-TIME | VIRTMEM | RESMEM | COMMAND")
var args string
for _, p := range procs {
@ -203,8 +215,9 @@ func processesOutput(ctx context.Context, c *client.Client) (output string, err
}
s = append(s,
fmt.Sprintf("%6d | %1s | %4d | %8.2f | %7s | %7s | %s",
p.Pid, p.State, p.Threads, p.CpuTime, bytefmt.ByteSize(p.VirtualMemory), bytefmt.ByteSize(p.ResidentMemory), args))
fmt.Sprintf("%12s | %6d | %1s | %4d | %8.2f | %7s | %7s | %s",
resp.Metadata.Hostname, p.Pid, p.State, p.Threads, p.CpuTime, bytefmt.ByteSize(p.VirtualMemory), bytefmt.ByteSize(p.ResidentMemory), args))
}
}
return columnize.SimpleFormat(s), err

View File

@ -34,7 +34,7 @@ var (
organization string
rsa bool
talosconfig string
target string
target []string
)
// rootCmd represents the base command when called without any subcommands
@ -96,7 +96,7 @@ func Execute() {
}
rootCmd.PersistentFlags().StringVar(&talosconfig, "talosconfig", defaultTalosConfig, "The path to the Talos configuration file")
rootCmd.PersistentFlags().StringVarP(&target, "target", "t", "", "target the specificed node")
rootCmd.PersistentFlags().StringSliceVarP(&target, "target", "t", []string{}, "target the specificed node")
if err := rootCmd.Execute(); err != nil {
helpers.Fatalf("%s", err)
@ -110,10 +110,6 @@ func setupClient(action func(*client.Client)) {
helpers.Fatalf("error getting client credentials: %s", err)
}
if target != "" {
t = target
}
c, err := client.NewClient(creds, t, constants.OsdPort)
if err != nil {
helpers.Fatalf("error constructing client: %s", err)

View File

@ -14,6 +14,7 @@ import (
criconstants "github.com/containerd/cri/pkg/constants"
"github.com/spf13/cobra"
"google.golang.org/grpc/metadata"
osapi "github.com/talos-systems/talos/api/os"
"github.com/talos-systems/talos/cmd/osctl/pkg/client"
@ -43,7 +44,9 @@ var statsCmd = &cobra.Command{
if useCRI {
driver = osapi.ContainerDriver_CRI
}
reply, err := c.Stats(globalCtx, namespace, driver)
md := metadata.New(make(map[string]string))
md.Set("targets", target...)
reply, err := c.Stats(metadata.NewOutgoingContext(globalCtx, md), namespace, driver)
if err != nil {
helpers.Fatalf("error getting stats: %s", err)
}
@ -54,22 +57,26 @@ var statsCmd = &cobra.Command{
}
func statsRender(reply *osapi.StatsReply) {
sort.Slice(reply.Stats,
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0)
fmt.Fprintln(w, "NODE\tNAMESPACE\tID\tMEMORY(MB)\tCPU")
for _, rep := range reply.Response {
resp := rep
sort.Slice(resp.Stats,
func(i, j int) bool {
return strings.Compare(reply.Stats[i].Id, reply.Stats[j].Id) < 0
return strings.Compare(resp.Stats[i].Id, resp.Stats[j].Id) < 0
})
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0)
fmt.Fprintln(w, "NAMESPACE\tID\tMEMORY(MB)\tCPU")
for _, s := range reply.Stats {
for _, s := range resp.Stats {
display := s.Id
if s.Id != s.PodId {
// container in a sandbox
display = "└─ " + display
}
fmt.Fprintf(w, "%s\t%s\t%.2f\t%d\n", s.Namespace, display, float64(s.MemoryUsage)*1e-6, s.CpuUsage)
fmt.Fprintf(w, "%s\t%s\t%s\t%.2f\t%d\n", resp.Metadata.Hostname, s.Namespace, display, float64(s.MemoryUsage)*1e-6, s.CpuUsage)
}
}
helpers.Should(w.Flush())

View File

@ -145,13 +145,8 @@ func (c *Client) Close() error {
}
// Kubeconfig implements the proto.OSClient interface.
func (c *Client) Kubeconfig(ctx context.Context) ([]byte, error) {
r, err := c.client.Kubeconfig(ctx, &empty.Empty{})
if err != nil {
return nil, err
}
return r.Bytes, nil
func (c *Client) Kubeconfig(ctx context.Context) (*osapi.DataReply, error) {
return c.client.Kubeconfig(ctx, &empty.Empty{})
}
// Stats implements the proto.OSClient interface.
@ -204,13 +199,8 @@ func (c *Client) Shutdown(ctx context.Context) (err error) {
}
// Dmesg implements the proto.OSClient interface.
func (c *Client) Dmesg(ctx context.Context) ([]byte, error) {
data, err := c.client.Dmesg(ctx, &empty.Empty{})
if err != nil {
return nil, err
}
return data.Bytes, nil
func (c *Client) Dmesg(ctx context.Context) (*osapi.DataReply, error) {
return c.client.Dmesg(ctx, &empty.Empty{})
}
// Logs implements the proto.OSClient interface.

View File

@ -91,6 +91,7 @@ run "kubectl wait --timeout=${TIMEOUT}s --for=condition=ready=true pod -l k8s-ap
# Wait for DNS addon to report ready
run "kubectl wait --timeout=${TIMEOUT}s --for=condition=ready=true pod -l k8s-app=kube-dns -n kube-system"
run "osctl -t 10.5.0.2 service etcd | grep Running"
run "osctl -t 10.5.0.3 service etcd | grep Running"
run "osctl -t 10.5.0.4 service etcd | grep Running"
run "osctl config target 10.5.0.2 && osctl -t 10.5.0.2 service etcd | grep Running"
run "osctl config target 10.5.0.3 && osctl -t 10.5.0.3 service etcd | grep Running"
run "osctl config target 10.5.0.4 && osctl -t 10.5.0.4 service etcd | grep Running"
run "osctl --target 10.5.0.2,10.5.0.3,10.5.0.4,10.5.0.5 containers | grep osd"

View File

@ -52,14 +52,18 @@ func (r *Registrator) Register(s *grpc.Server) {
}
// Kubeconfig implements the osapi.OSDServer interface.
func (r *Registrator) Kubeconfig(ctx context.Context, in *empty.Empty) (data *osapi.Data, err error) {
func (r *Registrator) Kubeconfig(ctx context.Context, in *empty.Empty) (data *osapi.DataReply, err error) {
fileBytes, err := ioutil.ReadFile(constants.AdminKubeconfig)
if err != nil {
return
}
data = &osapi.Data{
Bytes: fileBytes,
data = &osapi.DataReply{
Response: []*osapi.DataResponse{
{
Bytes: &osapi.Data{Bytes: fileBytes},
},
},
}
return data, err
@ -101,7 +105,15 @@ func (r *Registrator) Containers(ctx context.Context, in *osapi.ContainersReques
}
}
return &osapi.ContainersReply{Containers: containers}, nil
reply = &osapi.ContainersReply{
Response: []*osapi.ContainerResponse{
{
Containers: containers,
},
},
}
return reply, nil
}
// Stats implements the osapi.OSDServer interface.
@ -145,7 +157,13 @@ func (r *Registrator) Stats(ctx context.Context, in *osapi.StatsRequest) (reply
}
}
reply = &osapi.StatsReply{Stats: stats}
reply = &osapi.StatsReply{
Response: []*osapi.StatsResponse{
{
Stats: stats,
},
},
}
return reply, nil
}
@ -180,7 +198,7 @@ func (r *Registrator) Restart(ctx context.Context, in *osapi.RestartRequest) (*o
// to read from the ring buffer at /proc/kmsg by taking the
// SYSLOG_ACTION_READ_ALL action. This action reads all messages remaining in
// the ring buffer non-destructively.
func (r *Registrator) Dmesg(ctx context.Context, in *empty.Empty) (data *osapi.Data, err error) {
func (r *Registrator) Dmesg(ctx context.Context, in *empty.Empty) (data *osapi.DataReply, err error) {
// Return the size of the kernel ring buffer
size, err := unix.Klogctl(constants.SYSLOG_ACTION_SIZE_BUFFER, nil)
if err != nil {
@ -194,7 +212,13 @@ func (r *Registrator) Dmesg(ctx context.Context, in *empty.Empty) (data *osapi.D
return
}
data = &osapi.Data{Bytes: buf[:n]}
data = &osapi.DataReply{
Response: []*osapi.DataResponse{
{
Bytes: &osapi.Data{Bytes: buf[:n]},
},
},
}
return data, err
}
@ -296,7 +320,13 @@ func (r *Registrator) Processes(ctx context.Context, in *empty.Empty) (reply *os
processes = append(processes, p)
}
reply = &osapi.ProcessesReply{Processes: processes}
reply = &osapi.ProcessesReply{
Response: []*osapi.ProcessResponse{
{
Processes: processes,
},
},
}
return reply, nil
}

View File

@ -14,6 +14,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
osapi "github.com/talos-systems/talos/api/os"
"github.com/talos-systems/talos/internal/app/osd/internal/reg"
"github.com/talos-systems/talos/pkg/config"
"github.com/talos-systems/talos/pkg/constants"
@ -43,35 +44,7 @@ func main() {
log.Fatalf("failed to seed RNG: %v", err)
}
content, err := config.FromFile(*configPath)
if err != nil {
log.Fatalf("open config: %v", err)
}
config, err := config.New(content)
if err != nil {
log.Fatalf("open config: %v", err)
}
ips, err := net.IPAddrs()
if err != nil {
log.Fatalf("failed to discover IP addresses: %+v", err)
}
// TODO(andrewrynhard): Allow for DNS names.
for _, san := range config.Machine().Security().CertSANs() {
if ip := stdlibnet.ParseIP(san); ip != nil {
ips = append(ips, ip)
}
}
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("failed to discover hostname: %+v", err)
}
var provider tls.CertificateProvider
provider, err = tls.NewRemoteRenewingFileCertificateProvider(config.Machine().Security().Token(), strings.Split(*endpoints, ","), constants.TrustdPort, hostname, ips)
provider, err := createProvider()
if err != nil {
log.Fatalf("failed to create remote certificate provider: %+v", err)
}
@ -105,6 +78,13 @@ func main() {
log.Fatalf("networkd client: %v", err)
}
interceptorProvider, err := createProvider()
if err != nil {
log.Fatalf("failed to create remote certificate provider for interceptor: %+v", err)
}
protoProxy := osapi.NewOSProxy(interceptorProvider)
err = factory.ListenAndServe(
&reg.Registrator{
MachineClient: machineClient,
@ -116,9 +96,40 @@ func main() {
grpc.Creds(
credentials.NewTLS(tlsConfig),
),
grpc.UnaryInterceptor(protoProxy.UnaryInterceptor()),
),
)
if err != nil {
log.Fatalf("listen: %v", err)
}
}
func createProvider() (tls.CertificateProvider, error) {
content, err := config.FromFile(*configPath)
if err != nil {
log.Fatalf("open config: %v", err)
}
config, err := config.New(content)
if err != nil {
log.Fatalf("open config: %v", err)
}
ips, err := net.IPAddrs()
if err != nil {
log.Fatalf("failed to discover IP addresses: %+v", err)
}
// TODO(andrewrynhard): Allow for DNS names.
for _, san := range config.Machine().Security().CertSANs() {
if ip := stdlibnet.ParseIP(san); ip != nil {
ips = append(ips, ip)
}
}
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("failed to discover hostname: %+v", err)
}
return tls.NewRemoteRenewingFileCertificateProvider(config.Machine().Security().Token(), strings.Split(*endpoints, ","), constants.TrustdPort, hostname, ips)
}