fix: introduce 'routed' NodeAddresses and use them in kubelet

Same change will be done for the etcd in a separate PR.

The idea is to introduce a subset of `current` addresses: `routed`
addresses don't include external IPs (like AWS), as they are not on the
node, and excludes SideroLink IPs (as these are not routeable).

Reimplement `kubelet` nodeIP selection based on the new resources
removing the reliance on `net.IPAddrs`.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
This commit is contained in:
Andrey Smirnov 2022-08-11 19:29:55 +04:00
parent f1de478943
commit 20a5640857
No known key found for this signature in database
GPG Key ID: 7B26396447AB6DFD
7 changed files with 125 additions and 41 deletions

View File

@ -10,12 +10,15 @@ import (
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/go-pointer"
"github.com/talos-systems/net"
"go.uber.org/zap"
"inet.af/netaddr"
"github.com/talos-systems/talos/pkg/machinery/generic/slices"
"github.com/talos-systems/talos/pkg/machinery/nethelpers"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
"github.com/talos-systems/talos/pkg/machinery/resources/network"
)
@ -40,7 +43,7 @@ func (ctrl *NodeIPController) Inputs() []controller.Input {
{
Namespace: network.NamespaceName,
Type: network.NodeAddressType,
ID: pointer.To(network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterNoK8s)),
ID: pointer.To(network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s)),
Kind: controller.InputWeak,
},
}
@ -67,7 +70,7 @@ func (ctrl *NodeIPController) Run(ctx context.Context, r controller.Runtime, log
case <-r.EventCh():
}
cfg, err := r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.NodeIPConfigType, k8s.KubeletID, resource.VersionUndefined))
cfg, err := safe.ReaderGet[*k8s.NodeIPConfig](ctx, r, resource.NewMetadata(k8s.NamespaceName, k8s.NodeIPConfigType, k8s.KubeletID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -76,34 +79,40 @@ func (ctrl *NodeIPController) Run(ctx context.Context, r controller.Runtime, log
return fmt.Errorf("error getting config: %w", err)
}
cfgSpec := cfg.(*k8s.NodeIPConfig).TypedSpec()
cfgSpec := cfg.TypedSpec()
nodeAddrs, err := safe.ReaderGet[*network.NodeAddress](
ctx,
r,
resource.NewMetadata(
network.NamespaceName,
network.NodeAddressType,
network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s),
resource.VersionUndefined,
),
)
if err != nil {
if state.IsNotFoundError(err) {
continue
}
return fmt.Errorf("error getting addresses: %w", err)
}
addrs := nodeAddrs.TypedSpec().IPs()
cidrs := make([]string, 0, len(cfgSpec.ValidSubnets)+len(cfgSpec.ExcludeSubnets))
cidrs = append(cidrs, cfgSpec.ValidSubnets...)
cidrs = append(cidrs, slices.Map(cfgSpec.ExcludeSubnets, func(cidr string) string { return "!" + cidr })...)
for _, subnet := range cfgSpec.ExcludeSubnets {
cidrs = append(cidrs, "!"+subnet)
}
// we have trigger on NodeAddresses, but we don't use them directly as they contain
// some addresses which are not assigned to the node (like AWS ExternalIP).
// we need to find solution for that later, for now just pull addresses directly
ips, err := net.IPAddrs()
if err != nil {
return fmt.Errorf("error listing IPs: %w", err)
}
// we use stdnet.IP here to re-use already existing functions in talos-systems/net
// once talos-systems/net is migrated to netaddr or netip, we can use it here
ips = net.IPFilter(ips, network.NotSideroLinkStdIP)
ips, err = net.FilterIPs(ips, cidrs)
// TODO: this should eventually be rewritten with `net.FilterIPs` on netaddrs, but for now we'll keep same code and do the conversion.
stdIPs, err := net.FilterIPs(nethelpers.MapNetAddrToStd(addrs), cidrs)
if err != nil {
return fmt.Errorf("error filtering IPs: %w", err)
}
ips := nethelpers.MapStdToNetAddr(stdIPs)
// filter down to make sure only one IPv4 and one IPv6 address stays
var hasIPv4, hasIPv6 bool
@ -111,18 +120,16 @@ func (ctrl *NodeIPController) Run(ctx context.Context, r controller.Runtime, log
for _, ip := range ips {
switch {
case ip.To4() != nil:
case ip.Is4():
if !hasIPv4 {
addr, _ := netaddr.FromStdIP(ip)
nodeIPs = append(nodeIPs, addr)
nodeIPs = append(nodeIPs, ip)
hasIPv4 = true
} else {
logger.Warn("node IP skipped, please use .machine.kubelet.nodeIP to provide explicit subnet for the node IP", zap.Stringer("address", ip))
}
case ip.To16() != nil:
case ip.Is6():
if !hasIPv6 {
addr, _ := netaddr.FromStdIP(ip)
nodeIPs = append(nodeIPs, addr)
nodeIPs = append(nodeIPs, ip)
hasIPv6 = true
} else {
logger.Warn("node IP skipped, please use .machine.kubelet.nodeIP to provide explicit subnet for the node IP", zap.Stringer("address", ip))

View File

@ -66,8 +66,6 @@ func (suite *NodeIPSuite) startRuntime() {
}
func (suite *NodeIPSuite) TestReconcileIPv4() {
suite.T().Skip("skipping as the code relies on net.IPAddrs")
cfg := k8s.NewNodeIPConfig(k8s.NamespaceName, k8s.KubeletID)
cfg.TypedSpec().ValidSubnets = []string{"10.0.0.0/24", "::/0"}
@ -77,13 +75,12 @@ func (suite *NodeIPSuite) TestReconcileIPv4() {
addresses := network.NewNodeAddress(
network.NamespaceName,
network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterNoK8s),
network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s),
)
addresses.TypedSpec().Addresses = []netaddr.IPPrefix{
netaddr.MustParseIPPrefix("10.0.0.2/32"), // excluded explicitly
netaddr.MustParseIPPrefix("10.0.0.5/24"),
netaddr.MustParseIPPrefix("fdae:41e4:649b:9303::1/64"), // SideroLink IP
}
suite.Require().NoError(suite.state.Create(suite.ctx, addresses))
@ -114,8 +111,6 @@ func (suite *NodeIPSuite) TestReconcileIPv4() {
}
func (suite *NodeIPSuite) TestReconcileDefaultSubnets() {
suite.T().Skip("skipping as the code relies on net.IPAddrs")
cfg := k8s.NewNodeIPConfig(k8s.NamespaceName, k8s.KubeletID)
cfg.TypedSpec().ValidSubnets = []string{"0.0.0.0/0", "::/0"}
@ -124,7 +119,7 @@ func (suite *NodeIPSuite) TestReconcileDefaultSubnets() {
addresses := network.NewNodeAddress(
network.NamespaceName,
network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterNoK8s),
network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s),
)
addresses.TypedSpec().Addresses = []netaddr.IPPrefix{

View File

@ -108,6 +108,7 @@ func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime
defaultAddress netaddr.IPPrefix
defaultAddrLinkName string
current []netaddr.IPPrefix
routed []netaddr.IPPrefix
accumulative []netaddr.IPPrefix
)
@ -137,14 +138,23 @@ func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime
current = append(current, ip)
}
// routed: filter out external addresses and addresses from SideroLink
if _, up := linksUp[addr.TypedSpec().LinkIndex]; up && addr.TypedSpec().LinkName != externalLink {
if network.NotSideroLinkIP(ip.IP()) {
routed = append(routed, ip)
}
}
accumulative = append(accumulative, ip)
}
// sort current addresses
sort.Slice(current, func(i, j int) bool { return current[i].IP().Compare(current[j].IP()) < 0 })
sort.Slice(routed, func(i, j int) bool { return routed[i].IP().Compare(routed[j].IP()) < 0 })
// remove duplicates from current addresses
current = deduplicateIPPrefixes(current)
routed = deduplicateIPPrefixes(routed)
touchedIDs := make(map[resource.ID]struct{})
@ -177,6 +187,12 @@ func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime
touchedIDs[network.NodeAddressCurrentID] = struct{}{}
if err = updateCurrentAddresses(ctx, r, network.NodeAddressRoutedID, routed); err != nil {
return err
}
touchedIDs[network.NodeAddressRoutedID] = struct{}{}
if err = updateAccumulativeAddresses(ctx, r, network.NodeAddressAccumulativeID, accumulative); err != nil {
return err
}
@ -189,17 +205,23 @@ func (ctrl *NodeAddressController) Run(ctx context.Context, r controller.Runtime
filter := res.(*network.NodeAddressFilter).TypedSpec()
filteredCurrent := filterIPs(current, filter.IncludeSubnets, filter.ExcludeSubnets)
filteredRouted := filterIPs(routed, filter.IncludeSubnets, filter.ExcludeSubnets)
filteredAccumulative := filterIPs(accumulative, filter.IncludeSubnets, filter.ExcludeSubnets)
if err = updateCurrentAddresses(ctx, r, network.FilteredNodeAddressID(network.NodeAddressCurrentID, filterID), filteredCurrent); err != nil {
return err
}
if err = updateCurrentAddresses(ctx, r, network.FilteredNodeAddressID(network.NodeAddressRoutedID, filterID), filteredRouted); err != nil {
return err
}
if err = updateAccumulativeAddresses(ctx, r, network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, filterID), filteredAccumulative); err != nil {
return err
}
touchedIDs[network.FilteredNodeAddressID(network.NodeAddressCurrentID, filterID)] = struct{}{}
touchedIDs[network.FilteredNodeAddressID(network.NodeAddressRoutedID, filterID)] = struct{}{}
touchedIDs[network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, filterID)] = struct{}{}
}

View File

@ -114,6 +114,7 @@ func (suite *NodeAddressSuite) TestDefaults() {
[]string{
network.NodeAddressDefaultID,
network.NodeAddressCurrentID,
network.NodeAddressRoutedID,
network.NodeAddressAccumulativeID,
}, func(r *network.NodeAddress) error {
addrs := r.TypedSpec().Addresses
@ -146,7 +147,7 @@ func (suite *NodeAddressSuite) TestDefaults() {
)
}
//nolint:gocyclo
//nolint:gocyclo,cyclop
func (suite *NodeAddressSuite) TestFilters() {
var (
addressStatusController netctrl.AddressStatusController
@ -197,6 +198,7 @@ func (suite *NodeAddressSuite) TestFilters() {
"25.3.7.9/32",
"2001:470:6d:30e:4a62:b3ba:180b:b5b8/64",
"127.0.0.1/8",
"fdae:41e4:649b:9303:7886:731d:1ce9:4d4/128",
} {
newAddress(netaddr.MustParseIPPrefix(addr), linkUp)
}
@ -227,10 +229,13 @@ func (suite *NodeAddressSuite) TestFilters() {
[]string{
network.NodeAddressDefaultID,
network.NodeAddressCurrentID,
network.NodeAddressRoutedID,
network.NodeAddressAccumulativeID,
network.FilteredNodeAddressID(network.NodeAddressCurrentID, filter1.Metadata().ID()),
network.FilteredNodeAddressID(network.NodeAddressRoutedID, filter1.Metadata().ID()),
network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, filter1.Metadata().ID()),
network.FilteredNodeAddressID(network.NodeAddressCurrentID, filter2.Metadata().ID()),
network.FilteredNodeAddressID(network.NodeAddressRoutedID, filter2.Metadata().ID()),
network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, filter2.Metadata().ID()),
}, func(r *network.NodeAddress) error {
addrs := r.TypedSpec().Addresses
@ -243,32 +248,47 @@ func (suite *NodeAddressSuite) TestFilters() {
case network.NodeAddressCurrentID:
if !reflect.DeepEqual(
addrs,
ipList("1.2.3.4/32 10.0.0.1/8 25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"),
ipList("1.2.3.4/32 10.0.0.1/8 25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64 fdae:41e4:649b:9303:7886:731d:1ce9:4d4/128"),
) {
return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs)
}
case network.NodeAddressRoutedID:
if !reflect.DeepEqual(
addrs,
ipList("10.0.0.1/8 25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"),
) {
return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs)
}
case network.NodeAddressAccumulativeID:
if !reflect.DeepEqual(
addrs,
ipList("1.2.3.4/32 10.0.0.1/8 10.0.0.2/8 25.3.7.9/32 192.168.3.7/24 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"),
ipList("1.2.3.4/32 10.0.0.1/8 10.0.0.2/8 25.3.7.9/32 192.168.3.7/24 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64 fdae:41e4:649b:9303:7886:731d:1ce9:4d4/128"),
) {
return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs)
}
case network.FilteredNodeAddressID(network.NodeAddressCurrentID, filter1.Metadata().ID()):
if !reflect.DeepEqual(
addrs,
ipList("1.2.3.4/32 25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"),
ipList("1.2.3.4/32 25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64 fdae:41e4:649b:9303:7886:731d:1ce9:4d4/128"),
) {
return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs)
}
case network.FilteredNodeAddressID(network.NodeAddressRoutedID, filter1.Metadata().ID()):
if !reflect.DeepEqual(
addrs,
ipList("25.3.7.9/32 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"),
) {
return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs)
}
case network.FilteredNodeAddressID(network.NodeAddressAccumulativeID, filter1.Metadata().ID()):
if !reflect.DeepEqual(
addrs,
ipList("1.2.3.4/32 25.3.7.9/32 192.168.3.7/24 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64"),
ipList("1.2.3.4/32 25.3.7.9/32 192.168.3.7/24 2001:470:6d:30e:4a62:b3ba:180b:b5b8/64 fdae:41e4:649b:9303:7886:731d:1ce9:4d4/128"),
) {
return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs)
}
case network.FilteredNodeAddressID(network.NodeAddressCurrentID, filter2.Metadata().ID()):
case network.FilteredNodeAddressID(network.NodeAddressCurrentID, filter2.Metadata().ID()),
network.FilteredNodeAddressID(network.NodeAddressRoutedID, filter2.Metadata().ID()):
if !reflect.DeepEqual(addrs, ipList("10.0.0.1/8")) {
return fmt.Errorf("unexpected %q: %s", r.Metadata().ID(), addrs)
}

