feat: add API and command to save etcd snapshot (backup)

This adds a simple API and `talosctl etcd snapshot` command to stream
snapshot of etcd from one of the control plane nodes to the local file.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2021-03-30 23:27:10 +03:00 committed by talos-bot
parent 61b694b948
commit e664362cec
10 changed files with 828 additions and 511 deletions

View File

@ -30,6 +30,13 @@ service MachineService {
returns (EtcdLeaveClusterResponse);
rpc EtcdForfeitLeadership(EtcdForfeitLeadershipRequest)
returns (EtcdForfeitLeadershipResponse);
// EtcdSnapshot method creates etcd data snapshot (backup) from the local etcd instance
// and streams it back to the client.
//
// This method is available only on control plane nodes (which run etcd).
rpc EtcdSnapshot(EtcdSnapshotRequest) returns (stream common.Data);
rpc GenerateConfiguration(GenerateConfigurationRequest)
returns (GenerateConfigurationResponse);
rpc Hostname(google.protobuf.Empty) returns (HostnameResponse);
@ -762,6 +769,8 @@ message EtcdMemberList {
}
message EtcdMemberListResponse { repeated EtcdMemberList messages = 1; }
message EtcdSnapshotRequest {}
// rpc generateConfiguration
message RouteConfig {

View File

@ -6,13 +6,17 @@ package talos
import (
"context"
"crypto/sha256"
"fmt"
"io"
"os"
"strings"
"sync"
"text/tabwriter"
"github.com/spf13/cobra"
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/talos-systems/talos/pkg/cli"
"github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
@ -117,7 +121,74 @@ var etcdMemberListCmd = &cobra.Command{
},
}
var etcdSnapshotCmd = &cobra.Command{
Use: "snapshot <path>",
Short: "Stream snapshot of the etcd node to the path.",
Long: ``,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return WithClient(func(ctx context.Context, c *client.Client) error {
if err := helpers.FailIfMultiNodes(ctx, "etcd snapshot"); err != nil {
return err
}
dbPath := args[0]
partPath := dbPath + ".part"
defer os.RemoveAll(partPath) //nolint:errcheck
dest, err := os.OpenFile(partPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
if err != nil {
return fmt.Errorf("error creating temporary file: %w", err)
}
defer dest.Close() //nolint:errcheck
r, errCh, err := c.EtcdSnapshot(ctx, &machine.EtcdSnapshotRequest{})
if err != nil {
return fmt.Errorf("error reading file: %w", err)
}
defer r.Close() //nolint:errcheck
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for err := range errCh {
fmt.Fprintln(os.Stderr, err.Error())
}
}()
defer wg.Wait()
size, err := io.Copy(dest, r)
if err != nil {
return fmt.Errorf("error reading: %w", err)
}
if err = dest.Sync(); err != nil {
return fmt.Errorf("failed to fsync: %w", err)
}
// this check is from https://github.com/etcd-io/etcd/blob/client/v3.5.0-alpha.0/client/v3/snapshot/v3_snapshot.go#L46
if (size % 512) != sha256.Size {
return fmt.Errorf("sha256 checksum not found (size %d)", size)
}
if err = os.Rename(partPath, dbPath); err != nil {
return fmt.Errorf("error renaming to final location: %w", err)
}
fmt.Printf("etcd snapshot saved to %q (%d bytes)\n", dbPath, size)
return nil
})
},
}
func init() {
etcdCmd.AddCommand(etcdLeaveCmd, etcdForfeitLeadershipCmd, etcdMemberListCmd, etcdMemberRemoveCmd)
etcdCmd.AddCommand(etcdLeaveCmd, etcdForfeitLeadershipCmd, etcdMemberListCmd, etcdMemberRemoveCmd, etcdSnapshotCmd)
addCommand(etcdCmd)
}

View File

@ -83,6 +83,7 @@ func Main() {
"/machine.MachineService/Copy",
"/machine.MachineService/DiskUsage",
"/machine.MachineService/Dmesg",
"/machine.MachineService/EtcdSnapshot",
"/machine.MachineService/Events",
"/machine.MachineService/Kubeconfig",
"/machine.MachineService/List",

View File

@ -1785,6 +1785,39 @@ func (s *Server) EtcdForfeitLeadership(ctx context.Context, in *machine.EtcdForf
return reply, nil
}
// EtcdSnapshot implements the machine.MachineServer interface.
func (s *Server) EtcdSnapshot(in *machine.EtcdSnapshotRequest, srv machine.MachineService_EtcdSnapshotServer) error {
client, err := etcd.NewLocalClient()
if err != nil {
return fmt.Errorf("failed to create etcd client: %w", err)
}
//nolint:errcheck
defer client.Close()
rd, err := client.Snapshot(srv.Context())
if err != nil {
return fmt.Errorf("failed reading etcd snapshot: %w", err)
}
ctx, cancel := context.WithCancel(srv.Context())
defer cancel()
chunker := stream.NewChunker(ctx, rd)
chunkCh := chunker.Read()
for data := range chunkCh {
err := srv.SendMsg(&common.Data{Bytes: data})
if err != nil {
cancel()
return err
}
}
return nil
}
// RemoveBootkubeInitializedKey implements machine.MachineService.
//
// Temporary API only used when converting from self-hosted to Talos-managed control plane.

View File

@ -7,6 +7,11 @@
package cli
import (
"io/ioutil"
"os"
"path/filepath"
"regexp"
"github.com/talos-systems/talos/internal/integration/base"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
)
@ -39,6 +44,22 @@ func (suite *EtcdSuite) TestForfeitLeadership() {
)
}
// TestSnapshot tests etcd snapshot (backup).
func (suite *EtcdSuite) TestSnapshot() {
tempDir, err := ioutil.TempDir("", "talos")
suite.Require().NoError(err)
defer func() {
suite.Assert().NoError(os.RemoveAll(tempDir))
}()
dbPath := filepath.Join(tempDir, "snapshot.db")
suite.RunCLI([]string{"etcd", "snapshot", dbPath, "--nodes", suite.RandomDiscoveredNode(machine.TypeControlPlane)},
base.StdoutShouldMatch(regexp.MustCompile(`etcd snapshot saved to .+\d+ bytes.+`)),
)
}
func init() {
allSuites = append(allSuites, new(EtcdSuite))
}

File diff suppressed because it is too large Load Diff

View File

@ -34,6 +34,11 @@ type MachineServiceClient interface {
EtcdRemoveMember(ctx context.Context, in *EtcdRemoveMemberRequest, opts ...grpc.CallOption) (*EtcdRemoveMemberResponse, error)
EtcdLeaveCluster(ctx context.Context, in *EtcdLeaveClusterRequest, opts ...grpc.CallOption) (*EtcdLeaveClusterResponse, error)
EtcdForfeitLeadership(ctx context.Context, in *EtcdForfeitLeadershipRequest, opts ...grpc.CallOption) (*EtcdForfeitLeadershipResponse, error)
// EtcdSnapshot method creates etcd data snapshot (backup) from the local etcd instance
// and streams it back to the client.
//
// This method is available only on control plane nodes (which run etcd).
EtcdSnapshot(ctx context.Context, in *EtcdSnapshotRequest, opts ...grpc.CallOption) (MachineService_EtcdSnapshotClient, error)
GenerateConfiguration(ctx context.Context, in *GenerateConfigurationRequest, opts ...grpc.CallOption) (*GenerateConfigurationResponse, error)
Hostname(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*HostnameResponse, error)
Kubeconfig(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (MachineService_KubeconfigClient, error)
@ -248,6 +253,38 @@ func (c *machineServiceClient) EtcdForfeitLeadership(ctx context.Context, in *Et
return out, nil
}
func (c *machineServiceClient) EtcdSnapshot(ctx context.Context, in *EtcdSnapshotRequest, opts ...grpc.CallOption) (MachineService_EtcdSnapshotClient, error) {
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[3], "/machine.MachineService/EtcdSnapshot", opts...)
if err != nil {
return nil, err
}
x := &machineServiceEtcdSnapshotClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type MachineService_EtcdSnapshotClient interface {
Recv() (*common.Data, error)
grpc.ClientStream
}
type machineServiceEtcdSnapshotClient struct {
grpc.ClientStream
}
func (x *machineServiceEtcdSnapshotClient) Recv() (*common.Data, error) {
m := new(common.Data)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *machineServiceClient) GenerateConfiguration(ctx context.Context, in *GenerateConfigurationRequest, opts ...grpc.CallOption) (*GenerateConfigurationResponse, error) {
out := new(GenerateConfigurationResponse)
err := c.cc.Invoke(ctx, "/machine.MachineService/GenerateConfiguration", in, out, opts...)
@ -267,7 +304,7 @@ func (c *machineServiceClient) Hostname(ctx context.Context, in *emptypb.Empty,
}
func (c *machineServiceClient) Kubeconfig(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (MachineService_KubeconfigClient, error) {
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[3], "/machine.MachineService/Kubeconfig", opts...)
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[4], "/machine.MachineService/Kubeconfig", opts...)
if err != nil {
return nil, err
}
@ -299,7 +336,7 @@ func (x *machineServiceKubeconfigClient) Recv() (*common.Data, error) {
}
func (c *machineServiceClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (MachineService_ListClient, error) {
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[4], "/machine.MachineService/List", opts...)
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[5], "/machine.MachineService/List", opts...)
if err != nil {
return nil, err
}
@ -331,7 +368,7 @@ func (x *machineServiceListClient) Recv() (*FileInfo, error) {
}
func (c *machineServiceClient) DiskUsage(ctx context.Context, in *DiskUsageRequest, opts ...grpc.CallOption) (MachineService_DiskUsageClient, error) {
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[5], "/machine.MachineService/DiskUsage", opts...)
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[6], "/machine.MachineService/DiskUsage", opts...)
if err != nil {
return nil, err
}
@ -372,7 +409,7 @@ func (c *machineServiceClient) LoadAvg(ctx context.Context, in *emptypb.Empty, o
}
func (c *machineServiceClient) Logs(ctx context.Context, in *LogsRequest, opts ...grpc.CallOption) (MachineService_LogsClient, error) {
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[6], "/machine.MachineService/Logs", opts...)
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[7], "/machine.MachineService/Logs", opts...)
if err != nil {
return nil, err
}
@ -440,7 +477,7 @@ func (c *machineServiceClient) Processes(ctx context.Context, in *emptypb.Empty,
}
func (c *machineServiceClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (MachineService_ReadClient, error) {
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[7], "/machine.MachineService/Read", opts...)
stream, err := c.cc.NewStream(ctx, &MachineService_ServiceDesc.Streams[8], "/machine.MachineService/Read", opts...)
if err != nil {
return nil, err
}
@ -622,6 +659,11 @@ type MachineServiceServer interface {
EtcdRemoveMember(context.Context, *EtcdRemoveMemberRequest) (*EtcdRemoveMemberResponse, error)
EtcdLeaveCluster(context.Context, *EtcdLeaveClusterRequest) (*EtcdLeaveClusterResponse, error)
EtcdForfeitLeadership(context.Context, *EtcdForfeitLeadershipRequest) (*EtcdForfeitLeadershipResponse, error)
// EtcdSnapshot method creates etcd data snapshot (backup) from the local etcd instance
// and streams it back to the client.
//
// This method is available only on control plane nodes (which run etcd).
EtcdSnapshot(*EtcdSnapshotRequest, MachineService_EtcdSnapshotServer) error
GenerateConfiguration(context.Context, *GenerateConfigurationRequest) (*GenerateConfigurationResponse, error)
Hostname(context.Context, *emptypb.Empty) (*HostnameResponse, error)
Kubeconfig(*emptypb.Empty, MachineService_KubeconfigServer) error
@ -704,6 +746,10 @@ func (UnimplementedMachineServiceServer) EtcdForfeitLeadership(context.Context,
return nil, status.Errorf(codes.Unimplemented, "method EtcdForfeitLeadership not implemented")
}
func (UnimplementedMachineServiceServer) EtcdSnapshot(*EtcdSnapshotRequest, MachineService_EtcdSnapshotServer) error {
return status.Errorf(codes.Unimplemented, "method EtcdSnapshot not implemented")
}
func (UnimplementedMachineServiceServer) GenerateConfiguration(context.Context, *GenerateConfigurationRequest) (*GenerateConfigurationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GenerateConfiguration not implemented")
}
@ -1049,6 +1095,27 @@ func _MachineService_EtcdForfeitLeadership_Handler(srv interface{}, ctx context.
return interceptor(ctx, in, info, handler)
}
func _MachineService_EtcdSnapshot_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(EtcdSnapshotRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(MachineServiceServer).EtcdSnapshot(m, &machineServiceEtcdSnapshotServer{stream})
}
type MachineService_EtcdSnapshotServer interface {
Send(*common.Data) error
grpc.ServerStream
}
type machineServiceEtcdSnapshotServer struct {
grpc.ServerStream
}
func (x *machineServiceEtcdSnapshotServer) Send(m *common.Data) error {
return x.ServerStream.SendMsg(m)
}
func _MachineService_GenerateConfiguration_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GenerateConfigurationRequest)
if err := dec(in); err != nil {
@ -1698,6 +1765,11 @@ var MachineService_ServiceDesc = grpc.ServiceDesc{
Handler: _MachineService_Events_Handler,
ServerStreams: true,
},
{
StreamName: "EtcdSnapshot",
Handler: _MachineService_EtcdSnapshot_Handler,
ServerStreams: true,
},
{
StreamName: "Kubeconfig",
Handler: _MachineService_Kubeconfig_Handler,

View File

@ -895,6 +895,16 @@ func (c *Client) EtcdMemberList(ctx context.Context, req *machineapi.EtcdMemberL
return resp, err
}
// EtcdSnapshot receives a snapshot of the etcd from the node.
func (c *Client) EtcdSnapshot(ctx context.Context, req *machineapi.EtcdSnapshotRequest, callOptions ...grpc.CallOption) (io.ReadCloser, <-chan error, error) {
stream, err := c.MachineClient.EtcdSnapshot(ctx, req, callOptions...)
if err != nil {
return nil, nil, err
}
return ReadStream(stream)
}
// MachineStream is a common interface for streams returned by streaming APIs.
type MachineStream interface {
Recv() (*common.Data, error)

View File

@ -76,6 +76,7 @@ description: Talos gRPC API reference.
- [EtcdRemoveMember](#machine.EtcdRemoveMember)
- [EtcdRemoveMemberRequest](#machine.EtcdRemoveMemberRequest)
- [EtcdRemoveMemberResponse](#machine.EtcdRemoveMemberResponse)
- [EtcdSnapshotRequest](#machine.EtcdSnapshotRequest)
- [Event](#machine.Event)
- [EventsRequest](#machine.EventsRequest)
- [FileInfo](#machine.FileInfo)
@ -1233,6 +1234,16 @@ dmesg
<a name="machine.EtcdSnapshotRequest"></a>
### EtcdSnapshotRequest
<a name="machine.Event"></a>
### Event
@ -2853,6 +2864,9 @@ The machine service definition.
| EtcdRemoveMember | [EtcdRemoveMemberRequest](#machine.EtcdRemoveMemberRequest) | [EtcdRemoveMemberResponse](#machine.EtcdRemoveMemberResponse) | |
| EtcdLeaveCluster | [EtcdLeaveClusterRequest](#machine.EtcdLeaveClusterRequest) | [EtcdLeaveClusterResponse](#machine.EtcdLeaveClusterResponse) | |
| EtcdForfeitLeadership | [EtcdForfeitLeadershipRequest](#machine.EtcdForfeitLeadershipRequest) | [EtcdForfeitLeadershipResponse](#machine.EtcdForfeitLeadershipResponse) | |
| EtcdSnapshot | [EtcdSnapshotRequest](#machine.EtcdSnapshotRequest) | [.common.Data](#common.Data) stream | EtcdSnapshot method creates etcd data snapshot (backup) from the local etcd instance and streams it back to the client.
This method is available only on control plane nodes (which run etcd). |
| GenerateConfiguration | [GenerateConfigurationRequest](#machine.GenerateConfigurationRequest) | [GenerateConfigurationResponse](#machine.GenerateConfigurationResponse) | |
| Hostname | [.google.protobuf.Empty](#google.protobuf.Empty) | [HostnameResponse](#machine.HostnameResponse) | |
| Kubeconfig | [.google.protobuf.Empty](#google.protobuf.Empty) | [.common.Data](#common.Data) stream | |

View File

@ -882,6 +882,33 @@ talosctl etcd remove-member <hostname> [flags]
* [talosctl etcd](#talosctl-etcd) - Manage etcd
## talosctl etcd snapshot
Stream snapshot of the etcd node to the path.
```
talosctl etcd snapshot <path> [flags]
```
### Options
```
-h, --help help for snapshot
```
### Options inherited from parent commands
```
--context string Context to be used in command
-e, --endpoints strings override default endpoints in Talos configuration
-n, --nodes strings target the specified nodes
--talosconfig string The path to the Talos configuration file (default "/home/user/.talos/config")
```
### SEE ALSO
* [talosctl etcd](#talosctl-etcd) - Manage etcd
## talosctl etcd
Manage etcd
@ -908,6 +935,7 @@ Manage etcd
* [talosctl etcd leave](#talosctl-etcd-leave) - Tell nodes to leave etcd cluster
* [talosctl etcd members](#talosctl-etcd-members) - Get the list of etcd cluster members
* [talosctl etcd remove-member](#talosctl-etcd-remove-member) - Remove the node from etcd cluster
* [talosctl etcd snapshot](#talosctl-etcd-snapshot) - Stream snapshot of the etcd node to the path.
## talosctl events