refactor(annotation): rationalize filtering (#6065)

* chore(annotation): eliminate annotation filtering duplication

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* chore(annotation): eliminate annotation filtering duplication

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* chore(annotation): eliminate annotation filtering duplication

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* chore(annotation): eliminate annotation filtering duplication

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* chore(annotation): eliminate annotation filtering duplication

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* chore(annotation): eliminate annotation filtering duplication

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* chore(annotation): eliminate annotation filtering duplication

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* chore(annotation): eliminate annotation filtering duplication

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

* chore(annotation): eliminate annotation filtering duplication

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>

---------

Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
Ivan Ka 2025-12-30 14:46:41 +00:00 committed by GitHub
parent c0d97e06c1
commit a4035f12a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 247 additions and 404 deletions

View File

@ -137,7 +137,7 @@ func (sc *ambassadorHostSource) Endpoints(ctx context.Context) ([]*endpoint.Endp
}
// Filter Ambassador Hosts
ambassadorHosts, err = sc.filterByAnnotations(ambassadorHosts)
ambassadorHosts, err = annotations.Filter(ambassadorHosts, sc.annotationFilter)
if err != nil {
return nil, fmt.Errorf("failed to filter Ambassador Hosts by annotation: %w", err)
}
@ -289,28 +289,3 @@ func newUnstructuredConverter() (*unstructuredConverter, error) {
return uc, nil
}
// Filter a list of Ambassador Host Resources to only return the ones that
// contain the required External-DNS annotation filter
func (sc *ambassadorHostSource) filterByAnnotations(ambassadorHosts []*ambassador.Host) ([]*ambassador.Host, error) {
selector, err := annotations.ParseFilter(sc.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list of Ambassador Hosts
if selector.Empty() {
return ambassadorHosts, nil
}
// Return a filtered list of Ambassador Hosts
filteredList := []*ambassador.Host{}
for _, host := range ambassadorHosts {
// include Ambassador Host if its annotations match the annotation filter
if selector.Matches(labels.Set(host.Annotations)) {
filteredList = append(filteredList, host)
}
}
return filteredList, nil
}

View File

@ -0,0 +1,50 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package annotations
import (
"strings"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
)
// AnnotatedObject represents any Kubernetes object with annotations
type AnnotatedObject interface {
GetAnnotations() map[string]string
}
// Filter filters a slice of objects by annotation selector.
// Returns all items if annotationFilter is empty.
func Filter[T AnnotatedObject](items []T, filter string) ([]T, error) {
if filter == "" || strings.TrimSpace(filter) == "" {
return items, nil
}
selector, err := ParseFilter(filter)
if err != nil {
return nil, err
}
if selector.Empty() {
return items, nil
}
filtered := make([]T, 0, len(items))
for _, item := range items {
if selector.Matches(labels.Set(item.GetAnnotations())) {
filtered = append(filtered, item)
}
}
log.Debugf("filtered '%d' services out of '%d' with annotation filter '%s'", len(filtered), len(items), filter)
return filtered, nil
}

View File

@ -0,0 +1,136 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package annotations
import (
"testing"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sigs.k8s.io/external-dns/internal/testutils"
)
// Mock object implementing AnnotatedObject
type mockObj struct {
annotations map[string]string
}
func (m mockObj) GetAnnotations() map[string]string {
return m.annotations
}
func TestFilter(t *testing.T) {
tests := []struct {
name string
items []mockObj
filter string
expected []mockObj
expectError bool
}{
{
name: "Empty filter returns all",
items: []mockObj{
{annotations: map[string]string{"foo": "bar"}},
{annotations: map[string]string{"baz": "qux"}},
},
filter: "",
expected: []mockObj{
{annotations: map[string]string{"foo": "bar"}},
{annotations: map[string]string{"baz": "qux"}},
},
},
{
name: "Matching items",
items: []mockObj{
{annotations: map[string]string{"foo": "bar"}},
{annotations: map[string]string{"foo": "baz"}},
},
filter: "foo=bar",
expected: []mockObj{
{annotations: map[string]string{"foo": "bar"}},
},
},
{
name: "No matching items",
items: []mockObj{
{annotations: map[string]string{"foo": "baz"}},
},
filter: "foo=bar",
expected: []mockObj{},
},
{
name: "Whitespace filter returns all",
items: []mockObj{
{annotations: map[string]string{"foo": "bar"}},
{annotations: map[string]string{"baz": "qux"}},
},
filter: " ",
expected: []mockObj{
{annotations: map[string]string{"foo": "bar"}},
{annotations: map[string]string{"baz": "qux"}},
},
},
{
name: "empty filter returns all",
items: []mockObj{
{annotations: map[string]string{"foo": "bar"}},
{annotations: map[string]string{"baz": "qux"}},
},
filter: "",
expected: []mockObj{
{annotations: map[string]string{"foo": "bar"}},
{annotations: map[string]string{"baz": "qux"}},
},
},
{
name: "invalid filter returns error",
items: []mockObj{
{annotations: map[string]string{"foo": "bar"}},
{annotations: map[string]string{"baz": "qux"}},
},
filter: "=invalid",
expected: []mockObj{
{annotations: map[string]string{"foo": "bar"}},
{annotations: map[string]string{"baz": "qux"}},
},
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := Filter(tt.items, tt.filter)
if tt.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tt.expected, result)
}
})
}
}
func TestFilter_LogOutput(t *testing.T) {
hook := testutils.LogsUnderTestWithLogLevel(log.DebugLevel, t)
items := []mockObj{
{annotations: map[string]string{"foo": "bar"}},
{annotations: map[string]string{"foo": "baz"}},
}
filter := "foo=bar"
_, _ = Filter(items, filter)
testutils.TestHelperLogContains("filtered '1' services out of '2' with annotation filter 'foo=bar'", hook, t)
}