View File

@ -0,0 +1,29 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package nethelpers
import (
"net"
"inet.af/netaddr"
"github.com/talos-systems/talos/pkg/machinery/generic/slices"
)
// MapStdToNetAddr converts a slice of net.IP to a slice of netaddr.Addr.
func MapStdToNetAddr(in []net.IP) []netaddr.IP {
return slices.Map(in, func(std net.IP) netaddr.IP {
addr, _ := netaddr.FromStdIP(std)
return addr.Unmap()
})
}
// MapNetAddrToStd converts a slice of netaddr.Addr to a slice of net.IP.
func MapNetAddrToStd(in []netaddr.IP) []net.IP {
return slices.Map(in, func(addr netaddr.IP) net.IP {
return addr.Unmap().IPAddr().IP
})
}

View File

@ -37,6 +37,12 @@ const (
//
// If some address is no longer present, it will be still kept in the accumulative list.
NodeAddressAccumulativeID = "accumulative"
// Routed current node addresses (as seen at the moment).
//
// This is current addresses minus 'external' IPs, and SideroLink IPs.
//
// This list is used to pick advertised/listen addresses for different services.
NodeAddressRoutedID = "routed"
)
// NodeAddressSpec describes a set of node addresses.

View File

@ -69,6 +69,11 @@ func IsStdULA(ip net.IP, purpose ULAPurpose) bool {
return IsULA(addr, purpose)
}
// NotSideroLinkIP is a shorthand for !IsULA(ip, ULASideroLink).
func NotSideroLinkIP(ip netaddr.IP) bool {
return !IsULA(ip, ULASideroLink)
}
// NotSideroLinkStdIP is a shorthand for !IsStdULA(ip, ULASideroLink).
func NotSideroLinkStdIP(ip net.IP) bool {
return !IsStdULA(ip, ULASideroLink)