mirror of
				https://github.com/tailscale/tailscale.git
				synced 2025-10-31 16:22:03 +01:00 
			
		
		
		
	Adds a new reconciler for ProxyGroups of type kube-apiserver that will provision a Tailscale Service for each replica to advertise. Adds two new condition types to the ProxyGroup, TailscaleServiceValid and TailscaleServiceConfigured, to post updates on the state of that reconciler in a way that's consistent with the service-pg reconciler. The created Tailscale Service name is configurable via a new ProxyGroup field spec.kubeAPISserver.ServiceName, which expects a string of the form "svc:<dns-label>". Lots of supporting changes were needed to implement this in a way that's consistent with other operator workflows, including: * Pulled containerboot's ensureServicesUnadvertised and certManager into kube/ libraries to be shared with k8s-proxy. Use those in k8s-proxy to aid Service cert sharing between replicas and graceful Service shutdown. * For certManager, add an initial wait to the cert loop to wait until the domain appears in the devices's netmap to avoid a guaranteed error on the first issue attempt when it's quick to start. * Made several methods in ingress-for-pg.go and svc-for-pg.go into functions to share with the new reconciler * Added a Resource struct to the owner refs stored in Tailscale Service annotations to be able to distinguish between Ingress- and ProxyGroup- based Services that need cleaning up in the Tailscale API. * Added a ListVIPServices method to the internal tailscale client to aid cleaning up orphaned Services * Support for reading config from a kube Secret, and partial support for config reloading, to prevent us having to force Pod restarts when config changes. * Fixed up the zap logger so it's possible to set debug log level. Updates #13358 Change-Id: Ia9607441157dd91fb9b6ecbc318eecbef446e116 Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
		
			
				
	
	
		
			265 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			265 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) Tailscale Inc & AUTHORS
 | |
| // SPDX-License-Identifier: BSD-3-Clause
 | |
| 
 | |
| //go:build !plan9
 | |
| 
 | |
| // Package config provides watchers for the various supported ways to load a
 | |
| // config file for k8s-proxy; currently file or Kubernetes Secret.
 | |
| package config
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/fsnotify/fsnotify"
 | |
| 	"go.uber.org/zap"
 | |
| 	corev1 "k8s.io/api/core/v1"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/types"
 | |
| 	"k8s.io/apimachinery/pkg/watch"
 | |
| 	clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
 | |
| 	"tailscale.com/kube/k8s-proxy/conf"
 | |
| 	"tailscale.com/kube/kubetypes"
 | |
| 	"tailscale.com/types/ptr"
 | |
| 	"tailscale.com/util/testenv"
 | |
| )
 | |
| 
 | |
| type configLoader struct {
 | |
| 	logger *zap.SugaredLogger
 | |
| 	client clientcorev1.CoreV1Interface
 | |
| 
 | |
| 	cfgChan  chan<- *conf.Config
 | |
| 	previous []byte
 | |
| 
 | |
| 	once       sync.Once     // For use in tests. To close cfgIgnored.
 | |
| 	cfgIgnored chan struct{} // For use in tests.
 | |
| }
 | |
| 
 | |
