mirror of
https://github.com/kubernetes-sigs/external-dns.git
synced 2026-05-08 16:16:10 +02:00
refactor(source): handle context in similar way (#6049)
* chore(source): sources to handle context in similar way Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * chore(source): sources to handle context in similar way Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> * chore(source): sources to handle context in similar way Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com> --------- Signed-off-by: ivan katliarchuk <ivan.katliarchuk@gmail.com>
This commit is contained in:
parent
b2390a9187
commit
2bdb8df7e2
@ -49,9 +49,10 @@ const (
|
||||
groupName = "getambassador.io"
|
||||
)
|
||||
|
||||
var schemeGroupVersion = schema.GroupVersion{Group: groupName, Version: "v2"}
|
||||
|
||||
var ambHostGVR = schemeGroupVersion.WithResource("hosts")
|
||||
var (
|
||||
schemeGroupVersion = schema.GroupVersion{Group: groupName, Version: "v2"}
|
||||
ambHostGVR = schemeGroupVersion.WithResource("hosts")
|
||||
)
|
||||
|
||||
// ambassadorHostSource is an implementation of Source for Ambassador Host objects.
|
||||
// The IngressRoute implementation uses the spec.virtualHost.fqdn value for the hostname.
|
||||
@ -75,15 +76,13 @@ func NewAmbassadorHostSource(
|
||||
annotationFilter string,
|
||||
labelSelector labels.Selector,
|
||||
) (Source, error) {
|
||||
var err error
|
||||
|
||||
// Use shared informer to listen for add/update/delete of Host in the specified namespace.
|
||||
// Set resync period to 0, to prevent processing when nothing has changed.
|
||||
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
|
||||
ambassadorHostInformer := informerFactory.ForResource(ambHostGVR)
|
||||
|
||||
// Add default resource event handlers to properly initialize informer.
|
||||
ambassadorHostInformer.Informer().AddEventHandler(
|
||||
_, _ = ambassadorHostInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
},
|
||||
@ -93,7 +92,7 @@ func NewAmbassadorHostSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@ -36,11 +36,11 @@ func NewCloudFoundrySource(cfClient *cfclient.Client) (Source, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (rs *cloudfoundrySource) AddEventHandler(ctx context.Context, handler func()) {
|
||||
func (rs *cloudfoundrySource) AddEventHandler(_ context.Context, handler func()) {
|
||||
}
|
||||
|
||||
// Endpoints returns endpoint objects
|
||||
func (rs *cloudfoundrySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
|
||||
func (rs *cloudfoundrySource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
|
||||
endpoints := []*endpoint.Endpoint{}
|
||||
|
||||
u, err := url.Parse(rs.client.Config.ApiAddress)
|
||||
|
||||
@ -73,7 +73,7 @@ func NewContourHTTPProxySource(
|
||||
httpProxyInformer := informerFactory.ForResource(projectcontour.HTTPProxyGVR)
|
||||
|
||||
// Add default resource event handlers to properly initialize informer.
|
||||
httpProxyInformer.Informer().AddEventHandler(
|
||||
_, _ = httpProxyInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
},
|
||||
@ -83,7 +83,7 @@ func NewContourHTTPProxySource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -106,7 +106,7 @@ func NewContourHTTPProxySource(
|
||||
|
||||
// Endpoints returns endpoint objects for each host-target combination that should be processed.
|
||||
// Retrieves all HTTPProxy resources in the source's namespace(s).
|
||||
func (sc *httpProxySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
|
||||
func (sc *httpProxySource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
|
||||
hps, err := sc.httpProxyInformer.Lister().ByNamespace(sc.namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -273,10 +273,10 @@ func (sc *httpProxySource) endpointsFromHTTPProxy(httpProxy *projectcontour.HTTP
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
func (sc *httpProxySource) AddEventHandler(ctx context.Context, handler func()) {
|
||||
func (sc *httpProxySource) AddEventHandler(_ context.Context, handler func()) {
|
||||
log.Debug("Adding event handler for httpproxy")
|
||||
|
||||
// Right now there is no way to remove event handler from informer, see:
|
||||
// https://github.com/kubernetes/kubernetes/issues/79610
|
||||
sc.httpProxyInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
|
||||
_, _ = sc.httpProxyInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
|
||||
}
|
||||
|
||||
@ -68,7 +68,7 @@ func NewF5TransportServerSource(
|
||||
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
|
||||
transportServerInformer := informerFactory.ForResource(f5TransportServerGVR)
|
||||
|
||||
transportServerInformer.Informer().AddEventHandler(
|
||||
_, _ = transportServerInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
},
|
||||
@ -78,7 +78,7 @@ func NewF5TransportServerSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -99,7 +99,7 @@ func NewF5TransportServerSource(
|
||||
|
||||
// Endpoints returns endpoint objects for each host-target combination that should be processed.
|
||||
// Retrieves all TransportServers in the source's namespace(s).
|
||||
func (ts *f5TransportServerSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
|
||||
func (ts *f5TransportServerSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
|
||||
transportServerObjects, err := ts.transportServerInformer.Lister().ByNamespace(ts.namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -133,10 +133,10 @@ func (ts *f5TransportServerSource) Endpoints(ctx context.Context) ([]*endpoint.E
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
func (ts *f5TransportServerSource) AddEventHandler(ctx context.Context, handler func()) {
|
||||
func (ts *f5TransportServerSource) AddEventHandler(_ context.Context, handler func()) {
|
||||
log.Debug("Adding event handler for TransportServer")
|
||||
|
||||
ts.transportServerInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
|
||||
_, _ = ts.transportServerInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
|
||||
}
|
||||
|
||||
// endpointsFromTransportServers extracts the endpoints from a slice of TransportServers
|
||||
|
||||
@ -68,7 +68,7 @@ func NewF5VirtualServerSource(
|
||||
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
|
||||
virtualServerInformer := informerFactory.ForResource(f5VirtualServerGVR)
|
||||
|
||||
virtualServerInformer.Informer().AddEventHandler(
|
||||
_, _ = virtualServerInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
},
|
||||
@ -78,7 +78,7 @@ func NewF5VirtualServerSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -99,7 +99,7 @@ func NewF5VirtualServerSource(
|
||||
|
||||
// Endpoints returns endpoint objects for each host-target combination that should be processed.
|
||||
// Retrieves all VirtualServers in the source's namespace(s).
|
||||
func (vs *f5VirtualServerSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
|
||||
func (vs *f5VirtualServerSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
|
||||
virtualServerObjects, err := vs.virtualServerInformer.Lister().ByNamespace(vs.namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -138,10 +138,10 @@ func (vs *f5VirtualServerSource) Endpoints(ctx context.Context) ([]*endpoint.End
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
func (vs *f5VirtualServerSource) AddEventHandler(ctx context.Context, handler func()) {
|
||||
func (vs *f5VirtualServerSource) AddEventHandler(_ context.Context, handler func()) {
|
||||
log.Debug("Adding event handler for VirtualServer")
|
||||
|
||||
vs.virtualServerInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
|
||||
_, _ = vs.virtualServerInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
|
||||
}
|
||||
|
||||
// endpointsFromVirtualServers extracts the endpoints from a slice of VirtualServers
|
||||
|
||||
@ -59,7 +59,7 @@ func (sc *fakeSource) AddEventHandler(_ context.Context, handler func()) {
|
||||
}
|
||||
|
||||
// Endpoints returns endpoint objects.
|
||||
func (sc *fakeSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
|
||||
func (sc *fakeSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
|
||||
endpoints := make([]*endpoint.Endpoint, 10)
|
||||
|
||||
for i := range 10 {
|
||||
|
||||
@ -105,9 +105,12 @@ type gatewayRouteSource struct {
|
||||
ignoreHostnameAnnotation bool
|
||||
}
|
||||
|
||||
func newGatewayRouteSource(clients ClientGenerator, config *Config, kind string, newInformerFn newGatewayRouteInformerFunc) (Source, error) {
|
||||
ctx := context.TODO()
|
||||
|
||||
func newGatewayRouteSource(
|
||||
ctx context.Context,
|
||||
clients ClientGenerator,
|
||||
config *Config,
|
||||
kind string,
|
||||
newInformerFn newGatewayRouteInformerFunc) (Source, error) {
|
||||
gwLabels, err := getLabelSelector(config.GatewayLabelFilter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -187,15 +190,15 @@ func newGatewayRouteSource(clients ClientGenerator, config *Config, kind string,
|
||||
return src, nil
|
||||
}
|
||||
|
||||
func (src *gatewayRouteSource) AddEventHandler(ctx context.Context, handler func()) {
|
||||
func (src *gatewayRouteSource) AddEventHandler(_ context.Context, handler func()) {
|
||||
log.Debugf("Adding event handlers for %s", src.rtKind)
|
||||
eventHandler := eventHandlerFunc(handler)
|
||||
src.gwInformer.Informer().AddEventHandler(eventHandler)
|
||||
src.rtInformer.Informer().AddEventHandler(eventHandler)
|
||||
src.nsInformer.Informer().AddEventHandler(eventHandler)
|
||||
_, _ = src.gwInformer.Informer().AddEventHandler(eventHandler)
|
||||
_, _ = src.rtInformer.Informer().AddEventHandler(eventHandler)
|
||||
_, _ = src.nsInformer.Informer().AddEventHandler(eventHandler)
|
||||
}
|
||||
|
||||
func (src *gatewayRouteSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
|
||||
func (src *gatewayRouteSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
|
||||
var endpoints []*endpoint.Endpoint
|
||||
routes, err := src.rtInformer.List(src.rtNamespace, src.rtLabels)
|
||||
if err != nil {
|
||||
|
||||
@ -17,6 +17,8 @@ limitations under the License.
|
||||
package source
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
v1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
@ -25,8 +27,8 @@ import (
|
||||
)
|
||||
|
||||
// NewGatewayGRPCRouteSource creates a new Gateway GRPCRoute source with the given config.
|
||||
func NewGatewayGRPCRouteSource(clients ClientGenerator, config *Config) (Source, error) {
|
||||
return newGatewayRouteSource(clients, config, "GRPCRoute", func(factory informers.SharedInformerFactory) gatewayRouteInformer {
|
||||
func NewGatewayGRPCRouteSource(ctx context.Context, clients ClientGenerator, config *Config) (Source, error) {
|
||||
return newGatewayRouteSource(ctx, clients, config, "GRPCRoute", func(factory informers.SharedInformerFactory) gatewayRouteInformer {
|
||||
return &gatewayGRPCRouteInformer{factory.Gateway().V1().GRPCRoutes()}
|
||||
})
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package source
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@ -34,13 +35,15 @@ import (
|
||||
func TestGatewayGRPCRouteSourceEndpoints(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
gwClient := gatewayfake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewClientset()
|
||||
clients := new(MockClientGenerator)
|
||||
clients.On("GatewayClient").Return(gwClient, nil)
|
||||
clients.On("KubeClient").Return(kubeClient, nil)
|
||||
|
||||
ctx := context.Background()
|
||||
ns := &corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "default",
|
||||
@ -88,7 +91,7 @@ func TestGatewayGRPCRouteSourceEndpoints(t *testing.T) {
|
||||
_, err = gwClient.GatewayV1().GRPCRoutes(rt.Namespace).Create(ctx, rt, metav1.CreateOptions{})
|
||||
require.NoError(t, err, "failed to create GRPCRoute")
|
||||
|
||||
src, err := NewGatewayGRPCRouteSource(clients, &Config{
|
||||
src, err := NewGatewayGRPCRouteSource(ctx, clients, &Config{
|
||||
FQDNTemplate: "{{.Name}}-template.foobar.internal",
|
||||
CombineFQDNAndAnnotation: true,
|
||||
})
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
// TODO: refactor common DNS label functions into a shared package.
|
||||
// toLowerCaseASCII returns a lower-case version of in. See RFC 6125 6.4.1. We use
|
||||
// an explicitly ASCII function to avoid any sharp corners resulting from
|
||||
// performing Unicode operations on DNS labels.
|
||||
|
||||
@ -17,6 +17,8 @@ limitations under the License.
|
||||
package source
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
v1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
@ -26,8 +28,8 @@ import (
|
||||
)
|
||||
|
||||
// NewGatewayHTTPRouteSource creates a new Gateway HTTPRoute source with the given config.
|
||||
func NewGatewayHTTPRouteSource(clients ClientGenerator, config *Config) (Source, error) {
|
||||
return newGatewayRouteSource(clients, config, "HTTPRoute", func(factory informers.SharedInformerFactory) gatewayRouteInformer {
|
||||
func NewGatewayHTTPRouteSource(ctx context.Context, clients ClientGenerator, config *Config) (Source, error) {
|
||||
return newGatewayRouteSource(ctx, clients, config, "HTTPRoute", func(factory informers.SharedInformerFactory) gatewayRouteInformer {
|
||||
return &gatewayHTTPRouteInformer{factory.Gateway().V1beta1().HTTPRoutes()}
|
||||
})
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package source
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -1547,7 +1548,9 @@ func TestGatewayHTTPRouteSourceEndpoints(t *testing.T) {
|
||||
t.Parallel()
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
gwClient := gatewayfake.NewSimpleClientset()
|
||||
for _, gw := range tt.gateways {
|
||||
_, err := gwClient.GatewayV1beta1().Gateways(gw.Namespace).Create(ctx, gw, metav1.CreateOptions{})
|
||||
@ -1558,7 +1561,7 @@ func TestGatewayHTTPRouteSourceEndpoints(t *testing.T) {
|
||||
_, err := gwClient.GatewayV1beta1().HTTPRoutes(rt.Namespace).Create(ctx, rt, metav1.CreateOptions{})
|
||||
require.NoError(t, err, "failed to create HTTPRoute")
|
||||
}
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewClientset()
|
||||
for _, ns := range tt.namespaces {
|
||||
_, err := kubeClient.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
|
||||
require.NoError(t, err, "failed to create Namespace")
|
||||
@ -1568,7 +1571,7 @@ func TestGatewayHTTPRouteSourceEndpoints(t *testing.T) {
|
||||
clients.On("GatewayClient").Return(gwClient, nil)
|
||||
clients.On("KubeClient").Return(kubeClient, nil)
|
||||
|
||||
src, err := NewGatewayHTTPRouteSource(clients, &tt.config)
|
||||
src, err := NewGatewayHTTPRouteSource(ctx, clients, &tt.config)
|
||||
require.NoError(t, err, "failed to create Gateway HTTPRoute Source")
|
||||
|
||||
hook := testutils.LogsUnderTestWithLogLevel(log.DebugLevel, t)
|
||||
|
||||
@ -17,6 +17,8 @@ limitations under the License.
|
||||
package source
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
v1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
@ -26,8 +28,8 @@ import (
|
||||
)
|
||||
|
||||
// NewGatewayTCPRouteSource creates a new Gateway TCPRoute source with the given config.
|
||||
func NewGatewayTCPRouteSource(clients ClientGenerator, config *Config) (Source, error) {
|
||||
return newGatewayRouteSource(clients, config, "TCPRoute", func(factory informers.SharedInformerFactory) gatewayRouteInformer {
|
||||
func NewGatewayTCPRouteSource(ctx context.Context, clients ClientGenerator, config *Config) (Source, error) {
|
||||
return newGatewayRouteSource(ctx, clients, config, "TCPRoute", func(factory informers.SharedInformerFactory) gatewayRouteInformer {
|
||||
return &gatewayTCPRouteInformer{factory.Gateway().V1alpha2().TCPRoutes()}
|
||||
})
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package source
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@ -35,13 +36,15 @@ import (
|
||||
func TestGatewayTCPRouteSourceEndpoints(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
gwClient := gatewayfake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewClientset()
|
||||
clients := new(MockClientGenerator)
|
||||
clients.On("GatewayClient").Return(gwClient, nil)
|
||||
clients.On("KubeClient").Return(kubeClient, nil)
|
||||
|
||||
ctx := context.Background()
|
||||
ns := &corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "default",
|
||||
@ -88,7 +91,7 @@ func TestGatewayTCPRouteSourceEndpoints(t *testing.T) {
|
||||
_, err = gwClient.GatewayV1alpha2().TCPRoutes(rt.Namespace).Create(ctx, rt, metav1.CreateOptions{})
|
||||
require.NoError(t, err, "failed to create TCPRoute")
|
||||
|
||||
src, err := NewGatewayTCPRouteSource(clients, &Config{
|
||||
src, err := NewGatewayTCPRouteSource(ctx, clients, &Config{
|
||||
FQDNTemplate: "{{.Name}}-template.foobar.internal",
|
||||
CombineFQDNAndAnnotation: true,
|
||||
})
|
||||
|
||||
@ -17,6 +17,8 @@ limitations under the License.
|
||||
package source
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
v1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
@ -26,8 +28,8 @@ import (
|
||||
)
|
||||
|
||||
// NewGatewayTLSRouteSource creates a new Gateway TLSRoute source with the given config.
|
||||
func NewGatewayTLSRouteSource(clients ClientGenerator, config *Config) (Source, error) {
|
||||
return newGatewayRouteSource(clients, config, "TLSRoute", func(factory informers.SharedInformerFactory) gatewayRouteInformer {
|
||||
func NewGatewayTLSRouteSource(ctx context.Context, clients ClientGenerator, config *Config) (Source, error) {
|
||||
return newGatewayRouteSource(ctx, clients, config, "TLSRoute", func(factory informers.SharedInformerFactory) gatewayRouteInformer {
|
||||
return &gatewayTLSRouteInformer{factory.Gateway().V1alpha2().TLSRoutes()}
|
||||
})
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package source
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@ -35,13 +36,15 @@ import (
|
||||
func TestGatewayTLSRouteSourceEndpoints(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
gwClient := gatewayfake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewClientset()
|
||||
clients := new(MockClientGenerator)
|
||||
clients.On("GatewayClient").Return(gwClient, nil)
|
||||
clients.On("KubeClient").Return(kubeClient, nil)
|
||||
|
||||
ctx := context.Background()
|
||||
ns := &corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "default",
|
||||
@ -89,7 +92,7 @@ func TestGatewayTLSRouteSourceEndpoints(t *testing.T) {
|
||||
_, err = gwClient.GatewayV1alpha2().TLSRoutes(rt.Namespace).Create(ctx, rt, metav1.CreateOptions{})
|
||||
require.NoError(t, err, "failed to create TLSRoute")
|
||||
|
||||
src, err := NewGatewayTLSRouteSource(clients, &Config{
|
||||
src, err := NewGatewayTLSRouteSource(ctx, clients, &Config{
|
||||
FQDNTemplate: "{{.Name}}-template.foobar.internal",
|
||||
CombineFQDNAndAnnotation: true,
|
||||
})
|
||||
|
||||
@ -17,6 +17,8 @@ limitations under the License.
|
||||
package source
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
v1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
@ -26,8 +28,8 @@ import (
|
||||
)
|
||||
|
||||
// NewGatewayUDPRouteSource creates a new Gateway UDPRoute source with the given config.
|
||||
func NewGatewayUDPRouteSource(clients ClientGenerator, config *Config) (Source, error) {
|
||||
return newGatewayRouteSource(clients, config, "UDPRoute", func(factory informers.SharedInformerFactory) gatewayRouteInformer {
|
||||
func NewGatewayUDPRouteSource(ctx context.Context, clients ClientGenerator, config *Config) (Source, error) {
|
||||
return newGatewayRouteSource(ctx, clients, config, "UDPRoute", func(factory informers.SharedInformerFactory) gatewayRouteInformer {
|
||||
return &gatewayUDPRouteInformer{factory.Gateway().V1alpha2().UDPRoutes()}
|
||||
})
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package source
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@ -35,13 +36,15 @@ import (
|
||||
func TestGatewayUDPRouteSourceEndpoints(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
gwClient := gatewayfake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewClientset()
|
||||
clients := new(MockClientGenerator)
|
||||
clients.On("GatewayClient").Return(gwClient, nil)
|
||||
clients.On("KubeClient").Return(kubeClient, nil)
|
||||
|
||||
ctx := context.Background()
|
||||
ns := &corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "default",
|
||||
@ -88,7 +91,7 @@ func TestGatewayUDPRouteSourceEndpoints(t *testing.T) {
|
||||
_, err = gwClient.GatewayV1alpha2().UDPRoutes(rt.Namespace).Create(ctx, rt, metav1.CreateOptions{})
|
||||
require.NoError(t, err, "failed to create UDPRoute")
|
||||
|
||||
src, err := NewGatewayUDPRouteSource(clients, &Config{
|
||||
src, err := NewGatewayUDPRouteSource(ctx, clients, &Config{
|
||||
FQDNTemplate: "{{.Name}}-template.foobar.internal",
|
||||
CombineFQDNAndAnnotation: true,
|
||||
})
|
||||
|
||||
@ -104,7 +104,7 @@ func NewIngressSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -349,7 +349,7 @@ func targetsFromIngressStatus(status networkv1.IngressStatus) endpoint.Targets {
|
||||
return targets
|
||||
}
|
||||
|
||||
func (sc *ingressSource) AddEventHandler(ctx context.Context, handler func()) {
|
||||
func (sc *ingressSource) AddEventHandler(_ context.Context, handler func()) {
|
||||
log.Debug("Adding event handler for ingress")
|
||||
|
||||
// Right now there is no way to remove event handler from informer, see:
|
||||
|
||||
@ -59,16 +59,18 @@ type kongTCPIngressSource struct {
|
||||
}
|
||||
|
||||
// NewKongTCPIngressSource creates a new kongTCPIngressSource with the given config.
|
||||
func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string, ignoreHostnameAnnotation bool) (Source, error) {
|
||||
var err error
|
||||
|
||||
func NewKongTCPIngressSource(
|
||||
ctx context.Context,
|
||||
dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface,
|
||||
namespace, annotationFilter string, ignoreHostnameAnnotation bool,
|
||||
) (Source, error) {
|
||||
// Use shared informer to listen for add/update/delete of Host in the specified namespace.
|
||||
// Set resync period to 0, to prevent processing when nothing has changed.
|
||||
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
|
||||
kongTCPIngressInformer := informerFactory.ForResource(kongGroupdVersionResource)
|
||||
|
||||
// Add default resource event handlers to properly initialize informer.
|
||||
kongTCPIngressInformer.Informer().AddEventHandler(
|
||||
_, _ = kongTCPIngressInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
},
|
||||
@ -78,7 +80,7 @@ func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Inte
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -100,7 +102,7 @@ func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Inte
|
||||
|
||||
// Endpoints returns endpoint objects for each host-target combination that should be processed.
|
||||
// Retrieves all TCPIngresses in the source's namespace(s).
|
||||
func (sc *kongTCPIngressSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
|
||||
func (sc *kongTCPIngressSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
|
||||
tis, err := sc.kongTCPIngressInformer.Lister().ByNamespace(sc.namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -214,12 +216,12 @@ func (sc *kongTCPIngressSource) endpointsFromTCPIngress(tcpIngress *TCPIngress,
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
func (sc *kongTCPIngressSource) AddEventHandler(ctx context.Context, handler func()) {
|
||||
func (sc *kongTCPIngressSource) AddEventHandler(_ context.Context, handler func()) {
|
||||
log.Debug("Adding event handler for TCPIngress")
|
||||
|
||||
// Right now there is no way to remove event handler from informer, see:
|
||||
// https://github.com/kubernetes/kubernetes/issues/79610
|
||||
sc.kongTCPIngressInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
|
||||
_, _ = sc.kongTCPIngressInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
|
||||
}
|
||||
|
||||
// newUnstructuredConverter returns a new unstructuredConverter initialized
|
||||
|
||||
@ -73,7 +73,7 @@ func NewNodeSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@ -74,11 +74,11 @@ func NewOcpRouteSource(
|
||||
|
||||
// Use a shared informer to listen for add/update/delete of Routes in the specified namespace.
|
||||
// Set resync period to 0, to prevent processing when nothing has changed.
|
||||
informerFactory := extInformers.NewFilteredSharedInformerFactory(ocpClient, 0*time.Second, namespace, nil)
|
||||
informerFactory := extInformers.NewSharedInformerFactoryWithOptions(ocpClient, 0*time.Second, extInformers.WithNamespace(namespace))
|
||||
informer := informerFactory.Route().V1().Routes()
|
||||
|
||||
// Add default resource event handlers to properly initialize informer.
|
||||
informer.Informer().AddEventHandler(
|
||||
_, _ = informer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
},
|
||||
@ -88,7 +88,7 @@ func NewOcpRouteSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -105,18 +105,18 @@ func NewOcpRouteSource(
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ors *ocpRouteSource) AddEventHandler(ctx context.Context, handler func()) {
|
||||
func (ors *ocpRouteSource) AddEventHandler(_ context.Context, handler func()) {
|
||||
log.Debug("Adding event handler for openshift route")
|
||||
|
||||
// Right now there is no way to remove event handler from informer, see:
|
||||
// https://github.com/kubernetes/kubernetes/issues/79610
|
||||
ors.routeInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
|
||||
_, _ = ors.routeInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
|
||||
}
|
||||
|
||||
// Endpoints returns endpoint objects for each host-target combination that should be processed.
|
||||
// Retrieves all OpenShift Route resources on all namespaces, unless an explicit namespace
|
||||
// is specified in ocpRouteSource.
|
||||
func (ors *ocpRouteSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
|
||||
func (ors *ocpRouteSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
|
||||
ocpRoutes, err := ors.routeInformer.Lister().Routes(ors.namespace).List(ors.labelSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -83,7 +83,7 @@ func NewPodSource(
|
||||
// The pod informer will otherwise store a full in-memory, go-typed copy of all pod schemas in the cluster.
|
||||
// If watchList is not used it will not prevent memory bursts on the initial informer sync.
|
||||
// When fqdnTemplate is used the entire pod needs to be provided to the rendering call, but the informer itself becomes unneeded.
|
||||
podInformer.Informer().SetTransform(func(i any) (any, error) {
|
||||
_ = podInformer.Informer().SetTransform(func(i any) (any, error) {
|
||||
pod, ok := i.(*corev1.Pod)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("object is not a pod")
|
||||
@ -116,7 +116,7 @@ func NewPodSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@ -201,7 +201,7 @@ func NewServiceSource(
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@ -341,15 +341,15 @@ func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg
|
||||
case types.Pod:
|
||||
return buildPodSource(ctx, p, cfg)
|
||||
case types.GatewayHttpRoute:
|
||||
return NewGatewayHTTPRouteSource(p, cfg)
|
||||
return NewGatewayHTTPRouteSource(ctx, p, cfg)
|
||||
case types.GatewayGrpcRoute:
|
||||
return NewGatewayGRPCRouteSource(p, cfg)
|
||||
return NewGatewayGRPCRouteSource(ctx, p, cfg)
|
||||
case types.GatewayTlsRoute:
|
||||
return NewGatewayTLSRouteSource(p, cfg)
|
||||
return NewGatewayTLSRouteSource(ctx, p, cfg)
|
||||
case types.GatewayTcpRoute:
|
||||
return NewGatewayTCPRouteSource(p, cfg)
|
||||
return NewGatewayTCPRouteSource(ctx, p, cfg)
|
||||
case types.GatewayUdpRoute:
|
||||
return NewGatewayUDPRouteSource(p, cfg)
|
||||
return NewGatewayUDPRouteSource(ctx, p, cfg)
|
||||
case types.IstioGateway:
|
||||
return buildIstioGatewaySource(ctx, p, cfg)
|
||||
case types.IstioVirtualService:
|
||||
|
||||
@ -150,10 +150,10 @@ func NewTraefikSource(
|
||||
)
|
||||
}
|
||||
|
||||
informerFactory.Start((ctx.Done()))
|
||||
informerFactory.Start(ctx.Done())
|
||||
|
||||
// wait for the local cache to be populated.
|
||||
if err := informers.WaitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
|
||||
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user