talos/internal/app/init/pkg/system/runner/containerd/containerd.go
2018-12-19 22:22:05 -08:00

151 lines
4.3 KiB
Go

package containerd
import (
"context"
"fmt"
"github.com/autonomy/talos/internal/app/init/pkg/system/conditions"
"github.com/autonomy/talos/internal/app/init/pkg/system/runner"
"github.com/autonomy/talos/internal/pkg/constants"
"github.com/autonomy/talos/internal/pkg/userdata"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/runtime/restart"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
// Containerd represents a service to be run in a container.
type Containerd struct{}
// WithMemoryLimit sets the linux resource memory limit field.
func WithMemoryLimit(limit int64) oci.SpecOpts {
return func(_ context.Context, _ oci.Client, _ *containers.Container, s *specs.Spec) error {
s.Linux.Resources.Memory = &specs.LinuxMemory{
Limit: &limit,
// DisableOOMKiller: &disable,
}
return nil
}
}
// WithRootfsPropagation sets the root filesystem propagation.
func WithRootfsPropagation(rp string) oci.SpecOpts {
return func(_ context.Context, _ oci.Client, _ *containers.Container, s *specs.Spec) error {
s.Linux.RootfsPropagation = rp
return nil
}
}
// Run implements the Runner interface.
// nolint: gocyclo
func (c *Containerd) Run(data *userdata.UserData, args runner.Args, setters ...runner.Option) error {
// Wait for the containerd socket.
_, err := conditions.WaitForFileToExist(constants.ContainerdSocket)()
if err != nil {
return err
}
// Create the default runner options.
opts := runner.DefaultOptions()
for _, setter := range setters {
setter(opts)
}
// Create the containerd client.
ctx := namespaces.WithNamespace(context.Background(), "system")
client, err := containerd.New(constants.ContainerdSocket)
if err != nil {
return err
}
// nolint: errcheck
defer client.Close()
// Pull the image and unpack it.
image, err := client.Pull(ctx, opts.ContainerImage, containerd.WithPullUnpack)
if err != nil {
return fmt.Errorf("failed to pull image %q: %v", opts.ContainerImage, err)
}
// Create the container.
specOpts := newOCISpecOpts(image, args, opts)
containerOpts := newContainerOpts(image, args, opts, specOpts)
container, err := client.NewContainer(
ctx,
args.ID,
containerOpts...,
)
if err != nil {
return fmt.Errorf("failed to create container %q: %v", args.ID, err)
}
// Create the task and start it.
task, err := container.NewTask(ctx, cio.LogFile(logPath(args)))
if err != nil {
return fmt.Errorf("failed to create task: %q: %v", args.ID, err)
}
if err := task.Start(ctx); err != nil {
return fmt.Errorf("failed to start task: %q: %v", args.ID, err)
}
// Wait for the task exit code.
if opts.Type == runner.Once {
defer container.Delete(ctx, containerd.WithSnapshotCleanup) // nolint: errcheck
defer task.Delete(ctx) // nolint: errcheck
statusC, err := task.Wait(ctx)
if err != nil {
return fmt.Errorf("failed waiting for task: %q: %v", args.ID, err)
}
status := <-statusC
code := status.ExitCode()
if code != 0 {
return fmt.Errorf("task %q failed: exit code %d", args.ID, code)
}
}
return nil
}
func newContainerOpts(image containerd.Image, args runner.Args, opts *runner.Options, specOpts []oci.SpecOpts) []containerd.NewContainerOpts {
containerOpts := []containerd.NewContainerOpts{
containerd.WithImage(image),
containerd.WithNewSnapshot(args.ID, image),
containerd.WithNewSpec(specOpts...),
}
if opts.Type == runner.Forever {
containerOpts = append(containerOpts, restart.WithStatus(containerd.Running), restart.WithLogPath(logPath(args)))
}
containerOpts = append(containerOpts, opts.ContainerOpts...)
return containerOpts
}
func newOCISpecOpts(image oci.Image, args runner.Args, opts *runner.Options) []oci.SpecOpts {
specOpts := []oci.SpecOpts{
oci.WithImageConfig(image),
oci.WithProcessArgs(args.ProcessArgs...),
oci.WithEnv(opts.Env),
oci.WithHostNamespace(specs.NetworkNamespace),
oci.WithHostNamespace(specs.PIDNamespace),
oci.WithHostHostsFile,
oci.WithHostResolvconf,
oci.WithPrivileged,
}
specOpts = append(specOpts, opts.OCISpecOpts...)
return specOpts
}
func logPath(args runner.Args) string {
return "/var/log/" + args.ID + ".log"
}