From f696c514db03299a631c1a12dbef496d738e059f Mon Sep 17 00:00:00 2001 From: Timofey Titovets Date: Thu, 2 Mar 2023 22:01:11 +0100 Subject: [PATCH 1/2] fix: nodePort #2704 - publish ready endpoints fix: lowercase local functions chore: reuse nodes, cut logs --- source/service.go | 49 ++++++++++++++++++++++++++++++++++++++++- source/service_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/source/service.go b/source/service.go index b19ba4882..624d13c20 100644 --- a/source/service.go +++ b/source/service.go @@ -590,6 +590,30 @@ func extractLoadBalancerTargets(svc *v1.Service, resolveLoadBalancerHostname boo return targets } +func isPodStatusReady(status v1.PodStatus) bool { + _, condition := getPodCondition(&status, v1.PodReady) + return condition != nil && condition.Status == v1.ConditionTrue +} + +func getPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) { + if status == nil { + return -1, nil + } + return getPodConditionFromList(status.Conditions, conditionType) +} + +func getPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) { + if conditions == nil { + return -1, nil + } + for i := range conditions { + if conditions[i].Type == conditionType { + return i, &conditions[i] + } + } + return -1, nil +} + func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targets, error) { var ( internalIPs endpoint.Targets @@ -615,6 +639,8 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe return nil, err } + var nodesReady []*v1.Node + var nodesRunning []*v1.Node for _, v := range pods { if v.Status.Phase == v1.PodRunning { node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName) @@ -622,12 +648,33 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname) continue } + if _, ok := nodesMap[node]; !ok { nodesMap[node] = *new(struct{}) - nodes = append(nodes, node) + nodesRunning = append(nodesRunning, node) + + if isPodStatusReady(v.Status) { + nodesReady = append(nodesReady, node) + // Check pod not terminating + if v.GetDeletionTimestamp() == nil { + nodes = append(nodes, node) + } + } } } } + + if len(nodes) > 0 { + // Works same as service endpoints + } else if len(nodesReady) > 0 { + // 2 level of panic modes as safe guard, because old wrong behavior can be used by someone + // Publish all endpoints not always a bad thing + log.Debugf("All pods in terminating state, use ready") + nodes = nodesReady + } else { + log.Debugf("All pods not ready, use all running") + nodes = nodesRunning + } default: nodes, err = sc.nodeInformer.Lister().List(labels.Everything()) if err != nil { diff --git a/source/service_test.go b/source/service_test.go index a12bd2752..072a0bdc4 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -1630,6 +1630,7 @@ func TestServiceSourceNodePortServices(t *testing.T) { podNames []string nodeIndex []int phases []v1.PodPhase + conditions []v1.PodCondition labelSelector labels.Selector }{ { @@ -1817,6 +1818,7 @@ func TestServiceSourceNodePortServices(t *testing.T) { podNames: []string{"pod-0"}, nodeIndex: []int{1}, phases: []v1.PodPhase{v1.PodRunning}, + conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}, }, { title: "annotated NodePort services with ExternalTrafficPolicy=Local and multiple pods on a single node return an endpoint with unique IP addresses of the cluster's nodes where pods is running only", @@ -1859,6 +1861,53 @@ func TestServiceSourceNodePortServices(t *testing.T) { podNames: []string{"pod-0", "pod-1"}, nodeIndex: []int{1, 1}, phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning}, + conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue}, + {Type: v1.PodReady, Status: v1.ConditionTrue}, + }, + }, + { + title: "annotated NodePort services with ExternalTrafficPolicy=Local return pods in Ready & Running state", + svcNamespace: "testing", + svcName: "foo", + svcType: v1.ServiceTypeNodePort, + svcTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + labels: map[string]string{}, + annotations: map[string]string{ + hostnameAnnotationKey: "foo.example.org.", + }, + expected: []*endpoint.Endpoint{ + {DNSName: "_foo._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV}, + {DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.1"}, RecordType: endpoint.RecordTypeA}, + }, + nodes: []*v1.Node{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.1"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.1"}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.2"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.2"}, + }, + }, + }}, + podNames: []string{"pod-0", "pod-1"}, + nodeIndex: []int{0, 1}, + phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning}, + conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue}, + {Type: v1.PodReady, Status: v1.ConditionFalse}, + }, }, { title: "access=private annotation NodePort services return an endpoint with private IP addresses of the cluster's nodes", @@ -2153,6 +2202,7 @@ func TestServiceSourceNodePortServices(t *testing.T) { }, Status: v1.PodStatus{ Phase: tc.phases[i], + Conditions: []v1.PodCondition{tc.conditions[i]}, }, } From 2a1f7a4de75c29174613b37862e77d444d4ace8b Mon Sep 17 00:00:00 2001 From: Timofey Titovets Date: Mon, 26 Jun 2023 16:06:23 +0200 Subject: [PATCH 2/2] fix(source_test.go): add missing pod status tests --- source/service_test.go | 66 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/source/service_test.go b/source/service_test.go index 072a0bdc4..84d555cd8 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -1632,6 +1632,7 @@ func TestServiceSourceNodePortServices(t *testing.T) { phases []v1.PodPhase conditions []v1.PodCondition labelSelector labels.Selector + deletionTimestamp []*metav1.Time }{ { title: "annotated NodePort services return an endpoint with IP addresses of the cluster's nodes", @@ -1818,7 +1819,8 @@ func TestServiceSourceNodePortServices(t *testing.T) { podNames: []string{"pod-0"}, nodeIndex: []int{1}, phases: []v1.PodPhase{v1.PodRunning}, - conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}, + conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionFalse}}, + deletionTimestamp: []*metav1.Time{{}}, }, { title: "annotated NodePort services with ExternalTrafficPolicy=Local and multiple pods on a single node return an endpoint with unique IP addresses of the cluster's nodes where pods is running only", @@ -1862,9 +1864,10 @@ func TestServiceSourceNodePortServices(t *testing.T) { nodeIndex: []int{1, 1}, phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning}, conditions: []v1.PodCondition{ - {Type: v1.PodReady, Status: v1.ConditionTrue}, - {Type: v1.PodReady, Status: v1.ConditionTrue}, + {Type: v1.PodReady, Status: v1.ConditionFalse}, + {Type: v1.PodReady, Status: v1.ConditionFalse}, }, + deletionTimestamp: []*metav1.Time{{},{}}, }, { title: "annotated NodePort services with ExternalTrafficPolicy=Local return pods in Ready & Running state", @@ -1908,6 +1911,62 @@ func TestServiceSourceNodePortServices(t *testing.T) { {Type: v1.PodReady, Status: v1.ConditionTrue}, {Type: v1.PodReady, Status: v1.ConditionFalse}, }, + deletionTimestamp: []*metav1.Time{{},{}}, + }, + { + title: "annotated NodePort services with ExternalTrafficPolicy=Local return pods in Ready & Running state & not in Terminating", + svcNamespace: "testing", + svcName: "foo", + svcType: v1.ServiceTypeNodePort, + svcTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + labels: map[string]string{}, + annotations: map[string]string{ + hostnameAnnotationKey: "foo.example.org.", + }, + expected: []*endpoint.Endpoint{ + {DNSName: "_foo._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV}, + {DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.1"}, RecordType: endpoint.RecordTypeA}, + }, + nodes: []*v1.Node{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.1"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.1"}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.2"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.2"}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node3", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.3"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.3"}, + }, + }, + }}, + podNames: []string{"pod-0", "pod-1", "pod-2"}, + nodeIndex: []int{0, 1, 2}, + phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning, v1.PodRunning}, + conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue}, + {Type: v1.PodReady, Status: v1.ConditionFalse}, + {Type: v1.PodReady, Status: v1.ConditionTrue}, + }, + deletionTimestamp: []*metav1.Time{nil, nil, {}}, }, { title: "access=private annotation NodePort services return an endpoint with private IP addresses of the cluster's nodes", @@ -2199,6 +2258,7 @@ func TestServiceSourceNodePortServices(t *testing.T) { Name: podname, Labels: tc.labels, Annotations: tc.annotations, + DeletionTimestamp: tc.deletionTimestamp[i], }, Status: v1.PodStatus{ Phase: tc.phases[i],