Before this commit, when creating a cluster was not successful, some resource may be lingering behind. This commit fixes those cases. Now the cluster creation model is easier to understand. Either a cluster is successfully created, or all resources created will be cleaned up. The intention is to keep the "atomic create" model going forward for k3d.
407 lines
12 KiB
Go
407 lines
12 KiB
Go
package run
|
|
|
|
/*
|
|
* This file contains the "backend" functionality for the CLI commands (and flags)
|
|
*/
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/docker/docker/api/types/filters"
|
|
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/client"
|
|
"github.com/urfave/cli"
|
|
)
|
|
|
|
const (
|
|
defaultRegistry = "docker.io"
|
|
defaultServerCount = 1
|
|
)
|
|
|
|
// CheckTools checks if the docker API server is responding
|
|
func CheckTools(c *cli.Context) error {
|
|
log.Print("Checking docker...")
|
|
ctx := context.Background()
|
|
docker, err := client.NewEnvClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ping, err := docker.Ping(ctx)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("ERROR: checking docker failed\n%+v", err)
|
|
}
|
|
log.Printf("SUCCESS: Checking docker succeeded (API: v%s)\n", ping.APIVersion)
|
|
return nil
|
|
}
|
|
|
|
// CreateCluster creates a new single-node cluster container and initializes the cluster directory
|
|
func CreateCluster(c *cli.Context) error {
|
|
|
|
if err := CheckClusterName(c.String("name")); err != nil {
|
|
return err
|
|
}
|
|
|
|
if cluster, err := getClusters(false, c.String("name")); err != nil {
|
|
return err
|
|
} else if len(cluster) != 0 {
|
|
// A cluster exists with the same name. Return with an error.
|
|
return fmt.Errorf("ERROR: Cluster %s already exists", c.String("name"))
|
|
}
|
|
|
|
// define image
|
|
image := c.String("image")
|
|
if c.IsSet("version") {
|
|
// TODO: --version to be deprecated
|
|
log.Println("[WARNING] The `--version` flag will be deprecated soon, please use `--image rancher/k3s:<version>` instead")
|
|
if c.IsSet("image") {
|
|
// version specified, custom image = error (to push deprecation of version flag)
|
|
log.Fatalln("[ERROR] Please use `--image <image>:<version>` instead of --image and --version")
|
|
} else {
|
|
// version specified, default image = ok (until deprecation of version flag)
|
|
image = fmt.Sprintf("%s:%s", strings.Split(image, ":")[0], c.String("version"))
|
|
}
|
|
}
|
|
if len(strings.Split(image, "/")) <= 2 {
|
|
// fallback to default registry
|
|
image = fmt.Sprintf("%s/%s", defaultRegistry, image)
|
|
}
|
|
|
|
// create cluster network
|
|
networkID, err := createClusterNetwork(c.String("name"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Printf("Created cluster network with ID %s", networkID)
|
|
|
|
if c.IsSet("timeout") && !c.IsSet("wait") {
|
|
return errors.New("Cannot use --timeout flag without --wait flag")
|
|
}
|
|
|
|
// environment variables
|
|
env := []string{"K3S_KUBECONFIG_OUTPUT=/output/kubeconfig.yaml"}
|
|
if c.IsSet("env") || c.IsSet("e") {
|
|
env = append(env, c.StringSlice("env")...)
|
|
}
|
|
k3sClusterSecret := ""
|
|
if c.Int("workers") > 0 {
|
|
k3sClusterSecret = fmt.Sprintf("K3S_CLUSTER_SECRET=%s", GenerateRandomString(20))
|
|
env = append(env, k3sClusterSecret)
|
|
}
|
|
|
|
// k3s server arguments
|
|
// TODO: --port will soon be --api-port since we want to re-use --port for arbitrary port mappings
|
|
if c.IsSet("port") {
|
|
log.Println("INFO: As of v2.0.0 --port will be used for arbitrary port mapping. Please use --api-port/-a instead for configuring the Api Port")
|
|
}
|
|
k3sServerArgs := []string{"--https-listen-port", c.String("api-port")}
|
|
if c.IsSet("server-arg") || c.IsSet("x") {
|
|
k3sServerArgs = append(k3sServerArgs, c.StringSlice("server-arg")...)
|
|
}
|
|
|
|
// new port map
|
|
portmap, err := mapNodesToPortSpecs(c.StringSlice("publish"), GetAllContainerNames(c.String("name"), defaultServerCount, c.Int("workers")))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
// create the server
|
|
log.Printf("Creating cluster [%s]", c.String("name"))
|
|
dockerID, err := createServer(
|
|
c.GlobalBool("verbose"),
|
|
image,
|
|
c.String("api-port"),
|
|
k3sServerArgs,
|
|
env,
|
|
c.String("name"),
|
|
c.StringSlice("volume"),
|
|
portmap,
|
|
)
|
|
if err != nil {
|
|
log.Printf("ERROR: failed to create cluster\n%+v", err)
|
|
delErr := DeleteCluster(c)
|
|
if delErr != nil {
|
|
return delErr
|
|
}
|
|
os.Exit(1)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
docker, err := client.NewEnvClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Wait for k3s to be up and running if wanted.
|
|
// We're simply scanning the container logs for a line that tells us that everything's up and running
|
|
// TODO: also wait for worker nodes
|
|
start := time.Now()
|
|
timeout := time.Duration(c.Int("timeout")) * time.Second
|
|
for c.IsSet("wait") {
|
|
// not running after timeout exceeded? Rollback and delete everything.
|
|
if timeout != 0 && !time.Now().After(start.Add(timeout)) {
|
|
err := DeleteCluster(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return errors.New("Cluster creation exceeded specified timeout")
|
|
}
|
|
|
|
// scan container logs for a line that tells us that the required services are up and running
|
|
out, err := docker.ContainerLogs(ctx, dockerID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
|
|
if err != nil {
|
|
out.Close()
|
|
return fmt.Errorf("ERROR: couldn't get docker logs for %s\n%+v", c.String("name"), err)
|
|
}
|
|
buf := new(bytes.Buffer)
|
|
nRead, _ := buf.ReadFrom(out)
|
|
out.Close()
|
|
output := buf.String()
|
|
if nRead > 0 && strings.Contains(string(output), "Running kubelet") {
|
|
break
|
|
}
|
|
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
// create the directory where we will put the kubeconfig file by default (when running `k3d get-config`)
|
|
// TODO: this can probably be moved to `k3d get-config` or be removed in a different approach
|
|
createClusterDir(c.String("name"))
|
|
|
|
// spin up the worker nodes
|
|
// TODO: do this concurrently in different goroutines
|
|
if c.Int("workers") > 0 {
|
|
k3sWorkerArgs := []string{}
|
|
env := []string{k3sClusterSecret}
|
|
log.Printf("Booting %s workers for cluster %s", strconv.Itoa(c.Int("workers")), c.String("name"))
|
|
for i := 0; i < c.Int("workers"); i++ {
|
|
workerID, err := createWorker(
|
|
c.GlobalBool("verbose"),
|
|
image,
|
|
k3sWorkerArgs,
|
|
env,
|
|
c.String("name"),
|
|
c.StringSlice("volume"),
|
|
i,
|
|
c.String("api-port"),
|
|
portmap,
|
|
c.Int("port-auto-offset"),
|
|
)
|
|
if err != nil {
|
|
log.Printf("ERROR: failed to create worker node for cluster %s\n%+v", c.String("name"), err)
|
|
delErr := DeleteCluster(c)
|
|
if delErr != nil {
|
|
return delErr
|
|
}
|
|
os.Exit(1)
|
|
}
|
|
log.Printf("Created worker with ID %s\n", workerID)
|
|
}
|
|
}
|
|
|
|
log.Printf("SUCCESS: created cluster [%s]", c.String("name"))
|
|
log.Printf(`You can now use the cluster with:
|
|
|
|
export KUBECONFIG="$(%s get-kubeconfig --name='%s')"
|
|
kubectl cluster-info`, os.Args[0], c.String("name"))
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteCluster removes the containers belonging to a cluster and its local directory
|
|
func DeleteCluster(c *cli.Context) error {
|
|
clusters, err := getClusters(c.Bool("all"), c.String("name"))
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// remove clusters one by one instead of appending all names to the docker command
|
|
// this allows for more granular error handling and logging
|
|
for _, cluster := range clusters {
|
|
log.Printf("Removing cluster [%s]", cluster.name)
|
|
if len(cluster.workers) > 0 {
|
|
// TODO: this could be done in goroutines
|
|
log.Printf("...Removing %d workers\n", len(cluster.workers))
|
|
for _, worker := range cluster.workers {
|
|
if err := removeContainer(worker.ID); err != nil {
|
|
log.Println(err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
log.Println("...Removing server")
|
|
deleteClusterDir(cluster.name)
|
|
if err := removeContainer(cluster.server.ID); err != nil {
|
|
return fmt.Errorf("ERROR: Couldn't remove server for cluster %s\n%+v", cluster.name, err)
|
|
}
|
|
|
|
if err := deleteClusterNetwork(cluster.name); err != nil {
|
|
log.Printf("WARNING: couldn't delete cluster network for cluster %s\n%+v", cluster.name, err)
|
|
}
|
|
|
|
log.Printf("SUCCESS: removed cluster [%s]", cluster.name)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StopCluster stops a running cluster container (restartable)
|
|
func StopCluster(c *cli.Context) error {
|
|
clusters, err := getClusters(c.Bool("all"), c.String("name"))
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx := context.Background()
|
|
docker, err := client.NewEnvClient()
|
|
if err != nil {
|
|
return fmt.Errorf("ERROR: couldn't create docker client\n%+v", err)
|
|
}
|
|
|
|
// remove clusters one by one instead of appending all names to the docker command
|
|
// this allows for more granular error handling and logging
|
|
for _, cluster := range clusters {
|
|
log.Printf("Stopping cluster [%s]", cluster.name)
|
|
if len(cluster.workers) > 0 {
|
|
log.Printf("...Stopping %d workers\n", len(cluster.workers))
|
|
for _, worker := range cluster.workers {
|
|
if err := docker.ContainerStop(ctx, worker.ID, nil); err != nil {
|
|
log.Println(err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
log.Println("...Stopping server")
|
|
if err := docker.ContainerStop(ctx, cluster.server.ID, nil); err != nil {
|
|
return fmt.Errorf("ERROR: Couldn't stop server for cluster %s\n%+v", cluster.name, err)
|
|
}
|
|
|
|
log.Printf("SUCCESS: Stopped cluster [%s]", cluster.name)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StartCluster starts a stopped cluster container
|
|
func StartCluster(c *cli.Context) error {
|
|
clusters, err := getClusters(c.Bool("all"), c.String("name"))
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx := context.Background()
|
|
docker, err := client.NewEnvClient()
|
|
if err != nil {
|
|
return fmt.Errorf("ERROR: couldn't create docker client\n%+v", err)
|
|
}
|
|
|
|
// remove clusters one by one instead of appending all names to the docker command
|
|
// this allows for more granular error handling and logging
|
|
for _, cluster := range clusters {
|
|
log.Printf("Starting cluster [%s]", cluster.name)
|
|
|
|
log.Println("...Starting server")
|
|
if err := docker.ContainerStart(ctx, cluster.server.ID, types.ContainerStartOptions{}); err != nil {
|
|
return fmt.Errorf("ERROR: Couldn't start server for cluster %s\n%+v", cluster.name, err)
|
|
}
|
|
|
|
if len(cluster.workers) > 0 {
|
|
log.Printf("...Starting %d workers\n", len(cluster.workers))
|
|
for _, worker := range cluster.workers {
|
|
if err := docker.ContainerStart(ctx, worker.ID, types.ContainerStartOptions{}); err != nil {
|
|
log.Println(err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Printf("SUCCESS: Started cluster [%s]", cluster.name)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListClusters prints a list of created clusters
|
|
func ListClusters(c *cli.Context) error {
|
|
if c.IsSet("all") {
|
|
log.Println("INFO: --all is on by default, thus no longer required. This option will be removed in v2.0.0")
|
|
|
|
}
|
|
printClusters()
|
|
return nil
|
|
}
|
|
|
|
// GetKubeConfig grabs the kubeconfig from the running cluster and prints the path to stdout
|
|
func GetKubeConfig(c *cli.Context) error {
|
|
ctx := context.Background()
|
|
docker, err := client.NewEnvClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
filters := filters.NewArgs()
|
|
filters.Add("label", "app=k3d")
|
|
filters.Add("label", fmt.Sprintf("cluster=%s", c.String("name")))
|
|
filters.Add("label", "component=server")
|
|
server, err := docker.ContainerList(ctx, types.ContainerListOptions{
|
|
Filters: filters,
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get server container for cluster %s\n%+v", c.String("name"), err)
|
|
}
|
|
|
|
if len(server) == 0 {
|
|
return fmt.Errorf("No server container for cluster %s", c.String("name"))
|
|
}
|
|
|
|
// get kubeconfig file from container and read contents
|
|
reader, _, err := docker.CopyFromContainer(ctx, server[0].ID, "/output/kubeconfig.yaml")
|
|
if err != nil {
|
|
return fmt.Errorf("ERROR: couldn't copy kubeconfig.yaml from server container %s\n%+v", server[0].ID, err)
|
|
}
|
|
defer reader.Close()
|
|
|
|
readBytes, err := ioutil.ReadAll(reader)
|
|
if err != nil {
|
|
return fmt.Errorf("ERROR: couldn't read kubeconfig from container\n%+v", err)
|
|
}
|
|
|
|
// create destination kubeconfig file
|
|
clusterDir, err := getClusterDir(c.String("name"))
|
|
destPath := fmt.Sprintf("%s/kubeconfig.yaml", clusterDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
kubeconfigfile, err := os.Create(destPath)
|
|
if err != nil {
|
|
return fmt.Errorf("ERROR: couldn't create kubeconfig.yaml in %s\n%+v", clusterDir, err)
|
|
}
|
|
defer kubeconfigfile.Close()
|
|
|
|
// write to file, skipping the first 512 bytes which contain file metadata and trimming any NULL characters
|
|
_, err = kubeconfigfile.Write(bytes.Trim(readBytes[512:], "\x00"))
|
|
if err != nil {
|
|
return fmt.Errorf("ERROR: couldn't write to kubeconfig.yaml\n%+v", err)
|
|
}
|
|
|
|
// output kubeconfig file path to stdout
|
|
fmt.Println(destPath)
|
|
|
|
return nil
|
|
}
|