cmd/k8s-proxy,kube/k8s-proxy: starting userspace proxy

Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
This commit is contained in:
chaosinthecrd 2026-01-23 12:19:38 +00:00
parent 63d563e734
commit bf7508dbf6
No known key found for this signature in database
GPG Key ID: 52ED56820AF046EE
6 changed files with 383 additions and 33 deletions

View File

@ -10,6 +10,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
@ -104,6 +105,7 @@ func run(logger *zap.SugaredLogger) error {
if err != nil {
return fmt.Errorf("error getting rest config: %w", err)
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("error creating Kubernetes clientset: %w", err)
@ -152,10 +154,12 @@ func run(logger *zap.SugaredLogger) error {
// TODO(tomhjp): Pass this setting directly into the store instead of using
// environment variables.
if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true) {
os.Setenv("TS_CERT_SHARE_MODE", "rw")
} else {
os.Setenv("TS_CERT_SHARE_MODE", "ro")
if cfg.Parsed.APIServerProxy != nil {
if cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true) {
os.Setenv("TS_CERT_SHARE_MODE", "rw")
} else {
os.Setenv("TS_CERT_SHARE_MODE", "ro")
}
}
st, err := getStateStore(cfg.Parsed.State, logger)
@ -275,43 +279,64 @@ func run(logger *zap.SugaredLogger) error {
}
var cm *certs.CertManager
if shouldIssueCerts(cfg) {
logger.Infof("Will issue TLS certs for Tailscale Service")
cm = certs.NewCertManager(klc.New(lc), logger.Infof)
if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.L4Proxy != nil {
return fmt.Errorf("proxy configured for both api-server-proxy and l4-proxy")
}
if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
return err
if cfg.Parsed.APIServerProxy != nil {
// Setup for the API server proxy.
mode := kubetypes.APIServerProxyModeAuth
if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.Mode != nil {
mode = *cfg.Parsed.APIServerProxy.Mode
}
ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, mode, false)
if err != nil {
return fmt.Errorf("error creating api server proxy: %w", err)
}
group.Go(func() error {
if err := ap.Run(serveCtx); err != nil {
return fmt.Errorf("error running API server proxy: %w", err)
}
return nil
})
if shouldIssueCerts(cfg) {
logger.Infof("Will issue TLS certs for Tailscale Service")
cm = certs.NewCertManager(klc.New(lc), logger.Infof)
}
if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
return err
}
} else if cfg.Parsed.L4Proxy != nil {
err := setupL4Proxies(serveCtx, ts, lc, logger, cfg, group)
if err != nil {
return fmt.Errorf("failed to setup l4 proxies: %w", err)
}
} else {
return fmt.Errorf("please configure proxy either as api-server-proxy or l4-proxy")
}
if cfg.Parsed.AdvertiseServices != nil {
if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
if prefs, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
AdvertiseServicesSet: true,
Prefs: ipn.Prefs{
AdvertiseServices: cfg.Parsed.AdvertiseServices,
},
}); err != nil {
return fmt.Errorf("error setting prefs AdvertiseServices: %w", err)
} else {
prefsJSON, _ := json.Marshal(prefs)
logger.Infof("new prefs: %q", string(prefsJSON))
}
logger.Infof("Successfully set AdvertiseServices")
} else {
logger.Infof("No AdvertiseServices configured")
}
// Setup for the API server proxy.
mode := kubetypes.APIServerProxyModeAuth
if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.Mode != nil {
mode = *cfg.Parsed.APIServerProxy.Mode
}
ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, mode, false)
if err != nil {
return fmt.Errorf("error creating api server proxy: %w", err)
}
group.Go(func() error {
if err := ap.Run(serveCtx); err != nil {
return fmt.Errorf("error running API server proxy: %w", err)
}
return nil
})
for {
select {
case <-ctx.Done():
@ -325,6 +350,7 @@ func run(logger *zap.SugaredLogger) error {
case cfg = <-cfgChan:
// Handle config reload.
// TODO(tomhjp): Make auth mode reloadable.
// TODO(ChaosInTheCRD): Make UDP and TCP forwarders reloadable.
var prefs ipn.MaskedPrefs
cfgLogger := logger
currentPrefs, err := lc.GetPrefs(ctx)
@ -347,12 +373,16 @@ func run(logger *zap.SugaredLogger) error {
prefs.Prefs.RouteAll = v
}
if !prefs.IsEmpty() {
logger.Infof("Advertising Service: %v", cfg.Parsed.AdvertiseServices)
if _, err := lc.EditPrefs(ctx, &prefs); err != nil {
return fmt.Errorf("error editing prefs: %w", err)
}
}
if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
return fmt.Errorf("error setting serve config: %w", err)
if cfg.Parsed.APIServerProxy != nil {
if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
return fmt.Errorf("error setting serve config: %w", err)
}
}
cfgLogger.Infof("Config reloaded")
@ -441,6 +471,7 @@ func setServeConfig(ctx context.Context, lc *local.Client, cm *certs.CertManager
if err != nil {
return fmt.Errorf("error getting local client status: %w", err)
}
serviceHostPort := ipn.HostPort(fmt.Sprintf("%s.%s:443", name.WithoutPrefix(), status.CurrentTailnet.MagicDNSSuffix))
serveConfig := ipn.ServeConfig{

View File

@ -0,0 +1,269 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"context"
"fmt"
"net"
"net/netip"
"slices"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"tailscale.com/client/local"
"tailscale.com/ipn"
"tailscale.com/kube/ingressservices"
"tailscale.com/kube/k8s-proxy/conf"
"tailscale.com/tailcfg"
"tailscale.com/tsnet"
)
type udpForwarder struct {
listener net.PacketConn
backend string
connMap map[netip.AddrPort]*natEntry
timeout time.Duration
l *zap.SugaredLogger
m sync.Mutex
}
type natEntry struct {
conn net.Conn
timestamp atomic.Int64
cancel context.CancelFunc
}
func (f *udpForwarder) run(ctx context.Context) error {
buf := make([]byte, 65535)
f.l.Infof("UDP forwarder started, listening on %s, forwarding to %s", f.listener.LocalAddr().String(), f.backend)
// TODO: Cleanup goroutine
for {
select {
case <-ctx.Done():
return nil
default:
}
n, addr, err := f.listener.ReadFrom(buf)
if err != nil {
f.l.Errorf("failed to read from listener: %v", err)
return err
}
f.l.Debugf("Received %d bytes from %s", n, addr.String())
addrp, err := netip.ParseAddrPort(addr.String())
if err != nil {
f.l.Errorf("failed to parse address as address and port: %v", err)
return err
}
f.m.Lock()
entry, ok := f.connMap[addrp]
if !ok {
c, err := net.Dial("udp", f.backend)
if err != nil {
f.l.Errorf("failed to dial: %v", err)
f.m.Unlock()
return err
}
entryCtx, cancel := context.WithCancel(ctx)
entry = &natEntry{
conn: c,
cancel: cancel,
timestamp: atomic.Int64{},
}
f.connMap[addrp] = entry
go func(ctx context.Context, ne *natEntry) {
defer ne.conn.Close()
buf := make([]byte, 65535)
for {
select {
case <-ctx.Done():
f.l.Infof("context for relay with address %q done, exiting", addrp.String())
return
default:
}
n, err := ne.conn.Read(buf)
if err != nil {
f.l.Errorf("failed to read from connection with address %q: %v", addrp.String(), err)
return
}
ne.timestamp.Store(time.Now().Unix())
_, err = f.listener.WriteTo(buf[:n], net.UDPAddrFromAddrPort(addrp))
if err != nil {
f.l.Errorf("failed to write response to address %q: %v", addrp.String(), err)
return
}
}
}(entryCtx, entry)
}
f.m.Unlock()
_, err = entry.conn.Write(buf[:n])
if err != nil {
f.l.Errorf("failed to write bytes to %q: %v", f.backend, err)
return err
}
entry.timestamp.Store(time.Now().Unix())
}
}
func setupL4Proxies(ctx context.Context, ts *tsnet.Server, lc *local.Client, logger *zap.SugaredLogger, cfg *conf.Config, group *errgroup.Group) (err error) {
sc := &ipn.ServeConfig{}
sc.Services = make(map[tailcfg.ServiceName]*ipn.ServiceConfig)
// Store proxies to start later
udpProxies := []ingressservices.Config{}
// Build up the ServeConfig
for _, p := range cfg.Parsed.L4Proxy.Ingress {
// Register empty service config to trigger IP assignment
for _, m := range p.Mappings() {
if sc.Services[tailcfg.ServiceName(m.TailscaleServiceName)] == nil {
sc.Services[tailcfg.ServiceName(m.TailscaleServiceName)] = &ipn.ServiceConfig{}
}
}
udpProxies = append(udpProxies, p)
status, err := lc.StatusWithoutPeers(ctx)
if err != nil {
return fmt.Errorf("error getting local client status: %w", err)
}
err = setTCPForwardingForProxy(p, status.CurrentTailnet.MagicDNSSuffix, sc, lc, logger)
if err != nil {
return fmt.Errorf("failed to set tcp forwarding for services: %w", err)
}
}
// Apply the ServeConfig
logger.Infof("Applying ServeConfig...")
err = lc.SetServeConfig(ctx, sc)
if err != nil {
logger.Errorf("Failed to set ServeConfig: %v", err)
return err
}
// Setup the UDP Forwarders
for _, p := range udpProxies {
status, err := lc.StatusWithoutPeers(ctx)
if err != nil {
return fmt.Errorf("error getting status: %w", err)
}
// We can validate that the Service IP is in this node's capmap, to ensure that the advertisement was successful
found := false
serviceIPMaps, err := tailcfg.UnmarshalNodeCapJSON[tailcfg.ServiceIPMappings](status.Self.CapMap, tailcfg.NodeAttrServiceHost)
if err != nil {
return fmt.Errorf("error unmarshaling service IP mappings: %w", err)
}
if len(serviceIPMaps) == 0 {
logger.Warnf("no service IP mappings found for this node")
} else {
for _, m := range p.Mappings() {
ipMatches := false
for serviceName, addrs := range serviceIPMaps[0] {
if string(serviceName) == m.TailscaleServiceName {
found = true
if len(addrs) == 0 {
logger.Warnf("service %s has no assigned VIP addresses", m.TailscaleServiceName)
break
}
// Check if the configured IP is in the capmap. There can be scenarios where it isn't (no autoapproval, tag problems)
if slices.Contains(addrs, m.TailscaleServiceIP) {
ipMatches = true
logger.Infof("Found matching VIP %s for service %s in capmap", m.TailscaleServiceIP, m.TailscaleServiceName)
}
if !ipMatches {
logger.Warnf("Service %s configured with IP %s, but capmap reports %v. Routing may not work.",
m.TailscaleServiceName, m.TailscaleServiceIP, addrs)
}
break
}
}
if !found {
logger.Warnf("Tailscale Service %q not found in capmap. Routing may not work.", m.TailscaleServiceName)
}
}
}
fs, err := setupUDPForwardingForProxy(ts, p, logger)
if err != nil {
return fmt.Errorf("failed to setup udp forwarding: %w", err)
}
for _, f := range fs {
group.Go(func() error {
logger.Infof("Starting UDP forwarder goroutine for %s (%v)", f.backend, f.listener.LocalAddr())
return f.run(ctx)
})
logger.Infof("successfully created UDP listener on %s", f.listener.LocalAddr())
}
}
logger.Infof("Successfully applied ServeConfig and started all L4 proxies")
return nil
}
func setTCPForwardingForProxy(p ingressservices.Config, magicDNSSuffix string, serveConfig *ipn.ServeConfig, lc *local.Client, logger *zap.SugaredLogger) error {
for _, m := range p.Mappings() {
for _, port := range m.Ports {
svcName := tailcfg.ServiceName(m.TailscaleServiceName)
logger.Infof("Setting TCP forwarding for service=%s, port=%d, backend=%s", svcName, port, m.ClusterIP)
serveConfig.SetTCPForwardingForService(
port,
m.ClusterIP.String(),
false,
svcName,
0,
magicDNSSuffix,
)
}
}
return nil
}
func setupUDPForwardingForProxy(ts *tsnet.Server, p ingressservices.Config, logger *zap.SugaredLogger) (fs []*udpForwarder, err error) {
for _, m := range p.Mappings() {
for _, port := range m.Ports {
f := &udpForwarder{
l: logger.Named(fmt.Sprintf("udp-forwarder-%v", m.ClusterIP)),
backend: fmt.Sprintf("%s:%d", m.ClusterIP.String(), port),
connMap: make(map[netip.AddrPort]*natEntry),
}
listenAddr := fmt.Sprintf("%s:%d", m.TailscaleServiceIP, port)
logger.Infof("Attempting to listen on UDP address: %s", listenAddr)
f.listener, err = ts.ListenPacket("udp", listenAddr)
if err != nil {
logger.Warnf("Failed to listen on %s: %v", listenAddr, err)
return nil, err
}
fs = append(fs, f)
}
}
return
}

View File

@ -47,7 +47,7 @@ type TailnetTarget struct {
FQDN string `json:"fqdn"`
}
// PorMap is a mapping between match port on which proxy receives cluster
// PortMap is a mapping between match port on which proxy receives cluster
// traffic and target port where traffic received on match port should be
// fowardded to.
type PortMap struct {

View File

@ -48,6 +48,20 @@ type Config struct {
// Mapping describes a rule that forwards traffic from Tailscale Service IP to a
// Kubernetes Service IP.
type Mapping struct {
TailscaleServiceIP netip.Addr `json:"TailscaleServiceIP"`
ClusterIP netip.Addr `json:"ClusterIP"`
TailscaleServiceName string `json:"TailscaleServiceName"`
TailscaleServiceIP netip.Addr `json:"TailscaleServiceIP"`
ClusterIP netip.Addr `json:"ClusterIP"`
Ports []uint16 `json:"ports"`
}
// Mappings returns all non-nil mappings for this config
func (c *Config) Mappings() []*Mapping {
var mappings []*Mapping
if c.IPv4Mapping != nil {
mappings = append(mappings, c.IPv4Mapping)
}
if c.IPv6Mapping != nil {
mappings = append(mappings, c.IPv6Mapping)
}
return mappings
}

View File

@ -14,6 +14,8 @@ import (
"net/netip"
"github.com/tailscale/hujson"
"tailscale.com/kube/egressservices"
"tailscale.com/kube/ingressservices"
"tailscale.com/kube/kubetypes"
"tailscale.com/tailcfg"
"tailscale.com/types/opt"
@ -66,6 +68,12 @@ type ConfigV1Alpha1 struct {
AdvertiseServices []string `json:",omitempty"` // Tailscale Services to advertise.
APIServerProxy *APIServerProxyConfig `json:",omitempty"` // Config specific to the API Server proxy.
StaticEndpoints []netip.AddrPort `json:",omitempty"` // StaticEndpoints are additional, user-defined endpoints that this node should advertise amongst its wireguard endpoints.
L4Proxy *L4ProxyConfig `json:",omitempty"`
}
type L4ProxyConfig struct {
Ingress []ingressservices.Config `json:",omitempty"`
Egress []egressservices.Config `json:",omitempty"`
}
type APIServerProxyConfig struct {

View File

@ -1109,6 +1109,34 @@ func (ns *Impl) shouldProcessInbound(p *packet.Parsed, t *tstun.Wrapper) bool {
return true
}
}
// check if there's a registered UDP endpoint for this service VIP
// This allows userspace UDP listeners (e.g., via tsnet.ListenPacket) to
// receive traffic on service VIP addresses.
if p.IPProto == ipproto.UDP {
var netProto tcpip.NetworkProtocolNumber
var id stack.TransportEndpointID
if p.Dst.Addr().Is4() {
netProto = ipv4.ProtocolNumber
id = stack.TransportEndpointID{
LocalAddress: tcpip.AddrFrom4(p.Dst.Addr().As4()),
LocalPort: p.Dst.Port(),
RemoteAddress: tcpip.AddrFrom4(p.Src.Addr().As4()),
RemotePort: p.Src.Port(),
}
} else {
netProto = ipv6.ProtocolNumber
id = stack.TransportEndpointID{
LocalAddress: tcpip.AddrFrom16(p.Dst.Addr().As16()),
LocalPort: p.Dst.Port(),
RemoteAddress: tcpip.AddrFrom16(p.Src.Addr().As16()),
RemotePort: p.Src.Port(),
}
}
ep := ns.ipstack.FindTransportEndpoint(netProto, udp.ProtocolNumber, id, nicID)
if ep != nil {
return true
}
}
return false
}
if p.IPVersion == 6 && !isLocal && viaRange.Contains(dstIP) {