| func NewConfigLoader(logger *zap.SugaredLogger, client clientcorev1.CoreV1Interface, cfgChan chan<- *conf.Config) *configLoader {
 | |
| 	return &configLoader{
 | |
| 		logger:  logger,
 | |
| 		client:  client,
 | |
| 		cfgChan: cfgChan,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (l *configLoader) WatchConfig(ctx context.Context, path string) error {
 | |
| 	secretNamespacedName, isKubeSecret := strings.CutPrefix(path, "kube:")
 | |
| 	if isKubeSecret {
 | |
| 		secretNamespace, secretName, ok := strings.Cut(secretNamespacedName, string(types.Separator))
 | |
| 		if !ok {
 | |
| 			return fmt.Errorf("invalid Kubernetes Secret reference %q, expected format <namespace>/<name>", path)
 | |
| 		}
 | |
| 		if err := l.watchConfigSecretChanges(ctx, secretNamespace, secretName); err != nil && !errors.Is(err, context.Canceled) {
 | |
| 			return fmt.Errorf("error watching config Secret %q: %w", secretNamespacedName, err)
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if err := l.watchConfigFileChanges(ctx, path); err != nil && !errors.Is(err, context.Canceled) {
 | |
| 		return fmt.Errorf("error watching config file %q: %w", path, err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (l *configLoader) reloadConfig(ctx context.Context, raw []byte) error {
 | |
| 	if bytes.Equal(raw, l.previous) {
 | |
| 		if l.cfgIgnored != nil && testenv.InTest() {
 | |
| 			l.once.Do(func() {
 | |
| 				close(l.cfgIgnored)
 | |
| 			})
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	cfg, err := conf.Load(raw)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("error loading config: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 		return ctx.Err()
 | |
| 	case l.cfgChan <- &cfg:
 | |
| 	}
 | |
| 
 | |
| 	l.previous = raw
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (l *configLoader) watchConfigFileChanges(ctx context.Context, path string) error {
 | |
| 	var (
 | |
| 		tickChan  <-chan time.Time
 | |
| 		eventChan <-chan fsnotify.Event
 | |
| 		errChan   <-chan error
 | |
| 	)
 | |
| 
 | |
| 	if w, err := fsnotify.NewWatcher(); err != nil {
 | |
| 		// Creating a new fsnotify watcher would fail for example if inotify was not able to create a new file descriptor.
 | |
| 		// See https://github.com/tailscale/tailscale/issues/15081
 | |
| 		l.logger.Infof("Failed to create fsnotify watcher on config file %q; watching for changes on 5s timer: %v", path, err)
 | |
| 		ticker := time.NewTicker(5 * time.Second)
 | |
| 		defer ticker.Stop()
 | |
| 		tickChan = ticker.C
 | |
| 	} else {
 | |
| 		dir := filepath.Dir(path)
 | |
| 		file := filepath.Base(path)
 | |
| 		l.logger.Infof("Watching directory %q for changes to config file %q", dir, file)
 | |
| 		defer w.Close()
 | |
| 		if err := w.Add(dir); err != nil {
 | |
| 			return fmt.Errorf("failed to add fsnotify watch: %w", err)
 | |
| 		}
 | |
| 		eventChan = w.Events
 | |
| 		errChan = w.Errors
 | |
| 	}
 | |
| 
 | |
| 	// Read the initial config file, but after the watcher is already set up to
 | |
| 	// avoid an unlucky race condition if the config file is edited in between.
 | |
| 	b, err := os.ReadFile(path)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("error reading config file %q: %w", path, err)
 | |
| 	}
 | |
| 	if err := l.reloadConfig(ctx, b); err != nil {
 | |
| 		return fmt.Errorf("error loading initial config file %q: %w", path, err)
 | |
| 	}
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return ctx.Err()
 | |
| 		case err, ok := <-errChan:
 | |
| 			if !ok {
 | |
| 				// Watcher was closed.
 | |
| 				return nil
 | |
| 			}
 | |
| 			return fmt.Errorf("watcher error: %w", err)
 | |
| 		case <-tickChan:
 | |
| 		case ev, ok := <-eventChan:
 | |
| 			if !ok {
 | |
| 				// Watcher was closed.
 | |
| 				return nil
 | |
| 			}
 | |
| 			if ev.Name != path || ev.Op&fsnotify.Write == 0 {
 | |
| 				// Ignore irrelevant events.
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 		b, err := os.ReadFile(path)
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("error reading config file: %w", err)
 | |
| 		}
 | |
| 		// Writers such as os.WriteFile may truncate the file before writing
 | |
| 		// new contents, so it's possible to read an empty file if we read before
 | |
| 		// the write has completed.
 | |
| 		if len(b) == 0 {
 | |
| 			continue
 | |
| 		}
 | |
| 		if err := l.reloadConfig(ctx, b); err != nil {
 | |
| 			return fmt.Errorf("error reloading config file %q: %v", path, err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (l *configLoader) watchConfigSecretChanges(ctx context.Context, secretNamespace, secretName string) error {
 | |
| 	secrets := l.client.Secrets(secretNamespace)
 | |
| 	w, err := secrets.Watch(ctx, metav1.ListOptions{
 | |
| 		TypeMeta: metav1.TypeMeta{
 | |
| 			Kind:       "Secret",
 | |
| 			APIVersion: "v1",
 | |
| 		},
 | |
| 		// Re-watch regularly to avoid relying on long-lived connections.
 | |
| 		// See https://github.com/kubernetes-client/javascript/issues/596#issuecomment-786419380
 | |
| 		TimeoutSeconds: ptr.To(int64(600)),
 | |
| 		FieldSelector:  fmt.Sprintf("metadata.name=%s", secretName),
 | |
| 		Watch:          true,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to watch config Secret %q: %w", secretName, err)
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		// May not be the original watcher by the time we exit.
 | |
| 		if w != nil {
 | |
| 			w.Stop()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	// Get the initial config Secret now we've got the watcher set up.
 | |
| 	secret, err := secrets.Get(ctx, secretName, metav1.GetOptions{})
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to get config Secret %q: %w", secretName, err)
 | |
| 	}
 | |
| 
 | |
| 	if err := l.configFromSecret(ctx, secret); err != nil {
 | |
| 		return fmt.Errorf("error loading initial config: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	l.logger.Infof("Watching config Secret %q for changes", secretName)
 | |
| 	for {
 | |
| 		var secret *corev1.Secret
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return ctx.Err()
 | |
| 		case ev, ok := <-w.ResultChan():
 | |
| 			if !ok {
 | |
| 				w.Stop()
 | |
| 				w, err = secrets.Watch(ctx, metav1.ListOptions{
 | |
| 					TypeMeta: metav1.TypeMeta{
 | |
| 						Kind:       "Secret",
 | |
| 						APIVersion: "v1",
 | |
| 					},
 | |
| 					TimeoutSeconds: ptr.To(int64(600)),
 | |
| 					FieldSelector:  fmt.Sprintf("metadata.name=%s", secretName),
 | |
| 					Watch:          true,
 | |
| 				})
 | |
| 				if err != nil {
 | |
| 					return fmt.Errorf("failed to re-watch config Secret %q: %w", secretName, err)
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			switch ev.Type {
 | |
| 			case watch.Added, watch.Modified:
 | |
| 				// New config available to load.
 | |
| 				var ok bool
 | |
| 				secret, ok = ev.Object.(*corev1.Secret)
 | |
| 				if !ok {
 | |
| 					return fmt.Errorf("unexpected object type %T in watch event for config Secret %q", ev.Object, secretName)
 | |
| 				}
 | |
| 				if secret == nil || secret.Data == nil {
 | |
| 					continue
 | |
| 				}
 | |
| 				if err := l.configFromSecret(ctx, secret); err != nil {
 | |
| 					return fmt.Errorf("error reloading config Secret %q: %v", secret.Name, err)
 | |
| 				}
 | |
| 			case watch.Error:
 | |
| 				return fmt.Errorf("error watching config Secret %q: %v", secretName, ev.Object)
 | |
| 			default:
 | |
| 				// Ignore, no action required.
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (l *configLoader) configFromSecret(ctx context.Context, s *corev1.Secret) error {
 | |
| 	b := s.Data[kubetypes.KubeAPIServerConfigFile]
 | |
| 	if len(b) == 0 {
 | |
| 		return fmt.Errorf("config Secret %q does not contain expected config in key %q", s.Name, kubetypes.KubeAPIServerConfigFile)
 | |
| 	}
 | |
| 
 | |
| 	if err := l.reloadConfig(ctx, b); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |