From 20a564085792d2d4817071a2c352e9e253ada45d Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Thu, 11 Aug 2022 19:29:55 +0400 Subject: [PATCH] fix: introduce 'routed' NodeAddresses and use them in kubelet Same change will be done for the etcd in a separate PR. The idea is to introduce a subset of `current` addresses: `routed` addresses don't include external IPs (like AWS), as they are not on the node, and excludes SideroLink IPs (as these are not routeable). Reimplement `kubelet` nodeIP selection based on the new resources removing the reliance on `net.IPAddrs`. Signed-off-by: Andrey Smirnov --- .../machined/pkg/controllers/k8s/nodeip.go | 63 ++++++++++--------- .../pkg/controllers/k8s/nodeip_test.go | 9 +-- .../pkg/controllers/network/node_address.go | 22 +++++++ .../controllers/network/node_address_test.go | 32 ++++++++-- pkg/machinery/nethelpers/std_netaddr.go | 29 +++++++++ .../resources/network/node_address.go | 6 ++ pkg/machinery/resources/network/ula.go | 5 ++ 7 files changed, 125 insertions(+), 41 deletions(-) create mode 100644 pkg/machinery/nethelpers/std_netaddr.go diff --git a/internal/app/machined/pkg/controllers/k8s/nodeip.go b/internal/app/machined/pkg/controllers/k8s/nodeip.go index ff4cc72be..3bc78a484 100644 --- a/internal/app/machined/pkg/controllers/k8s/nodeip.go +++ b/internal/app/machined/pkg/controllers/k8s/nodeip.go @@ -10,12 +10,15 @@ import ( "github.com/cosi-project/runtime/pkg/controller" "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/safe" "github.com/cosi-project/runtime/pkg/state" "github.com/siderolabs/go-pointer" "github.com/talos-systems/net" "go.uber.org/zap" "inet.af/netaddr" + "github.com/talos-systems/talos/pkg/machinery/generic/slices" + "github.com/talos-systems/talos/pkg/machinery/nethelpers" "github.com/talos-systems/talos/pkg/machinery/resources/k8s" "github.com/talos-systems/talos/pkg/machinery/resources/network" ) @@ -40,7 +43,7 @@ func (ctrl *NodeIPController) Inputs() []controller.Input { { Namespace: network.NamespaceName, Type: network.NodeAddressType, - ID: pointer.To(network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterNoK8s)), + ID: pointer.To(network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s)), Kind: controller.InputWeak, }, } @@ -67,7 +70,7 @@ func (ctrl *NodeIPController) Run(ctx context.Context, r controller.Runtime, log case <-r.EventCh(): } - cfg, err := r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.NodeIPConfigType, k8s.KubeletID, resource.VersionUndefined)) + cfg, err := safe.ReaderGet[*k8s.NodeIPConfig](ctx, r, resource.NewMetadata(k8s.NamespaceName, k8s.NodeIPConfigType, k8s.KubeletID, resource.VersionUndefined)) if err != nil { if state.IsNotFoundError(err) { continue @@ -76,34 +79,40 @@ func (ctrl *NodeIPController) Run(ctx context.Context, r controller.Runtime, log return fmt.Errorf("error getting config: %w", err) } - cfgSpec := cfg.(*k8s.NodeIPConfig).TypedSpec() + cfgSpec := cfg.TypedSpec() + + nodeAddrs, err := safe.ReaderGet[*network.NodeAddress]( + ctx, + r, + resource.NewMetadata( + network.NamespaceName, + network.NodeAddressType, + network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s), + resource.VersionUndefined, + ), + ) + if err != nil { + if state.IsNotFoundError(err) { + continue + } + + return fmt.Errorf("error getting addresses: %w", err) + } + + addrs := nodeAddrs.TypedSpec().IPs() cidrs := make([]string, 0, len(cfgSpec.ValidSubnets)+len(cfgSpec.ExcludeSubnets)) - cidrs = append(cidrs, cfgSpec.ValidSubnets...) + cidrs = append(cidrs, slices.Map(cfgSpec.ExcludeSubnets, func(cidr string) string { return "!" + cidr })...) - for _, subnet := range cfgSpec.ExcludeSubnets { - cidrs = append(cidrs, "!"+subnet) - } - - // we have trigger on NodeAddresses, but we don't use them directly as they contain - // some addresses which are not assigned to the node (like AWS ExternalIP). - // we need to find solution for that later, for now just pull addresses directly - - ips, err := net.IPAddrs() - if err != nil { - return fmt.Errorf("error listing IPs: %w", err) - } - - // we use stdnet.IP here to re-use already existing functions in talos-systems/net - // once talos-systems/net is migrated to netaddr or netip, we can use it here - ips = net.IPFilter(ips, network.NotSideroLinkStdIP) - - ips, err = net.FilterIPs(ips, cidrs) + // TODO: this should eventually be rewritten with `net.FilterIPs` on netaddrs, but for now we'll keep same code and do the conversion. + stdIPs, err := net.FilterIPs(nethelpers.MapNetAddrToStd(addrs), cidrs) if err != nil { return fmt.Errorf("error filtering IPs: %w", err) } + ips := nethelpers.MapStdToNetAddr(stdIPs) + // filter down to make sure only one IPv4 and one IPv6 address stays var hasIPv4, hasIPv6 bool @@ -111,18 +120,16 @@ func (ctrl *NodeIPController) Run(ctx context.Context, r controller.Runtime, log for _, ip := range ips { switch { - case ip.To4() != nil: + case ip.Is4(): if !hasIPv4 { - addr, _ := netaddr.FromStdIP(ip) - nodeIPs = append(nodeIPs, addr) + nodeIPs = append(nodeIPs, ip) hasIPv4 = true } else { logger.Warn("node IP skipped, please use .machine.kubelet.nodeIP to provide explicit subnet for the node IP", zap.Stringer("address", ip)) } - case ip.To16() != nil: + case ip.Is6(): if !hasIPv6 { - addr, _ := netaddr.FromStdIP(ip) - nodeIPs = append(nodeIPs, addr) + nodeIPs = append(nodeIPs, ip) hasIPv6 = true } else { logger.Warn("node IP skipped, please use .machine.kubelet.nodeIP to provide explicit subnet for the node IP", zap.Stringer("address", ip)) diff --git a/internal/app/machined/pkg/controllers/k8s/nodeip_test.go b/internal/app/machined/pkg/controllers/k8s/nodeip_test.go index 2776c6d62..58fada10f 100644 --- a/internal/app/machined/pkg/controllers/k8s/nodeip_test.go +++ b/internal/app/machined/pkg/controllers/k8s/nodeip_test.go @@ -66,8 +66,6 @@ func (suite *NodeIPSuite) startRuntime() { } func (suite *NodeIPSuite) TestReconcileIPv4() { - suite.T().Skip("skipping as the code relies on net.IPAddrs") - cfg := k8s.NewNodeIPConfig(k8s.NamespaceName, k8s.KubeletID) cfg.TypedSpec().ValidSubnets = []string{"10.0.0.0/24", "::/0"} @@ -77,13 +75,12 @@ func (suite *NodeIPSuite) TestReconcileIPv4() { addresses := network.NewNodeAddress( network.NamespaceName, - network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterNoK8s), + network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s), ) addresses.TypedSpec().Addresses = []netaddr.IPPrefix{ netaddr.MustParseIPPrefix("10.0.0.2/32"), // excluded explicitly netaddr.MustParseIPPrefix("10.0.0.5/24"), - netaddr.MustParseIPPrefix("fdae:41e4:649b:9303::1/64"), // SideroLink IP } suite.Require().NoError(suite.state.Create(suite.ctx, addresses)) @@ -114,8 +111,6 @@ func (suite *NodeIPSuite) TestReconcileIPv4() { } func (suite *NodeIPSuite) TestReconcileDefaultSubnets() { - suite.T().Skip("skipping as the code relies on net.IPAddrs") - cfg := k8s.NewNodeIPConfig(k8s.NamespaceName, k8s.KubeletID) cfg.TypedSpec().ValidSubnets = []string{"0.0.0.0/0", "::/0"} @@ -124,7 +119,7 @@ func (suite *NodeIPSuite) TestReconcileDefaultSubnets() { addresses := network.NewNodeAddress( network.NamespaceName, - network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterNoK8s), + network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s), ) addresses.TypedSpec().Addresses = []netaddr.IPPrefix{ diff --git a/internal/app/machined/pkg/controllers/network/node_address.go b/internal/app/machined/pkg/controllers/network/node_address.go index 151ffc906..509b93390 100644 --- a/internal/app/machined/pkg/controllers/network/node_address.go +++ b/internal/app/machined/pkg/controllers/network/node_address.go @@ -108,6 +108,7 @@ func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime defaultAddress netaddr.IPPrefix defaultAddrLinkName string current []netaddr.IPPrefix + routed []netaddr.IPPrefix accumulative []netaddr.IPPrefix ) @@ -137,14 +138,23 @@ func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime current = append(current, ip) } + // routed: filter out external addresses and addresses from SideroLink + if _, up := linksUp[addr.TypedSpec().LinkIndex]; up && addr.TypedSpec().LinkName != externalLink { + if network.NotSideroLinkIP(ip.IP()) { + routed = append(routed, ip) + } + } + accumulative = append(accumulative, ip) } // sort current addresses sort.Slice(current, func(i, j int) bool { return current[i].IP().Compare(current[j].IP()) < 0 }) + sort.Slice(routed, func(i, j int) bool { return routed[i].IP().Compare(routed[j].IP()) < 0 }) // remove duplicates from current addresses current = deduplicateIPPrefixes(current) + routed = deduplicateIPPrefixes(routed) touchedIDs := make(map[resource.ID]struct{}) @@ -177,6 +187,12 @@ func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime touchedIDs[network.NodeAddressCurrentID] = struct{}{} + if err = updateCurrentAddresses(ctx, r, network.NodeAddressRoutedID, routed); err != nil { + return err + } + + touchedIDs[network.NodeAddressRoutedID] = struct{}{} + if err = updateAccumulativeAddresses(ctx, r, network.NodeAddressAccumulativeID, accumulative); err != nil { return err } @@ -189,17 +205,23 @@ func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime filter := res.(*network.NodeAddressFilter).TypedSpec() filteredCurrent := filterIPs(current, filter.IncludeSubnets, filter.ExcludeSubnets) + filteredRouted := filterIPs(routed, filter.IncludeSubnets, filter.ExcludeSubnets) filteredAccumulative := filterIPs(accumulative, filter.IncludeSubnets, filter.ExcludeSubnets) if err = updateCurrentAddresses(ctx, r, network.FilteredNodeAddressID(network.NodeAddressCurrentID, filterID), filteredCurrent); err != nil { return err } + if err = updateCurrentAddresses(ctx, r, network.FilteredNodeAddressID(network.NodeAddressRoutedID, filterID), filteredRouted); err != nil { + return err + } + if err = updateAccumulativeAddresses(ctx, r, network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, filterID), filteredAccumulative); err != nil { return err } touchedIDs[network.FilteredNodeAddressID(network.NodeAddressCurrentID, filterID)] = struct{}{} + touchedIDs[network.FilteredNodeAddressID(network.NodeAddressRoutedID, filterID)] = struct{}{} touchedIDs[network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, filterID)] = struct{}{} } diff --git a/internal/app/machined/pkg/controllers/network/node_address_test.go b/internal/app/machined/pkg/controllers/network/node_address_test.go index e6ad2855f..2564ad4fe 100644 --- a/internal/app/machined/pkg/controllers/network/node_address_test.go +++ b/internal/app/machined/pkg/controllers/network/node_address_test.go @@ -114,6 +114,7 @@ func (suite *NodeAddressSuite) TestDefaults() { []string{ network.NodeAddressDefaultID, network.NodeAddressCurrentID, + network.NodeAddressRoutedID, network.NodeAddressAccumulativeID, }, func(r *network.NodeAddress) error { addrs := r.TypedSpec().Addresses @@ -146,7 +147,7 @@ func (suite *NodeAddressSuite) TestDefaults() { ) } -//nolint:gocyclo +//nolint:gocyclo,cyclop func (suite *NodeAddressSuite) TestFilters() { var ( addressStatusController netctrl.AddressStatusController @@ -197,6 +198,7 @@ func (suite *NodeAddressSuite) TestFilters() { "25.3.7.9/32", "2001:470:6d:30e:4a62:b3ba:180b:b5b8/64", "127.0.0.1/8", + "fdae:41e4:649b:9303:7886:731d:1ce9:4d4/128", } { newAddress(netaddr.MustParseIPPrefix(addr), linkUp) } @@ -227,10 +229,13 @@ func (suite *NodeAddressSuite) TestFilters() { []string{ network.NodeAddressDefaultID, network.NodeAddressCurrentID, + network.NodeAddressRoutedID, network.NodeAddressAccumulativeID, network.FilteredNodeAddressID(network.NodeAddressCurrentID, filter1.Metadata().ID()), + network.FilteredNodeAddressID(network.NodeAddressRoutedID, filter1.Metadata().ID()), network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, filter1.Metadata().ID()), network.FilteredNodeAddressID(network.NodeAddressCurrentID, filter2.Metadata().ID()), + network.FilteredNodeAddressID(network.NodeAddressRoutedID, filter2.Metadata().ID()), network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, filter2.Metadata().ID()), }, func(r *network.NodeAddress) error { addrs := r.TypedSpec().Addresses @@ -243,32 +248,47 @@ func (suite *NodeAddressSuite) TestFilters() { case network.NodeAddressCurrentID: if !reflect.DeepEqual( addrs, - ipList("1.2.3.4/32 10.0.0.1/8 25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"), + ipList("1.2.3.4/32 10.0.0.1/8 25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64 fdae:41e4:649b:9303:7886:731d:1ce9:4d4/128"), + ) { + return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs) + } + case network.NodeAddressRoutedID: + if !reflect.DeepEqual( + addrs, + ipList("10.0.0.1/8 25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"), ) { return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs) } case network.NodeAddressAccumulativeID: if !reflect.DeepEqual( addrs, - ipList("1.2.3.4/32 10.0.0.1/8 10.0.0.2/8 25.3.7.9/32 192.168.3.7/24 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"), + ipList("1.2.3.4/32 10.0.0.1/8 10.0.0.2/8 25.3.7.9/32 192.168.3.7/24 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64 fdae:41e4:649b:9303:7886:731d:1ce9:4d4/128"), ) { return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs) } case network.FilteredNodeAddressID(network.NodeAddressCurrentID, filter1.Metadata().ID()): if !reflect.DeepEqual( addrs, - ipList("1.2.3.4/32 25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"), + ipList("1.2.3.4/32 25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64 fdae:41e4:649b:9303:7886:731d:1ce9:4d4/128"), + ) { + return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs) + } + case network.FilteredNodeAddressID(network.NodeAddressRoutedID, filter1.Metadata().ID()): + if !reflect.DeepEqual( + addrs, + ipList("25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"), ) { return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs) } case network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, filter1.Metadata().ID()): if !reflect.DeepEqual( addrs, - ipList("1.2.3.4/32 25.3.7.9/32 192.168.3.7/24 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"), + ipList("1.2.3.4/32 25.3.7.9/32 192.168.3.7/24 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64 fdae:41e4:649b:9303:7886:731d:1ce9:4d4/128"), ) { return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs) } - case network.FilteredNodeAddressID(network.NodeAddressCurrentID, filter2.Metadata().ID()): + case network.FilteredNodeAddressID(network.NodeAddressCurrentID, filter2.Metadata().ID()), + network.FilteredNodeAddressID(network.NodeAddressRoutedID, filter2.Metadata().ID()): if !reflect.DeepEqual(addrs, ipList("10.0.0.1/8")) { return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs) } diff --git a/pkg/machinery/nethelpers/std_netaddr.go b/pkg/machinery/nethelpers/std_netaddr.go new file mode 100644 index 000000000..fdfef50e0 --- /dev/null +++ b/pkg/machinery/nethelpers/std_netaddr.go @@ -0,0 +1,29 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package nethelpers + +import ( + "net" + + "inet.af/netaddr" + + "github.com/talos-systems/talos/pkg/machinery/generic/slices" +) + +// MapStdToNetAddr converts a slice of net.IP to a slice of netaddr.Addr. +func MapStdToNetAddr(in []net.IP) []netaddr.IP { + return slices.Map(in, func(std net.IP) netaddr.IP { + addr, _ := netaddr.FromStdIP(std) + + return addr.Unmap() + }) +} + +// MapNetAddrToStd converts a slice of netaddr.Addr to a slice of net.IP. +func MapNetAddrToStd(in []netaddr.IP) []net.IP { + return slices.Map(in, func(addr netaddr.IP) net.IP { + return addr.Unmap().IPAddr().IP + }) +} diff --git a/pkg/machinery/resources/network/node_address.go b/pkg/machinery/resources/network/node_address.go index 536c7bdf3..a877b6c90 100644 --- a/pkg/machinery/resources/network/node_address.go +++ b/pkg/machinery/resources/network/node_address.go @@ -37,6 +37,12 @@ const ( // // If some address is no longer present, it will be still kept in the accumulative list. NodeAddressAccumulativeID = "accumulative" + // Routed current node addresses (as seen at the moment). + // + // This is current addresses minus 'external' IPs, and SideroLink IPs. + // + // This list is used to pick advertised/listen addresses for different services. + NodeAddressRoutedID = "routed" ) // NodeAddressSpec describes a set of node addresses. diff --git a/pkg/machinery/resources/network/ula.go b/pkg/machinery/resources/network/ula.go index 0d3f71277..9d5078b47 100644 --- a/pkg/machinery/resources/network/ula.go +++ b/pkg/machinery/resources/network/ula.go @@ -69,6 +69,11 @@ func IsStdULA(ip net.IP, purpose ULAPurpose) bool { return IsULA(addr, purpose) } +// NotSideroLinkIP is a shorthand for !IsULA(ip, ULASideroLink). +func NotSideroLinkIP(ip netaddr.IP) bool { + return !IsULA(ip, ULASideroLink) +} + // NotSideroLinkStdIP is a shorthand for !IsStdULA(ip, ULASideroLink). func NotSideroLinkStdIP(ip net.IP) bool { return !IsStdULA(ip, ULASideroLink)