From ee10f9881c86e5f230c2af34ed0bfaa009697b03 Mon Sep 17 00:00:00 2001 From: Tom Meadows Date: Fri, 1 May 2026 18:26:55 +0100 Subject: [PATCH] cmd/k8s-operator: add authkey reissuing to recorder reconciler (#19556) also fixes memory leak with authKeyReissuing map on ProxyGroup reconciler authkey reissue. Updates #19311 Signed-off-by: chaosinthecrd --- cmd/k8s-operator/operator.go | 14 +-- cmd/k8s-operator/proxygroup.go | 3 + cmd/k8s-operator/tsrecorder.go | 160 +++++++++++++++++++++++----- cmd/k8s-operator/tsrecorder_test.go | 15 +-- 4 files changed, 155 insertions(+), 37 deletions(-) diff --git a/cmd/k8s-operator/operator.go b/cmd/k8s-operator/operator.go index c0ef96a68..9f9c71997 100644 --- a/cmd/k8s-operator/operator.go +++ b/cmd/k8s-operator/operator.go @@ -692,12 +692,14 @@ func runReconcilers(opts reconcilerOpts) { Watches(&rbacv1.Role{}, recorderFilter). Watches(&rbacv1.RoleBinding{}, recorderFilter). Complete(&RecorderReconciler{ - recorder: eventRecorder, - tsNamespace: opts.tailscaleNamespace, - Client: mgr.GetClient(), - log: opts.log.Named("recorder-reconciler"), - clock: tstime.DefaultClock{}, - clients: clients, + recorder: eventRecorder, + tsNamespace: opts.tailscaleNamespace, + Client: mgr.GetClient(), + log: opts.log.Named("recorder-reconciler"), + clock: tstime.DefaultClock{}, + clients: clients, + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), }) if err != nil { startlog.Fatalf("could not create Recorder reconciler: %v", err) diff --git a/cmd/k8s-operator/proxygroup.go b/cmd/k8s-operator/proxygroup.go index 4bd015701..9df8460b7 100644 --- a/cmd/k8s-operator/proxygroup.go +++ b/cmd/k8s-operator/proxygroup.go @@ -1160,6 +1160,9 @@ func (r *ProxyGroupReconciler) ensureStateRemovedForProxyGroup(pg *tsapi.ProxyGr gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len())) gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len())) delete(r.authKeyRateLimits, pg.Name) + for i := range pgReplicas(pg) { + delete(r.authKeyReissuing, pgStateSecretName(pg.Name, i)) + } } func pgTailscaledConfig(pg *tsapi.ProxyGroup, loginServer string, pc *tsapi.ProxyClass, idx int32, authKey *string, staticEndpoints []netip.AddrPort, oldAdvertiseServices []string) (tailscaledConfigs, error) { diff --git a/cmd/k8s-operator/tsrecorder.go b/cmd/k8s-operator/tsrecorder.go index 881d82354..86669d212 100644 --- a/cmd/k8s-operator/tsrecorder.go +++ b/cmd/k8s-operator/tsrecorder.go @@ -14,9 +14,11 @@ import ( "strconv" "strings" "sync" + "time" "go.uber.org/zap" xslices "golang.org/x/exp/slices" + "golang.org/x/time/rate" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -57,14 +59,15 @@ var gaugeRecorderResources = clientmetric.NewGauge(kubetypes.MetricRecorderCount // Recorder CRs. type RecorderReconciler struct { client.Client - log *zap.SugaredLogger - recorder record.EventRecorder - clock tstime.Clock - clients ClientProvider - tsNamespace string - - mu sync.Mutex // protects following - recorders set.Slice[types.UID] // for recorders gauge + log *zap.SugaredLogger + recorder record.EventRecorder + clock tstime.Clock + clients ClientProvider + tsNamespace string + authKeyRateLimits map[string]*rate.Limiter // per-Recorder rate limiters for auth key re-issuance. + authKeyReissuing map[string]bool + mu sync.Mutex // protects following + recorders set.Slice[types.UID] // for recorders gauge } func (r *RecorderReconciler) logger(name string) *zap.SugaredLogger { @@ -164,9 +167,23 @@ func (r *RecorderReconciler) Reconcile(ctx context.Context, req reconcile.Reques func (r *RecorderReconciler) maybeProvision(ctx context.Context, tsClient tsclient.Client, tsr *tsapi.Recorder) error { logger := r.logger(tsr.Name) + var replicas int32 = 1 + if tsr.Spec.Replicas != nil { + replicas = *tsr.Spec.Replicas + } + r.mu.Lock() r.recorders.Add(tsr.UID) gaugeRecorderResources.Set(int64(r.recorders.Len())) + if _, ok := r.authKeyRateLimits[tsr.Name]; !ok { + r.authKeyRateLimits[tsr.Name] = rate.NewLimiter(rate.Every(30*time.Second), int(replicas)) + } + for replica := range replicas { + name := fmt.Sprintf("%s-%d", tsr.Name, replica) + if _, ok := r.authKeyReissuing[name]; !ok { + r.authKeyReissuing[name] = false + } + } r.mu.Unlock() if err := r.ensureAuthSecretsCreated(ctx, tsClient, tsr); err != nil { @@ -174,11 +191,6 @@ func (r *RecorderReconciler) maybeProvision(ctx context.Context, tsClient tsclie } // State Secrets are pre-created so we can use the Recorder CR as its owner ref. - var replicas int32 = 1 - if tsr.Spec.Replicas != nil { - replicas = *tsr.Spec.Replicas - } - for replica := range replicas { sec := tsrStateSecret(tsr, r.tsNamespace, replica) _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, sec, func(s *corev1.Secret) { @@ -423,6 +435,10 @@ func (r *RecorderReconciler) maybeCleanup(ctx context.Context, tsr *tsapi.Record r.mu.Lock() r.recorders.Remove(tsr.UID) gaugeRecorderResources.Set(int64(r.recorders.Len())) + delete(r.authKeyRateLimits, tsr.Name) + for replica := range replicas { + delete(r.authKeyReissuing, fmt.Sprintf("%s-%d", tsr.Name, replica)) + } r.mu.Unlock() return true, nil @@ -447,28 +463,122 @@ func (r *RecorderReconciler) ensureAuthSecretsCreated(ctx context.Context, tsCli Name: fmt.Sprintf("%s-auth-%d", tsr.Name, replica), } - err := r.Get(ctx, key, &corev1.Secret{}) + existingSecret := &corev1.Secret{} + err := r.Get(ctx, key, existingSecret) switch { case err == nil: - logger.Debugf("auth Secret %q already exists", key.Name) + reissue, err := r.shouldReissueAuthKey(ctx, tsClient, tsr, replica, existingSecret) + if err != nil { + return fmt.Errorf("error checking auth key reissue for replica %d: %w", replica, err) + } + if !reissue { + logger.Debugf("auth Secret %q already exists, no reissue needed", key.Name) + continue + } + authKey, err := newAuthKey(ctx, tsClient, tags.Stringify()) + if err != nil { + return err + } + existingSecret.Data["authkey"] = []byte(authKey) + if err = r.Update(ctx, existingSecret); err != nil { + return err + } continue - case !apierrors.IsNotFound(err): + case apierrors.IsNotFound(err): + authKey, err := newAuthKey(ctx, tsClient, tags.Stringify()) + if err != nil { + return err + } + if err := r.Create(ctx, tsrAuthSecret(tsr, r.tsNamespace, authKey, replica)); err != nil { + return err + } + default: return fmt.Errorf("failed to get Secret %q: %w", key.Name, err) } - - authKey, err := newAuthKey(ctx, tsClient, tags.Stringify()) - if err != nil { - return err - } - - if err = r.Create(ctx, tsrAuthSecret(tsr, r.tsNamespace, authKey, replica)); err != nil { - return err - } } return nil } +// shouldReissueAuthKey returns true if the proxy needs a new auth key. It +// tracks in-flight reissues via authKeyReissuing to avoid duplicate API calls +// across reconciles. +func (r *RecorderReconciler) shouldReissueAuthKey(ctx context.Context, tsClient tsclient.Client, tsr *tsapi.Recorder, replica int32, authSecret *corev1.Secret) (shouldReissue bool, err error) { + stateSecret, err := r.getStateSecret(ctx, tsr.Name, replica) + if err != nil || stateSecret == nil { + return false, err + } + + stateSecretName := fmt.Sprintf("%s-%d", tsr.Name, replica) + + r.mu.Lock() + reissuing := r.authKeyReissuing[stateSecretName] + r.mu.Unlock() + + if reissuing { + _, requestStillPresent := stateSecret.Data[kubetypes.KeyReissueAuthkey] + if !requestStillPresent { + r.mu.Lock() + r.authKeyReissuing[stateSecretName] = false + r.mu.Unlock() + r.log.Debugf("auth key reissue completed for %q", stateSecretName) + return false, nil + } + r.log.Debugf("auth key already in process of re-issuance for %q, waiting", stateSecretName) + return false, nil + } + + defer func() { + r.mu.Lock() + r.authKeyReissuing[stateSecretName] = shouldReissue + r.mu.Unlock() + }() + + brokenAuthkey, ok := stateSecret.Data[kubetypes.KeyReissueAuthkey] + if !ok { + return false, nil + } + + cfgAuthKey := string(authSecret.Data["authkey"]) + empty := cfgAuthKey == "" + broken := cfgAuthKey == string(brokenAuthkey) + + if !empty && !broken { + return false, nil + } + + lim := r.authKeyRateLimits[tsr.Name] + if !lim.Allow() { + r.log.Debugf("auth key re-issuance rate limit exceeded, limit: %.2f, burst: %d, tokens: %.2f", + lim.Limit(), lim.Burst(), lim.Tokens()) + return false, fmt.Errorf("auth key re-issuance rate limit exceeded for Recorder %q, will retry with backoff", tsr.Name) + } + + r.log.Infof("Recorder replica %s failing to auth; attempting cleanup and new key", stateSecretName) + if tsID := stateSecret.Data[kubetypes.KeyDeviceID]; len(tsID) > 0 { + id := tailcfg.StableNodeID(tsID) + if err := r.ensureDeviceDeleted(ctx, tsClient, id, r.log); err != nil { + return false, err + } + } + + return true, nil +} + +func (r *RecorderReconciler) ensureDeviceDeleted(ctx context.Context, tsClient tsclient.Client, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error { + logger.Debugf("deleting device %s from control", string(id)) + err := tsClient.Devices().Delete(ctx, string(id)) + switch { + case tailscale.IsNotFound(err): + logger.Debugf("device %s not found, likely because it has already been deleted from control", string(id)) + case err != nil: + return fmt.Errorf("error deleting device: %w", err) + default: + logger.Debugf("device %s deleted from control", string(id)) + } + return nil +} + func (r *RecorderReconciler) validate(ctx context.Context, tsr *tsapi.Recorder) error { if !tsr.Spec.EnableUI && tsr.Spec.Storage.S3 == nil { return errors.New("must either enable UI or use S3 storage to ensure recordings are accessible") diff --git a/cmd/k8s-operator/tsrecorder_test.go b/cmd/k8s-operator/tsrecorder_test.go index 6bd47e07b..8f189728c 100644 --- a/cmd/k8s-operator/tsrecorder_test.go +++ b/cmd/k8s-operator/tsrecorder_test.go @@ -14,6 +14,7 @@ import ( "github.com/google/go-cmp/cmp" "go.uber.org/zap" + "golang.org/x/time/rate" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -55,12 +56,14 @@ func TestRecorder(t *testing.T) { fr := record.NewFakeRecorder(2) cl := tstest.NewClock(tstest.ClockOpts{}) reconciler := &RecorderReconciler{ - tsNamespace: tsNamespace, - Client: fc, - clients: tsclient.NewProvider(tsClient), - recorder: fr, - log: zl.Sugar(), - clock: cl, + tsNamespace: tsNamespace, + Client: fc, + clients: tsclient.NewProvider(tsClient), + recorder: fr, + log: zl.Sugar(), + clock: cl, + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), } t.Run("invalid_spec_gives_an_error_condition", func(t *testing.T) {