From 28aab6ea204cfc09ccce2f14e487a17eb067d23b Mon Sep 17 00:00:00 2001 From: Aaron U'Ren Date: Fri, 4 Feb 2022 15:51:59 -0600 Subject: [PATCH] fact(service_endpoints_sync): simplify external IP logic This is an attempt to make the external IP logic easier to follow and more straight forward for future changes like consolidating the iptables logic. --- .../proxy/network_services_controller.go | 5 - .../proxy/service_endpoints_sync.go | 322 +++++++++--------- pkg/controllers/proxy/utils.go | 63 ++++ 3 files changed, 224 insertions(+), 166 deletions(-) diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index 05b24154..32136b1f 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -994,11 +994,6 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *api.Service) { } } -type externalIPService struct { - ipvsSvc *ipvs.Service - externalIP string -} - func hasActiveEndpoints(endpoints []endpointsInfo) bool { for _, endpoint := range endpoints { if endpoint.isLocal { diff --git a/pkg/controllers/proxy/service_endpoints_sync.go b/pkg/controllers/proxy/service_endpoints_sync.go index f9cf8e46..93e3adae 100644 --- a/pkg/controllers/proxy/service_endpoints_sync.go +++ b/pkg/controllers/proxy/service_endpoints_sync.go @@ -10,7 +10,6 @@ import ( "syscall" "time" - "github.com/cloudnativelabs/kube-router/pkg/cri" "github.com/cloudnativelabs/kube-router/pkg/metrics" "github.com/cloudnativelabs/kube-router/pkg/utils" "github.com/moby/ipvs" @@ -261,35 +260,9 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { - ipvsSvcs, err := nsc.ln.ipvsGetServices() - if err != nil { - return errors.New("Failed get list of IPVS services due to: " + err.Error()) - } for k, svc := range serviceInfoMap { - var protocol uint16 - - switch svc.protocol { - case tcpProtocol: - protocol = syscall.IPPROTO_TCP - case udpProtocol: - protocol = syscall.IPPROTO_UDP - default: - protocol = syscall.IPPROTO_NONE - } - endpoints := endpointsInfoMap[k] - dummyVipInterface, err := nsc.ln.getKubeDummyInterface() - if err != nil { - return errors.New("Failed creating dummy interface: " + err.Error()) - } - - externalIPServices := make([]externalIPService, 0) - // create IPVS service for the service to be exposed through the external IP's - // For external IP (which are meant for ingress traffic) Kube-router setsup IPVS services - // based on FWMARK to enable Direct server return functionality. DSR requires a director - // without a VIP http://www.austintek.com/LVS/LVS-HOWTO/HOWTO/LVS-HOWTO.routing_to_VIP-less_director.html - // to avoid martian packets extIPSet := sets.NewString(svc.externalIPs...) if !svc.skipLbIps { extIPSet = extIPSet.Union(sets.NewString(svc.loadBalancerIPs...)) @@ -308,78 +281,26 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser for _, externalIP := range extIPSet.List() { var externalIPServiceID string if svc.directServerReturn && svc.directServerReturnMethod == tunnelInterfaceType { - fwMark, err := nsc.generateUniqueFWMark(externalIP, svc.protocol, strconv.Itoa(svc.port)) - if err != nil { - klog.Errorf("failed to generate FW mark") - continue + // for a DSR service, do the work necessary to set up the IPVS service for DSR, then use the FW mark + // that was generated to add this external IP to the activeServiceEndpointMap + if err := nsc.setupExternalIPForDSRService(svc, externalIP, endpoints); err != nil { + return fmt.Errorf("failed to setup DSR endpoint %s: %v", externalIP, err) } - ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(ipvsSvcs, fwMark, protocol, uint16(svc.port), - svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) - if err != nil { - klog.Errorf("failed to create IPVS service for External IP: %s due to: %s", - externalIP, err.Error()) - continue - } - externalIPServices = append(externalIPServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, - externalIP: externalIP}) - + fwMark := nsc.lookupFWMarkByService(externalIP, svc.protocol, fmt.Sprint(svc.port)) externalIPServiceID = fmt.Sprint(fwMark) - - // ensure there is iptables mangle table rule to FWMARK the packet - err = setupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), externalIPServiceID, - nsc.dsrTCPMSS) - if err != nil { - klog.Errorf("failed to setup mangle table rule to forward the traffic to external IP") - continue - } - - // ensure VIP less director. we dont assign VIP to any interface - err = nsc.ln.ipAddrDel(dummyVipInterface, externalIP) - if err != nil && err.Error() != IfaceHasNoAddr { - klog.Errorf("failed to delete external ip address from dummyVipInterface due to %v", err) - continue - } - // do policy routing to deliver the packet locally so that IPVS can pick the packet - err = routeVIPTrafficToDirector("0x" + fmt.Sprintf("%x", fwMark)) - if err != nil { - klog.Errorf("failed to setup ip rule to lookup traffic to external IP: %s through custom "+ - "route table due to %v", externalIP, err) - continue - } } else { - // ensure director with vip assigned - err := nsc.ln.ipAddrAdd(dummyVipInterface, externalIP, true) - if err != nil && err.Error() != IfaceHasAddr { - klog.Errorf("failed to assign external ip %s to dummy interface %s due to %v", - externalIP, KubeDummyIf, err) + // for a non-DSR service, do the work necessary to setup the IPVS service, then use its IP, protocol, + // and port to add this external IP to the activeServiceEndpointMap + if err := nsc.setupExternalIPForService(svc, externalIP, endpoints); err != nil { + return fmt.Errorf("failed to setup service endpoint %s: %v", externalIP, err) } - - // create IPVS service for the service to be exposed through the external ip - ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, - uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) - if err != nil { - klog.Errorf("failed to create ipvs service for external ip: %s due to %v", - externalIP, err) - continue - } - externalIPServices = append(externalIPServices, externalIPService{ - ipvsSvc: ipvsExternalIPSvc, externalIP: externalIP}) externalIPServiceID = generateIPPortID(externalIP, svc.protocol, strconv.Itoa(svc.port)) - - // ensure there is NO iptables mangle table rule to FW mark the packet - fwMark := nsc.lookupFWMarkByService(externalIP, svc.protocol, strconv.Itoa(svc.port)) - switch { - case fwMark == 0: - klog.V(2).Infof("no FW mark found for service, nothing to cleanup") - case fwMark != 0: - klog.V(2).Infof("the following service '%s:%s:%d' had fwMark associated with it: %d doing "+ - "additional cleanup", externalIP, svc.protocol, svc.port, fwMark) - if err = nsc.cleanupDSRService(fwMark); err != nil { - klog.Errorf("failed to cleanup DSR service: %v", err) - } - } } + // add external service to the activeServiceEndpointMap by its externalIPServiceID. In this case, + // externalIPServiceID is a little confusing because in the case of DSR services it is the FW Mark that is + // generated for it, and for non-DSR services it is the combination of: ip + "-" + protocol + "-" + port + // TODO: remove the difference between DSR and non-DSR services and make a standard activeServiceEndpointMap[externalIPServiceID] = make([]string, 0) for _, endpoint := range endpoints { if !svc.local || (svc.local && endpoint.isLocal) { @@ -389,82 +310,161 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser } } } + } - // add IPVS remote server to the IPVS service - for _, endpoint := range endpoints { - dst := ipvs.Destination{ - Address: net.ParseIP(endpoint.ip), - AddressFamily: syscall.AF_INET, - Port: uint16(endpoint.port), - Weight: 1, - } + return nil +} - for _, externalIPService := range externalIPServices { - if svc.local && !endpoint.isLocal { - continue - } +// setupExternalIPForService does the basic work to setup a non-DSR based external IP for service. This includes adding +// the IPVS service to the host if it is missing, and setting up the dummy interface to be able to receive traffic on +// the node. +func (nsc *NetworkServicesController) setupExternalIPForService(svc *serviceInfo, externalIP string, + endpoints []endpointsInfo) error { + // Get everything we need to get setup to process the external IP + protocol := convertSvcProtoToSysCallProto(svc.protocol) + dummyVipInterface, err := nsc.ln.getKubeDummyInterface() + if err != nil { + return fmt.Errorf("failed creating dummy interface: %v", err) + } + ipvsSvcs, err := nsc.ln.ipvsGetServices() + if err != nil { + return fmt.Errorf("failed get list of IPVS services due to: %v", err) + } - if svc.directServerReturn && svc.directServerReturnMethod == tunnelInterfaceType { - dst.ConnectionFlags = ipvs.ConnectionFlagTunnel - } + // ensure director with vip assigned + err = nsc.ln.ipAddrAdd(dummyVipInterface, externalIP, true) + if err != nil && err.Error() != IfaceHasAddr { + return fmt.Errorf("failed to assign external ip %s to dummy interface %s due to %v", + externalIP, KubeDummyIf, err) + } - // add server to IPVS service - err := nsc.ln.ipvsAddServer(externalIPService.ipvsSvc, &dst) - if err != nil { - klog.Errorf(err.Error()) - } + // create IPVS service for the service to be exposed through the external ip + ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, + uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) + if err != nil { + return fmt.Errorf("failed to create ipvs service for external ip: %s due to %v", + externalIP, err) + } - // For now just support IPVS tunnel mode, we can add other ways of DSR in future - if svc.directServerReturn && svc.directServerReturnMethod == tunnelInterfaceType { - - podObj, err := nsc.getPodObjectForEndpoint(endpoint.ip) - if err != nil { - klog.Errorf("Failed to find endpoint with ip: " + endpoint.ip + ". so skipping " + - "preparing endpoint for DSR") - continue - } - - // we are only concerned with endpoint pod running on current node - if strings.Compare(podObj.Status.HostIP, nsc.nodeIP.String()) != 0 { - continue - } - - containerURL := podObj.Status.ContainerStatuses[0].ContainerID - runtime, containerID, err := cri.EndpointParser(containerURL) - if err != nil { - klog.Errorf("couldn't get containerID (container=%s, pod=%s). Skipping DSR endpoint "+ - "set up", podObj.Spec.Containers[0].Name, podObj.Name) - continue - } - - if containerID == "" { - klog.Errorf("Failed to find container id for the endpoint with ip: %s so skipping "+ - "preparing endpoint for DSR", endpoint.ip) - continue - } - - if runtime == "docker" { - // WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet. - err = nsc.ln.prepareEndpointForDsrWithDocker(containerID, endpoint.ip, - externalIPService.externalIP) - if err != nil { - klog.Errorf("Failed to prepare endpoint %s to do direct server return due to %s", - endpoint.ip, err.Error()) - } - } else { - // We expect CRI compliant runtimes here - // ugly workaround, refactoring of pkg/Proxy is required - err = nsc.ln.(*linuxNetworking).prepareEndpointForDsrWithCRI(nsc.dsr.runtimeEndpoint, - containerID, endpoint.ip, externalIPService.externalIP) - if err != nil { - klog.Errorf("Failed to prepare endpoint %s to do DSR due to: %s", - endpoint.ip, err.Error()) - } - } - } - } + // ensure there is NO iptables mangle table rule to FW mark the packet + fwMark := nsc.lookupFWMarkByService(externalIP, svc.protocol, strconv.Itoa(svc.port)) + switch { + case fwMark == 0: + klog.V(2).Infof("no FW mark found for service, nothing to cleanup") + case fwMark != 0: + klog.V(2).Infof("the following service '%s:%s:%d' had fwMark associated with it: %d doing "+ + "additional cleanup", externalIP, svc.protocol, svc.port, fwMark) + if err = nsc.cleanupDSRService(fwMark); err != nil { + return fmt.Errorf("failed to cleanup DSR service: %v", err) } } + + // add pod endpoints to the IPVS service + for _, endpoint := range endpoints { + // if this specific endpoint isn't local, there is nothing for us to do and we can go to the next record + if svc.local && !endpoint.isLocal { + continue + } + + // create the basic IPVS destination record + dst := ipvs.Destination{ + Address: net.ParseIP(endpoint.ip), + AddressFamily: syscall.AF_INET, + Port: uint16(endpoint.port), + Weight: 1, + } + + if err = nsc.ln.ipvsAddServer(ipvsExternalIPSvc, &dst); err != nil { + return fmt.Errorf("unable to add destination %s to externalIP service %s: %v", + endpoint.ip, externalIP, err) + } + } + + return nil +} + +// setupExternalIPForDSRService does the basic setup necessary to set up an External IP service for DSR. This includes +// generating a unique FW mark for the service, setting up the mangle rules to apply the FW mark, setting up IPVS to +// work with the FW mark, and ensuring that the IP doesn't exist on the dummy interface so that the traffic doesn't +// accidentally ingress the packet and change it. +// +// For external IPs (which are meant for ingress traffic) configured for DSR, kube-router sets up IPVS services +// based on FWMARK to enable direct server return functionality. DSR requires a director without a VIP +// http://www.austintek.com/LVS/LVS-HOWTO/HOWTO/LVS-HOWTO.routing_to_VIP-less_director.html to avoid martian packets +func (nsc *NetworkServicesController) setupExternalIPForDSRService(svc *serviceInfo, externalIP string, + endpoints []endpointsInfo) error { + // Get everything we need to get setup to process the external IP + protocol := convertSvcProtoToSysCallProto(svc.protocol) + dummyVipInterface, err := nsc.ln.getKubeDummyInterface() + if err != nil { + return errors.New("Failed creating dummy interface: " + err.Error()) + } + ipvsSvcs, err := nsc.ln.ipvsGetServices() + if err != nil { + return errors.New("Failed get list of IPVS services due to: " + err.Error()) + } + + fwMark, err := nsc.generateUniqueFWMark(externalIP, svc.protocol, strconv.Itoa(svc.port)) + if err != nil { + return fmt.Errorf("failed to generate FW mark") + } + ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(ipvsSvcs, fwMark, protocol, uint16(svc.port), + svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) + if err != nil { + return fmt.Errorf("failed to create IPVS service for External IP: %s due to: %s", + externalIP, err.Error()) + } + + externalIPServiceID := fmt.Sprint(fwMark) + + // ensure there is iptables mangle table rule to FWMARK the packet + err = setupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), externalIPServiceID, + nsc.dsrTCPMSS) + if err != nil { + return fmt.Errorf("failed to setup mangle table rule to forward the traffic to external IP") + } + + // ensure VIP less director. we dont assign VIP to any interface + err = nsc.ln.ipAddrDel(dummyVipInterface, externalIP) + if err != nil && err.Error() != IfaceHasNoAddr { + return fmt.Errorf("failed to delete external ip address from dummyVipInterface due to %v", err) + } + + // do policy routing to deliver the packet locally so that IPVS can pick the packet + err = routeVIPTrafficToDirector("0x" + fmt.Sprintf("%x", fwMark)) + if err != nil { + return fmt.Errorf("failed to setup ip rule to lookup traffic to external IP: %s through custom "+ + "route table due to %v", externalIP, err) + } + + // add pod endpoints to the IPVS service + for _, endpoint := range endpoints { + // if this specific endpoint isn't local, there is nothing for us to do and we can go to the next record + if svc.local && !endpoint.isLocal { + continue + } + + // create the basic IPVS destination record + dst := ipvs.Destination{ + Address: net.ParseIP(endpoint.ip), + AddressFamily: syscall.AF_INET, + ConnectionFlags: ipvs.ConnectionFlagTunnel, + Port: uint16(endpoint.port), + Weight: 1, + } + + // add the destination for the IPVS service for this external IP + if err = nsc.ln.ipvsAddServer(ipvsExternalIPSvc, &dst); err != nil { + return fmt.Errorf("unable to add destination %s to externalIP service %s: %v", + endpoint.ip, externalIP, err) + } + + // add the external IP to a virtual interface inside the pod so that the pod can receive it + if err = nsc.addDSRIPInsidePodNetNamespace(externalIP, endpoint.ip); err != nil { + return fmt.Errorf("unable to setup DSR receiver inside pod: %v", err) + } + } + return nil } diff --git a/pkg/controllers/proxy/utils.go b/pkg/controllers/proxy/utils.go index 164e048c..996d16bd 100644 --- a/pkg/controllers/proxy/utils.go +++ b/pkg/controllers/proxy/utils.go @@ -6,8 +6,10 @@ import ( "net" "strconv" "strings" + "syscall" "time" + "github.com/cloudnativelabs/kube-router/pkg/cri" "github.com/cloudnativelabs/kube-router/pkg/utils" "github.com/vishvananda/netlink" "github.com/vishvananda/netns" @@ -282,3 +284,64 @@ func endpointsMapsEquivalent(a, b endpointsInfoMap) bool { return true } + +// convertSvcProtoToSysCallProto converts a string based protocol that we receive from Kubernetes via something like the +// serviceInfo object into the uint16 syscall version of the protocol that is capable of interfacing with aspects of the +// Linux sub-sysem like IPVS +func convertSvcProtoToSysCallProto(svcProtocol string) uint16 { + switch svcProtocol { + case tcpProtocol: + return syscall.IPPROTO_TCP + case udpProtocol: + return syscall.IPPROTO_UDP + default: + return syscall.IPPROTO_NONE + } +} + +// addDSRIPInsidePodNetNamespace takes a given external IP and endpoint IP for a DSR service and then uses the container +// runtime to add the external IP to a virtual interface inside the pod so that it can receive DSR traffic inside its +// network namespace. +func (nsc *NetworkServicesController) addDSRIPInsidePodNetNamespace(externalIP, endpointIP string) error { + podObj, err := nsc.getPodObjectForEndpoint(endpointIP) + if err != nil { + return fmt.Errorf("failed to find endpoint with ip: %s. so skipping preparing endpoint for DSR", + endpointIP) + } + + // we are only concerned with endpoint pod running on current node + if strings.Compare(podObj.Status.HostIP, nsc.nodeIP.String()) != 0 { + return nil + } + + containerURL := podObj.Status.ContainerStatuses[0].ContainerID + runtime, containerID, err := cri.EndpointParser(containerURL) + if err != nil { + return fmt.Errorf("couldn't get containerID (container=%s, pod=%s). Skipping DSR endpoint set up", + podObj.Spec.Containers[0].Name, podObj.Name) + } + + if containerID == "" { + return fmt.Errorf("failed to find container id for the endpoint with ip: %s so skipping preparing "+ + "endpoint for DSR", endpointIP) + } + + if runtime == "docker" { + // WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet. + err = nsc.ln.prepareEndpointForDsrWithDocker(containerID, endpointIP, externalIP) + if err != nil { + return fmt.Errorf("failed to prepare endpoint %s to do direct server return due to %v", + endpointIP, err) + } + } else { + // We expect CRI compliant runtimes here + // ugly workaround, refactoring of pkg/Proxy is required + err = nsc.ln.(*linuxNetworking).prepareEndpointForDsrWithCRI(nsc.dsr.runtimeEndpoint, + containerID, endpointIP, externalIP) + if err != nil { + return fmt.Errorf("failed to prepare endpoint %s to do DSR due to: %v", endpointIP, err) + } + } + + return nil +}