This commit is contained in:
Simon Baier 2021-10-31 02:46:48 +05:30 committed by GitHub
commit 1566f75f54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 254 additions and 21 deletions

View File

@ -59,16 +59,28 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of
Args: cobra.MinimumNArgs(1), Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
images, clusters := parseLoadImageCmd(cmd, args) images, clusters := parseLoadImageCmd(cmd, args)
loadModeStr, err := cmd.Flags().GetString("load-mode")
if err != nil {
l.Log().Errorln("No load-mode specified")
l.Log().Fatalln(err)
}
if mode, ok := k3d.LoadModes[loadModeStr]; !ok {
l.Log().Fatalf("Unknown image loading mode '%s'\n", loadModeStr)
} else {
loadImageOpts.LoadingMode = mode
}
l.Log().Debugf("Importing image(s) [%+v] from runtime [%s] into cluster(s) [%+v]...", images, runtimes.SelectedRuntime, clusters) l.Log().Debugf("Importing image(s) [%+v] from runtime [%s] into cluster(s) [%+v]...", images, runtimes.SelectedRuntime, clusters)
errOccured := false errOccurred := false
for _, cluster := range clusters { for _, cluster := range clusters {
l.Log().Infof("Importing image(s) into cluster '%s'", cluster.Name) l.Log().Infof("Importing image(s) into cluster '%s'", cluster.Name)
if err := client.ImageImportIntoClusterMulti(cmd.Context(), runtimes.SelectedRuntime, images, &cluster, loadImageOpts); err != nil { if err := client.ImageImportIntoClusterMulti(cmd.Context(), runtimes.SelectedRuntime, images, cluster, loadImageOpts); err != nil {
l.Log().Errorf("Failed to import image(s) into cluster '%s': %+v", cluster.Name, err) l.Log().Errorf("Failed to import image(s) into cluster '%s': %+v", cluster.Name, err)
errOccured = true errOccurred = true
} }
} }
if errOccured { if errOccurred {
l.Log().Warnln("At least one error occured while trying to import the image(s) into the selected cluster(s)") l.Log().Warnln("At least one error occured while trying to import the image(s) into the selected cluster(s)")
os.Exit(1) os.Exit(1)
} }
@ -86,7 +98,7 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of
cmd.Flags().BoolVarP(&loadImageOpts.KeepTar, "keep-tarball", "k", false, "Do not delete the tarball containing the saved images from the shared volume") cmd.Flags().BoolVarP(&loadImageOpts.KeepTar, "keep-tarball", "k", false, "Do not delete the tarball containing the saved images from the shared volume")
cmd.Flags().BoolVarP(&loadImageOpts.KeepToolsNode, "keep-tools", "t", false, "Do not delete the tools node after import") cmd.Flags().BoolVarP(&loadImageOpts.KeepToolsNode, "keep-tools", "t", false, "Do not delete the tools node after import")
cmd.Flags().StringP("load-mode", "m", string(k3d.AutoDetect), "Which method to use to load images to the cluster [auto, direct, tools-node].")
/* Subcommands */ /* Subcommands */
// done // done
@ -94,16 +106,30 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of
} }
// parseLoadImageCmd parses the command input into variables required to create a cluster // parseLoadImageCmd parses the command input into variables required to create a cluster
func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []k3d.Cluster) { func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []*k3d.Cluster) {
// --cluster // --cluster
clusterNames, err := cmd.Flags().GetStringArray("cluster") clusterNames, err := cmd.Flags().GetStringArray("cluster")
if err != nil { if err != nil {
l.Log().Fatalln(err) l.Log().Fatalln(err)
} }
clusters := []k3d.Cluster{} clusters := []*k3d.Cluster{}
for _, clusterName := range clusterNames { for _, clusterName := range clusterNames {
clusters = append(clusters, k3d.Cluster{Name: clusterName}) clusters = append(clusters, &k3d.Cluster{Name: clusterName})
}
// TODO is this actually necessary? looks like it never worked as intended at first glance.
// Figure out the nodes for each cluster
nodeList, err := client.NodeList(cmd.Context(), runtimes.SelectedRuntime)
if err != nil {
l.Log().Fatalf("Failed to list clusters %v", err)
}
for _, node := range nodeList {
for _, cluster := range clusters {
if cluster.Name == node.RuntimeLabels[k3d.LabelClusterName] {
cluster.Nodes = append(cluster.Nodes, node)
}
}
} }
// images // images

View File

@ -29,6 +29,7 @@ k3d image import [IMAGE | ARCHIVE [IMAGE | ARCHIVE...]] [flags]
-h, --help help for import -h, --help help for import
-k, --keep-tarball Do not delete the tarball containing the saved images from the shared volume -k, --keep-tarball Do not delete the tarball containing the saved images from the shared volume
-t, --keep-tools Do not delete the tools node after import -t, --keep-tools Do not delete the tools node after import
-m, --load-mode Which method to use to load images to the cluster [auto, direct, tools-node]. (default "auto")
``` ```
### Options inherited from parent commands ### Options inherited from parent commands
@ -39,6 +40,25 @@ k3d image import [IMAGE | ARCHIVE [IMAGE | ARCHIVE...]] [flags]
--verbose Enable verbose output (debug logging) --verbose Enable verbose output (debug logging)
``` ```
### Loading modes
#### Auto
Auto-determine whether to use `direct` or `tools-node`.
For remote container runtimes, `tools-node` is faster due to less network overhead, thus it is automatically selected for remote runtimes.
Otherwise direct is used.
#### Direct
Directly load the given images to the k3s nodes. No separate container is spawned, no intermediate files are written.
#### Tools Node
Start a `k3d-tools` container in the container runtime, copy images to that runtime, then load the images to k3s nodes from there.
### SEE ALSO ### SEE ALSO
* [k3d image](k3d_image.md) - Handle container images. * [k3d image](k3d_image.md) - Handle container images.

View File

@ -25,6 +25,7 @@ package client
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"os" "os"
"path" "path"
"strings" "strings"
@ -39,6 +40,13 @@ import (
// ImageImportIntoClusterMulti starts up a k3d tools container for the selected cluster and uses it to export // ImageImportIntoClusterMulti starts up a k3d tools container for the selected cluster and uses it to export
// images from the runtime to import them into the nodes of the selected cluster // images from the runtime to import them into the nodes of the selected cluster
func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, images []string, cluster *k3d.Cluster, opts k3d.ImageImportOpts) error { func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, images []string, cluster *k3d.Cluster, opts k3d.ImageImportOpts) error {
// stdin case
if len(images) == 1 && images[0] == "-" {
loadImageFromStream(ctx, runtime, os.Stdin, cluster)
return nil
}
imagesFromRuntime, imagesFromTar, err := findImages(ctx, runtime, images) imagesFromRuntime, imagesFromTar, err := findImages(ctx, runtime, images)
if err != nil { if err != nil {
return fmt.Errorf("failed to find images: %w", err) return fmt.Errorf("failed to find images: %w", err)
@ -46,13 +54,28 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime,
// no images found to load -> exit early // no images found to load -> exit early
if len(imagesFromRuntime)+len(imagesFromTar) == 0 { if len(imagesFromRuntime)+len(imagesFromTar) == 0 {
return fmt.Errorf("No valid images specified") return fmt.Errorf("no valid images specified")
} }
// create tools node to export images loadWithToolsNode := false
toolsNode, err := EnsureToolsNode(ctx, runtime, cluster)
if err != nil { switch opts.LoadingMode {
return fmt.Errorf("failed to ensure that tools node is running: %w", err) case k3d.AutoDetect:
if err != nil {
return fmt.Errorf("failed to retrieve container runtime information: %w", err)
}
if err != nil {
return fmt.Errorf("failed to compile remote runtime endpoint regexp: %w", err)
}
runtimeHost := runtime.GetHost()
if runtimeHost != "" && runtimeHost != "localhost" && runtimeHost != "127.0.0.1" {
l.Log().Infof("Auto-detected a remote docker daemon, using tools node for loading images")
loadWithToolsNode = true
}
case k3d.ToolsNode:
loadWithToolsNode = true
case k3d.Direct:
loadWithToolsNode = false
} }
/* TODO: /* TODO:
@ -63,6 +86,26 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime,
* 3. From stdin: save to tar -> import * 3. From stdin: save to tar -> import
* Note: temporary storage location is always the shared image volume and actions are always executed by the tools node * Note: temporary storage location is always the shared image volume and actions are always executed by the tools node
*/ */
if loadWithToolsNode {
err = importWithToolsNode(ctx, runtime, err, cluster, imagesFromRuntime, imagesFromTar, opts)
} else {
err = importWithStream(ctx, runtime, imagesFromRuntime, cluster, imagesFromTar)
}
if err != nil {
return err
}
l.Log().Infoln("Successfully imported image(s)")
return nil
}
func importWithToolsNode(ctx context.Context, runtime runtimes.Runtime, err error, cluster *k3d.Cluster, imagesFromRuntime []string, imagesFromTar []string, opts k3d.ImageImportOpts) error {
// create tools node to export images
toolsNode, err := EnsureToolsNode(ctx, runtime, cluster)
if err != nil {
return fmt.Errorf("failed to ensure that tools node is running: %w", err)
}
var importTarNames []string var importTarNames []string
if len(imagesFromRuntime) > 0 { if len(imagesFromRuntime) > 0 {
@ -123,11 +166,97 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime,
l.Log().Errorf("failed to delete tools node '%s' (try to delete it manually): %v", toolsNode.Name, err) l.Log().Errorf("failed to delete tools node '%s' (try to delete it manually): %v", toolsNode.Name, err)
} }
} }
l.Log().Infoln("Successfully imported image(s)")
return nil return nil
}
func importWithStream(ctx context.Context, runtime runtimes.Runtime, imagesFromRuntime []string, cluster *k3d.Cluster, imagesFromTar []string) error {
if len(imagesFromRuntime) > 0 {
l.Log().Infof("Loading %d image(s) from runtime into nodes...", len(imagesFromRuntime))
// open a stream to all given images
stream, err := runtime.GetImageStream(ctx, imagesFromRuntime)
loadImageFromStream(ctx, runtime, stream, cluster)
if err != nil {
return fmt.Errorf("Could not open image stream for given images %s: %w", imagesFromRuntime, err)
}
// load the images directly into the nodes
}
if len(imagesFromTar) > 0 {
// copy tarfiles to shared volume
l.Log().Infof("Saving %d tarball(s) to shared image volume...", len(imagesFromTar))
files := make([]*os.File, len(imagesFromTar))
readers := make([]io.Reader, len(imagesFromTar))
failedFiles := 0
for i, fileName := range imagesFromTar {
file, err := os.Open(fileName)
if err != nil {
l.Log().Errorf("failed to read file '%s', skipping. Error below:\n%+v", fileName, err)
failedFiles++
continue
}
files[i] = file
readers[i] = file
}
multiReader := io.MultiReader(readers...)
loadImageFromStream(ctx, runtime, io.NopCloser(multiReader), cluster)
for _, file := range files {
err := file.Close()
if err != nil {
l.Log().Errorf("Failed to close file '%s' after reading. Error below:\n%+v", file.Name(), err)
}
}
}
return nil
}
func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream io.ReadCloser, cluster *k3d.Cluster) {
var importWaitgroup sync.WaitGroup
numNodes := 0
for _, node := range cluster.Nodes {
// only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer)
if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole {
numNodes++
}
}
// multiplex the stream so we can write to multiple nodes
pipeReaders := make([]*io.PipeReader, numNodes)
pipeWriters := make([]io.Writer, numNodes)
for i := 0; i < numNodes; i++ {
reader, writer := io.Pipe()
pipeReaders[i] = reader
pipeWriters[i] = writer
}
go func() {
_, err := io.Copy(io.MultiWriter(pipeWriters...), stream)
if err != nil {
l.Log().Errorf("Failed to copy read stream. %v", err)
}
err = stream.Close()
if err != nil {
l.Log().Errorf("Failed to close stream. %v", err)
}
}()
pipeId := 0
for _, node := range cluster.Nodes {
// only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer)
if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole {
importWaitgroup.Add(1)
go func(node *k3d.Node, wg *sync.WaitGroup, stream io.ReadCloser) {
l.Log().Infof("Importing images into node '%s'...", node.Name)
if err := runtime.ExecInNodeWithStdin(ctx, node, []string{"ctr", "image", "import", "-"}, stream); err != nil {
l.Log().Errorf("failed to import images in node '%s': %v", node.Name, err)
}
wg.Done()
}(node, &importWaitgroup, pipeReaders[pipeId])
pipeId++
}
}
importWaitgroup.Wait()
} }
type runtimeImageGetter interface { type runtimeImageGetter interface {

View File

@ -309,7 +309,7 @@ func (d Docker) GetNodeLogs(ctx context.Context, node *k3d.Node, since time.Time
// ExecInNodeGetLogs executes a command inside a node and returns the logs to the caller, e.g. to parse them // ExecInNodeGetLogs executes a command inside a node and returns the logs to the caller, e.g. to parse them
func (d Docker) ExecInNodeGetLogs(ctx context.Context, node *k3d.Node, cmd []string) (*bufio.Reader, error) { func (d Docker) ExecInNodeGetLogs(ctx context.Context, node *k3d.Node, cmd []string) (*bufio.Reader, error) {
resp, err := executeInNode(ctx, node, cmd) resp, err := executeInNode(ctx, node, cmd, nil)
if resp != nil { if resp != nil {
defer resp.Close() defer resp.Close()
} }
@ -322,9 +322,23 @@ func (d Docker) ExecInNodeGetLogs(ctx context.Context, node *k3d.Node, cmd []str
return resp.Reader, nil return resp.Reader, nil
} }
// GetImageStream creates a tar stream for the given images, to be read (and closed) by the caller
func (d Docker) GetImageStream(ctx context.Context, image []string) (io.ReadCloser, error) {
docker, err := GetDockerClient()
if err != nil {
return nil, err
}
reader, err := docker.ImageSave(ctx, image)
return reader, err
}
// ExecInNode execs a command inside a node // ExecInNode execs a command inside a node
func (d Docker) ExecInNode(ctx context.Context, node *k3d.Node, cmd []string) error { func (d Docker) ExecInNode(ctx context.Context, node *k3d.Node, cmd []string) error {
execConnection, err := executeInNode(ctx, node, cmd) return execInNode(ctx, node, cmd, nil)
}
func execInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) error {
execConnection, err := executeInNode(ctx, node, cmd, stdin)
if execConnection != nil { if execConnection != nil {
defer execConnection.Close() defer execConnection.Close()
} }
@ -340,7 +354,11 @@ func (d Docker) ExecInNode(ctx context.Context, node *k3d.Node, cmd []string) er
return err return err
} }
func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.HijackedResponse, error) { func (d Docker) ExecInNodeWithStdin(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) error {
return execInNode(ctx, node, cmd, stdin)
}
func executeInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) (*types.HijackedResponse, error) {
l.Log().Debugf("Executing command '%+v' in node '%s'", cmd, node.Name) l.Log().Debugf("Executing command '%+v' in node '%s'", cmd, node.Name)
@ -357,12 +375,19 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.Hi
} }
defer docker.Close() defer docker.Close()
attachStdin := false
if stdin != nil {
attachStdin = true
}
// exec // exec
exec, err := docker.ContainerExecCreate(ctx, container.ID, types.ExecConfig{ exec, err := docker.ContainerExecCreate(ctx, container.ID, types.ExecConfig{
Privileged: true, Privileged: true,
Tty: true, // Don't use tty true when piping stdin.
Tty: !attachStdin,
AttachStderr: true, AttachStderr: true,
AttachStdout: true, AttachStdout: true,
AttachStdin: attachStdin,
Cmd: cmd, Cmd: cmd,
}) })
if err != nil { if err != nil {
@ -380,6 +405,20 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.Hi
return nil, fmt.Errorf("docker failed to start exec process in node '%s': %w", node.Name, err) return nil, fmt.Errorf("docker failed to start exec process in node '%s': %w", node.Name, err)
} }
// If we need to write to stdin pipe, start a new goroutine that writes the stream to stdin
if stdin != nil {
go func() {
_, err := io.Copy(execConnection.Conn, stdin)
if err != nil {
l.Log().Errorf("Failed to copy read stream. %v", err)
}
err = stdin.Close()
if err != nil {
l.Log().Errorf("Failed to close stdin stream. %v", err)
}
}()
}
for { for {
// get info about exec process inside container // get info about exec process inside container
execInfo, err := docker.ContainerExecInspect(ctx, exec.ID) execInfo, err := docker.ContainerExecInspect(ctx, exec.ID)

View File

@ -65,8 +65,10 @@ type Runtime interface {
CreateVolume(context.Context, string, map[string]string) error CreateVolume(context.Context, string, map[string]string) error
DeleteVolume(context.Context, string) error DeleteVolume(context.Context, string) error
GetVolume(string) (string, error) GetVolume(string) (string, error)
GetImageStream(context.Context, []string) (io.ReadCloser, error)
GetRuntimePath() string // returns e.g. '/var/run/docker.sock' for a default docker setup GetRuntimePath() string // returns e.g. '/var/run/docker.sock' for a default docker setup
ExecInNode(context.Context, *k3d.Node, []string) error ExecInNode(context.Context, *k3d.Node, []string) error
ExecInNodeWithStdin(context.Context, *k3d.Node, []string, io.ReadCloser) error
ExecInNodeGetLogs(context.Context, *k3d.Node, []string) (*bufio.Reader, error) ExecInNodeGetLogs(context.Context, *k3d.Node, []string) (*bufio.Reader, error)
GetNodeLogs(context.Context, *k3d.Node, time.Time, *runtimeTypes.NodeLogsOpts) (io.ReadCloser, error) GetNodeLogs(context.Context, *k3d.Node, time.Time, *runtimeTypes.NodeLogsOpts) (io.ReadCloser, error)
GetImages(context.Context) ([]string, error) GetImages(context.Context) ([]string, error)

View File

@ -178,10 +178,27 @@ type NodeHookAction interface {
Run(ctx context.Context, node *Node) error Run(ctx context.Context, node *Node) error
} }
// LoadMode describes how images are loaded into the cluster
type LoadMode string
const (
AutoDetect LoadMode = "auto"
Direct LoadMode = "direct"
ToolsNode LoadMode = "tools-node"
)
// LoadModes defines the loading methods for image loading
var LoadModes = map[string]LoadMode{
string(AutoDetect): AutoDetect,
string(Direct): Direct,
string(ToolsNode): ToolsNode,
}
// ImageImportOpts describes a set of options one can set for loading image(s) into cluster(s) // ImageImportOpts describes a set of options one can set for loading image(s) into cluster(s)
type ImageImportOpts struct { type ImageImportOpts struct {
KeepTar bool KeepTar bool
KeepToolsNode bool KeepToolsNode bool
LoadingMode LoadMode
} }
type IPAM struct { type IPAM struct {