diff --git a/internal/app/machined/pkg/system/services/apid.go b/internal/app/machined/pkg/system/services/apid.go index 32711b64a..6ed24d002 100644 --- a/internal/app/machined/pkg/system/services/apid.go +++ b/internal/app/machined/pkg/system/services/apid.go @@ -9,12 +9,15 @@ import ( "bytes" "context" "fmt" + "log" "net" "os" "path/filepath" "strings" + "sync" "github.com/containerd/containerd/oci" + "github.com/fsnotify/fsnotify" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" @@ -25,13 +28,18 @@ import ( "github.com/talos-systems/talos/internal/app/machined/pkg/system/runner/restart" "github.com/talos-systems/talos/internal/pkg/containers/image" "github.com/talos-systems/talos/pkg/conditions" + "github.com/talos-systems/talos/pkg/copy" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" ) // APID implements the Service interface. It serves as the concrete type with // the required methods. -type APID struct{} +type APID struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} // ID implements the Service interface. func (o *APID) ID(r runtime.Runtime) string { @@ -40,11 +48,18 @@ func (o *APID) ID(r runtime.Runtime) string { // PreFunc implements the Service interface. func (o *APID) PreFunc(ctx context.Context, r runtime.Runtime) error { + if r.Config().Machine().Type() == machine.TypeJoin { + o.syncKubeletPKI() + } + return image.Import(ctx, "/usr/images/apid.tar", "talos/apid") } // PostFunc implements the Service interface. func (o *APID) PostFunc(r runtime.Runtime, state events.ServiceState) (err error) { + o.cancel() + o.wg.Wait() + return nil } @@ -101,8 +116,8 @@ func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) { if isWorker { // worker requires kubelet config to refresh the certs via Kubernetes mounts = append(mounts, - specs.Mount{Type: "bind", Destination: filepath.Dir(constants.KubeletKubeconfig), Source: filepath.Dir(constants.KubeletKubeconfig), Options: []string{"rbind", "ro"}}, - specs.Mount{Type: "bind", Destination: constants.KubeletPKIDir, Source: constants.KubeletPKIDir, Options: []string{"rbind", "ro"}}, + specs.Mount{Type: "bind", Destination: filepath.Dir(constants.KubeletKubeconfig), Source: constants.SystemKubeletPKIDir, Options: []string{"rbind", "ro"}}, + specs.Mount{Type: "bind", Destination: constants.KubeletPKIDir, Source: constants.SystemKubeletPKIDir, Options: []string{"rbind", "ro"}}, ) } @@ -164,3 +179,55 @@ func (o *APID) HealthFunc(runtime.Runtime) health.Check { func (o *APID) HealthSettings(runtime.Runtime) *health.Settings { return &health.DefaultSettings } + +func (o *APID) syncKubeletPKI() { + copyAll := func() { + if err := copy.Dir(constants.KubeletPKIDir, constants.SystemKubeletPKIDir, copy.WithMode(0o700)); err != nil { + log.Printf("failed to sync %s dir contents into %s: %s", constants.KubeletPKIDir, constants.SystemKubeletPKIDir, err) + + return + } + + if err := copy.File(constants.KubeletKubeconfig, filepath.Join(constants.SystemKubeletPKIDir, filepath.Base(constants.KubeletKubeconfig)), copy.WithMode(0o700)); err != nil { + log.Printf("failed to sync %s into %s: %s", constants.KubeletKubeconfig, constants.SystemKubeletPKIDir, err) + + return + } + } + + copyAll() + + o.ctx, o.cancel = context.WithCancel(context.Background()) + o.wg.Add(1) + + go func() { + defer o.wg.Done() + + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Printf("failed to create directory watcher %s", err) + + return + } + + defer watcher.Close() //nolint:errcheck + + err = watcher.Add(constants.KubeletPKIDir) + if err != nil { + log.Printf("failed to watch dir %s %s", constants.KubeletPKIDir, err) + + return + } + + for { + select { + case <-o.ctx.Done(): + return + case <-watcher.Events: + copyAll() + case err = <-watcher.Errors: + log.Printf("directory watch error %s", err) + } + } + }() +} diff --git a/pkg/copy/copy.go b/pkg/copy/copy.go index d7229a324..44495caf0 100644 --- a/pkg/copy/copy.go +++ b/pkg/copy/copy.go @@ -12,14 +12,19 @@ import ( ) // File copies the `src` file to the `dst` file. -func File(src, dst string) error { +func File(src, dst string, setters ...Option) error { var ( - err error - s *os.File - d *os.File - info os.FileInfo + err error + s *os.File + d *os.File + info os.FileInfo + options Options ) + for _, setter := range setters { + setter(&options) + } + if s, err = os.Open(src); err != nil { return err } @@ -45,22 +50,38 @@ func File(src, dst string) error { return err } - return os.Chmod(dst, info.Mode()) + mode := info.Mode() + if options.Mode != 0 { + mode = options.Mode + } + + return os.Chmod(dst, mode) } // Dir copies the `src` directory to the `dst` directory. -func Dir(src, dst string) error { +func Dir(src, dst string, setters ...Option) error { var ( - err error - files []os.FileInfo - info os.FileInfo + err error + files []os.FileInfo + info os.FileInfo + options Options ) + for _, setter := range setters { + setter(&options) + } + if info, err = os.Stat(src); err != nil { return err } - if err = os.MkdirAll(dst, info.Mode()); err != nil { + mode := info.Mode() + + if options.Mode != 0 { + mode = options.Mode + } + + if err = os.MkdirAll(dst, mode); err != nil { return err } @@ -73,11 +94,11 @@ func Dir(src, dst string) error { d := path.Join(dst, file.Name()) if file.IsDir() { - if err = Dir(s, d); err != nil { + if err = Dir(s, d, setters...); err != nil { return err } } else { - if err = File(s, d); err != nil { + if err = File(s, d, setters...); err != nil { return err } } @@ -85,3 +106,18 @@ func Dir(src, dst string) error { return nil } + +// Option represents copy option. +type Option func(o *Options) + +// Options represents copy options. +type Options struct { + Mode os.FileMode +} + +// WithMode sets destination files filemode. +func WithMode(m os.FileMode) Option { + return func(o *Options) { + o.Mode = m + } +} diff --git a/pkg/machinery/constants/constants.go b/pkg/machinery/constants/constants.go index d1ee3ce8d..065693cb3 100644 --- a/pkg/machinery/constants/constants.go +++ b/pkg/machinery/constants/constants.go @@ -181,9 +181,12 @@ const ( // KubeletPort is the kubelet port for secure API. KubeletPort = 10250 - // KubeletPKIDir is the path to the directory where kubelet stores issues certificates and keys. + // KubeletPKIDir is the path to the directory where kubelet stores issued certificates and keys. KubeletPKIDir = "/var/lib/kubelet/pki" + // SystemKubeletPKIDir is the path to the directory where Talos copies kubelet issued certificates and keys. + SystemKubeletPKIDir = "/system/secrets/kubelet" + // DefaultKubernetesVersion is the default target version of the control plane. DefaultKubernetesVersion = "1.20.2"