From f5db29e36dae93dcad528373fa9f420fe3c6bfed Mon Sep 17 00:00:00 2001 From: bumyongchoi Date: Thu, 23 Apr 2020 04:40:30 -0700 Subject: [PATCH] honor the ClientIP session affinity timeout when set. (#882) * honor the ClientIP session affinity timeout * update moq file * Fix unit test failure due to adding a new arg to ipvsAddService Co-authored-by: Bumyong Choi --- .../proxy/network_services_controller.go | 64 +++++---- .../proxy/network_services_controller_moq.go | 132 ++++++++++-------- .../proxy/network_services_controller_test.go | 6 +- .../proxy/service_endpoints_sync.go | 10 +- 4 files changed, 115 insertions(+), 97 deletions(-) diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index de73b766..8ffc1e86 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -69,7 +69,7 @@ var ( type ipvsCalls interface { ipvsNewService(ipvsSvc *ipvs.Service) error - ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) + ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) ipvsDelService(ipvsSvc *ipvs.Service) error ipvsUpdateService(ipvsSvc *ipvs.Service) error ipvsGetServices() ([]*ipvs.Service, error) @@ -78,7 +78,7 @@ type ipvsCalls interface { ipvsUpdateDestination(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error ipvsGetDestinations(ipvsSvc *ipvs.Service) ([]*ipvs.Destination, error) ipvsDelDestination(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error - ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) + ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) } type netlinkCalls interface { @@ -237,23 +237,24 @@ type NetworkServicesController struct { // internal representation of kubernetes service type serviceInfo struct { - name string - namespace string - clusterIP net.IP - port int - targetPort string - protocol string - nodePort int - sessionAffinity bool - directServerReturn bool - scheduler string - directServerReturnMethod string - hairpin bool - skipLbIps bool - externalIPs []string - loadBalancerIPs []string - local bool - flags schedFlags + name string + namespace string + clusterIP net.IP + port int + targetPort string + protocol string + nodePort int + sessionAffinity bool + sessionAffinityTimeoutSeconds int32 + directServerReturn bool + scheduler string + directServerReturnMethod string + hairpin bool + skipLbIps bool + externalIPs []string + loadBalancerIPs []string + local bool + flags schedFlags } // IPVS scheduler flags @@ -1070,7 +1071,13 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap { svcInfo.loadBalancerIPs = append(svcInfo.loadBalancerIPs, lbIngress.IP) } } - svcInfo.sessionAffinity = svc.Spec.SessionAffinity == "ClientIP" + svcInfo.sessionAffinity = svc.Spec.SessionAffinity == api.ServiceAffinityClientIP + + if svcInfo.sessionAffinity { + // Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP" + // https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/v1/defaults.go#L106 + svcInfo.sessionAffinityTimeoutSeconds = *svc.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds + } _, svcInfo.hairpin = svc.ObjectMeta.Annotations[svcHairpinAnnotation] _, svcInfo.local = svc.ObjectMeta.Annotations[svcLocalAnnotation] _, svcInfo.skipLbIps = svc.ObjectMeta.Annotations[svcSkipLbIpsAnnotation] @@ -1473,12 +1480,11 @@ func ipvsDestinationString(d *ipvs.Destination) string { return fmt.Sprintf("%s:%v (Weight: %v)", d.Address, d.Port, d.Weight) } -func ipvsSetPersistence(svc *ipvs.Service, p bool) { +func ipvsSetPersistence(svc *ipvs.Service, p bool, timeout int32) { if p { svc.Flags |= 0x0001 svc.Netmask |= 0xFFFFFFFF - // TODO: once service manifest supports timeout time remove hardcoding - svc.Timeout = 180 * 60 + svc.Timeout = uint32(timeout) } else { svc.Flags &^= 0x0001 svc.Netmask &^= 0xFFFFFFFF @@ -1530,13 +1536,13 @@ func changedIpvsSchedFlags(svc *ipvs.Service, s schedFlags) bool { return false } -func (ln *linuxNetworking) ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) { +func (ln *linuxNetworking) ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { var err error for _, svc := range svcs { if vip.Equal(svc.Address) && protocol == svc.Protocol && port == svc.Port { if (persistent && (svc.Flags&0x0001) == 0) || (!persistent && (svc.Flags&0x0001) != 0) { - ipvsSetPersistence(svc, persistent) + ipvsSetPersistence(svc, persistent, persistentTimeout) if changedIpvsSchedFlags(svc, flags) { ipvsSetSchedFlags(svc, flags) @@ -1583,7 +1589,7 @@ func (ln *linuxNetworking) ipvsAddService(svcs []*ipvs.Service, vip net.IP, prot SchedName: scheduler, } - ipvsSetPersistence(&svc, persistent) + ipvsSetPersistence(&svc, persistent, persistentTimeout) ipvsSetSchedFlags(&svc, flags) err = ln.ipvsNewService(&svc) @@ -1605,7 +1611,7 @@ func generateFwmark(ip, protocol, port string) uint32 { } // ipvsAddFWMarkService: creates a IPVS service using FWMARK -func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) { +func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { var protocolStr string if protocol == syscall.IPPROTO_TCP { @@ -1627,7 +1633,7 @@ func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint1 for _, svc := range svcs { if fwmark == svc.FWMark { if (persistent && (svc.Flags&0x0001) == 0) || (!persistent && (svc.Flags&0x0001) != 0) { - ipvsSetPersistence(svc, persistent) + ipvsSetPersistence(svc, persistent, persistentTimeout) if changedIpvsSchedFlags(svc, flags) { ipvsSetSchedFlags(svc, flags) @@ -1674,7 +1680,7 @@ func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint1 SchedName: ipvs.RoundRobin, } - ipvsSetPersistence(&svc, persistent) + ipvsSetPersistence(&svc, persistent, persistentTimeout) ipvsSetSchedFlags(&svc, flags) err = ln.ipvsNewService(&svc) diff --git a/pkg/controllers/proxy/network_services_controller_moq.go b/pkg/controllers/proxy/network_services_controller_moq.go index 6c7256f3..2afaa1a3 100644 --- a/pkg/controllers/proxy/network_services_controller_moq.go +++ b/pkg/controllers/proxy/network_services_controller_moq.go @@ -53,13 +53,13 @@ var _ LinuxNetworking = &LinuxNetworkingMock{} // ipAddrDelFunc: func(iface netlink.Link, ip string) error { // panic("mock out the ipAddrDel method") // }, -// ipvsAddFWMarkServiceFunc: func(vip net.IP, protocol uint16, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) { +// ipvsAddFWMarkServiceFunc: func(vip net.IP, protocol uint16, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { // panic("mock out the ipvsAddFWMarkService method") // }, // ipvsAddServerFunc: func(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error { // panic("mock out the ipvsAddServer method") // }, -// ipvsAddServiceFunc: func(svcs []*ipvs.Service, vip net.IP, protocol uint16, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) { +// ipvsAddServiceFunc: func(svcs []*ipvs.Service, vip net.IP, protocol uint16, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { // panic("mock out the ipvsAddService method") // }, // ipvsDelDestinationFunc: func(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error { @@ -115,13 +115,13 @@ type LinuxNetworkingMock struct { ipAddrDelFunc func(iface netlink.Link, ip string) error // ipvsAddFWMarkServiceFunc mocks the ipvsAddFWMarkService method. - ipvsAddFWMarkServiceFunc func(vip net.IP, protocol uint16, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) + ipvsAddFWMarkServiceFunc func(vip net.IP, protocol uint16, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) // ipvsAddServerFunc mocks the ipvsAddServer method. ipvsAddServerFunc func(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error // ipvsAddServiceFunc mocks the ipvsAddService method. - ipvsAddServiceFunc func(svcs []*ipvs.Service, vip net.IP, protocol uint16, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) + ipvsAddServiceFunc func(svcs []*ipvs.Service, vip net.IP, protocol uint16, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) // ipvsDelDestinationFunc mocks the ipvsDelDestination method. ipvsDelDestinationFunc func(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error @@ -198,6 +198,8 @@ type LinuxNetworkingMock struct { Port uint16 // Persistent is the persistent argument value. Persistent bool + // PersistentTimeout is the persistentTimeout argument value. + PersistentTimeout int32 // Scheduler is the scheduler argument value. Scheduler string // Flags is the flags argument value. @@ -222,6 +224,8 @@ type LinuxNetworkingMock struct { Port uint16 // Persistent is the persistent argument value. Persistent bool + // PersistentTimeout is the persistentTimeout argument value. + PersistentTimeout int32 // Scheduler is the scheduler argument value. Scheduler string // Flags is the flags argument value. @@ -435,49 +439,53 @@ func (mock *LinuxNetworkingMock) ipAddrDelCalls() []struct { } // ipvsAddFWMarkService calls ipvsAddFWMarkServiceFunc. -func (mock *LinuxNetworkingMock) ipvsAddFWMarkService(vip net.IP, protocol uint16, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) { +func (mock *LinuxNetworkingMock) ipvsAddFWMarkService(vip net.IP, protocol uint16, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { if mock.ipvsAddFWMarkServiceFunc == nil { panic("LinuxNetworkingMock.ipvsAddFWMarkServiceFunc: method is nil but LinuxNetworking.ipvsAddFWMarkService was just called") } callInfo := struct { - Vip net.IP - Protocol uint16 - Port uint16 - Persistent bool - Scheduler string - Flags schedFlags + Vip net.IP + Protocol uint16 + Port uint16 + Persistent bool + PersistentTimeout int32 + Scheduler string + Flags schedFlags }{ - Vip: vip, - Protocol: protocol, - Port: port, - Persistent: persistent, - Scheduler: scheduler, - Flags: flags, + Vip: vip, + Protocol: protocol, + Port: port, + Persistent: persistent, + PersistentTimeout: persistentTimeout, + Scheduler: scheduler, + Flags: flags, } lockLinuxNetworkingMockipvsAddFWMarkService.Lock() mock.calls.ipvsAddFWMarkService = append(mock.calls.ipvsAddFWMarkService, callInfo) lockLinuxNetworkingMockipvsAddFWMarkService.Unlock() - return mock.ipvsAddFWMarkServiceFunc(vip, protocol, port, persistent, scheduler, flags) + return mock.ipvsAddFWMarkServiceFunc(vip, protocol, port, persistent, persistentTimeout, scheduler, flags) } // ipvsAddFWMarkServiceCalls gets all the calls that were made to ipvsAddFWMarkService. // Check the length with: // len(mockedLinuxNetworking.ipvsAddFWMarkServiceCalls()) func (mock *LinuxNetworkingMock) ipvsAddFWMarkServiceCalls() []struct { - Vip net.IP - Protocol uint16 - Port uint16 - Persistent bool - Scheduler string - Flags schedFlags + Vip net.IP + Protocol uint16 + Port uint16 + Persistent bool + PersistentTimeout int32 + Scheduler string + Flags schedFlags } { var calls []struct { - Vip net.IP - Protocol uint16 - Port uint16 - Persistent bool - Scheduler string - Flags schedFlags + Vip net.IP + Protocol uint16 + Port uint16 + Persistent bool + PersistentTimeout int32 + Scheduler string + Flags schedFlags } lockLinuxNetworkingMockipvsAddFWMarkService.RLock() calls = mock.calls.ipvsAddFWMarkService @@ -521,53 +529,57 @@ func (mock *LinuxNetworkingMock) ipvsAddServerCalls() []struct { } // ipvsAddService calls ipvsAddServiceFunc. -func (mock *LinuxNetworkingMock) ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol uint16, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) { +func (mock *LinuxNetworkingMock) ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol uint16, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { if mock.ipvsAddServiceFunc == nil { panic("LinuxNetworkingMock.ipvsAddServiceFunc: method is nil but LinuxNetworking.ipvsAddService was just called") } callInfo := struct { - Svcs []*ipvs.Service - Vip net.IP - Protocol uint16 - Port uint16 - Persistent bool - Scheduler string - Flags schedFlags + Svcs []*ipvs.Service + Vip net.IP + Protocol uint16 + Port uint16 + Persistent bool + PersistentTimeout int32 + Scheduler string + Flags schedFlags }{ - Svcs: svcs, - Vip: vip, - Protocol: protocol, - Port: port, - Persistent: persistent, - Scheduler: scheduler, - Flags: flags, + Svcs: svcs, + Vip: vip, + Protocol: protocol, + Port: port, + Persistent: persistent, + PersistentTimeout: persistentTimeout, + Scheduler: scheduler, + Flags: flags, } lockLinuxNetworkingMockipvsAddService.Lock() mock.calls.ipvsAddService = append(mock.calls.ipvsAddService, callInfo) lockLinuxNetworkingMockipvsAddService.Unlock() - return mock.ipvsAddServiceFunc(svcs, vip, protocol, port, persistent, scheduler, flags) + return mock.ipvsAddServiceFunc(svcs, vip, protocol, port, persistent, persistentTimeout, scheduler, flags) } // ipvsAddServiceCalls gets all the calls that were made to ipvsAddService. // Check the length with: // len(mockedLinuxNetworking.ipvsAddServiceCalls()) func (mock *LinuxNetworkingMock) ipvsAddServiceCalls() []struct { - Svcs []*ipvs.Service - Vip net.IP - Protocol uint16 - Port uint16 - Persistent bool - Scheduler string - Flags schedFlags + Svcs []*ipvs.Service + Vip net.IP + Protocol uint16 + Port uint16 + Persistent bool + PersistentTimeout int32 + Scheduler string + Flags schedFlags } { var calls []struct { - Svcs []*ipvs.Service - Vip net.IP - Protocol uint16 - Port uint16 - Persistent bool - Scheduler string - Flags schedFlags + Svcs []*ipvs.Service + Vip net.IP + Protocol uint16 + Port uint16 + Persistent bool + PersistentTimeout int32 + Scheduler string + Flags schedFlags } lockLinuxNetworkingMockipvsAddService.RLock() calls = mock.calls.ipvsAddService diff --git a/pkg/controllers/proxy/network_services_controller_test.go b/pkg/controllers/proxy/network_services_controller_test.go index 6ceb2616..276b4e99 100644 --- a/pkg/controllers/proxy/network_services_controller_test.go +++ b/pkg/controllers/proxy/network_services_controller_test.go @@ -52,7 +52,7 @@ func (lnm *LinuxNetworkingMockImpl) ipAddrAdd(iface netlink.Link, addr string, a func (lnm *LinuxNetworkingMockImpl) ipvsAddServer(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error { return nil } -func (lnm *LinuxNetworkingMockImpl) ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, scheduler string, flags schedFlags) (*ipvs.Service, error) { +func (lnm *LinuxNetworkingMockImpl) ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { svc := &ipvs.Service{ Address: vip, Protocol: protocol, @@ -179,8 +179,8 @@ var _ = Describe("NetworkServicesController", func() { }) JustBeforeEach(func() { // pre-inject some foo ipvs Service to verify its deletion - fooSvc1, _ = lnm.ipvsAddService(lnm.ipvsSvcs, net.ParseIP("1.2.3.4"), 6, 1234, false, "rr", schedFlags{}) - fooSvc2, _ = lnm.ipvsAddService(lnm.ipvsSvcs, net.ParseIP("5.6.7.8"), 6, 5678, false, "rr", schedFlags{true, true, false}) + fooSvc1, _ = lnm.ipvsAddService(lnm.ipvsSvcs, net.ParseIP("1.2.3.4"), 6, 1234, false, 0, "rr", schedFlags{}) + fooSvc2, _ = lnm.ipvsAddService(lnm.ipvsSvcs, net.ParseIP("5.6.7.8"), 6, 5678, false, 0, "rr", schedFlags{true, true, false}) syncErr = nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap) }) It("Should have called syncIpvsServices OK", func() { diff --git a/pkg/controllers/proxy/service_endpoints_sync.go b/pkg/controllers/proxy/service_endpoints_sync.go index fe46fa88..bc6dfe2a 100644 --- a/pkg/controllers/proxy/service_endpoints_sync.go +++ b/pkg/controllers/proxy/service_endpoints_sync.go @@ -109,7 +109,7 @@ func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serv } // create IPVS service for the service to be exposed through the cluster ip - ipvsClusterVipSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags) + ipvsClusterVipSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { glog.Errorf("Failed to create ipvs service for cluster ip: %s", err.Error()) continue @@ -196,7 +196,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi nodeServiceIds = make([]string, len(addrs)) for i, addr := range addrs { - ipvsNodeportSvcs[i], err = nsc.ln.ipvsAddService(ipvsSvcs, addr.IP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.scheduler, svc.flags) + ipvsNodeportSvcs[i], err = nsc.ln.ipvsAddService(ipvsSvcs, addr.IP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { glog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error()) continue @@ -207,7 +207,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi } } else { ipvsNodeportSvcs = make([]*ipvs.Service, 1) - ipvsNodeportSvcs[0], err = nsc.ln.ipvsAddService(ipvsSvcs, nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.scheduler, svc.flags) + ipvsNodeportSvcs[0], err = nsc.ln.ipvsAddService(ipvsSvcs, nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { glog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error()) continue @@ -287,7 +287,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser for _, externalIP := range extIPSet.List() { var externalIpServiceId string if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" { - ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags) + ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { glog.Errorf("Failed to create ipvs service for External IP: %s due to: %s", externalIP, err.Error()) continue @@ -321,7 +321,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser } // create IPVS service for the service to be exposed through the external ip - ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.scheduler, svc.flags) + ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { glog.Errorf("Failed to create ipvs service for external ip: %s due to %s", externalIP, err.Error()) continue