diff --git a/cmd/k8s-proxy/k8s-proxy.go b/cmd/k8s-proxy/k8s-proxy.go index 9b2bb6749..fbcde1564 100644 --- a/cmd/k8s-proxy/k8s-proxy.go +++ b/cmd/k8s-proxy/k8s-proxy.go @@ -10,6 +10,7 @@ package main import ( "context" + "encoding/json" "errors" "fmt" "net" @@ -104,6 +105,7 @@ func run(logger *zap.SugaredLogger) error { if err != nil { return fmt.Errorf("error getting rest config: %w", err) } + clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { return fmt.Errorf("error creating Kubernetes clientset: %w", err) @@ -152,10 +154,12 @@ func run(logger *zap.SugaredLogger) error { // TODO(tomhjp): Pass this setting directly into the store instead of using // environment variables. - if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true) { - os.Setenv("TS_CERT_SHARE_MODE", "rw") - } else { - os.Setenv("TS_CERT_SHARE_MODE", "ro") + if cfg.Parsed.APIServerProxy != nil { + if cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true) { + os.Setenv("TS_CERT_SHARE_MODE", "rw") + } else { + os.Setenv("TS_CERT_SHARE_MODE", "ro") + } } st, err := getStateStore(cfg.Parsed.State, logger) @@ -275,43 +279,64 @@ func run(logger *zap.SugaredLogger) error { } var cm *certs.CertManager - if shouldIssueCerts(cfg) { - logger.Infof("Will issue TLS certs for Tailscale Service") - cm = certs.NewCertManager(klc.New(lc), logger.Infof) + + if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.L4Proxy != nil { + return fmt.Errorf("proxy configured for both api-server-proxy and l4-proxy") } - if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil { - return err + + if cfg.Parsed.APIServerProxy != nil { + // Setup for the API server proxy. + mode := kubetypes.APIServerProxyModeAuth + if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.Mode != nil { + mode = *cfg.Parsed.APIServerProxy.Mode + } + + ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, mode, false) + if err != nil { + return fmt.Errorf("error creating api server proxy: %w", err) + } + + group.Go(func() error { + if err := ap.Run(serveCtx); err != nil { + return fmt.Errorf("error running API server proxy: %w", err) + } + + return nil + }) + + if shouldIssueCerts(cfg) { + logger.Infof("Will issue TLS certs for Tailscale Service") + cm = certs.NewCertManager(klc.New(lc), logger.Infof) + } + if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil { + return err + } + } else if cfg.Parsed.L4Proxy != nil { + err := setupL4Proxies(serveCtx, ts, lc, logger, cfg, group) + if err != nil { + return fmt.Errorf("failed to setup l4 proxies: %w", err) + } + } else { + return fmt.Errorf("please configure proxy either as api-server-proxy or l4-proxy") } if cfg.Parsed.AdvertiseServices != nil { - if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{ + if prefs, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{ AdvertiseServicesSet: true, Prefs: ipn.Prefs{ AdvertiseServices: cfg.Parsed.AdvertiseServices, }, }); err != nil { return fmt.Errorf("error setting prefs AdvertiseServices: %w", err) + } else { + prefsJSON, _ := json.Marshal(prefs) + logger.Infof("new prefs: %q", string(prefsJSON)) } + logger.Infof("Successfully set AdvertiseServices") + } else { + logger.Infof("No AdvertiseServices configured") } - // Setup for the API server proxy. - mode := kubetypes.APIServerProxyModeAuth - if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.Mode != nil { - mode = *cfg.Parsed.APIServerProxy.Mode - } - ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, mode, false) - if err != nil { - return fmt.Errorf("error creating api server proxy: %w", err) - } - - group.Go(func() error { - if err := ap.Run(serveCtx); err != nil { - return fmt.Errorf("error running API server proxy: %w", err) - } - - return nil - }) - for { select { case <-ctx.Done(): @@ -325,6 +350,7 @@ func run(logger *zap.SugaredLogger) error { case cfg = <-cfgChan: // Handle config reload. // TODO(tomhjp): Make auth mode reloadable. + // TODO(ChaosInTheCRD): Make UDP and TCP forwarders reloadable. var prefs ipn.MaskedPrefs cfgLogger := logger currentPrefs, err := lc.GetPrefs(ctx) @@ -347,12 +373,16 @@ func run(logger *zap.SugaredLogger) error { prefs.Prefs.RouteAll = v } if !prefs.IsEmpty() { + logger.Infof("Advertising Service: %v", cfg.Parsed.AdvertiseServices) if _, err := lc.EditPrefs(ctx, &prefs); err != nil { return fmt.Errorf("error editing prefs: %w", err) } } - if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil { - return fmt.Errorf("error setting serve config: %w", err) + + if cfg.Parsed.APIServerProxy != nil { + if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil { + return fmt.Errorf("error setting serve config: %w", err) + } } cfgLogger.Infof("Config reloaded") @@ -441,6 +471,7 @@ func setServeConfig(ctx context.Context, lc *local.Client, cm *certs.CertManager if err != nil { return fmt.Errorf("error getting local client status: %w", err) } + serviceHostPort := ipn.HostPort(fmt.Sprintf("%s.%s:443", name.WithoutPrefix(), status.CurrentTailnet.MagicDNSSuffix)) serveConfig := ipn.ServeConfig{ diff --git a/cmd/k8s-proxy/l4-forwarder.go b/cmd/k8s-proxy/l4-forwarder.go new file mode 100644 index 000000000..008d7d64f --- /dev/null +++ b/cmd/k8s-proxy/l4-forwarder.go @@ -0,0 +1,269 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package main + +import ( + "context" + "fmt" + "net" + "net/netip" + "slices" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "tailscale.com/client/local" + "tailscale.com/ipn" + "tailscale.com/kube/ingressservices" + "tailscale.com/kube/k8s-proxy/conf" + "tailscale.com/tailcfg" + "tailscale.com/tsnet" +) + +type udpForwarder struct { + listener net.PacketConn + backend string + connMap map[netip.AddrPort]*natEntry + timeout time.Duration + l *zap.SugaredLogger + m sync.Mutex +} + +type natEntry struct { + conn net.Conn + timestamp atomic.Int64 + cancel context.CancelFunc +} + +func (f *udpForwarder) run(ctx context.Context) error { + buf := make([]byte, 65535) + + f.l.Infof("UDP forwarder started, listening on %s, forwarding to %s", f.listener.LocalAddr().String(), f.backend) + + // TODO: Cleanup goroutine + for { + select { + case <-ctx.Done(): + return nil + default: + } + + n, addr, err := f.listener.ReadFrom(buf) + if err != nil { + f.l.Errorf("failed to read from listener: %v", err) + return err + } + + f.l.Debugf("Received %d bytes from %s", n, addr.String()) + + addrp, err := netip.ParseAddrPort(addr.String()) + if err != nil { + f.l.Errorf("failed to parse address as address and port: %v", err) + return err + } + + f.m.Lock() + entry, ok := f.connMap[addrp] + if !ok { + c, err := net.Dial("udp", f.backend) + if err != nil { + f.l.Errorf("failed to dial: %v", err) + f.m.Unlock() + return err + } + + entryCtx, cancel := context.WithCancel(ctx) + + entry = &natEntry{ + conn: c, + cancel: cancel, + timestamp: atomic.Int64{}, + } + f.connMap[addrp] = entry + + go func(ctx context.Context, ne *natEntry) { + defer ne.conn.Close() + buf := make([]byte, 65535) + + for { + select { + case <-ctx.Done(): + f.l.Infof("context for relay with address %q done, exiting", addrp.String()) + return + default: + } + + n, err := ne.conn.Read(buf) + if err != nil { + f.l.Errorf("failed to read from connection with address %q: %v", addrp.String(), err) + return + } + + ne.timestamp.Store(time.Now().Unix()) + + _, err = f.listener.WriteTo(buf[:n], net.UDPAddrFromAddrPort(addrp)) + if err != nil { + f.l.Errorf("failed to write response to address %q: %v", addrp.String(), err) + return + } + } + }(entryCtx, entry) + } + f.m.Unlock() + + _, err = entry.conn.Write(buf[:n]) + if err != nil { + f.l.Errorf("failed to write bytes to %q: %v", f.backend, err) + return err + } + + entry.timestamp.Store(time.Now().Unix()) + } +} + +func setupL4Proxies(ctx context.Context, ts *tsnet.Server, lc *local.Client, logger *zap.SugaredLogger, cfg *conf.Config, group *errgroup.Group) (err error) { + sc := &ipn.ServeConfig{} + sc.Services = make(map[tailcfg.ServiceName]*ipn.ServiceConfig) + + // Store proxies to start later + udpProxies := []ingressservices.Config{} + + // Build up the ServeConfig + for _, p := range cfg.Parsed.L4Proxy.Ingress { + // Register empty service config to trigger IP assignment + for _, m := range p.Mappings() { + if sc.Services[tailcfg.ServiceName(m.TailscaleServiceName)] == nil { + sc.Services[tailcfg.ServiceName(m.TailscaleServiceName)] = &ipn.ServiceConfig{} + } + } + udpProxies = append(udpProxies, p) + + status, err := lc.StatusWithoutPeers(ctx) + if err != nil { + return fmt.Errorf("error getting local client status: %w", err) + } + err = setTCPForwardingForProxy(p, status.CurrentTailnet.MagicDNSSuffix, sc, lc, logger) + if err != nil { + return fmt.Errorf("failed to set tcp forwarding for services: %w", err) + } + } + + // Apply the ServeConfig + logger.Infof("Applying ServeConfig...") + err = lc.SetServeConfig(ctx, sc) + if err != nil { + logger.Errorf("Failed to set ServeConfig: %v", err) + return err + } + + // Setup the UDP Forwarders + for _, p := range udpProxies { + status, err := lc.StatusWithoutPeers(ctx) + if err != nil { + return fmt.Errorf("error getting status: %w", err) + } + + // We can validate that the Service IP is in this node's capmap, to ensure that the advertisement was successful + found := false + serviceIPMaps, err := tailcfg.UnmarshalNodeCapJSON[tailcfg.ServiceIPMappings](status.Self.CapMap, tailcfg.NodeAttrServiceHost) + if err != nil { + return fmt.Errorf("error unmarshaling service IP mappings: %w", err) + } + if len(serviceIPMaps) == 0 { + logger.Warnf("no service IP mappings found for this node") + } else { + for _, m := range p.Mappings() { + ipMatches := false + for serviceName, addrs := range serviceIPMaps[0] { + if string(serviceName) == m.TailscaleServiceName { + found = true + if len(addrs) == 0 { + logger.Warnf("service %s has no assigned VIP addresses", m.TailscaleServiceName) + break + } + // Check if the configured IP is in the capmap. There can be scenarios where it isn't (no autoapproval, tag problems) + if slices.Contains(addrs, m.TailscaleServiceIP) { + ipMatches = true + logger.Infof("Found matching VIP %s for service %s in capmap", m.TailscaleServiceIP, m.TailscaleServiceName) + } + if !ipMatches { + logger.Warnf("Service %s configured with IP %s, but capmap reports %v. Routing may not work.", + m.TailscaleServiceName, m.TailscaleServiceIP, addrs) + } + break + } + } + if !found { + logger.Warnf("Tailscale Service %q not found in capmap. Routing may not work.", m.TailscaleServiceName) + } + } + } + + fs, err := setupUDPForwardingForProxy(ts, p, logger) + if err != nil { + return fmt.Errorf("failed to setup udp forwarding: %w", err) + } + + for _, f := range fs { + group.Go(func() error { + logger.Infof("Starting UDP forwarder goroutine for %s (%v)", f.backend, f.listener.LocalAddr()) + return f.run(ctx) + }) + + logger.Infof("successfully created UDP listener on %s", f.listener.LocalAddr()) + } + + } + + logger.Infof("Successfully applied ServeConfig and started all L4 proxies") + return nil +} + +func setTCPForwardingForProxy(p ingressservices.Config, magicDNSSuffix string, serveConfig *ipn.ServeConfig, lc *local.Client, logger *zap.SugaredLogger) error { + for _, m := range p.Mappings() { + for _, port := range m.Ports { + svcName := tailcfg.ServiceName(m.TailscaleServiceName) + logger.Infof("Setting TCP forwarding for service=%s, port=%d, backend=%s", svcName, port, m.ClusterIP) + + serveConfig.SetTCPForwardingForService( + port, + m.ClusterIP.String(), + false, + svcName, + 0, + magicDNSSuffix, + ) + } + } + + return nil +} + +func setupUDPForwardingForProxy(ts *tsnet.Server, p ingressservices.Config, logger *zap.SugaredLogger) (fs []*udpForwarder, err error) { + for _, m := range p.Mappings() { + for _, port := range m.Ports { + f := &udpForwarder{ + l: logger.Named(fmt.Sprintf("udp-forwarder-%v", m.ClusterIP)), + backend: fmt.Sprintf("%s:%d", m.ClusterIP.String(), port), + connMap: make(map[netip.AddrPort]*natEntry), + } + listenAddr := fmt.Sprintf("%s:%d", m.TailscaleServiceIP, port) + logger.Infof("Attempting to listen on UDP address: %s", listenAddr) + + f.listener, err = ts.ListenPacket("udp", listenAddr) + if err != nil { + logger.Warnf("Failed to listen on %s: %v", listenAddr, err) + return nil, err + } + + fs = append(fs, f) + } + } + + return +} diff --git a/kube/egressservices/egressservices.go b/kube/egressservices/egressservices.go index 56c874f31..d61a9547d 100644 --- a/kube/egressservices/egressservices.go +++ b/kube/egressservices/egressservices.go @@ -47,7 +47,7 @@ type TailnetTarget struct { FQDN string `json:"fqdn"` } -// PorMap is a mapping between match port on which proxy receives cluster +// PortMap is a mapping between match port on which proxy receives cluster // traffic and target port where traffic received on match port should be // fowardded to. type PortMap struct { diff --git a/kube/ingressservices/ingressservices.go b/kube/ingressservices/ingressservices.go index f79410761..ba8d0f63e 100644 --- a/kube/ingressservices/ingressservices.go +++ b/kube/ingressservices/ingressservices.go @@ -48,6 +48,20 @@ type Config struct { // Mapping describes a rule that forwards traffic from Tailscale Service IP to a // Kubernetes Service IP. type Mapping struct { - TailscaleServiceIP netip.Addr `json:"TailscaleServiceIP"` - ClusterIP netip.Addr `json:"ClusterIP"` + TailscaleServiceName string `json:"TailscaleServiceName"` + TailscaleServiceIP netip.Addr `json:"TailscaleServiceIP"` + ClusterIP netip.Addr `json:"ClusterIP"` + Ports []uint16 `json:"ports"` +} + +// Mappings returns all non-nil mappings for this config +func (c *Config) Mappings() []*Mapping { + var mappings []*Mapping + if c.IPv4Mapping != nil { + mappings = append(mappings, c.IPv4Mapping) + } + if c.IPv6Mapping != nil { + mappings = append(mappings, c.IPv6Mapping) + } + return mappings } diff --git a/kube/k8s-proxy/conf/conf.go b/kube/k8s-proxy/conf/conf.go index 529495243..c854d7c13 100644 --- a/kube/k8s-proxy/conf/conf.go +++ b/kube/k8s-proxy/conf/conf.go @@ -14,6 +14,8 @@ import ( "net/netip" "github.com/tailscale/hujson" + "tailscale.com/kube/egressservices" + "tailscale.com/kube/ingressservices" "tailscale.com/kube/kubetypes" "tailscale.com/tailcfg" "tailscale.com/types/opt" @@ -66,6 +68,12 @@ type ConfigV1Alpha1 struct { AdvertiseServices []string `json:",omitempty"` // Tailscale Services to advertise. APIServerProxy *APIServerProxyConfig `json:",omitempty"` // Config specific to the API Server proxy. StaticEndpoints []netip.AddrPort `json:",omitempty"` // StaticEndpoints are additional, user-defined endpoints that this node should advertise amongst its wireguard endpoints. + L4Proxy *L4ProxyConfig `json:",omitempty"` +} + +type L4ProxyConfig struct { + Ingress []ingressservices.Config `json:",omitempty"` + Egress []egressservices.Config `json:",omitempty"` } type APIServerProxyConfig struct { diff --git a/wgengine/netstack/netstack.go b/wgengine/netstack/netstack.go index e05846e15..27aa39014 100644 --- a/wgengine/netstack/netstack.go +++ b/wgengine/netstack/netstack.go @@ -1109,6 +1109,34 @@ func (ns *Impl) shouldProcessInbound(p *packet.Parsed, t *tstun.Wrapper) bool { return true } } + // check if there's a registered UDP endpoint for this service VIP + // This allows userspace UDP listeners (e.g., via tsnet.ListenPacket) to + // receive traffic on service VIP addresses. + if p.IPProto == ipproto.UDP { + var netProto tcpip.NetworkProtocolNumber + var id stack.TransportEndpointID + if p.Dst.Addr().Is4() { + netProto = ipv4.ProtocolNumber + id = stack.TransportEndpointID{ + LocalAddress: tcpip.AddrFrom4(p.Dst.Addr().As4()), + LocalPort: p.Dst.Port(), + RemoteAddress: tcpip.AddrFrom4(p.Src.Addr().As4()), + RemotePort: p.Src.Port(), + } + } else { + netProto = ipv6.ProtocolNumber + id = stack.TransportEndpointID{ + LocalAddress: tcpip.AddrFrom16(p.Dst.Addr().As16()), + LocalPort: p.Dst.Port(), + RemoteAddress: tcpip.AddrFrom16(p.Src.Addr().As16()), + RemotePort: p.Src.Port(), + } + } + ep := ns.ipstack.FindTransportEndpoint(netProto, udp.ProtocolNumber, id, nicID) + if ep != nil { + return true + } + } return false } if p.IPVersion == 6 && !isLocal && viaRange.Contains(dstIP) {