This commit is contained in:
Lino Layani 2025-05-21 10:23:28 -04:00
parent 4276764b86
commit c01fbf0682

View File

@ -586,68 +586,87 @@ func getPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodC
return -1, nil
}
// nodesExternalTrafficPolicyTypeLocal filters nodes that have running pods belonging to the given NodePort service
// with externalTrafficPolicy=Local. Returns a prioritized slice of nodes, favoring those with ready, non-terminating pods.
func (sc *serviceSource) nodesExternalTrafficPolicyTypeLocal(svc *v1.Service) []*v1.Node {
var nodesReady []*v1.Node
var nodesRunning []*v1.Node
var nodes []*v1.Node
nodesMap := map[*v1.Node]struct{}{}
pods := sc.pods(svc)
for _, v := range pods {
if v.Status.Phase == v1.PodRunning {
node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName)
if err != nil {
log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname)
continue
}
if _, ok := nodesMap[node]; !ok {
nodesMap[node] = *new(struct{})
nodesRunning = append(nodesRunning, node)
if isPodStatusReady(v.Status) {
nodesReady = append(nodesReady, node)
// Check pod not terminating
if v.GetDeletionTimestamp() == nil {
nodes = append(nodes, node)
}
}
}
}
}
// Prioritize nodes with non-terminating ready pods
// If none available, fall back to nodes with ready pods
// If still none, use nodes with any running pods
if len(nodes) > 0 {
// Works same as service endpoints
} else if len(nodesReady) > 0 {
// 2 level of panic modes as safe guard, because old wrong behavior can be used by someone
// Publish all endpoints not always a bad thing
log.Debugf("All pods in terminating state, use ready")
nodes = nodesReady
} else {
log.Debugf("All pods not ready, use all running")
nodes = nodesRunning
}
return nodes
}
// pods retrieves a slice of pods associated with the given Service
func (sc *serviceSource) pods(svc *v1.Service) []*v1.Pod {
labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
if err != nil {
return nil
}
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil
}
pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
if err != nil {
return nil
}
return pods
}
func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targets, error) {
var (
internalIPs endpoint.Targets
externalIPs endpoint.Targets
ipv6IPs endpoint.Targets
nodes []*v1.Node
err error
)
switch svc.Spec.ExternalTrafficPolicy {
case v1.ServiceExternalTrafficPolicyTypeLocal:
nodesMap := map[*v1.Node]struct{}{}
labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
if err != nil {
return nil, err
}
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil, err
}
pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
if err != nil {
return nil, err
}
var nodesReady []*v1.Node
var nodesRunning []*v1.Node
for _, v := range pods {
if v.Status.Phase == v1.PodRunning {
node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName)
if err != nil {
log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname)
continue
}
if _, ok := nodesMap[node]; !ok {
nodesMap[node] = *new(struct{})
nodesRunning = append(nodesRunning, node)
if isPodStatusReady(v.Status) {
nodesReady = append(nodesReady, node)
// Check pod not terminating
if v.GetDeletionTimestamp() == nil {
nodes = append(nodes, node)
}
}
}
}
}
if len(nodes) > 0 {
// Works same as service endpoints
} else if len(nodesReady) > 0 {
// 2 level of panic modes as safe guard, because old wrong behavior can be used by someone
// Publish all endpoints not always a bad thing
log.Debugf("All pods in terminating state, use ready")
nodes = nodesReady
} else {
log.Debugf("All pods not ready, use all running")
nodes = nodesRunning
}
default:
if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
nodes = sc.nodesExternalTrafficPolicyTypeLocal(svc)
} else {
var err error
nodes, err = sc.nodeInformer.Lister().List(labels.Everything())
if err != nil {
return nil, err
@ -669,15 +688,17 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe
}
access := getAccessFromAnnotations(svc.Annotations)
if access == "public" {
switch access {
case "public":
return append(externalIPs, ipv6IPs...), nil
}
if access == "private" {
case "private":
return internalIPs, nil
}
if len(externalIPs) > 0 {
return append(externalIPs, ipv6IPs...), nil
}
return internalIPs, nil
}