mirror of
https://github.com/cloudnativelabs/kube-router.git
synced 2026-02-18 00:21:03 +01:00
280 lines
8.4 KiB
Go
280 lines
8.4 KiB
Go
package proxy
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/ccoveille/go-safecast/v2"
|
|
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/testhelpers"
|
|
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
|
|
"github.com/moby/ipvs"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/vishvananda/netlink"
|
|
"github.com/vishvananda/netns"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes/fake"
|
|
)
|
|
|
|
type stubNode struct {
|
|
name string
|
|
iface string
|
|
ipv4Addrs []net.IP
|
|
ipv6Addrs []net.IP
|
|
}
|
|
|
|
func (n *stubNode) SloppyTCP() *utils.SysctlConfig {
|
|
return &utils.SysctlConfig{}
|
|
}
|
|
|
|
func (n *stubNode) FindBestIPv4NodeAddress() net.IP {
|
|
if len(n.ipv4Addrs) > 0 {
|
|
return n.ipv4Addrs[0]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (n *stubNode) FindBestIPv6NodeAddress() net.IP {
|
|
if len(n.ipv6Addrs) > 0 {
|
|
return n.ipv6Addrs[0]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (n *stubNode) GetNodeIPv4Addrs() []net.IP {
|
|
return append([]net.IP(nil), n.ipv4Addrs...)
|
|
}
|
|
|
|
func (n *stubNode) GetNodeIPv6Addrs() []net.IP {
|
|
return append([]net.IP(nil), n.ipv6Addrs...)
|
|
}
|
|
|
|
func (n *stubNode) GetNodeIPAddrs() []net.IP {
|
|
var res []net.IP
|
|
res = append(res, n.ipv4Addrs...)
|
|
res = append(res, n.ipv6Addrs...)
|
|
return res
|
|
}
|
|
|
|
func (n *stubNode) GetPrimaryNodeIP() net.IP {
|
|
if len(n.ipv4Addrs) > 0 {
|
|
return n.ipv4Addrs[0]
|
|
}
|
|
if len(n.ipv6Addrs) > 0 {
|
|
return n.ipv6Addrs[0]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (n *stubNode) GetNodeInterfaceName() string {
|
|
return n.iface
|
|
}
|
|
|
|
func (n *stubNode) GetNodeMTU() (int, error) {
|
|
return 1500, nil
|
|
}
|
|
|
|
func (n *stubNode) IsIPv4Capable() bool {
|
|
return len(n.ipv4Addrs) > 0
|
|
}
|
|
|
|
func (n *stubNode) IsIPv6Capable() bool {
|
|
return len(n.ipv6Addrs) > 0
|
|
}
|
|
|
|
func (n *stubNode) GetNodeName() string {
|
|
return n.name
|
|
}
|
|
|
|
type stubLinuxNetworking struct {
|
|
services []*ipvs.Service
|
|
}
|
|
|
|
func (s *stubLinuxNetworking) ipvsNewService(_ *ipvs.Service) error { return nil }
|
|
func (s *stubLinuxNetworking) ipvsAddService(svcs []*ipvs.Service, _ net.IP, _ uint16, _ uint16, _ bool,
|
|
_ int32, _ string, _ schedFlags) ([]*ipvs.Service, *ipvs.Service, error) {
|
|
return svcs, nil, nil
|
|
}
|
|
func (s *stubLinuxNetworking) ipvsDelService(_ *ipvs.Service) error { return nil }
|
|
func (s *stubLinuxNetworking) ipvsUpdateService(_ *ipvs.Service) error { return nil }
|
|
func (s *stubLinuxNetworking) ipvsGetServices() ([]*ipvs.Service, error) {
|
|
return append([]*ipvs.Service(nil), s.services...), nil
|
|
}
|
|
func (s *stubLinuxNetworking) ipvsAddServer(_ *ipvs.Service, _ *ipvs.Destination) error { return nil }
|
|
func (s *stubLinuxNetworking) ipvsNewDestination(_ *ipvs.Service, _ *ipvs.Destination) error {
|
|
return nil
|
|
}
|
|
func (s *stubLinuxNetworking) ipvsUpdateDestination(_ *ipvs.Service, _ *ipvs.Destination) error {
|
|
return nil
|
|
}
|
|
func (s *stubLinuxNetworking) ipvsGetDestinations(_ *ipvs.Service) ([]*ipvs.Destination, error) {
|
|
return nil, nil
|
|
}
|
|
func (s *stubLinuxNetworking) ipvsDelDestination(_ *ipvs.Service, _ *ipvs.Destination) error {
|
|
return nil
|
|
}
|
|
func (s *stubLinuxNetworking) ipvsAddFWMarkService(_ []*ipvs.Service, _ uint32, _ uint16, _ uint16, _ uint16,
|
|
_ bool, _ int32, _ string, _ schedFlags) (*ipvs.Service, error) {
|
|
return nil, nil
|
|
}
|
|
func (s *stubLinuxNetworking) ipAddrAdd(_ netlink.Link, _ string, _ string, _ bool) error { return nil }
|
|
func (s *stubLinuxNetworking) ipAddrDel(_ netlink.Link, _ string, _ string) error { return nil }
|
|
func (s *stubLinuxNetworking) getContainerPidWithDocker(_ string) (int, error) { return 0, nil }
|
|
func (s *stubLinuxNetworking) getContainerPidWithCRI(_ string, _ string) (int, error) { return 0, nil }
|
|
func (s *stubLinuxNetworking) getKubeDummyInterface() (netlink.Link, error) { return nil, nil }
|
|
func (s *stubLinuxNetworking) setupRoutesForExternalIPForDSR(_ serviceInfoMap, _ bool, _ bool) error {
|
|
return nil
|
|
}
|
|
func (s *stubLinuxNetworking) configureContainerForDSR(_ string, _ string, _ string, _ int,
|
|
_ netns.NsHandle) error {
|
|
return nil
|
|
}
|
|
func (s *stubLinuxNetworking) setupPolicyRoutingForDSR(_ bool, _ bool) error { return nil }
|
|
func (s *stubLinuxNetworking) findIfaceLinkForPid(_ int) (int, error) { return 0, nil }
|
|
|
|
func buildIPVSServicesFromFixtures(t *testing.T, services *v1.ServiceList) []*ipvs.Service {
|
|
t.Helper()
|
|
var result []*ipvs.Service
|
|
for i := range services.Items {
|
|
svc := &services.Items[i]
|
|
for _, port := range svc.Spec.Ports {
|
|
proto := convertSvcProtoToSysCallProto(strings.ToLower(string(port.Protocol)))
|
|
|
|
require.GreaterOrEqualf(t, port.Port, int32(0), "service %s/%s has negative port %d", svc.Namespace, svc.Name, port.Port)
|
|
const maxServicePort = 1<<16 - 1
|
|
require.LessOrEqualf(t, port.Port, int32(maxServicePort), "service %s/%s port %d exceeds %d", svc.Namespace, svc.Name, port.Port, maxServicePort)
|
|
targetPort := safecast.RequireConvert[uint16](t, port.Port)
|
|
|
|
appendService := func(ipStr string) {
|
|
ip := net.ParseIP(ipStr)
|
|
if ip == nil {
|
|
return
|
|
}
|
|
result = append(result, &ipvs.Service{
|
|
Address: ip,
|
|
Protocol: proto,
|
|
Port: targetPort,
|
|
})
|
|
}
|
|
|
|
for _, clIP := range svc.Spec.ClusterIPs {
|
|
appendService(clIP)
|
|
}
|
|
for _, exIP := range svc.Spec.ExternalIPs {
|
|
appendService(exIP)
|
|
}
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func filterExpectations(src map[string]testhelpers.IPSetExpectation, include func(string) bool) map[string]testhelpers.IPSetExpectation {
|
|
dst := make(map[string]testhelpers.IPSetExpectation)
|
|
for k, v := range src {
|
|
if include(k) {
|
|
dst[k] = v
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func TestNetworkServicesFixtureIPSets(t *testing.T) {
|
|
fixtureDir := filepath.Join("..", "..", "..", "testdata", "ipset_test_1")
|
|
|
|
services := testhelpers.LoadServiceList(t, filepath.Join(fixtureDir, "services.yaml"))
|
|
|
|
client := fake.NewSimpleClientset()
|
|
for i := range services.Items {
|
|
_, err := client.CoreV1().Services(services.Items[i].Namespace).Create(
|
|
context.Background(), services.Items[i].DeepCopy(), metav1.CreateOptions{})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
svcInformer := informerFactory.Core().V1().Services().Informer()
|
|
svcIndexer := svcInformer.GetIndexer()
|
|
for i := range services.Items {
|
|
svc := services.Items[i].DeepCopy()
|
|
svc.SetResourceVersion("1")
|
|
require.NoError(t, svcIndexer.Add(svc))
|
|
}
|
|
|
|
ipv4Handler := testhelpers.NewFakeIPSetHandler(false)
|
|
ipv6Handler := testhelpers.NewFakeIPSetHandler(true)
|
|
t.Cleanup(func() {
|
|
if t.Failed() {
|
|
t.Logf("ipv4 restore script:\n%s", ipv4Handler.Restored())
|
|
t.Logf("ipv6 restore script:\n%s", ipv6Handler.Restored())
|
|
}
|
|
})
|
|
|
|
node := &stubNode{
|
|
name: "kube-router-vm1",
|
|
iface: "eth0",
|
|
ipv4Addrs: []net.IP{net.ParseIP("10.241.0.21")},
|
|
ipv6Addrs: []net.IP{net.ParseIP("2001:db8:ca2:2::e7e5")},
|
|
}
|
|
|
|
ipvsServices := buildIPVSServicesFromFixtures(t, services)
|
|
ln := &stubLinuxNetworking{services: ipvsServices}
|
|
|
|
controller := &NetworkServicesController{
|
|
krNode: node,
|
|
ln: ln,
|
|
ipSetHandlers: map[v1.IPFamily]utils.IPSetHandler{
|
|
v1.IPv4Protocol: ipv4Handler,
|
|
v1.IPv6Protocol: ipv6Handler,
|
|
},
|
|
ipsetMutex: &sync.Mutex{},
|
|
svcLister: svcIndexer,
|
|
}
|
|
|
|
controller.setServiceMap(make(serviceInfoMap))
|
|
controller.setServiceMap(controller.buildServicesInfo())
|
|
|
|
err := controller.syncIpvsFirewall()
|
|
require.NoError(t, err)
|
|
|
|
actual := testhelpers.MergeExpectations(
|
|
testhelpers.ParseRestoreScript(ipv4Handler.Restored()),
|
|
testhelpers.ParseRestoreScript(ipv6Handler.Restored()),
|
|
)
|
|
|
|
include := func(name string) bool {
|
|
// Exclude netpol ipsets
|
|
if strings.Contains(name, "KUBE-DST") || strings.Contains(name, "KUBE-SRC") {
|
|
return false
|
|
}
|
|
// Exclude routing ipsets
|
|
if strings.Contains(name, "pod-subnets") || strings.Contains(name, "node-ips") {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
actual = filterExpectations(actual, include)
|
|
|
|
expected := testhelpers.ParseSnapshotWithFilter(
|
|
t,
|
|
filepath.Join(fixtureDir, "ipset_save.txt"),
|
|
include,
|
|
)
|
|
|
|
require.NotEmpty(t, expected, "expected snapshot should not be empty")
|
|
require.Equal(t, testhelpers.ExpectedKeys(expected), testhelpers.ExpectedKeys(actual))
|
|
|
|
for name, exp := range expected {
|
|
act := actual[name]
|
|
require.Equal(t, exp.SetType, act.SetType, "set type mismatch for %s", name)
|
|
// For now we can't compare the entries because this makes a hard call to netlink libraries via getAllLocalIPs()
|
|
// if we eventually abstract that out or refactor it in some way, then we can compare the entries
|
|
// require.Equal(t, exp.Entries, act.Entries, "entries mismatch for %s", name)
|
|
}
|
|
}
|