diff --git a/source/service.go b/source/service.go index b19ba4882..624d13c20 100644 --- a/source/service.go +++ b/source/service.go @@ -590,6 +590,30 @@ func extractLoadBalancerTargets(svc *v1.Service, resolveLoadBalancerHostname boo return targets } +func isPodStatusReady(status v1.PodStatus) bool { + _, condition := getPodCondition(&status, v1.PodReady) + return condition != nil && condition.Status == v1.ConditionTrue +} + +func getPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) { + if status == nil { + return -1, nil + } + return getPodConditionFromList(status.Conditions, conditionType) +} + +func getPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) { + if conditions == nil { + return -1, nil + } + for i := range conditions { + if conditions[i].Type == conditionType { + return i, &conditions[i] + } + } + return -1, nil +} + func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targets, error) { var ( internalIPs endpoint.Targets @@ -615,6 +639,8 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe return nil, err } + var nodesReady []*v1.Node + var nodesRunning []*v1.Node for _, v := range pods { if v.Status.Phase == v1.PodRunning { node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName) @@ -622,12 +648,33 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname) continue } + if _, ok := nodesMap[node]; !ok { nodesMap[node] = *new(struct{}) - nodes = append(nodes, node) + nodesRunning = append(nodesRunning, node) + + if isPodStatusReady(v.Status) { + nodesReady = append(nodesReady, node) + // Check pod not terminating + if v.GetDeletionTimestamp() == nil { + nodes = append(nodes, node) + } + } } } } + + if len(nodes) > 0 { + // Works same as service endpoints + } else if len(nodesReady) > 0 { + // 2 level of panic modes as safe guard, because old wrong behavior can be used by someone + // Publish all endpoints not always a bad thing + log.Debugf("All pods in terminating state, use ready") + nodes = nodesReady + } else { + log.Debugf("All pods not ready, use all running") + nodes = nodesRunning + } default: nodes, err = sc.nodeInformer.Lister().List(labels.Everything()) if err != nil { diff --git a/source/service_test.go b/source/service_test.go index a12bd2752..84d555cd8 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -1630,7 +1630,9 @@ func TestServiceSourceNodePortServices(t *testing.T) { podNames []string nodeIndex []int phases []v1.PodPhase + conditions []v1.PodCondition labelSelector labels.Selector + deletionTimestamp []*metav1.Time }{ { title: "annotated NodePort services return an endpoint with IP addresses of the cluster's nodes", @@ -1817,6 +1819,8 @@ func TestServiceSourceNodePortServices(t *testing.T) { podNames: []string{"pod-0"}, nodeIndex: []int{1}, phases: []v1.PodPhase{v1.PodRunning}, + conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionFalse}}, + deletionTimestamp: []*metav1.Time{{}}, }, { title: "annotated NodePort services with ExternalTrafficPolicy=Local and multiple pods on a single node return an endpoint with unique IP addresses of the cluster's nodes where pods is running only", @@ -1859,6 +1863,110 @@ func TestServiceSourceNodePortServices(t *testing.T) { podNames: []string{"pod-0", "pod-1"}, nodeIndex: []int{1, 1}, phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning}, + conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionFalse}, + {Type: v1.PodReady, Status: v1.ConditionFalse}, + }, + deletionTimestamp: []*metav1.Time{{},{}}, + }, + { + title: "annotated NodePort services with ExternalTrafficPolicy=Local return pods in Ready & Running state", + svcNamespace: "testing", + svcName: "foo", + svcType: v1.ServiceTypeNodePort, + svcTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + labels: map[string]string{}, + annotations: map[string]string{ + hostnameAnnotationKey: "foo.example.org.", + }, + expected: []*endpoint.Endpoint{ + {DNSName: "_foo._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV}, + {DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.1"}, RecordType: endpoint.RecordTypeA}, + }, + nodes: []*v1.Node{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.1"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.1"}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.2"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.2"}, + }, + }, + }}, + podNames: []string{"pod-0", "pod-1"}, + nodeIndex: []int{0, 1}, + phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning}, + conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue}, + {Type: v1.PodReady, Status: v1.ConditionFalse}, + }, + deletionTimestamp: []*metav1.Time{{},{}}, + }, + { + title: "annotated NodePort services with ExternalTrafficPolicy=Local return pods in Ready & Running state & not in Terminating", + svcNamespace: "testing", + svcName: "foo", + svcType: v1.ServiceTypeNodePort, + svcTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + labels: map[string]string{}, + annotations: map[string]string{ + hostnameAnnotationKey: "foo.example.org.", + }, + expected: []*endpoint.Endpoint{ + {DNSName: "_foo._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV}, + {DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.1"}, RecordType: endpoint.RecordTypeA}, + }, + nodes: []*v1.Node{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.1"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.1"}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.2"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.2"}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node3", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.3"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.3"}, + }, + }, + }}, + podNames: []string{"pod-0", "pod-1", "pod-2"}, + nodeIndex: []int{0, 1, 2}, + phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning, v1.PodRunning}, + conditions: []v1.PodCondition{ + {Type: v1.PodReady, Status: v1.ConditionTrue}, + {Type: v1.PodReady, Status: v1.ConditionFalse}, + {Type: v1.PodReady, Status: v1.ConditionTrue}, + }, + deletionTimestamp: []*metav1.Time{nil, nil, {}}, }, { title: "access=private annotation NodePort services return an endpoint with private IP addresses of the cluster's nodes", @@ -2150,9 +2258,11 @@ func TestServiceSourceNodePortServices(t *testing.T) { Name: podname, Labels: tc.labels, Annotations: tc.annotations, + DeletionTimestamp: tc.deletionTimestamp[i], }, Status: v1.PodStatus{ Phase: tc.phases[i], + Conditions: []v1.PodCondition{tc.conditions[i]}, }, }