From c01fbf0682395236a88c5f95cb54d894160cc609 Mon Sep 17 00:00:00 2001 From: Lino Layani Date: Wed, 21 May 2025 10:23:28 -0400 Subject: [PATCH] wip --- source/service.go | 135 ++++++++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 57 deletions(-) diff --git a/source/service.go b/source/service.go index 0fc6eb642..96764d347 100644 --- a/source/service.go +++ b/source/service.go @@ -586,68 +586,87 @@ func getPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodC return -1, nil } +// nodesExternalTrafficPolicyTypeLocal filters nodes that have running pods belonging to the given NodePort service +// with externalTrafficPolicy=Local. Returns a prioritized slice of nodes, favoring those with ready, non-terminating pods. +func (sc *serviceSource) nodesExternalTrafficPolicyTypeLocal(svc *v1.Service) []*v1.Node { + var nodesReady []*v1.Node + var nodesRunning []*v1.Node + var nodes []*v1.Node + nodesMap := map[*v1.Node]struct{}{} + + pods := sc.pods(svc) + + for _, v := range pods { + if v.Status.Phase == v1.PodRunning { + node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName) + if err != nil { + log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname) + continue + } + + if _, ok := nodesMap[node]; !ok { + nodesMap[node] = *new(struct{}) + nodesRunning = append(nodesRunning, node) + + if isPodStatusReady(v.Status) { + nodesReady = append(nodesReady, node) + // Check pod not terminating + if v.GetDeletionTimestamp() == nil { + nodes = append(nodes, node) + } + } + } + } + } + + // Prioritize nodes with non-terminating ready pods + // If none available, fall back to nodes with ready pods + // If still none, use nodes with any running pods + if len(nodes) > 0 { + // Works same as service endpoints + } else if len(nodesReady) > 0 { + // 2 level of panic modes as safe guard, because old wrong behavior can be used by someone + // Publish all endpoints not always a bad thing + log.Debugf("All pods in terminating state, use ready") + nodes = nodesReady + } else { + log.Debugf("All pods not ready, use all running") + nodes = nodesRunning + } + + return nodes +} + +// pods retrieves a slice of pods associated with the given Service +func (sc *serviceSource) pods(svc *v1.Service) []*v1.Pod { + labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String()) + if err != nil { + return nil + } + selector, err := metav1.LabelSelectorAsSelector(labelSelector) + if err != nil { + return nil + } + pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector) + if err != nil { + return nil + } + + return pods +} + func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targets, error) { var ( internalIPs endpoint.Targets externalIPs endpoint.Targets ipv6IPs endpoint.Targets nodes []*v1.Node - err error ) - switch svc.Spec.ExternalTrafficPolicy { - case v1.ServiceExternalTrafficPolicyTypeLocal: - nodesMap := map[*v1.Node]struct{}{} - labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String()) - if err != nil { - return nil, err - } - selector, err := metav1.LabelSelectorAsSelector(labelSelector) - if err != nil { - return nil, err - } - pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector) - if err != nil { - return nil, err - } - - var nodesReady []*v1.Node - var nodesRunning []*v1.Node - for _, v := range pods { - if v.Status.Phase == v1.PodRunning { - node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName) - if err != nil { - log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname) - continue - } - - if _, ok := nodesMap[node]; !ok { - nodesMap[node] = *new(struct{}) - nodesRunning = append(nodesRunning, node) - - if isPodStatusReady(v.Status) { - nodesReady = append(nodesReady, node) - // Check pod not terminating - if v.GetDeletionTimestamp() == nil { - nodes = append(nodes, node) - } - } - } - } - } - - if len(nodes) > 0 { - // Works same as service endpoints - } else if len(nodesReady) > 0 { - // 2 level of panic modes as safe guard, because old wrong behavior can be used by someone - // Publish all endpoints not always a bad thing - log.Debugf("All pods in terminating state, use ready") - nodes = nodesReady - } else { - log.Debugf("All pods not ready, use all running") - nodes = nodesRunning - } - default: + if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { + nodes = sc.nodesExternalTrafficPolicyTypeLocal(svc) + } else { + var err error nodes, err = sc.nodeInformer.Lister().List(labels.Everything()) if err != nil { return nil, err @@ -669,15 +688,17 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe } access := getAccessFromAnnotations(svc.Annotations) - if access == "public" { + switch access { + case "public": return append(externalIPs, ipv6IPs...), nil - } - if access == "private" { + case "private": return internalIPs, nil } + if len(externalIPs) > 0 { return append(externalIPs, ipv6IPs...), nil } + return internalIPs, nil }