mirror of
https://github.com/siderolabs/talos.git
synced 2025-10-22 04:51:14 +02:00
This replaces codegen version of apid proxying with talos-systems/grpc-proxy based version. Proxying is transparent, it doesn't require exact information about methods and response types. It requires some common layout response to enhance it properly with node metadata or errors. There should be no signifcant changes to the API with the previous version, but it's worth mentioning a few changes: 1. grpc.ClientConn is established just once per upstream (either local service or remote apid instance). 2. When called without `-t` (`targets`), apid proxies immediately down to local service skipping proxying to itself (as before), which results in empty node metadata in response (before it had local node IP). Might revert this later to proxy to itself (?). 3. Streaming APIs are now fully supported with multiple targets, but message definition doesn't contain `ResponseMetadata`, so streaming APIs are broken now with targets (needs a fix). 4. Errors are now returned as responses with `Error` field set in `ResponseMetadata`, this requires client library update and `osctl` to handle it properly. Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
415 lines
11 KiB
Go
415 lines
11 KiB
Go
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package client
|
|
|
|
import (
|
|
"archive/tar"
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/base64"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/golang/protobuf/ptypes/empty"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"github.com/talos-systems/talos/api/common"
|
|
machineapi "github.com/talos-systems/talos/api/machine"
|
|
networkapi "github.com/talos-systems/talos/api/network"
|
|
osapi "github.com/talos-systems/talos/api/os"
|
|
timeapi "github.com/talos-systems/talos/api/time"
|
|
"github.com/talos-systems/talos/cmd/osctl/pkg/client/config"
|
|
"github.com/talos-systems/talos/pkg/net"
|
|
)
|
|
|
|
// Credentials represents the set of values required to initialize a vaild
|
|
// Client.
|
|
type Credentials struct {
|
|
ca []byte
|
|
crt []byte
|
|
key []byte
|
|
}
|
|
|
|
// Client implements the proto.OSClient interface. It serves as the
|
|
// concrete type with the required methods.
|
|
type Client struct {
|
|
conn *grpc.ClientConn
|
|
client osapi.OSClient
|
|
MachineClient machineapi.MachineClient
|
|
TimeClient timeapi.TimeClient
|
|
NetworkClient networkapi.NetworkClient
|
|
}
|
|
|
|
// NewClientTargetAndCredentialsFromConfig initializes ClientCredentials using default paths
|
|
// to the required CA, certificate, and key.
|
|
func NewClientTargetAndCredentialsFromConfig(p string, ctx string) (target string, creds *Credentials, err error) {
|
|
c, err := config.Open(p)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if ctx != "" {
|
|
c.Context = ctx
|
|
}
|
|
|
|
if c.Context == "" {
|
|
return "", nil, fmt.Errorf("'context' key is not set in the config")
|
|
}
|
|
|
|
context := c.Contexts[c.Context]
|
|
if context == nil {
|
|
return "", nil, fmt.Errorf("context %q is not defined in 'contexts' key in config", c.Context)
|
|
}
|
|
|
|
caBytes, err := base64.StdEncoding.DecodeString(context.CA)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("error decoding CA: %w", err)
|
|
}
|
|
|
|
crtBytes, err := base64.StdEncoding.DecodeString(context.Crt)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("error decoding certificate: %w", err)
|
|
}
|
|
|
|
keyBytes, err := base64.StdEncoding.DecodeString(context.Key)
|
|
if err != nil {
|
|
return "", nil, fmt.Errorf("error decoding key: %w", err)
|
|
}
|
|
|
|
creds = &Credentials{
|
|
ca: caBytes,
|
|
crt: crtBytes,
|
|
key: keyBytes,
|
|
}
|
|
|
|
return context.Target, creds, nil
|
|
}
|
|
|
|
// NewClientCredentials initializes ClientCredentials using default paths
|
|
// to the required CA, certificate, and key.
|
|
func NewClientCredentials(ca, crt, key []byte) (creds *Credentials) {
|
|
creds = &Credentials{
|
|
ca: ca,
|
|
crt: crt,
|
|
key: key,
|
|
}
|
|
|
|
return creds
|
|
}
|
|
|
|
// NewClient initializes a Client.
|
|
func NewClient(creds *Credentials, target string, port int) (c *Client, err error) {
|
|
grpcOpts := []grpc.DialOption{}
|
|
|
|
c = &Client{}
|
|
|
|
crt, err := tls.X509KeyPair(creds.crt, creds.key)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not load client key pair: %s", err)
|
|
}
|
|
|
|
certPool := x509.NewCertPool()
|
|
if ok := certPool.AppendCertsFromPEM(creds.ca); !ok {
|
|
return nil, fmt.Errorf("failed to append client certs")
|
|
}
|
|
|
|
// TODO(andrewrynhard): Do not parse the address. Pass the IP and port in as separate
|
|
// parameters.
|
|
transportCreds := credentials.NewTLS(&tls.Config{
|
|
ServerName: target,
|
|
Certificates: []tls.Certificate{crt},
|
|
// Set the root certificate authorities to use the self-signed
|
|
// certificate.
|
|
RootCAs: certPool,
|
|
})
|
|
|
|
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(transportCreds))
|
|
|
|
c.conn, err = grpc.Dial(fmt.Sprintf("%s:%d", net.FormatAddress(target), port), grpcOpts...)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
c.client = osapi.NewOSClient(c.conn)
|
|
c.MachineClient = machineapi.NewMachineClient(c.conn)
|
|
c.TimeClient = timeapi.NewTimeClient(c.conn)
|
|
c.NetworkClient = networkapi.NewNetworkClient(c.conn)
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// Close shuts down client protocol
|
|
func (c *Client) Close() error {
|
|
return c.conn.Close()
|
|
}
|
|
|
|
// KubeconfigRaw returns K8s client config (kubeconfig).
|
|
func (c *Client) KubeconfigRaw(ctx context.Context) (io.Reader, <-chan error, error) {
|
|
stream, err := c.MachineClient.Kubeconfig(ctx, &empty.Empty{})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return readStream(stream)
|
|
}
|
|
|
|
// Kubeconfig returns K8s client config (kubeconfig).
|
|
func (c *Client) Kubeconfig(ctx context.Context) ([]byte, error) {
|
|
r, errCh, err := c.KubeconfigRaw(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
gzR, err := gzip.NewReader(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// returned .tar.gz should contain only single file (kubeconfig)
|
|
var kubeconfigBuf bytes.Buffer
|
|
|
|
tar := tar.NewReader(gzR)
|
|
|
|
for {
|
|
_, err = tar.Next()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = io.Copy(&kubeconfigBuf, tar)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if err = gzR.Close(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = <-errCh; err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return kubeconfigBuf.Bytes(), nil
|
|
}
|
|
|
|
// Stats implements the proto.OSClient interface.
|
|
func (c *Client) Stats(ctx context.Context, namespace string, driver common.ContainerDriver) (reply *osapi.StatsReply, err error) {
|
|
reply, err = c.client.Stats(ctx, &osapi.StatsRequest{
|
|
Namespace: namespace,
|
|
Driver: driver,
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
// Containers implements the proto.OSClient interface.
|
|
func (c *Client) Containers(ctx context.Context, namespace string, driver common.ContainerDriver) (reply *osapi.ContainersReply, err error) {
|
|
reply, err = c.client.Containers(ctx, &osapi.ContainersRequest{
|
|
Namespace: namespace,
|
|
Driver: driver,
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
// Restart implements the proto.OSClient interface.
|
|
func (c *Client) Restart(ctx context.Context, namespace string, driver common.ContainerDriver, id string) (err error) {
|
|
_, err = c.client.Restart(ctx, &osapi.RestartRequest{
|
|
Id: id,
|
|
Namespace: namespace,
|
|
Driver: driver,
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
// Reset implements the proto.OSClient interface.
|
|
func (c *Client) Reset(ctx context.Context) (err error) {
|
|
_, err = c.MachineClient.Reset(ctx, &empty.Empty{})
|
|
return
|
|
}
|
|
|
|
// Reboot implements the proto.OSClient interface.
|
|
func (c *Client) Reboot(ctx context.Context) (err error) {
|
|
_, err = c.MachineClient.Reboot(ctx, &empty.Empty{})
|
|
return
|
|
}
|
|
|
|
// Shutdown implements the proto.OSClient interface.
|
|
func (c *Client) Shutdown(ctx context.Context) (err error) {
|
|
_, err = c.MachineClient.Shutdown(ctx, &empty.Empty{})
|
|
return
|
|
}
|
|
|
|
// Dmesg implements the proto.OSClient interface.
|
|
func (c *Client) Dmesg(ctx context.Context) (*common.DataReply, error) {
|
|
return c.client.Dmesg(ctx, &empty.Empty{})
|
|
}
|
|
|
|
// Logs implements the proto.OSClient interface.
|
|
func (c *Client) Logs(ctx context.Context, namespace string, driver common.ContainerDriver, id string) (stream machineapi.Machine_LogsClient, err error) {
|
|
stream, err = c.MachineClient.Logs(ctx, &machineapi.LogsRequest{
|
|
Namespace: namespace,
|
|
Driver: driver,
|
|
Id: id,
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
// Version implements the proto.OSClient interface.
|
|
func (c *Client) Version(ctx context.Context) (*machineapi.VersionReply, error) {
|
|
return c.MachineClient.Version(ctx, &empty.Empty{})
|
|
}
|
|
|
|
// Routes implements the networkdproto.NetworkClient interface.
|
|
func (c *Client) Routes(ctx context.Context) (*networkapi.RoutesReply, error) {
|
|
return c.NetworkClient.Routes(ctx, &empty.Empty{})
|
|
}
|
|
|
|
// Interfaces implements the proto.OSClient interface.
|
|
func (c *Client) Interfaces(ctx context.Context) (*networkapi.InterfacesReply, error) {
|
|
return c.NetworkClient.Interfaces(ctx, &empty.Empty{})
|
|
}
|
|
|
|
// Processes implements the proto.OSClient interface.
|
|
func (c *Client) Processes(ctx context.Context) (reply *osapi.ProcessesReply, err error) {
|
|
return c.client.Processes(ctx, &empty.Empty{})
|
|
}
|
|
|
|
// Memory implements the proto.OSClient interface.
|
|
func (c *Client) Memory(ctx context.Context) (*osapi.MemInfoReply, error) {
|
|
return c.client.Memory(ctx, &empty.Empty{})
|
|
}
|
|
|
|
// Mounts implements the proto.OSClient interface.
|
|
func (c *Client) Mounts(ctx context.Context) (*machineapi.MountsReply, error) {
|
|
return c.MachineClient.Mounts(ctx, &empty.Empty{})
|
|
}
|
|
|
|
// LS implements the proto.OSClient interface.
|
|
func (c *Client) LS(ctx context.Context, req machineapi.LSRequest) (stream machineapi.Machine_LSClient, err error) {
|
|
return c.MachineClient.LS(ctx, &req)
|
|
}
|
|
|
|
// CopyOut implements the proto.OSClient interface
|
|
func (c *Client) CopyOut(ctx context.Context, rootPath string) (io.Reader, <-chan error, error) {
|
|
stream, err := c.MachineClient.CopyOut(ctx, &machineapi.CopyOutRequest{
|
|
RootPath: rootPath,
|
|
})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return readStream(stream)
|
|
}
|
|
|
|
// Upgrade initiates a Talos upgrade ... and implements the proto.OSClient
|
|
// interface
|
|
func (c *Client) Upgrade(ctx context.Context, image string) (*machineapi.UpgradeReply, error) {
|
|
return c.MachineClient.Upgrade(ctx, &machineapi.UpgradeRequest{Image: image})
|
|
}
|
|
|
|
// ServiceList returns list of services with their state
|
|
func (c *Client) ServiceList(ctx context.Context) (*machineapi.ServiceListReply, error) {
|
|
return c.MachineClient.ServiceList(ctx, &empty.Empty{})
|
|
}
|
|
|
|
// ServiceInfo returns info about a single service
|
|
//
|
|
// This is implemented via service list API, as we don't have many services
|
|
// If service with given id is not registered, function returns nil
|
|
func (c *Client) ServiceInfo(ctx context.Context, id string) (*machineapi.ServiceListReply, error) {
|
|
return c.MachineClient.ServiceList(ctx, &empty.Empty{})
|
|
}
|
|
|
|
// ServiceStart starts a service.
|
|
func (c *Client) ServiceStart(ctx context.Context, id string) (*machineapi.ServiceStartReply, error) {
|
|
return c.MachineClient.ServiceStart(ctx, &machineapi.ServiceStartRequest{Id: id})
|
|
}
|
|
|
|
// ServiceStop stops a service.
|
|
func (c *Client) ServiceStop(ctx context.Context, id string) (*machineapi.ServiceStopReply, error) {
|
|
return c.MachineClient.ServiceStop(ctx, &machineapi.ServiceStopRequest{Id: id})
|
|
}
|
|
|
|
// ServiceRestart restarts a service.
|
|
func (c *Client) ServiceRestart(ctx context.Context, id string) (*machineapi.ServiceRestartReply, error) {
|
|
return c.MachineClient.ServiceRestart(ctx, &machineapi.ServiceRestartRequest{Id: id})
|
|
}
|
|
|
|
// Time returns the time
|
|
func (c *Client) Time(ctx context.Context) (*timeapi.TimeReply, error) {
|
|
return c.TimeClient.Time(ctx, &empty.Empty{})
|
|
}
|
|
|
|
// TimeCheck returns the time compared to the specified ntp server
|
|
func (c *Client) TimeCheck(ctx context.Context, server string) (*timeapi.TimeReply, error) {
|
|
return c.TimeClient.TimeCheck(ctx, &timeapi.TimeRequest{Server: server})
|
|
}
|
|
|
|
// Read reads a file.
|
|
func (c *Client) Read(ctx context.Context, path string) (io.Reader, <-chan error, error) {
|
|
stream, err := c.MachineClient.Read(ctx, &machineapi.ReadRequest{Path: path})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return readStream(stream)
|
|
}
|
|
|
|
type machineStream interface {
|
|
Recv() (*common.DataResponse, error)
|
|
grpc.ClientStream
|
|
}
|
|
|
|
func readStream(stream machineStream) (io.Reader, <-chan error, error) {
|
|
errCh := make(chan error)
|
|
pr, pw := io.Pipe()
|
|
|
|
go func() {
|
|
//nolint: errcheck
|
|
defer pw.Close()
|
|
defer close(errCh)
|
|
|
|
for {
|
|
data, err := stream.Recv()
|
|
if err != nil {
|
|
if err == io.EOF || status.Code(err) == codes.Canceled {
|
|
return
|
|
}
|
|
//nolint: errcheck
|
|
pw.CloseWithError(err)
|
|
return
|
|
}
|
|
|
|
if data.Bytes != nil {
|
|
_, err = pw.Write(data.Bytes)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
if data.Metadata != nil && data.Metadata.Error != "" {
|
|
errCh <- errors.New(data.Metadata.Error)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return pr, errCh, nil
|
|
}
|