cmd/k8s-operator: add authkey reissuing to recorder reconciler (#19556)

also fixes memory leak with authKeyReissuing map on ProxyGroup
reconciler authkey reissue.

Updates #19311

Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
Tom Meadows 2026-05-01 18:26:55 +01:00 committed by GitHub
parent 3ced30b0b6
commit ee10f9881c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 155 additions and 37 deletions

View File

@ -692,12 +692,14 @@ func runReconcilers(opts reconcilerOpts) {
Watches(&rbacv1.Role{}, recorderFilter).
Watches(&rbacv1.RoleBinding{}, recorderFilter).
Complete(&RecorderReconciler{
recorder: eventRecorder,
tsNamespace: opts.tailscaleNamespace,
Client: mgr.GetClient(),
log: opts.log.Named("recorder-reconciler"),
clock: tstime.DefaultClock{},
clients: clients,
recorder: eventRecorder,
tsNamespace: opts.tailscaleNamespace,
Client: mgr.GetClient(),
log: opts.log.Named("recorder-reconciler"),
clock: tstime.DefaultClock{},
clients: clients,
authKeyRateLimits: make(map[string]*rate.Limiter),
authKeyReissuing: make(map[string]bool),
})
if err != nil {
startlog.Fatalf("could not create Recorder reconciler: %v", err)

View File

@ -1160,6 +1160,9 @@ func (r *ProxyGroupReconciler) ensureStateRemovedForProxyGroup(pg *tsapi.ProxyGr
gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len()))
delete(r.authKeyRateLimits, pg.Name)
for i := range pgReplicas(pg) {
delete(r.authKeyReissuing, pgStateSecretName(pg.Name, i))
}
}
func pgTailscaledConfig(pg *tsapi.ProxyGroup, loginServer string, pc *tsapi.ProxyClass, idx int32, authKey *string, staticEndpoints []netip.AddrPort, oldAdvertiseServices []string) (tailscaledConfigs, error) {

View File

@ -14,9 +14,11 @@ import (
"strconv"
"strings"
"sync"
"time"
"go.uber.org/zap"
xslices "golang.org/x/exp/slices"
"golang.org/x/time/rate"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
@ -57,14 +59,15 @@ var gaugeRecorderResources = clientmetric.NewGauge(kubetypes.MetricRecorderCount
// Recorder CRs.
type RecorderReconciler struct {
client.Client
log *zap.SugaredLogger
recorder record.EventRecorder
clock tstime.Clock
clients ClientProvider
tsNamespace string
mu sync.Mutex // protects following
recorders set.Slice[types.UID] // for recorders gauge
log *zap.SugaredLogger
recorder record.EventRecorder
clock tstime.Clock
clients ClientProvider
tsNamespace string
authKeyRateLimits map[string]*rate.Limiter // per-Recorder rate limiters for auth key re-issuance.
authKeyReissuing map[string]bool
mu sync.Mutex // protects following
recorders set.Slice[types.UID] // for recorders gauge
}
func (r *RecorderReconciler) logger(name string) *zap.SugaredLogger {
@ -164,9 +167,23 @@ func (r *RecorderReconciler) Reconcile(ctx context.Context, req reconcile.Reques
func (r *RecorderReconciler) maybeProvision(ctx context.Context, tsClient tsclient.Client, tsr *tsapi.Recorder) error {
logger := r.logger(tsr.Name)
var replicas int32 = 1
if tsr.Spec.Replicas != nil {
replicas = *tsr.Spec.Replicas
}
r.mu.Lock()
r.recorders.Add(tsr.UID)
gaugeRecorderResources.Set(int64(r.recorders.Len()))
if _, ok := r.authKeyRateLimits[tsr.Name]; !ok {
r.authKeyRateLimits[tsr.Name] = rate.NewLimiter(rate.Every(30*time.Second), int(replicas))
}
for replica := range replicas {
name := fmt.Sprintf("%s-%d", tsr.Name, replica)
if _, ok := r.authKeyReissuing[name]; !ok {
r.authKeyReissuing[name] = false
}
}
r.mu.Unlock()
if err := r.ensureAuthSecretsCreated(ctx, tsClient, tsr); err != nil {
@ -174,11 +191,6 @@ func (r *RecorderReconciler) maybeProvision(ctx context.Context, tsClient tsclie
}
// State Secrets are pre-created so we can use the Recorder CR as its owner ref.
var replicas int32 = 1
if tsr.Spec.Replicas != nil {
replicas = *tsr.Spec.Replicas
}
for replica := range replicas {
sec := tsrStateSecret(tsr, r.tsNamespace, replica)
_, err := createOrUpdate(ctx, r.Client, r.tsNamespace, sec, func(s *corev1.Secret) {
@ -423,6 +435,10 @@ func (r *RecorderReconciler) maybeCleanup(ctx context.Context, tsr *tsapi.Record
r.mu.Lock()
r.recorders.Remove(tsr.UID)
gaugeRecorderResources.Set(int64(r.recorders.Len()))
delete(r.authKeyRateLimits, tsr.Name)
for replica := range replicas {
delete(r.authKeyReissuing, fmt.Sprintf("%s-%d", tsr.Name, replica))
}
r.mu.Unlock()
return true, nil
@ -447,28 +463,122 @@ func (r *RecorderReconciler) ensureAuthSecretsCreated(ctx context.Context, tsCli
Name: fmt.Sprintf("%s-auth-%d", tsr.Name, replica),
}
err := r.Get(ctx, key, &corev1.Secret{})
existingSecret := &corev1.Secret{}
err := r.Get(ctx, key, existingSecret)
switch {
case err == nil:
logger.Debugf("auth Secret %q already exists", key.Name)
reissue, err := r.shouldReissueAuthKey(ctx, tsClient, tsr, replica, existingSecret)
if err != nil {
return fmt.Errorf("error checking auth key reissue for replica %d: %w", replica, err)
}
if !reissue {
logger.Debugf("auth Secret %q already exists, no reissue needed", key.Name)
continue
}
authKey, err := newAuthKey(ctx, tsClient, tags.Stringify())
if err != nil {
return err
}
existingSecret.Data["authkey"] = []byte(authKey)
if err = r.Update(ctx, existingSecret); err != nil {
return err
}
continue
case !apierrors.IsNotFound(err):
case apierrors.IsNotFound(err):
authKey, err := newAuthKey(ctx, tsClient, tags.Stringify())
if err != nil {
return err
}
if err := r.Create(ctx, tsrAuthSecret(tsr, r.tsNamespace, authKey, replica)); err != nil {
return err
}
default:
return fmt.Errorf("failed to get Secret %q: %w", key.Name, err)
}
authKey, err := newAuthKey(ctx, tsClient, tags.Stringify())
if err != nil {
return err
}
if err = r.Create(ctx, tsrAuthSecret(tsr, r.tsNamespace, authKey, replica)); err != nil {
return err
}
}
return nil
}
// shouldReissueAuthKey returns true if the proxy needs a new auth key. It
// tracks in-flight reissues via authKeyReissuing to avoid duplicate API calls
// across reconciles.
func (r *RecorderReconciler) shouldReissueAuthKey(ctx context.Context, tsClient tsclient.Client, tsr *tsapi.Recorder, replica int32, authSecret *corev1.Secret) (shouldReissue bool, err error) {
stateSecret, err := r.getStateSecret(ctx, tsr.Name, replica)
if err != nil || stateSecret == nil {
return false, err
}
stateSecretName := fmt.Sprintf("%s-%d", tsr.Name, replica)
r.mu.Lock()
reissuing := r.authKeyReissuing[stateSecretName]
r.mu.Unlock()
if reissuing {
_, requestStillPresent := stateSecret.Data[kubetypes.KeyReissueAuthkey]
if !requestStillPresent {
r.mu.Lock()
r.authKeyReissuing[stateSecretName] = false
r.mu.Unlock()
r.log.Debugf("auth key reissue completed for %q", stateSecretName)
return false, nil
}
r.log.Debugf("auth key already in process of re-issuance for %q, waiting", stateSecretName)
return false, nil
}
defer func() {
r.mu.Lock()
r.authKeyReissuing[stateSecretName] = shouldReissue
r.mu.Unlock()
}()
brokenAuthkey, ok := stateSecret.Data[kubetypes.KeyReissueAuthkey]
if !ok {
return false, nil
}
cfgAuthKey := string(authSecret.Data["authkey"])
empty := cfgAuthKey == ""
broken := cfgAuthKey == string(brokenAuthkey)
if !empty && !broken {
return false, nil
}
lim := r.authKeyRateLimits[tsr.Name]
if !lim.Allow() {
r.log.Debugf("auth key re-issuance rate limit exceeded, limit: %.2f, burst: %d, tokens: %.2f",
lim.Limit(), lim.Burst(), lim.Tokens())
return false, fmt.Errorf("auth key re-issuance rate limit exceeded for Recorder %q, will retry with backoff", tsr.Name)
}
r.log.Infof("Recorder replica %s failing to auth; attempting cleanup and new key", stateSecretName)
if tsID := stateSecret.Data[kubetypes.KeyDeviceID]; len(tsID) > 0 {
id := tailcfg.StableNodeID(tsID)
if err := r.ensureDeviceDeleted(ctx, tsClient, id, r.log); err != nil {
return false, err
}
}
return true, nil
}
func (r *RecorderReconciler) ensureDeviceDeleted(ctx context.Context, tsClient tsclient.Client, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error {
logger.Debugf("deleting device %s from control", string(id))
err := tsClient.Devices().Delete(ctx, string(id))
switch {
case tailscale.IsNotFound(err):
logger.Debugf("device %s not found, likely because it has already been deleted from control", string(id))
case err != nil:
return fmt.Errorf("error deleting device: %w", err)
default:
logger.Debugf("device %s deleted from control", string(id))
}
return nil
}
func (r *RecorderReconciler) validate(ctx context.Context, tsr *tsapi.Recorder) error {
if !tsr.Spec.EnableUI && tsr.Spec.Storage.S3 == nil {
return errors.New("must either enable UI or use S3 storage to ensure recordings are accessible")

View File

@ -14,6 +14,7 @@ import (
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"golang.org/x/time/rate"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
@ -55,12 +56,14 @@ func TestRecorder(t *testing.T) {
fr := record.NewFakeRecorder(2)
cl := tstest.NewClock(tstest.ClockOpts{})
reconciler := &RecorderReconciler{
tsNamespace: tsNamespace,
Client: fc,
clients: tsclient.NewProvider(tsClient),
recorder: fr,
log: zl.Sugar(),
clock: cl,
tsNamespace: tsNamespace,
Client: fc,
clients: tsclient.NewProvider(tsClient),
recorder: fr,
log: zl.Sugar(),
clock: cl,
authKeyRateLimits: make(map[string]*rate.Limiter),
authKeyReissuing: make(map[string]bool),
}
t.Run("invalid_spec_gives_an_error_condition", func(t *testing.T) {