View File

@ -127,7 +127,7 @@ func (sc *httpProxySource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, e
httpProxies = append(httpProxies, hpConverted)
}
httpProxies, err = sc.filterByAnnotations(httpProxies)
httpProxies, err = annotations.Filter(httpProxies, sc.annotationFilter)
if err != nil {
return nil, fmt.Errorf("failed to filter HTTPProxies: %w", err)
}
@ -209,30 +209,6 @@ func (sc *httpProxySource) endpointsFromTemplate(httpProxy *projectcontour.HTTPP
return endpoints, nil
}
// filterByAnnotations filters a list of configs by a given annotation selector.
func (sc *httpProxySource) filterByAnnotations(httpProxies []*projectcontour.HTTPProxy) ([]*projectcontour.HTTPProxy, error) {
selector, err := annotations.ParseFilter(sc.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return httpProxies, nil
}
var filteredList []*projectcontour.HTTPProxy
for _, httpProxy := range httpProxies {
// include HTTPProxy if its annotations match the selector
if selector.Matches(labels.Set(httpProxy.Annotations)) {
filteredList = append(filteredList, httpProxy)
}
}
return filteredList, nil
}
// endpointsFromHTTPProxyConfig extracts the endpoints from a Contour HTTPProxy object
func (sc *httpProxySource) endpointsFromHTTPProxy(httpProxy *projectcontour.HTTPProxy) ([]*endpoint.Endpoint, error) {
resource := fmt.Sprintf("HTTPProxy/%s/%s", httpProxy.Namespace, httpProxy.Name)

View File

@ -111,7 +111,12 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS
}
// NewCRDSource creates a new crdSource with the given config.
func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme, startInformer bool) (Source, error) {
func NewCRDSource(
crdClient rest.Interface,
namespace, kind, annotationFilter string,
labelSelector labels.Selector,
scheme *runtime.Scheme,
startInformer bool) (Source, error) {
sourceCrd := crdSource{
crdResource: strings.ToLower(kind) + "s",
namespace: namespace,
@ -174,12 +179,17 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
return nil, err
}
result, err = cs.filterByAnnotations(result)
itemPtrs := make([]*apiv1alpha1.DNSEndpoint, len(result.Items))
for i := range result.Items {
itemPtrs[i] = &result.Items[i]
}
filtered, err := annotations.Filter(itemPtrs, cs.annotationFilter)
if err != nil {
return nil, err
}
for _, dnsEndpoint := range result.Items {
for _, dnsEndpoint := range filtered {
var crdEndpoints []*endpoint.Endpoint
for _, ep := range dnsEndpoint.Spec.Endpoints {
if (ep.RecordType == endpoint.RecordTypeCNAME || ep.RecordType == endpoint.RecordTypeA || ep.RecordType == endpoint.RecordTypeAAAA) && len(ep.Targets) < 1 {
@ -214,7 +224,7 @@ func (cs *crdSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
dnsEndpoint.Status.ObservedGeneration = dnsEndpoint.Generation
// Update the ObservedGeneration
_, err = cs.UpdateStatus(ctx, &dnsEndpoint)
_, err = cs.UpdateStatus(ctx, dnsEndpoint)
if err != nil {
log.Warnf("Could not update ObservedGeneration of the CRD: %v", err)
}
@ -253,26 +263,3 @@ func (cs *crdSource) UpdateStatus(ctx context.Context, dnsEndpoint *apiv1alpha1.
Do(ctx).
Into(result)
}
// filterByAnnotations filters a list of dnsendpoints by a given annotation selector.
func (cs *crdSource) filterByAnnotations(dnsendpoints *apiv1alpha1.DNSEndpointList) (*apiv1alpha1.DNSEndpointList, error) {
selector, err := annotations.ParseFilter(cs.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return dnsendpoints, nil
}
filteredList := apiv1alpha1.DNSEndpointList{}
for _, dnsendpoint := range dnsendpoints.Items {
// include dnsendpoint if its annotations match the selector
if selector.Matches(labels.Set(dnsendpoint.Annotations)) {
filteredList.Items = append(filteredList.Items, dnsendpoint)
}
}
return &filteredList, nil
}

View File

@ -120,7 +120,7 @@ func (ts *f5TransportServerSource) Endpoints(_ context.Context) ([]*endpoint.End
transportServers = append(transportServers, transportServer)
}
transportServers, err = ts.filterByAnnotations(transportServers)
transportServers, err = annotations.Filter(transportServers, ts.annotationFilter)
if err != nil {
return nil, fmt.Errorf("failed to filter TransportServers: %w", err)
}
@ -183,30 +183,6 @@ func newTSUnstructuredConverter() (*unstructuredConverter, error) {
return uc, nil
}
// filterByAnnotations filters a list of TransportServers by a given annotation selector.
func (ts *f5TransportServerSource) filterByAnnotations(transportServers []*f5.TransportServer) ([]*f5.TransportServer, error) {
selector, err := annotations.ParseFilter(ts.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return transportServers, nil
}
filteredList := []*f5.TransportServer{}
for _, ts := range transportServers {
// include TransportServer if its annotations match the selector
if selector.Matches(labels.Set(ts.Annotations)) {
filteredList = append(filteredList, ts)
}
}
return filteredList, nil
}
func hasValidTransportServerIP(vs *f5.TransportServer) bool {
normalizedAddress := strings.ToLower(vs.Status.VSAddress)
return normalizedAddress != "none" && normalizedAddress != ""

View File

@ -120,7 +120,7 @@ func (vs *f5VirtualServerSource) Endpoints(_ context.Context) ([]*endpoint.Endpo
virtualServers = append(virtualServers, virtualServer)
}
virtualServers, err = vs.filterByAnnotations(virtualServers)
virtualServers, err = annotations.Filter(virtualServers, vs.annotationFilter)
if err != nil {
return nil, fmt.Errorf("failed to filter VirtualServers: %w", err)
}
@ -195,30 +195,6 @@ func newVSUnstructuredConverter() (*unstructuredConverter, error) {
return uc, nil
}
// filterByAnnotations filters a list of VirtualServers by a given annotation selector.
func (vs *f5VirtualServerSource) filterByAnnotations(virtualServers []*f5.VirtualServer) ([]*f5.VirtualServer, error) {
selector, err := annotations.ParseFilter(vs.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return virtualServers, nil
}
filteredList := []*f5.VirtualServer{}
for _, vs := range virtualServers {
// include VirtualServer if its annotations match the selector
if selector.Matches(labels.Set(vs.Annotations)) {
filteredList = append(filteredList, vs)
}
}
return filteredList, nil
}
func hasValidVirtualServerIP(vs *f5.VirtualServer) bool {
normalizedAddress := strings.ToLower(vs.Status.VSAddress)
return normalizedAddress != "none" && normalizedAddress != ""

View File

@ -131,7 +131,7 @@ func (sc *ingressSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, err
if err != nil {
return nil, err
}
ingresses, err = sc.filterByAnnotations(ingresses)
ingresses, err = annotations.Filter(ingresses, sc.annotationFilter)
if err != nil {
return nil, err
}
@ -203,30 +203,6 @@ func (sc *ingressSource) endpointsFromTemplate(ing *networkv1.Ingress) ([]*endpo
return endpoints, nil
}
// filterByAnnotations filters a list of ingresses by a given annotation selector.
func (sc *ingressSource) filterByAnnotations(ingresses []*networkv1.Ingress) ([]*networkv1.Ingress, error) {
selector, err := getLabelSelector(sc.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return ingresses, nil
}
filteredList := []*networkv1.Ingress{}
for _, ingress := range ingresses {
// include ingress if its annotations match the selector
if matchLabelSelector(selector, ingress.Annotations) {
filteredList = append(filteredList, ingress)
}
}
return filteredList, nil
}
// filterByIngressClass filters a list of ingresses based on a required ingress
// class
func (sc *ingressSource) filterByIngressClass(ingresses []*networkv1.Ingress) ([]*networkv1.Ingress, error) {

View File

@ -30,7 +30,6 @@ import (
networkingv1beta1informer "istio.io/client-go/pkg/informers/externalversions/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
netinformers "k8s.io/client-go/informers/networking/v1"
@ -136,7 +135,7 @@ func (sc *gatewaySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, e
}
gateways := gwList.Items
gateways, err = sc.filterByAnnotations(gateways)
gateways, err = annotations.Filter(gateways, sc.annotationFilter)
if err != nil {
return nil, err
}
@ -209,30 +208,6 @@ func (sc *gatewaySource) AddEventHandler(_ context.Context, handler func()) {
_, _ = sc.gatewayInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
}
// filterByAnnotations filters a list of configs by a given annotation selector.
func (sc *gatewaySource) filterByAnnotations(gateways []*networkingv1beta1.Gateway) ([]*networkingv1beta1.Gateway, error) {
selector, err := annotations.ParseFilter(sc.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return gateways, nil
}
var filteredList []*networkingv1beta1.Gateway
for _, gw := range gateways {
// include if the annotations match the selector
if selector.Matches(labels.Set(gw.Annotations)) {
filteredList = append(filteredList, gw)
}
}
return filteredList, nil
}
func (sc *gatewaySource) targetsFromIngress(ingressStr string, gateway *networkingv1beta1.Gateway) (endpoint.Targets, error) {
namespace, name, err := ParseIngress(ingressStr)
if err != nil {

View File

@ -139,7 +139,7 @@ func (sc *virtualServiceSource) Endpoints(ctx context.Context) ([]*endpoint.Endp
if err != nil {
return nil, err
}
virtualServices, err = sc.filterByAnnotations(virtualServices)
virtualServices, err = annotations.Filter(virtualServices, sc.annotationFilter)
if err != nil {
return nil, err
}
@ -251,30 +251,6 @@ func (sc *virtualServiceSource) endpointsFromTemplate(ctx context.Context, virtu
return endpoints, nil
}
// filterByAnnotations filters a list of configs by a given annotation selector.
func (sc *virtualServiceSource) filterByAnnotations(vServices []*v1beta1.VirtualService) ([]*v1beta1.VirtualService, error) {
selector, err := annotations.ParseFilter(sc.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return vServices, nil
}
var filteredList []*v1beta1.VirtualService
for _, vs := range vServices {
// include if the annotations match the selector
if selector.Matches(labels.Set(vs.Annotations)) {
filteredList = append(filteredList, vs)
}
}
return filteredList, nil
}
// append a target to the list of targets unless it's already in the list
func appendUnique(targets []string, target string) []string {
if slices.Contains(targets, target) {

View File

@ -123,7 +123,7 @@ func (sc *kongTCPIngressSource) Endpoints(_ context.Context) ([]*endpoint.Endpoi
tcpIngresses = append(tcpIngresses, tcpIngress)
}
tcpIngresses, err = sc.filterByAnnotations(tcpIngresses)
tcpIngresses, err = annotations.Filter(tcpIngresses, sc.annotationFilter)
if err != nil {
return nil, fmt.Errorf("failed to filter TCPIngresses: %w", err)
}
@ -164,30 +164,6 @@ func (sc *kongTCPIngressSource) Endpoints(_ context.Context) ([]*endpoint.Endpoi
return endpoints, nil
}
// filterByAnnotations filters a list of TCPIngresses by a given annotation selector.
func (sc *kongTCPIngressSource) filterByAnnotations(tcpIngresses []*TCPIngress) ([]*TCPIngress, error) {
selector, err := annotations.ParseFilter(sc.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return tcpIngresses, nil
}
var filteredList []*TCPIngress
for _, tcpIngress := range tcpIngresses {
// include TCPIngress if its annotations match the selector
if selector.Matches(labels.Set(tcpIngress.Annotations)) {
filteredList = append(filteredList, tcpIngress)
}
}
return filteredList, nil
}
// endpointsFromTCPIngress extracts the endpoints from a TCPIngress object
func (sc *kongTCPIngressSource) endpointsFromTCPIngress(tcpIngress *TCPIngress, targets endpoint.Targets) ([]*endpoint.Endpoint, error) {
var endpoints []*endpoint.Endpoint

View File

@ -96,7 +96,7 @@ func (ns *nodeSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error)
return nil, err
}
nodes, err = ns.filterByAnnotations(nodes)
nodes, err = annotations.Filter(nodes, ns.annotationFilter)
if err != nil {
return nil, err
}
@ -202,30 +202,6 @@ func (ns *nodeSource) nodeAddresses(node *v1.Node) ([]string, error) {
return nil, fmt.Errorf("could not find node address for %s", node.Name)
}
// filterByAnnotations filters a list of nodes by a given annotation selector.
func (ns *nodeSource) filterByAnnotations(nodes []*v1.Node) ([]*v1.Node, error) {
selector, err := annotations.ParseFilter(ns.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return nodes, nil
}
var filteredList []*v1.Node
for _, node := range nodes {
// include a node if its annotations match the selector
if selector.Matches(labels.Set(node.Annotations)) {
filteredList = append(filteredList, node)
}
}
return filteredList, nil
}
// collectDNSNames returns a set of DNS names associated with the given Kubernetes Node.
// If an FQDN template is configured, it renders the template using the Node object
// to generate one or more DNS names.

View File

@ -122,7 +122,7 @@ func (ors *ocpRouteSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, e
return nil, err
}
ocpRoutes, err = ors.filterByAnnotations(ocpRoutes)
ocpRoutes, err = annotations.Filter(ocpRoutes, ors.annotationFilter)
if err != nil {
return nil, err
}
@ -195,29 +195,6 @@ func (ors *ocpRouteSource) endpointsFromTemplate(ocpRoute *routev1.Route) ([]*en
return endpoints, nil
}
func (ors *ocpRouteSource) filterByAnnotations(ocpRoutes []*routev1.Route) ([]*routev1.Route, error) {
selector, err := annotations.ParseFilter(ors.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return ocpRoutes, nil
}
var filteredList []*routev1.Route
for _, ocpRoute := range ocpRoutes {
// include ocpRoute if its annotations match the selector
if selector.Matches(labels.Set(ocpRoute.Annotations)) {
filteredList = append(filteredList, ocpRoute)
}
}
return filteredList, nil
}
// endpointsFromOcpRoute extracts the endpoints from a OpenShift Route object
func (ors *ocpRouteSource) endpointsFromOcpRoute(ocpRoute *routev1.Route, ignoreHostnameAnnotation bool) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint

View File

@ -239,7 +239,7 @@ func (sc *serviceSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, err
// filter on service types if at least one has been provided
services = sc.filterByServiceType(services)
services, err = sc.filterByAnnotations(services)
services, err = annotations.Filter(services, sc.annotationFilter)
if err != nil {
return nil, err
}
@ -572,30 +572,6 @@ func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint {
return endpoints
}
// filterByAnnotations filters a list of services by a given annotation selector.
func (sc *serviceSource) filterByAnnotations(services []*v1.Service) ([]*v1.Service, error) {
selector, err := annotations.ParseFilter(sc.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return services, nil
}
var filteredList []*v1.Service
for _, service := range services {
// include service if its annotations match the selector
if selector.Matches(labels.Set(service.Annotations)) {
filteredList = append(filteredList, service)
}
}
log.Debugf("filtered %d services out of %d with annotation filter", len(filteredList), len(services))
return filteredList, nil
}
// filterByServiceType filters services according to their types
func (sc *serviceSource) filterByServiceType(services []*v1.Service) []*v1.Service {
if !sc.serviceTypeFilter.enabled || len(services) == 0 {

View File

@ -246,13 +246,14 @@ func (sc *routeGroupSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint
log.Errorf("Failed to get RouteGroup list: %v", err)
return nil, err
}
rgList, err = sc.filterByAnnotations(rgList)
filtered, err := annotations.Filter(rgList.Items, sc.annotationFilter)
if err != nil {
return nil, err
}
endpoints := []*endpoint.Endpoint{}
for _, rg := range rgList.Items {
for _, rg := range filtered {
// Check controller annotation to see if we are responsible.
controller, ok := rg.Metadata.Annotations[annotations.ControllerKey]
if ok && controller != annotations.ControllerValue {
@ -364,30 +365,6 @@ func (sc *routeGroupSource) endpointsFromRouteGroup(rg *routeGroup) []*endpoint.
return endpoints
}
// filterByAnnotations filters a list of routeGroupList by a given annotation selector.
func (sc *routeGroupSource) filterByAnnotations(rgs *routeGroupList) (*routeGroupList, error) {
selector, err := getLabelSelector(sc.annotationFilter)
if err != nil {
return nil, err
}
// empty filter returns original list
if selector.Empty() {
return rgs, nil
}
var filteredList []*routeGroup
for _, rg := range rgs.Items {
// include ingress if its annotations match the selector
if matchLabelSelector(selector, rg.Metadata.Annotations) {
filteredList = append(filteredList, rg)
}
}
rgs.Items = filteredList
return rgs, nil
}
func targetsFromRouteGroupStatus(status routeGroupStatus) endpoint.Targets {
var targets endpoint.Targets
@ -443,3 +420,7 @@ type routeGroupLoadBalancer struct {
IP string `json:"ip,omitempty"`
Hostname string `json:"hostname,omitempty"`
}
func (rg *routeGroup) GetAnnotations() map[string]string {
return rg.Metadata.Annotations
}

View File

@ -240,7 +240,7 @@ func (ts *traefikSource) ingressRouteEndpoints() ([]*endpoint.Endpoint, error) {
typed := &IngressRoute{}
return typed, ts.unstructuredConverter.scheme.Convert(u, typed, nil)
},
ts.filterIngressRouteByAnnotation,
ts.annotationFilter,
func(r *IngressRoute, targets endpoint.Targets) []*endpoint.Endpoint {
return ts.endpointsFromIngressRoute(r, targets)
},
@ -271,7 +271,7 @@ func (ts *traefikSource) ingressRouteTCPEndpoints() ([]*endpoint.Endpoint, error
ingressRouteTCPs = append(ingressRouteTCPs, ingressRouteTCP)
}
ingressRouteTCPs, err = ts.filterIngressRouteTcpByAnnotations(ingressRouteTCPs)
ingressRouteTCPs, err = annotations.Filter(ingressRouteTCPs, ts.annotationFilter)
if err != nil {
return nil, fmt.Errorf("failed to filter IngressRouteTCP: %w", err)
}
@ -305,7 +305,7 @@ func (ts *traefikSource) ingressRouteUDPEndpoints() ([]*endpoint.Endpoint, error
typed := &IngressRouteUDP{}
return typed, ts.unstructuredConverter.scheme.Convert(u, typed, nil)
},
ts.filterIngressRouteUdpByAnnotations,
ts.annotationFilter,
ts.endpointsFromIngressRouteUDP,
)
}
@ -319,7 +319,7 @@ func (ts *traefikSource) oldIngressRouteEndpoints() ([]*endpoint.Endpoint, error
typed := &IngressRoute{}
return typed, ts.unstructuredConverter.scheme.Convert(u, typed, nil)
},
ts.filterIngressRouteByAnnotation,
ts.annotationFilter,
func(r *IngressRoute, targets endpoint.Targets) []*endpoint.Endpoint {
return ts.endpointsFromIngressRoute(r, targets)
},
@ -335,7 +335,7 @@ func (ts *traefikSource) oldIngressRouteTCPEndpoints() ([]*endpoint.Endpoint, er
typed := &IngressRouteTCP{}
return typed, ts.unstructuredConverter.scheme.Convert(u, typed, nil)
},
ts.filterIngressRouteTcpByAnnotations,
ts.annotationFilter,
ts.endpointsFromIngressRouteTCP,
)
}
@ -349,32 +349,11 @@ func (ts *traefikSource) oldIngressRouteUDPEndpoints() ([]*endpoint.Endpoint, er
typed := &IngressRouteUDP{}
return typed, ts.unstructuredConverter.scheme.Convert(u, typed, nil)
},
ts.filterIngressRouteUdpByAnnotations,
ts.annotationFilter,
ts.endpointsFromIngressRouteUDP,
)
}
// filterIngressRouteByAnnotation filters a list of IngressRoute by a given annotation selector.
func (ts *traefikSource) filterIngressRouteByAnnotation(input []*IngressRoute) ([]*IngressRoute, error) {
return filterResourcesByAnnotations(input, ts.annotationFilter, func(ir *IngressRoute) map[string]string {
return ir.Annotations
})
}
// filterIngressRouteTcpByAnnotations filters a list of IngressRouteTCP by a given annotation selector.
func (ts *traefikSource) filterIngressRouteTcpByAnnotations(input []*IngressRouteTCP) ([]*IngressRouteTCP, error) {
return filterResourcesByAnnotations(input, ts.annotationFilter, func(ir *IngressRouteTCP) map[string]string {
return ir.Annotations
})
}
// filterIngressRouteUdpByAnnotations filters a list of IngressRoute by a given annotation selector.
func (ts *traefikSource) filterIngressRouteUdpByAnnotations(input []*IngressRouteUDP) ([]*IngressRouteUDP, error) {
return filterResourcesByAnnotations(input, ts.annotationFilter, func(ir *IngressRouteUDP) map[string]string {
return ir.Annotations
})
}
// endpointsFromIngressRoute extracts the endpoints from a IngressRoute object
func (ts *traefikSource) endpointsFromIngressRoute(ingressRoute *IngressRoute, targets endpoint.Targets) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint
@ -842,19 +821,34 @@ func (in *IngressRouteUDPList) DeepCopyObject() runtime.Object {
return nil
}
// GetAnnotations returns the annotations of the IngressRoute.
func (in *IngressRoute) GetAnnotations() map[string]string {
return in.Annotations
}
// GetAnnotations returns the annotations of the IngressRouteTCP.
func (in *IngressRouteTCP) GetAnnotations() map[string]string {
return in.Annotations
}
// GetAnnotations returns the annotations of the IngressRouteUDP.
func (in *IngressRouteUDP) GetAnnotations() map[string]string {
return in.Annotations
}
// extractEndpoints is a generic function that extracts endpoints from Kubernetes resources.
// It performs the following steps:
// 1. Lists all objects in the specified namespace using the provided informer.
// 2. Converts the unstructured objects to the desired type using the convertFunc.
// 3. Filters the converted objects based on the provided filterFunc.
// 3. Filters the converted objects based on the annotation filter.
// 4. Generates endpoints for each filtered object using the generateEndpoints function.
// Returns a list of generated endpoints or an error if any step fails.
func extractEndpoints[T any](
func extractEndpoints[T annotations.AnnotatedObject](
informer cache.GenericLister,
namespace string,
convertFunc func(*unstructured.Unstructured) (*T, error),
filterFunc func([]*T) ([]*T, error),
generateEndpoints func(*T, endpoint.Targets) []*endpoint.Endpoint,
convertFunc func(*unstructured.Unstructured) (T, error),
annotationFilter string,
generateEndpoints func(T, endpoint.Targets) []*endpoint.Endpoint,
) ([]*endpoint.Endpoint, error) {
var endpoints []*endpoint.Endpoint
@ -863,7 +857,7 @@ func extractEndpoints[T any](
return nil, err
}
var typedObjs []*T
var typedObjs []T
for _, obj := range objs {
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
@ -877,13 +871,13 @@ func extractEndpoints[T any](
typedObjs = append(typedObjs, typed)
}
typedObjs, err = filterFunc(typedObjs)
typedObjs, err = annotations.Filter(typedObjs, annotationFilter)
if err != nil {
return nil, err
}
for _, item := range typedObjs {
targets := annotations.TargetsFromTargetAnnotation(getAnnotations(item))
targets := annotations.TargetsFromTargetAnnotation(item.GetAnnotations())
name := getObjectFullName(item)
ingressEndpoints := generateEndpoints(item, targets)
@ -900,46 +894,6 @@ func extractEndpoints[T any](
return endpoints, nil
}
// filterResourcesByAnnotations filters a list of resources based on a given annotation selector.
// It performs the following steps:
// 1. Parses the annotation filter into a label selector.
// 2. Converts the label selector into a Kubernetes selector.
// 3. If the selector is empty, returns the original list of resources.
// 4. Iterates through the resources and matches their annotations against the selector.
// 5. Returns the filtered list of resources or an error if any step fails.
func filterResourcesByAnnotations[T any](resources []*T, annotationFilter string, getAnnotations func(*T) map[string]string) ([]*T, error) {
selector, err := annotations.ParseFilter(annotationFilter)
if err != nil {
return nil, err
}
if selector.Empty() {
return resources, nil
}
var filteredList []*T
for _, resource := range resources {
if selector.Matches(labels.Set(getAnnotations(resource))) {
filteredList = append(filteredList, resource)
}
}
return filteredList, nil
}
func getAnnotations(obj any) map[string]string {
switch o := obj.(type) {
case *IngressRouteUDP:
return o.Annotations
case *IngressRoute:
return o.Annotations
case *IngressRouteTCP:
return o.Annotations
default:
return nil
}
}
func getObjectFullName(obj any) string {
switch o := obj.(type) {
case *IngressRouteUDP: