diff --git a/cmd/containerboot/egressservices.go b/cmd/containerboot/egressservices.go index cadad832c..abde12523 100644 --- a/cmd/containerboot/egressservices.go +++ b/cmd/containerboot/egressservices.go @@ -24,10 +24,10 @@ import ( "github.com/fsnotify/fsnotify" "tailscale.com/client/local" - "tailscale.com/ipn" "tailscale.com/kube/egressservices" "tailscale.com/kube/kubeclient" "tailscale.com/kube/kubetypes" + "tailscale.com/types/netmap" "tailscale.com/util/httpm" "tailscale.com/util/linuxfw" "tailscale.com/util/mak" @@ -55,7 +55,7 @@ type egressProxy struct { tsClient *local.Client // never nil - netmapChan chan ipn.Notify // chan to receive netmap updates on + netmapChan chan *netmap.NetworkMap // chan to receive netmap updates on podIPv4 string // never empty string, currently only IPv4 is supported @@ -87,7 +87,7 @@ type httpClient interface { // - the mounted egress config has changed // - the proxy's tailnet IP addresses have changed // - tailnet IPs have changed for any backend targets specified by tailnet FQDN -func (ep *egressProxy) run(ctx context.Context, n ipn.Notify, opts egressProxyRunOpts) error { +func (ep *egressProxy) run(ctx context.Context, nm *netmap.NetworkMap, opts egressProxyRunOpts) error { ep.configure(opts) var tickChan <-chan time.Time var eventChan <-chan fsnotify.Event @@ -106,7 +106,7 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify, opts egressProxyRu eventChan = w.Events } - if err := ep.sync(ctx, n); err != nil { + if err := ep.sync(ctx, nm); err != nil { return err } for { @@ -117,14 +117,14 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify, opts egressProxyRu log.Printf("periodic sync, ensuring firewall config is up to date...") case <-eventChan: log.Printf("config file change detected, ensuring firewall config is up to date...") - case n = <-ep.netmapChan: - shouldResync := ep.shouldResync(n) + case nm = <-ep.netmapChan: + shouldResync := ep.shouldResync(nm) if !shouldResync { continue } log.Printf("netmap change detected, ensuring firewall config is up to date...") } - if err := ep.sync(ctx, n); err != nil { + if err := ep.sync(ctx, nm); err != nil { return fmt.Errorf("error syncing egress service config: %w", err) } } @@ -136,7 +136,7 @@ type egressProxyRunOpts struct { kc kubeclient.Client tsClient *local.Client stateSecret string - netmapChan chan ipn.Notify + netmapChan chan *netmap.NetworkMap podIPv4 string tailnetAddrs []netip.Prefix } @@ -165,7 +165,7 @@ func (ep *egressProxy) configure(opts egressProxyRunOpts) { // any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current // firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such // as failed firewall update -func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error { +func (ep *egressProxy) sync(ctx context.Context, nm *netmap.NetworkMap) error { cfgs, err := ep.getConfigs() if err != nil { return fmt.Errorf("error retrieving egress service configs: %w", err) @@ -174,12 +174,12 @@ func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error { if err != nil { return fmt.Errorf("error retrieving current egress proxy status: %w", err) } - newStatus, err := ep.syncEgressConfigs(cfgs, status, n) + newStatus, err := ep.syncEgressConfigs(cfgs, status, nm) if err != nil { return fmt.Errorf("error syncing egress service configs: %w", err) } if !servicesStatusIsEqual(newStatus, status) { - if err := ep.setStatus(ctx, newStatus, n); err != nil { + if err := ep.setStatus(ctx, newStatus, nm); err != nil { return fmt.Errorf("error setting egress proxy status: %w", err) } } @@ -188,14 +188,14 @@ func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error { // addrsHaveChanged returns true if the provided netmap update contains tailnet address change for this proxy node. // Netmap must not be nil. -func (ep *egressProxy) addrsHaveChanged(n ipn.Notify) bool { - return !reflect.DeepEqual(ep.tailnetAddrs, n.NetMap.SelfNode.Addresses()) +func (ep *egressProxy) addrsHaveChanged(nm *netmap.NetworkMap) bool { + return !reflect.DeepEqual(ep.tailnetAddrs, nm.SelfNode.Addresses()) } // syncEgressConfigs adds and deletes firewall rules to match the desired // configuration. It uses the provided status to determine what is currently // applied and updates the status after a successful sync. -func (ep *egressProxy) syncEgressConfigs(cfgs egressservices.Configs, status *egressservices.Status, n ipn.Notify) (*egressservices.Status, error) { +func (ep *egressProxy) syncEgressConfigs(cfgs egressservices.Configs, status *egressservices.Status, nm *netmap.NetworkMap) (*egressservices.Status, error) { if !(wantsServicesConfigured(cfgs) || hasServicesConfigured(status)) { return nil, nil } @@ -214,7 +214,7 @@ func (ep *egressProxy) syncEgressConfigs(cfgs egressservices.Configs, status *eg rulesPerSvcToAdd := make(map[string][]rule, 0) rulesPerSvcToDelete := make(map[string][]rule, 0) for svcName, cfg := range cfgs { - tailnetTargetIPs, err := ep.tailnetTargetIPsForSvc(cfg, n) + tailnetTargetIPs, err := ep.tailnetTargetIPsForSvc(cfg, nm) if err != nil { return nil, fmt.Errorf("error determining tailnet target IPs: %w", err) } @@ -229,12 +229,12 @@ func (ep *egressProxy) syncEgressConfigs(cfgs egressservices.Configs, status *eg if len(rulesToDelete) != 0 { mak.Set(&rulesPerSvcToDelete, svcName, rulesToDelete) } - if len(rulesToAdd) != 0 || ep.addrsHaveChanged(n) { + if len(rulesToAdd) != 0 || ep.addrsHaveChanged(nm) { // For each tailnet target, set up SNAT from the local tailnet device address of the matching // family. for _, t := range tailnetTargetIPs { var local netip.Addr - for _, pfx := range n.NetMap.SelfNode.Addresses().All() { + for _, pfx := range nm.SelfNode.Addresses().All() { if !pfx.IsSingleIP() { continue } @@ -424,7 +424,7 @@ func (ep *egressProxy) getStatus(ctx context.Context) (*egressservices.Status, e // setStatus writes egress proxy's currently configured firewall to the state // Secret and updates proxy's tailnet addresses. -func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Status, n ipn.Notify) error { +func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Status, nm *netmap.NetworkMap) error { // Pod IP is used to determine if a stored status applies to THIS proxy Pod. if status == nil { status = &egressservices.Status{} @@ -447,7 +447,7 @@ func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Sta if err := ep.kc.JSONPatchResource(ctx, ep.stateSecret, kubeclient.TypeSecrets, []kubeclient.JSONPatch{patch}); err != nil { return fmt.Errorf("error patching state Secret: %w", err) } - ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice() + ep.tailnetAddrs = nm.SelfNode.Addresses().AsSlice() return nil } @@ -457,7 +457,7 @@ func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Sta // FQDN, resolve the FQDN and return the resolved IPs. It checks if the // netfilter runner supports IPv6 NAT and skips any IPv6 addresses if it // doesn't. -func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.Notify) (addrs []netip.Addr, err error) { +func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, nm *netmap.NetworkMap) (addrs []netip.Addr, err error) { if svc.TailnetTarget.IP != "" { addr, err := netip.ParseAddr(svc.TailnetTarget.IP) if err != nil { @@ -473,11 +473,11 @@ func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.N if svc.TailnetTarget.FQDN == "" { return nil, errors.New("unexpected egress service config- neither tailnet target IP nor FQDN is set") } - if n.NetMap == nil { + if nm == nil { log.Printf("netmap is not available, unable to determine backend addresses for %s", svc.TailnetTarget.FQDN) return addrs, nil } - egressAddrs, err := resolveTailnetFQDN(n.NetMap, svc.TailnetTarget.FQDN) + egressAddrs, err := resolveTailnetFQDN(nm, svc.TailnetTarget.FQDN) if err != nil { log.Printf("error fetching backend addresses for %q: %v", svc.TailnetTarget.FQDN, err) return addrs, nil @@ -503,22 +503,22 @@ func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.N // shouldResync parses netmap update and returns true if the update contains // changes for which the egress proxy's firewall should be reconfigured. -func (ep *egressProxy) shouldResync(n ipn.Notify) bool { - if n.NetMap == nil { +func (ep *egressProxy) shouldResync(nm *netmap.NetworkMap) bool { + if nm == nil { return false } // If proxy's tailnet addresses have changed, resync. - if !reflect.DeepEqual(n.NetMap.SelfNode.Addresses().AsSlice(), ep.tailnetAddrs) { + if !reflect.DeepEqual(nm.SelfNode.Addresses().AsSlice(), ep.tailnetAddrs) { log.Printf("node addresses have changed, trigger egress config resync") - ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice() + ep.tailnetAddrs = nm.SelfNode.Addresses().AsSlice() return true } // If the IPs for any of the egress services configured via FQDN have // changed, resync. for fqdn, ips := range ep.targetFQDNs { - for _, nn := range n.NetMap.Peers { + for _, nn := range nm.Peers { if equalFQDNs(nn.Name(), fqdn) { if !reflect.DeepEqual(ips, nn.Addresses().AsSlice()) { log.Printf("backend addresses for egress target %q have changed old IPs %v, new IPs %v trigger egress config resync", nn.Name(), ips, nn.Addresses().AsSlice()) diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index 12a274507..1a11c3150 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -137,6 +137,7 @@ import ( "golang.org/x/sys/unix" + "tailscale.com/client/local" "tailscale.com/health" "tailscale.com/ipn" kubeutils "tailscale.com/k8s-operator" @@ -536,7 +537,7 @@ authLoop: failedResolveAttempts++ } - var egressSvcsNotify chan ipn.Notify + var egressSvcsNotify chan *netmap.NetworkMap notifyChan := make(chan ipn.Notify) errChan := make(chan error) go func() { @@ -550,10 +551,17 @@ authLoop: } } }() + // Peer set changes (Add/Remove) no longer ride on the IPN bus; poll + // periodically so egress FQDN resolution and peer-aware work picks + // them up. SelfChange covers prompt self changes. + const peerPollInterval = 15 * time.Second + peerPoll := time.NewTicker(peerPollInterval) + defer peerPoll.Stop() var wg sync.WaitGroup runLoop: for { + var processNetmap bool select { case <-ctx.Done(): // Although killTailscaled() is deferred earlier, if we @@ -566,6 +574,8 @@ runLoop: return fmt.Errorf("failed to read from tailscaled: %w", err) case err := <-cfgWatchErrChan: return fmt.Errorf("failed to watch tailscaled config: %w", err) + case <-peerPoll.C: + processNetmap = true case n := <-notifyChan: // TODO: (ChaosInTheCRD) Add node removed check when supported by ipn if n.State != nil && *n.State != ipn.Running { @@ -576,235 +586,8 @@ runLoop: // whereupon we'll go through initial auth again. return fmt.Errorf("tailscaled left running state (now in state %q), exiting", *n.State) } - if n.NetMap != nil { - addrs = n.NetMap.SelfNode.Addresses().AsSlice() - newCurrentIPs := deephash.Hash(&addrs) - ipsHaveChanged := newCurrentIPs != currentIPs - - // Store device ID in a Kubernetes Secret before - // setting up any routing rules. This ensures - // that, for containerboot instances that are - // Kubernetes operator proxies, the operator is - // able to retrieve the device ID from the - // Kubernetes Secret to clean up tailnet nodes - // for proxies whose route setup continuously - // fails. - deviceID := n.NetMap.SelfNode.StableID() - if hasKubeStateStore(cfg) && deephash.Update(¤tDeviceID, &deviceID) { - if err := kc.storeDeviceID(ctx, n.NetMap.SelfNode.StableID()); err != nil { - return fmt.Errorf("storing device ID in Kubernetes Secret: %w", err) - } - } - if cfg.TailnetTargetFQDN != "" { - egressAddrs, err := resolveTailnetFQDN(n.NetMap, cfg.TailnetTargetFQDN) - if err != nil { - log.Print(err.Error()) - break - } - - newCurentEgressIPs := deephash.Hash(&egressAddrs) - egressIPsHaveChanged := newCurentEgressIPs != currentEgressIPs - // The firewall rules get (re-)installed: - // - on startup - // - when the tailnet IPs of the tailnet target have changed - // - when the tailnet IPs of this node have changed - if (egressIPsHaveChanged || ipsHaveChanged) && len(egressAddrs) != 0 { - var rulesInstalled bool - for _, egressAddr := range egressAddrs { - ea := egressAddr.Addr() - if ea.Is4() || (ea.Is6() && nfr.HasIPV6NAT()) { - rulesInstalled = true - log.Printf("Installing forwarding rules for destination %v", ea.String()) - if err := installEgressForwardingRule(ctx, ea.String(), addrs, nfr); err != nil { - return fmt.Errorf("installing egress proxy rules for destination %s: %v", ea.String(), err) - } - } - } - if !rulesInstalled { - return fmt.Errorf("no forwarding rules for egress addresses %v, host supports IPv6: %v", egressAddrs, nfr.HasIPV6NAT()) - } - } - currentEgressIPs = newCurentEgressIPs - } - if cfg.ProxyTargetIP != "" && len(addrs) != 0 && ipsHaveChanged { - log.Printf("Installing proxy rules") - if err := installIngressForwardingRule(ctx, cfg.ProxyTargetIP, addrs, nfr); err != nil { - return fmt.Errorf("installing ingress proxy rules: %w", err) - } - } - if cfg.ProxyTargetDNSName != "" && len(addrs) != 0 && ipsHaveChanged { - newBackendAddrs, err := resolveDNS(ctx, cfg.ProxyTargetDNSName) - if err != nil { - log.Printf("[unexpected] error resolving DNS name %s: %v", cfg.ProxyTargetDNSName, err) - resetTimer(true) - continue - } - backendsHaveChanged := !(slices.EqualFunc(backendAddrs, newBackendAddrs, func(ip1 net.IP, ip2 net.IP) bool { - return slices.ContainsFunc(newBackendAddrs, func(ip net.IP) bool { return ip.Equal(ip1) }) - })) - if backendsHaveChanged { - log.Printf("installing ingress proxy rules for backends %v", newBackendAddrs) - if err := installIngressForwardingRuleForDNSTarget(ctx, newBackendAddrs, addrs, nfr); err != nil { - return fmt.Errorf("error installing ingress proxy rules: %w", err) - } - } - resetTimer(false) - backendAddrs = newBackendAddrs - } - if cfg.ServeConfigPath != "" { - cd := certDomainFromNetmap(n.NetMap) - if cd == "" { - cd = kubetypes.ValueNoHTTPS - } - prev := certDomain.Swap(new(cd)) - if prev == nil || *prev != cd { - select { - case certDomainChanged <- true: - default: - } - } - } - if cfg.TailnetTargetIP != "" && ipsHaveChanged && len(addrs) != 0 { - log.Printf("Installing forwarding rules for destination %v", cfg.TailnetTargetIP) - if err := installEgressForwardingRule(ctx, cfg.TailnetTargetIP, addrs, nfr); err != nil { - return fmt.Errorf("installing egress proxy rules: %w", err) - } - } - // If this is a L7 cluster ingress proxy (set up - // by Kubernetes operator) and proxying of - // cluster traffic to the ingress target is - // enabled, set up proxy rule each time the - // tailnet IPs of this node change (including - // the first time they become available). - if cfg.AllowProxyingClusterTrafficViaIngress && cfg.ServeConfigPath != "" && ipsHaveChanged && len(addrs) != 0 { - log.Printf("installing rules to forward traffic for %s to node's tailnet IP", cfg.PodIP) - if err := installTSForwardingRuleForDestination(ctx, cfg.PodIP, addrs, nfr); err != nil { - return fmt.Errorf("installing rules to forward traffic to node's tailnet IP: %w", err) - } - } - currentIPs = newCurrentIPs - - // Only store device FQDN and IP addresses to - // Kubernetes Secret when any required proxy - // route setup has succeeded. IPs and FQDN are - // read from the Secret by the Tailscale - // Kubernetes operator and, for some proxy - // types, such as Tailscale Ingress, advertized - // on the Ingress status. Writing them to the - // Secret only after the proxy routing has been - // set up ensures that the operator does not - // advertize endpoints of broken proxies. - // TODO (irbekrm): instead of using the IP and FQDN, have some other mechanism for the proxy signal that it is 'Ready'. - deviceEndpoints := []any{n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses()} - if hasKubeStateStore(cfg) && deephash.Update(¤tDeviceEndpoints, &deviceEndpoints) { - if err := kc.storeDeviceEndpoints(ctx, n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses().AsSlice()); err != nil { - return fmt.Errorf("storing device IPs and FQDN in Kubernetes Secret: %w", err) - } - } - - if healthCheck != nil { - healthCheck.Update(len(addrs) != 0) - } - - var prevServeConfig *ipn.ServeConfig - if getAutoAdvertiseBool() { - prevServeConfig, err = client.GetServeConfig(ctx) - if err != nil { - return fmt.Errorf("autoadvertisement: failed to get serve config: %w", err) - } - - err = refreshAdvertiseServices(ctx, prevServeConfig, klc.New(client)) - if err != nil { - return fmt.Errorf("autoadvertisement: failed to refresh advertise services: %w", err) - } - } - - if cfg.ServeConfigPath != "" { - triggerWatchServeConfigChanges.Do(func() { - go watchServeConfigChanges(ctx, certDomainChanged, certDomain, client, kc, cfg, prevServeConfig) - }) - } - - if egressSvcsNotify != nil { - egressSvcsNotify <- n - } - } - if !startupTasksDone { - // For containerboot instances that act as TCP proxies (proxying traffic to an endpoint - // passed via one of the env vars that containerboot reads) and store state in a - // Kubernetes Secret, we consider startup tasks done at the point when device info has - // been successfully stored to state Secret. For all other containerboot instances, if - // we just get to this point the startup tasks can be considered done. - if !isL3Proxy(cfg) || !hasKubeStateStore(cfg) || (currentDeviceEndpoints != deephash.Sum{} && currentDeviceID != deephash.Sum{}) { - // This log message is used in tests to detect when all - // post-auth configuration is done. - log.Println("Startup complete, waiting for shutdown signal") - startupTasksDone = true - - // Configure egress proxy. Egress proxy will set up firewall rules to proxy - // traffic to tailnet targets configured in the provided configuration file. It - // will then continuously monitor the config file and netmap updates and - // reconfigure the firewall rules as needed. If any of its operations fail, it - // will crash this node. - if cfg.EgressProxiesCfgPath != "" { - log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressProxiesCfgPath) - egressSvcsNotify = make(chan ipn.Notify) - opts := egressProxyRunOpts{ - cfgPath: cfg.EgressProxiesCfgPath, - nfr: nfr, - kc: kc, - tsClient: client, - stateSecret: cfg.KubeSecret, - netmapChan: egressSvcsNotify, - podIPv4: cfg.PodIPv4, - tailnetAddrs: addrs, - } - go func() { - if err := ep.run(ctx, n, opts); err != nil { - egressSvcsErrorChan <- err - } - }() - } - ip := ingressProxy{} - if cfg.IngressProxiesCfgPath != "" { - log.Printf("configuring ingress proxy using configuration file at %s", cfg.IngressProxiesCfgPath) - opts := ingressProxyOpts{ - cfgPath: cfg.IngressProxiesCfgPath, - nfr: nfr, - kc: kc, - stateSecret: cfg.KubeSecret, - podIPv4: cfg.PodIPv4, - podIPv6: cfg.PodIPv6, - } - go func() { - if err := ip.run(ctx, opts); err != nil { - ingressSvcsErrorChan <- err - } - }() - } - - // Wait on tailscaled process. It won't be cleaned up by default when the - // container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the - // reaper by a running cmd.Wait in a goroutine immediately after starting - // tailscaled? - reaper := func() { - defer wg.Done() - for { - var status unix.WaitStatus - _, err := unix.Wait4(daemonProcess.Pid, &status, 0, nil) - if errors.Is(err, unix.EINTR) { - continue - } - if err != nil { - log.Fatalf("Waiting for tailscaled to exit: %v", err) - } - log.Print("tailscaled exited") - os.Exit(0) - } - } - wg.Add(1) - go reaper() - } + if n.SelfChange != nil { + processNetmap = true } case <-tc: newBackendAddrs, err := resolveDNS(ctx, cfg.ProxyTargetDNSName) @@ -824,11 +607,250 @@ runLoop: } backendAddrs = newBackendAddrs resetTimer(false) + continue case e := <-egressSvcsErrorChan: return fmt.Errorf("egress proxy failed: %v", e) case e := <-ingressSvcsErrorChan: return fmt.Errorf("ingress proxy failed: %v", e) } + if !processNetmap { + continue + } + nm, err := fetchNetMap(ctx, client) + if err != nil { + log.Printf("error fetching netmap: %v", err) + continue + } + if nm != nil { + addrs = nm.SelfNode.Addresses().AsSlice() + newCurrentIPs := deephash.Hash(&addrs) + ipsHaveChanged := newCurrentIPs != currentIPs + + // Store device ID in a Kubernetes Secret before + // setting up any routing rules. This ensures + // that, for containerboot instances that are + // Kubernetes operator proxies, the operator is + // able to retrieve the device ID from the + // Kubernetes Secret to clean up tailnet nodes + // for proxies whose route setup continuously + // fails. + deviceID := nm.SelfNode.StableID() + if hasKubeStateStore(cfg) && deephash.Update(¤tDeviceID, &deviceID) { + if err := kc.storeDeviceID(ctx, nm.SelfNode.StableID()); err != nil { + return fmt.Errorf("storing device ID in Kubernetes Secret: %w", err) + } + } + if cfg.TailnetTargetFQDN != "" { + egressAddrs, err := resolveTailnetFQDN(nm, cfg.TailnetTargetFQDN) + if err != nil { + log.Print(err.Error()) + break + } + + newCurentEgressIPs := deephash.Hash(&egressAddrs) + egressIPsHaveChanged := newCurentEgressIPs != currentEgressIPs + // The firewall rules get (re-)installed: + // - on startup + // - when the tailnet IPs of the tailnet target have changed + // - when the tailnet IPs of this node have changed + if (egressIPsHaveChanged || ipsHaveChanged) && len(egressAddrs) != 0 { + var rulesInstalled bool + for _, egressAddr := range egressAddrs { + ea := egressAddr.Addr() + if ea.Is4() || (ea.Is6() && nfr.HasIPV6NAT()) { + rulesInstalled = true + log.Printf("Installing forwarding rules for destination %v", ea.String()) + if err := installEgressForwardingRule(ctx, ea.String(), addrs, nfr); err != nil { + return fmt.Errorf("installing egress proxy rules for destination %s: %v", ea.String(), err) + } + } + } + if !rulesInstalled { + return fmt.Errorf("no forwarding rules for egress addresses %v, host supports IPv6: %v", egressAddrs, nfr.HasIPV6NAT()) + } + } + currentEgressIPs = newCurentEgressIPs + } + if cfg.ProxyTargetIP != "" && len(addrs) != 0 && ipsHaveChanged { + log.Printf("Installing proxy rules") + if err := installIngressForwardingRule(ctx, cfg.ProxyTargetIP, addrs, nfr); err != nil { + return fmt.Errorf("installing ingress proxy rules: %w", err) + } + } + if cfg.ProxyTargetDNSName != "" && len(addrs) != 0 && ipsHaveChanged { + newBackendAddrs, err := resolveDNS(ctx, cfg.ProxyTargetDNSName) + if err != nil { + log.Printf("[unexpected] error resolving DNS name %s: %v", cfg.ProxyTargetDNSName, err) + resetTimer(true) + continue + } + backendsHaveChanged := !(slices.EqualFunc(backendAddrs, newBackendAddrs, func(ip1 net.IP, ip2 net.IP) bool { + return slices.ContainsFunc(newBackendAddrs, func(ip net.IP) bool { return ip.Equal(ip1) }) + })) + if backendsHaveChanged { + log.Printf("installing ingress proxy rules for backends %v", newBackendAddrs) + if err := installIngressForwardingRuleForDNSTarget(ctx, newBackendAddrs, addrs, nfr); err != nil { + return fmt.Errorf("error installing ingress proxy rules: %w", err) + } + } + resetTimer(false) + backendAddrs = newBackendAddrs + } + if cfg.ServeConfigPath != "" { + cd := certDomainFromNetmap(nm) + if cd == "" { + cd = kubetypes.ValueNoHTTPS + } + prev := certDomain.Swap(new(cd)) + if prev == nil || *prev != cd { + select { + case certDomainChanged <- true: + default: + } + } + } + if cfg.TailnetTargetIP != "" && ipsHaveChanged && len(addrs) != 0 { + log.Printf("Installing forwarding rules for destination %v", cfg.TailnetTargetIP) + if err := installEgressForwardingRule(ctx, cfg.TailnetTargetIP, addrs, nfr); err != nil { + return fmt.Errorf("installing egress proxy rules: %w", err) + } + } + // If this is a L7 cluster ingress proxy (set up + // by Kubernetes operator) and proxying of + // cluster traffic to the ingress target is + // enabled, set up proxy rule each time the + // tailnet IPs of this node change (including + // the first time they become available). + if cfg.AllowProxyingClusterTrafficViaIngress && cfg.ServeConfigPath != "" && ipsHaveChanged && len(addrs) != 0 { + log.Printf("installing rules to forward traffic for %s to node's tailnet IP", cfg.PodIP) + if err := installTSForwardingRuleForDestination(ctx, cfg.PodIP, addrs, nfr); err != nil { + return fmt.Errorf("installing rules to forward traffic to node's tailnet IP: %w", err) + } + } + currentIPs = newCurrentIPs + + // Only store device FQDN and IP addresses to + // Kubernetes Secret when any required proxy + // route setup has succeeded. IPs and FQDN are + // read from the Secret by the Tailscale + // Kubernetes operator and, for some proxy + // types, such as Tailscale Ingress, advertized + // on the Ingress status. Writing them to the + // Secret only after the proxy routing has been + // set up ensures that the operator does not + // advertize endpoints of broken proxies. + // TODO (irbekrm): instead of using the IP and FQDN, have some other mechanism for the proxy signal that it is 'Ready'. + deviceEndpoints := []any{nm.SelfNode.Name(), nm.SelfNode.Addresses()} + if hasKubeStateStore(cfg) && deephash.Update(¤tDeviceEndpoints, &deviceEndpoints) { + if err := kc.storeDeviceEndpoints(ctx, nm.SelfNode.Name(), nm.SelfNode.Addresses().AsSlice()); err != nil { + return fmt.Errorf("storing device IPs and FQDN in Kubernetes Secret: %w", err) + } + } + + if healthCheck != nil { + healthCheck.Update(len(addrs) != 0) + } + + var prevServeConfig *ipn.ServeConfig + if getAutoAdvertiseBool() { + prevServeConfig, err = client.GetServeConfig(ctx) + if err != nil { + return fmt.Errorf("autoadvertisement: failed to get serve config: %w", err) + } + + err = refreshAdvertiseServices(ctx, prevServeConfig, klc.New(client)) + if err != nil { + return fmt.Errorf("autoadvertisement: failed to refresh advertise services: %w", err) + } + } + + if cfg.ServeConfigPath != "" { + triggerWatchServeConfigChanges.Do(func() { + go watchServeConfigChanges(ctx, certDomainChanged, certDomain, client, kc, cfg, prevServeConfig) + }) + } + + if egressSvcsNotify != nil { + egressSvcsNotify <- nm + } + } + if !startupTasksDone { + // For containerboot instances that act as TCP proxies (proxying traffic to an endpoint + // passed via one of the env vars that containerboot reads) and store state in a + // Kubernetes Secret, we consider startup tasks done at the point when device info has + // been successfully stored to state Secret. For all other containerboot instances, if + // we just get to this point the startup tasks can be considered done. + if !isL3Proxy(cfg) || !hasKubeStateStore(cfg) || (currentDeviceEndpoints != deephash.Sum{} && currentDeviceID != deephash.Sum{}) { + // This log message is used in tests to detect when all + // post-auth configuration is done. + log.Println("Startup complete, waiting for shutdown signal") + startupTasksDone = true + + // Configure egress proxy. Egress proxy will set up firewall rules to proxy + // traffic to tailnet targets configured in the provided configuration file. It + // will then continuously monitor the config file and netmap updates and + // reconfigure the firewall rules as needed. If any of its operations fail, it + // will crash this node. + if cfg.EgressProxiesCfgPath != "" { + log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressProxiesCfgPath) + egressSvcsNotify = make(chan *netmap.NetworkMap) + opts := egressProxyRunOpts{ + cfgPath: cfg.EgressProxiesCfgPath, + nfr: nfr, + kc: kc, + tsClient: client, + stateSecret: cfg.KubeSecret, + netmapChan: egressSvcsNotify, + podIPv4: cfg.PodIPv4, + tailnetAddrs: addrs, + } + go func() { + if err := ep.run(ctx, nm, opts); err != nil { + egressSvcsErrorChan <- err + } + }() + } + ip := ingressProxy{} + if cfg.IngressProxiesCfgPath != "" { + log.Printf("configuring ingress proxy using configuration file at %s", cfg.IngressProxiesCfgPath) + opts := ingressProxyOpts{ + cfgPath: cfg.IngressProxiesCfgPath, + nfr: nfr, + kc: kc, + stateSecret: cfg.KubeSecret, + podIPv4: cfg.PodIPv4, + podIPv6: cfg.PodIPv6, + } + go func() { + if err := ip.run(ctx, opts); err != nil { + ingressSvcsErrorChan <- err + } + }() + } + + // Wait on tailscaled process. It won't be cleaned up by default when the + // container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the + // reaper by a running cmd.Wait in a goroutine immediately after starting + // tailscaled? + reaper := func() { + defer wg.Done() + for { + var status unix.WaitStatus + _, err := unix.Wait4(daemonProcess.Pid, &status, 0, nil) + if errors.Is(err, unix.EINTR) { + continue + } + if err != nil { + log.Fatalf("Waiting for tailscaled to exit: %v", err) + } + log.Print("tailscaled exited") + os.Exit(0) + } + } + wg.Add(1) + go reaper() + } + } } wg.Wait() @@ -963,6 +985,15 @@ func runHTTPServer(mux *http.ServeMux, addr string) (close func() error) { } } +// fetchNetMap fetches the current netmap from tailscaled via the +// "current-netmap" localapi debug action. The debug action's payload +// shape is intentionally not part of any stable API; containerboot +// reads its own internal-package types out of it. New external consumers +// should not rely on this — see [local.Client.Status] and friends. +func fetchNetMap(ctx context.Context, lc *local.Client) (*netmap.NetworkMap, error) { + return local.GetDebugResultJSON[*netmap.NetworkMap](ctx, lc, "current-netmap") +} + // resolveTailnetFQDN resolves a tailnet FQDN to a list of IP prefixes, which // can be either a peer device or a Tailscale Service. func resolveTailnetFQDN(nm *netmap.NetworkMap, fqdn string) ([]netip.Prefix, error) { diff --git a/cmd/containerboot/main_test.go b/cmd/containerboot/main_test.go index e9a677b3d..40f575250 100644 --- a/cmd/containerboot/main_test.go +++ b/cmd/containerboot/main_test.go @@ -71,6 +71,12 @@ func TestContainerBoot(t *testing.T) { // Waits below to be true before proceeding to the next phase. Notify *ipn.Notify + // If non-nil, install this NetMap on the fake LocalAPI before + // sending Notify. This is the replacement for the old + // Notify.NetMap field; reactive consumers fetch the current + // netmap via /localapi/v0/netmap on their own. + NetMap *netmap.NetworkMap + // WantCmds is the commands that containerboot should run in this phase. WantCmds []string @@ -105,12 +111,10 @@ func TestContainerBoot(t *testing.T) { } runningNotify := &ipn.Notify{ State: new(ipn.Running), - NetMap: &netmap.NetworkMap{ - SelfNode: (&tailcfg.Node{ - StableID: tailcfg.StableNodeID("myID"), - Name: "test-node.test.ts.net.", - Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, - }).View(), + SelfChange: &tailcfg.Node{ + StableID: tailcfg.StableNodeID("myID"), + Name: "test-node.test.ts.net.", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, }, } type testCase struct { @@ -383,19 +387,24 @@ func TestContainerBoot(t *testing.T) { { Notify: &ipn.Notify{ State: new(ipn.Running), - NetMap: &netmap.NetworkMap{ - SelfNode: (&tailcfg.Node{ - StableID: tailcfg.StableNodeID("myID"), - Name: "test-node.test.ts.net.", - Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, + SelfChange: &tailcfg.Node{ + StableID: tailcfg.StableNodeID("myID"), + Name: "test-node.test.ts.net.", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, + }, + }, + NetMap: &netmap.NetworkMap{ + SelfNode: (&tailcfg.Node{ + StableID: tailcfg.StableNodeID("myID"), + Name: "test-node.test.ts.net.", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, + }).View(), + Peers: []tailcfg.NodeView{ + (&tailcfg.Node{ + StableID: tailcfg.StableNodeID("ipv6ID"), + Name: "ipv6-node.test.ts.net.", + Addresses: []netip.Prefix{netip.MustParsePrefix("::1/128")}, }).View(), - Peers: []tailcfg.NodeView{ - (&tailcfg.Node{ - StableID: tailcfg.StableNodeID("ipv6ID"), - Name: "ipv6-node.test.ts.net.", - Addresses: []netip.Prefix{netip.MustParsePrefix("::1/128")}, - }).View(), - }, }, }, WantLog: "no forwarding rules for egress addresses [::1/128], host supports IPv6: false", @@ -631,14 +640,19 @@ func TestContainerBoot(t *testing.T) { { Notify: &ipn.Notify{ State: new(ipn.Running), - NetMap: &netmap.NetworkMap{ - SelfNode: (&tailcfg.Node{ - StableID: tailcfg.StableNodeID("newID"), - Name: "new-name.test.ts.net.", - Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, - }).View(), + SelfChange: &tailcfg.Node{ + StableID: tailcfg.StableNodeID("newID"), + Name: "new-name.test.ts.net.", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, }, }, + NetMap: &netmap.NetworkMap{ + SelfNode: (&tailcfg.Node{ + StableID: tailcfg.StableNodeID("newID"), + Name: "new-name.test.ts.net.", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, + }).View(), + }, WantKubeSecret: map[string]string{ "authkey": "tskey-key", "device_fqdn": "new-name.test.ts.net.", @@ -1095,19 +1109,24 @@ func TestContainerBoot(t *testing.T) { { Notify: &ipn.Notify{ State: new(ipn.Running), - NetMap: &netmap.NetworkMap{ - SelfNode: (&tailcfg.Node{ - StableID: tailcfg.StableNodeID("myID"), - Name: "test-node.test.ts.net.", - Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, + SelfChange: &tailcfg.Node{ + StableID: tailcfg.StableNodeID("myID"), + Name: "test-node.test.ts.net.", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, + }, + }, + NetMap: &netmap.NetworkMap{ + SelfNode: (&tailcfg.Node{ + StableID: tailcfg.StableNodeID("myID"), + Name: "test-node.test.ts.net.", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32")}, + }).View(), + Peers: []tailcfg.NodeView{ + (&tailcfg.Node{ + StableID: tailcfg.StableNodeID("fooID"), + Name: "foo.tailnetxyz.ts.net.", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.2/32")}, }).View(), - Peers: []tailcfg.NodeView{ - (&tailcfg.Node{ - StableID: tailcfg.StableNodeID("fooID"), - Name: "foo.tailnetxyz.ts.net.", - Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.2/32")}, - }).View(), - }, }, }, WantKubeSecret: map[string]string{ @@ -1276,6 +1295,18 @@ func TestContainerBoot(t *testing.T) { t.Fatalf("phase %d: updating mtime for %q: %v", i, path, err) } } + nmForFake := p.NetMap + if nmForFake == nil && p.Notify != nil && p.Notify.SelfChange != nil { + // Synthesize a minimal netmap from SelfChange so + // containerboot's NetMap() fetch returns + // something usable when the test only set Notify. + nmForFake = &netmap.NetworkMap{ + SelfNode: p.Notify.SelfChange.View(), + } + } + if nmForFake != nil { + env.lapi.SetNetMap(nmForFake) + } env.lapi.Notify(p.Notify) if p.Signal != nil { cmd.Process.Signal(*p.Signal) @@ -1468,6 +1499,7 @@ type localAPI struct { sync.Mutex cond *sync.Cond notify *ipn.Notify + netmap *netmap.NetworkMap // served by /localapi/v0/netmap } func (lc *localAPI) Start() error { @@ -1504,8 +1536,44 @@ func (lc *localAPI) Notify(n *ipn.Notify) { lc.cond.Broadcast() } +// SetNetMap installs the netmap that the fake /localapi/v0/netmap endpoint +// will return. +func (lc *localAPI) SetNetMap(nm *netmap.NetworkMap) { + lc.Lock() + defer lc.Unlock() + lc.netmap = nm +} + func (lc *localAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { + case "/localapi/v0/netmap": + w.Header().Set("Content-Type", "application/json") + lc.Lock() + nm := lc.netmap + lc.Unlock() + if nm == nil { + http.Error(w, "no netmap", http.StatusServiceUnavailable) + return + } + json.NewEncoder(w).Encode(nm) + return + case "/localapi/v0/debug": + // containerboot fetches the netmap via the "current-netmap" + // debug action; serve it like /localapi/v0/netmap above. + if r.URL.Query().Get("action") != "current-netmap" { + http.Error(w, "unsupported debug action", http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + lc.Lock() + nm := lc.netmap + lc.Unlock() + if nm == nil { + http.Error(w, "no netmap", http.StatusServiceUnavailable) + return + } + json.NewEncoder(w).Encode(nm) + return case "/localapi/v0/serve-config": switch r.Method { case "GET": diff --git a/cmd/sniproxy/sniproxy.go b/cmd/sniproxy/sniproxy.go index bd95cc113..f7ebc6aba 100644 --- a/cmd/sniproxy/sniproxy.go +++ b/cmd/sniproxy/sniproxy.go @@ -138,9 +138,9 @@ func run(ctx context.Context, ts *tsnet.Server, wgPort int, hostname string, pro } // Finally, start mainloop to configure app connector based on information - // in the netmap. - // We set the NotifyInitialNetMap flag so we will always get woken with the - // current netmap, before only being woken on changes. + // in the self node's CapMap. We set NotifyInitialNetMap so the first + // Notify carries the current self node (now via Notify.SelfChange); + // subsequent self changes wake us up too. bus, err := lc.WatchIPNBus(ctx, ipn.NotifyWatchEngineUpdates|ipn.NotifyInitialNetMap) if err != nil { log.Fatalf("watching IPN bus: %v", err) @@ -155,28 +155,30 @@ func run(ctx context.Context, ts *tsnet.Server, wgPort int, hostname string, pro log.Fatalf("reading IPN bus: %v", err) } - // NetMap contains app-connector configuration - if nm := msg.NetMap; nm != nil && nm.SelfNode.Valid() { - var c appctype.AppConnectorConfig - nmConf, err := tailcfg.UnmarshalNodeCapViewJSON[appctype.AppConnectorConfig](nm.SelfNode.CapMap(), configCapKey) - if err != nil { - log.Printf("failed to read app connector configuration from coordination server: %v", err) - } else if len(nmConf) > 0 { - c = nmConf[0] - } - - if c.AdvertiseRoutes { - if err := s.advertiseRoutesFromConfig(ctx, &c); err != nil { - log.Printf("failed to advertise routes: %v", err) - } - } - - // Backwards compatibility: combine any configuration from control with flags specified - // on the command line. This is intentionally done after we advertise any routes - // because its never correct to advertise the nodes native IP addresses. - s.mergeConfigFromFlags(&c, ports, forwards) - s.srv.Configure(&c) + self := msg.SelfChange + if self == nil { + continue } + var c appctype.AppConnectorConfig + // View() lets us reuse the existing CapView decoder. + nmConf, err := tailcfg.UnmarshalNodeCapViewJSON[appctype.AppConnectorConfig](self.View().CapMap(), configCapKey) + if err != nil { + log.Printf("failed to read app connector configuration from coordination server: %v", err) + } else if len(nmConf) > 0 { + c = nmConf[0] + } + + if c.AdvertiseRoutes { + if err := s.advertiseRoutesFromConfig(ctx, &c); err != nil { + log.Printf("failed to advertise routes: %v", err) + } + } + + // Backwards compatibility: combine any configuration from control with flags specified + // on the command line. This is intentionally done after we advertise any routes + // because its never correct to advertise the nodes native IP addresses. + s.mergeConfigFromFlags(&c, ports, forwards) + s.srv.Configure(&c) } } diff --git a/cmd/tailscale/cli/file.go b/cmd/tailscale/cli/file.go index a5c39b13d..489c83deb 100644 --- a/cmd/tailscale/cli/file.go +++ b/cmd/tailscale/cli/file.go @@ -287,9 +287,9 @@ func runCp(ctx context.Context, args []string) error { // caller's progress display stays at 0 — exactly the right degradation, // since the warning timer will then fire on its normal 3-second deadline. func watchOutgoingFiles(ctx context.Context, peer tailcfg.StableNodeID, onUpdate func(name string, sent int64)) { - // NotifyPeerChanges asks the broadcaster to deliver incremental peer - // updates as small PeerChanges blobs in place of the full NetMap, which - // we don't read anyway. (See ipn/ipnlocal/local.go's notify-elision.) + // NotifyPeerChanges opts in to per-peer add/remove notifications so the + // bus stays responsive without us also subscribing to the full NetMap, + // which we don't read here. w, err := localClient.WatchIPNBus(ctx, ipn.NotifyInitialOutgoingFiles|ipn.NotifyPeerChanges) if err != nil { return diff --git a/cmd/tailscale/cli/serve_legacy.go b/cmd/tailscale/cli/serve_legacy.go index 837d88513..635bcfa3d 100644 --- a/cmd/tailscale/cli/serve_legacy.go +++ b/cmd/tailscale/cli/serve_legacy.go @@ -848,10 +848,10 @@ func (e *serveEnv) enableFeatureInteractive(ctx context.Context, feature string, e.lc.IncrementCounter(ctx, fmt.Sprintf("%s_enablement_lost_connection", feature), 1) return err } - if nm := n.NetMap; nm != nil && nm.SelfNode.Valid() { + if self := n.SelfChange; self != nil { gotAll := true for _, c := range caps { - if !nm.SelfNode.HasCap(c) { + if _, has := self.CapMap[c]; !has { // The feature is not yet enabled. // Continue blocking until it is. gotAll = false diff --git a/cmd/tailscale/cli/up.go b/cmd/tailscale/cli/up.go index 419d55020..2638490b9 100644 --- a/cmd/tailscale/cli/up.go +++ b/cmd/tailscale/cli/up.go @@ -732,7 +732,7 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE if s := n.State; s != nil { ipnIsRunning = *s == ipn.Running } - if n.NetMap != nil && n.NetMap.NodeKey != origNodeKey { + if n.SelfChange != nil && n.SelfChange.Key != origNodeKey { waitingForKeyChange = false } if ipnIsRunning && !waitingForKeyChange { diff --git a/cmd/tsconnect/wasm/wasm_js.go b/cmd/tsconnect/wasm/wasm_js.go index 71e8476a0..f58e4201a 100644 --- a/cmd/tsconnect/wasm/wasm_js.go +++ b/cmd/tsconnect/wasm/wasm_js.go @@ -258,44 +258,50 @@ func (i *jsIPN) run(jsCallbacks js.Value) { if n.State != nil { notifyState(*n.State) } - if nm := n.NetMap; nm != nil { - jsNetMap := jsNetMap{ - Self: jsNetMapSelfNode{ - jsNetMapNode: jsNetMapNode{ - Name: nm.SelfName(), - Addresses: mapSliceView(nm.GetAddresses(), func(a netip.Prefix) string { return a.Addr().String() }), - NodeKey: nm.NodeKey.String(), - MachineKey: nm.MachineKey.String(), - }, - MachineStatus: jsMachineStatus[nm.GetMachineStatus()], - }, - Peers: mapSlice(nm.Peers, func(p tailcfg.NodeView) jsNetMapPeerNode { - name := p.Name() - if name == "" { - // In practice this should only happen for Hello. - name = p.Hostinfo().Hostname() - } - addrs := make([]string, p.Addresses().Len()) - for i, ap := range p.Addresses().All() { - addrs[i] = ap.Addr().String() - } - return jsNetMapPeerNode{ + if n.SelfChange != nil { + // Self changed: rebuild the JS-side NetMap snapshot. Peers + // don't ride on the bus anymore, so fetch them on demand + // from LocalBackend. + nm := i.lb.NetMapWithPeers() + if nm != nil { + jsNetMap := jsNetMap{ + Self: jsNetMapSelfNode{ jsNetMapNode: jsNetMapNode{ - Name: name, - Addresses: addrs, - MachineKey: p.Machine().String(), - NodeKey: p.Key().String(), + Name: nm.SelfName(), + Addresses: mapSliceView(nm.GetAddresses(), func(a netip.Prefix) string { return a.Addr().String() }), + NodeKey: nm.NodeKey.String(), + MachineKey: nm.MachineKey.String(), }, - Online: p.Online().Clone(), - TailscaleSSHEnabled: p.Hostinfo().TailscaleSSHEnabled(), - } - }), - LockedOut: nm.TKAEnabled && nm.SelfNode.KeySignature().Len() == 0, - } - if jsonNetMap, err := json.Marshal(jsNetMap); err == nil { - jsCallbacks.Call("notifyNetMap", string(jsonNetMap)) - } else { - log.Printf("Could not generate JSON netmap: %v", err) + MachineStatus: jsMachineStatus[nm.GetMachineStatus()], + }, + Peers: mapSlice(nm.Peers, func(p tailcfg.NodeView) jsNetMapPeerNode { + name := p.Name() + if name == "" { + // In practice this should only happen for Hello. + name = p.Hostinfo().Hostname() + } + addrs := make([]string, p.Addresses().Len()) + for i, ap := range p.Addresses().All() { + addrs[i] = ap.Addr().String() + } + return jsNetMapPeerNode{ + jsNetMapNode: jsNetMapNode{ + Name: name, + Addresses: addrs, + MachineKey: p.Machine().String(), + NodeKey: p.Key().String(), + }, + Online: p.Online().Clone(), + TailscaleSSHEnabled: p.Hostinfo().TailscaleSSHEnabled(), + } + }), + LockedOut: nm.TKAEnabled && nm.SelfNode.KeySignature().Len() == 0, + } + if jsonNetMap, err := json.Marshal(jsNetMap); err == nil { + jsCallbacks.Call("notifyNetMap", string(jsonNetMap)) + } else { + log.Printf("Could not generate JSON netmap: %v", err) + } } } if n.BrowseToURL != nil { diff --git a/kube/certs/certs.go b/kube/certs/certs.go index f139c0759..fd7c82a10 100644 --- a/kube/certs/certs.go +++ b/kube/certs/certs.go @@ -171,14 +171,9 @@ func (cm *CertManager) runCertLoop(ctx context.Context, domain string) { } } -// waitForCertDomain ensures the requested domain is in the list of allowed -// domains before issuing the cert for the first time. -// It uses the IPN bus only as a wake-up trigger and queries the current cert -// domains explicitly via [LocalClient.CertDomains]. -// -// TODO(bradfitz): once Notify.SelfChange lands upstream, switch this to -// watch for SelfChange events instead of NotifyInitialNetMap, and drop the -// netmap dependency on the bus entirely. +// domains before issuing the cert for the first time. It uses the IPN bus +// only as a wake-up trigger (Notify.SelfChange) and queries the current +// cert domains explicitly via [LocalClient.CertDomains]. func (cm *CertManager) waitForCertDomain(ctx context.Context, domain string) error { w, err := cm.lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap) if err != nil { @@ -191,7 +186,7 @@ func (cm *CertManager) waitForCertDomain(ctx context.Context, domain string) err if err != nil { return err } - if n.NetMap == nil { + if n.SelfChange == nil { continue } domains, err := cm.lc.CertDomains(ctx) diff --git a/kube/certs/certs_test.go b/kube/certs/certs_test.go index f8de11d71..27fe12752 100644 --- a/kube/certs/certs_test.go +++ b/kube/certs/certs_test.go @@ -12,7 +12,6 @@ import ( "tailscale.com/ipn" "tailscale.com/kube/localclient" "tailscale.com/tailcfg" - "tailscale.com/types/netmap" ) // TestEnsureCertLoops tests that the certManager correctly starts and stops @@ -201,12 +200,12 @@ func TestEnsureCertLoops(t *testing.T) { notifyChan := make(chan ipn.Notify) go func() { - // Drive waitForCertDomain by sending notifications - // with empty netmaps as wake-up triggers; the cert - // manager queries CertDomains via the local - // client and not by reading the bus payload. + // SelfChange wakes the cert manager; cert domains are + // then fetched via FakeLocalClient.CertDomainsResult. for { - notifyChan <- ipn.Notify{NetMap: &netmap.NetworkMap{}} + notifyChan <- ipn.Notify{ + SelfChange: &tailcfg.Node{StableID: "test"}, + } } }() cm := &CertManager{ diff --git a/kube/health/healthz.go b/kube/health/healthz.go index 53888922b..e9b459fc1 100644 --- a/kube/health/healthz.go +++ b/kube/health/healthz.go @@ -65,8 +65,8 @@ func (h *Healthz) MonitorHealth(ctx context.Context, lc *local.Client) error { return err } - if n.NetMap != nil { - h.Update(n.NetMap.SelfNode.Addresses().Len() != 0) + if self := n.SelfChange; self != nil { + h.Update(len(self.Addresses) != 0) } } } diff --git a/kube/state/state.go b/kube/state/state.go index a7f00b7f2..220eb439f 100644 --- a/kube/state/state.go +++ b/kube/state/state.go @@ -44,9 +44,9 @@ func SetInitialKeys(store ipn.StateStore, podUID string) error { // KeepKeysUpdated sets state store keys consistent with containerboot to // signal proxy readiness to the operator. It runs until its context is -// cancelled or it hits an error. The passed in next function is expected to be -// from a local.IPNBusWatcher that is at least subscribed to -// ipn.NotifyInitialNetMap. +// cancelled or it hits an error. It watches the IPN bus for SelfChange +// notifications (which fire whenever the self node changes) and reads +// the new self node directly from the notify. func KeepKeysUpdated(ctx context.Context, store ipn.StateStore, lc klc.LocalClient) error { w, err := lc.WatchIPNBus(ctx, ipn.NotifyInitialNetMap) if err != nil { @@ -63,25 +63,26 @@ func KeepKeysUpdated(ctx context.Context, store ipn.StateStore, lc klc.LocalClie } return err } - if n.NetMap == nil { + self := n.SelfChange + if self == nil { continue } - if deviceID := n.NetMap.SelfNode.StableID(); deephash.Update(¤tDeviceID, &deviceID) { + if deviceID := self.StableID; deephash.Update(¤tDeviceID, &deviceID) { if err := store.WriteState(keyDeviceID, []byte(deviceID)); err != nil { return fmt.Errorf("failed to store device ID in state: %w", err) } } - if fqdn := n.NetMap.SelfNode.Name(); deephash.Update(¤tDeviceFQDN, &fqdn) { + if fqdn := self.Name; deephash.Update(¤tDeviceFQDN, &fqdn) { if err := store.WriteState(keyDeviceFQDN, []byte(fqdn)); err != nil { return fmt.Errorf("failed to store device FQDN in state: %w", err) } } - if addrs := n.NetMap.SelfNode.Addresses(); deephash.Update(¤tDeviceIPs, &addrs) { + if addrs := self.Addresses; deephash.Update(¤tDeviceIPs, &addrs) { var deviceIPs []string - for _, addr := range addrs.AsSlice() { + for _, addr := range addrs { deviceIPs = append(deviceIPs, addr.Addr().String()) } deviceIPsValue, err := json.Marshal(deviceIPs) diff --git a/kube/state/state_test.go b/kube/state/state_test.go index b5603acb5..5c438377e 100644 --- a/kube/state/state_test.go +++ b/kube/state/state_test.go @@ -18,7 +18,6 @@ import ( klc "tailscale.com/kube/localclient" "tailscale.com/tailcfg" "tailscale.com/types/logger" - "tailscale.com/types/netmap" ) func TestSetInitialStateKeys(t *testing.T) { @@ -133,12 +132,10 @@ func TestKeepStateKeysUpdated(t *testing.T) { { name: "authed", notify: ipn.Notify{ - NetMap: &netmap.NetworkMap{ - SelfNode: (&tailcfg.Node{ - StableID: "TESTCTRL00000001", - Name: "test-node.test.ts.net", - Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32"), netip.MustParsePrefix("fd7a:115c:a1e0:ab12:4843:cd96:0:1/128")}, - }).View(), + SelfChange: &tailcfg.Node{ + StableID: "TESTCTRL00000001", + Name: "test-node.test.ts.net", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32"), netip.MustParsePrefix("fd7a:115c:a1e0:ab12:4843:cd96:0:1/128")}, }, }, expected: []string{ @@ -150,12 +147,10 @@ func TestKeepStateKeysUpdated(t *testing.T) { { name: "updated_fields", notify: ipn.Notify{ - NetMap: &netmap.NetworkMap{ - SelfNode: (&tailcfg.Node{ - StableID: "TESTCTRL00000001", - Name: "updated.test.ts.net", - Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.250/32")}, - }).View(), + SelfChange: &tailcfg.Node{ + StableID: "TESTCTRL00000001", + Name: "updated.test.ts.net", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.250/32")}, }, }, expected: []string{ diff --git a/tsconsensus/monitor.go b/tsconsensus/monitor.go index b937926a6..bf7410d0d 100644 --- a/tsconsensus/monitor.go +++ b/tsconsensus/monitor.go @@ -12,7 +12,6 @@ import ( "net/http" "slices" - "tailscale.com/ipn" "tailscale.com/ipn/ipnstate" "tailscale.com/tsnet" "tailscale.com/util/dnsname" @@ -108,24 +107,16 @@ func (m *monitor) handleNetmap(w http.ResponseWriter, r *http.Request) { http.Error(w, "", http.StatusInternalServerError) return } - watcher, err := lc.WatchIPNBus(r.Context(), ipn.NotifyInitialNetMap) + st, err := lc.Status(r.Context()) if err != nil { - log.Printf("monitor: error WatchIPNBus: %v", err) - http.Error(w, "", http.StatusInternalServerError) - return - } - defer watcher.Close() - - n, err := watcher.Next() - if err != nil { - log.Printf("monitor: error watcher.Next: %v", err) + log.Printf("monitor: error fetching status: %v", err) http.Error(w, "", http.StatusInternalServerError) return } encoder := json.NewEncoder(w) encoder.SetIndent("", "\t") - if err := encoder.Encode(n); err != nil { - log.Printf("monitor: error encoding netmap: %v", err) + if err := encoder.Encode(st); err != nil { + log.Printf("monitor: error encoding status: %v", err) return } }