From 4f41229820a40bdeb5f180799f85415aad0e4bcc Mon Sep 17 00:00:00 2001 From: Andrey Lebedev Date: Sat, 22 Jan 2022 21:11:06 +0100 Subject: [PATCH] Pass stop channel to informer factory instances --- main.go | 2 +- source/ambassador_host.go | 5 ++--- source/contour_httpproxy.go | 5 ++--- source/contour_httpproxy_test.go | 4 ++++ source/ingress.go | 6 ++---- source/ingress_test.go | 3 +++ source/istio_gateway.go | 7 +++---- source/istio_gateway_test.go | 4 ++++ source/istio_virtualservice.go | 7 +++---- source/istio_virtualservice_test.go | 4 ++++ source/kong_tcpingress.go | 6 ++---- source/kong_tcpingress_test.go | 2 +- source/node.go | 6 ++---- source/node_test.go | 2 ++ source/openshift_route.go | 5 ++--- source/openshift_route_test.go | 3 +++ source/pod.go | 5 ++--- source/pod_test.go | 2 +- source/service.go | 6 ++---- source/service_test.go | 10 ++++++++++ source/store.go | 27 ++++++++++++++------------- source/store_test.go | 19 ++++++++++--------- 22 files changed, 79 insertions(+), 61 deletions(-) diff --git a/main.go b/main.go index 28d6b046c..0d73ef2ad 100644 --- a/main.go +++ b/main.go @@ -136,7 +136,7 @@ func main() { } // Lookup all the selected sources by names and pass them the desired configuration. - sources, err := source.ByNames(&source.SingletonClientGenerator{ + sources, err := source.ByNames(ctx, &source.SingletonClientGenerator{ KubeConfig: cfg.KubeConfig, APIServerURL: cfg.APIServerURL, // If update events are enabled, disable timeout. diff --git a/source/ambassador_host.go b/source/ambassador_host.go index 04e26b5ff..a2214dcd3 100644 --- a/source/ambassador_host.go +++ b/source/ambassador_host.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" @@ -64,6 +63,7 @@ type ambassadorHostSource struct { // NewAmbassadorHostSource creates a new ambassadorHostSource with the given config. func NewAmbassadorHostSource( + ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string) (Source, error) { @@ -82,8 +82,7 @@ func NewAmbassadorHostSource( }, ) - // TODO informer is not explicitly stopped since controller is not passing in its channel. - informerFactory.Start(wait.NeverStop) + informerFactory.Start(ctx.Done()) if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { return nil, err diff --git a/source/contour_httpproxy.go b/source/contour_httpproxy.go index 7fa23acd0..109e50f8d 100644 --- a/source/contour_httpproxy.go +++ b/source/contour_httpproxy.go @@ -28,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" @@ -53,6 +52,7 @@ type httpProxySource struct { // NewContourHTTPProxySource creates a new contourHTTPProxySource with the given config. func NewContourHTTPProxySource( + ctx context.Context, dynamicKubeClient dynamic.Interface, namespace string, annotationFilter string, @@ -78,8 +78,7 @@ func NewContourHTTPProxySource( }, ) - // TODO informer is not explicitly stopped since controller is not passing in its channel. - informerFactory.Start(wait.NeverStop) + informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { diff --git a/source/contour_httpproxy_test.go b/source/contour_httpproxy_test.go index 227f95960..1905d0eee 100644 --- a/source/contour_httpproxy_test.go +++ b/source/contour_httpproxy_test.go @@ -88,6 +88,7 @@ func (suite *HTTPProxySuite) SetupTest() { var err error suite.source, err = NewContourHTTPProxySource( + context.TODO(), fakeDynamicClient, "default", "", @@ -184,6 +185,7 @@ func TestNewContourHTTPProxySource(t *testing.T) { fakeDynamicClient, _ := newDynamicKubernetesClient() _, err := NewContourHTTPProxySource( + context.TODO(), fakeDynamicClient, "", ti.annotationFilter, @@ -1033,6 +1035,7 @@ func testHTTPProxyEndpoints(t *testing.T) { } httpProxySource, err := NewContourHTTPProxySource( + context.TODO(), fakeDynamicClient, ti.targetNamespace, ti.annotationFilter, @@ -1059,6 +1062,7 @@ func newTestHTTPProxySource() (*httpProxySource, error) { fakeDynamicClient, _ := newDynamicKubernetesClient() src, err := NewContourHTTPProxySource( + context.TODO(), fakeDynamicClient, "default", "", diff --git a/source/ingress.go b/source/ingress.go index 9890f1fe0..76d5429e7 100644 --- a/source/ingress.go +++ b/source/ingress.go @@ -26,7 +26,6 @@ import ( log "github.com/sirupsen/logrus" networkv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" netinformers "k8s.io/client-go/informers/networking/v1" "k8s.io/client-go/kubernetes" @@ -64,7 +63,7 @@ type ingressSource struct { } // NewIngressSource creates a new ingressSource with the given config. -func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, ignoreHostnameAnnotation bool, ignoreIngressTLSSpec bool, ignoreIngressRulesSpec bool, labelSelector labels.Selector) (Source, error) { +func NewIngressSource(ctx context.Context, kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, ignoreHostnameAnnotation bool, ignoreIngressTLSSpec bool, ignoreIngressRulesSpec bool, labelSelector labels.Selector) (Source, error) { tmpl, err := parseTemplate(fqdnTemplate) if err != nil { return nil, err @@ -83,8 +82,7 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt }, ) - // TODO informer is not explicitly stopped since controller is not passing in its channel. - informerFactory.Start(wait.NeverStop) + informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. if err := waitForCacheSync(context.Background(), informerFactory); err != nil { diff --git a/source/ingress_test.go b/source/ingress_test.go index 32805b7ad..75a970356 100644 --- a/source/ingress_test.go +++ b/source/ingress_test.go @@ -56,6 +56,7 @@ func (suite *IngressSuite) SetupTest() { suite.NoError(err, "should succeed") suite.sc, err = NewIngressSource( + context.TODO(), fakeClient, "", "", @@ -138,6 +139,7 @@ func TestNewIngressSource(t *testing.T) { t.Parallel() _, err := NewIngressSource( + context.TODO(), fake.NewSimpleClientset(), "", ti.annotationFilter, @@ -1225,6 +1227,7 @@ func testIngressEndpoints(t *testing.T) { } source, _ := NewIngressSource( + context.TODO(), fakeClient, ti.targetNamespace, ti.annotationFilter, diff --git a/source/istio_gateway.go b/source/istio_gateway.go index 9472c6750..ffd8b79d1 100644 --- a/source/istio_gateway.go +++ b/source/istio_gateway.go @@ -30,7 +30,6 @@ import ( networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -56,6 +55,7 @@ type gatewaySource struct { // NewIstioGatewaySource creates a new gatewaySource with the given config. func NewIstioGatewaySource( + ctx context.Context, kubeClient kubernetes.Interface, istioClient istioclient.Interface, namespace string, @@ -93,9 +93,8 @@ func NewIstioGatewaySource( }, ) - // TODO informer is not explicitly stopped since controller is not passing in its channel. - informerFactory.Start(wait.NeverStop) - istioInformerFactory.Start(wait.NeverStop) + informerFactory.Start(ctx.Done()) + istioInformerFactory.Start(ctx.Done()) // wait for the local cache to be populated. if err := waitForCacheSync(context.Background(), informerFactory); err != nil { diff --git a/source/istio_gateway_test.go b/source/istio_gateway_test.go index c94a06690..fefa46c53 100644 --- a/source/istio_gateway_test.go +++ b/source/istio_gateway_test.go @@ -69,6 +69,7 @@ func (suite *GatewaySuite) SetupTest() { } suite.source, err = NewIstioGatewaySource( + context.TODO(), fakeKubernetesClient, fakeIstioClient, "", @@ -142,6 +143,7 @@ func TestNewIstioGatewaySource(t *testing.T) { t.Parallel() _, err := NewIstioGatewaySource( + context.TODO(), fake.NewSimpleClientset(), istiofake.NewSimpleClientset(), "", @@ -1165,6 +1167,7 @@ func testGatewayEndpoints(t *testing.T) { } gatewaySource, err := NewIstioGatewaySource( + context.TODO(), fakeKubernetesClient, fakeIstioClient, ti.targetNamespace, @@ -1201,6 +1204,7 @@ func newTestGatewaySource(loadBalancerList []fakeIngressGatewayService) (*gatewa } src, err := NewIstioGatewaySource( + context.TODO(), fakeKubernetesClient, fakeIstioClient, "", diff --git a/source/istio_virtualservice.go b/source/istio_virtualservice.go index 4a96ceb9b..d69414336 100644 --- a/source/istio_virtualservice.go +++ b/source/istio_virtualservice.go @@ -31,7 +31,6 @@ import ( networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -60,6 +59,7 @@ type virtualServiceSource struct { // NewIstioVirtualServiceSource creates a new virtualServiceSource with the given config. func NewIstioVirtualServiceSource( + ctx context.Context, kubeClient kubernetes.Interface, istioClient istioclient.Interface, namespace string, @@ -97,9 +97,8 @@ func NewIstioVirtualServiceSource( }, ) - // TODO informer is not explicitly stopped since controller is not passing in its channel. - informerFactory.Start(wait.NeverStop) - istioInformerFactory.Start(wait.NeverStop) + informerFactory.Start(ctx.Done()) + istioInformerFactory.Start(ctx.Done()) // wait for the local cache to be populated. if err := waitForCacheSync(context.Background(), informerFactory); err != nil { diff --git a/source/istio_virtualservice_test.go b/source/istio_virtualservice_test.go index e7dc1ce4f..68e90aadb 100644 --- a/source/istio_virtualservice_test.go +++ b/source/istio_virtualservice_test.go @@ -89,6 +89,7 @@ func (suite *VirtualServiceSuite) SetupTest() { suite.NoError(err, "should succeed") suite.source, err = NewIstioVirtualServiceSource( + context.TODO(), fakeKubernetesClient, fakeIstioClient, "", @@ -165,6 +166,7 @@ func TestNewIstioVirtualServiceSource(t *testing.T) { t.Parallel() _, err := NewIstioVirtualServiceSource( + context.TODO(), fake.NewSimpleClientset(), istiofake.NewSimpleClientset(), "", @@ -1482,6 +1484,7 @@ func testVirtualServiceEndpoints(t *testing.T) { } virtualServiceSource, err := NewIstioVirtualServiceSource( + context.TODO(), fakeKubernetesClient, fakeIstioClient, ti.targetNamespace, @@ -1557,6 +1560,7 @@ func newTestVirtualServiceSource(loadBalancerList []fakeIngressGatewayService, g } src, err := NewIstioVirtualServiceSource( + context.TODO(), fakeKubernetesClient, fakeIstioClient, "", diff --git a/source/kong_tcpingress.go b/source/kong_tcpingress.go index 028f0fb32..a0e8de1f4 100644 --- a/source/kong_tcpingress.go +++ b/source/kong_tcpingress.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" @@ -57,7 +56,7 @@ type kongTCPIngressSource struct { } // NewKongTCPIngressSource creates a new kongTCPIngressSource with the given config. -func NewKongTCPIngressSource(dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string) (Source, error) { +func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string) (Source, error) { var err error // Use shared informer to listen for add/update/delete of Host in the specified namespace. @@ -73,8 +72,7 @@ func NewKongTCPIngressSource(dynamicKubeClient dynamic.Interface, kubeClient kub }, ) - // TODO informer is not explicitly stopped since controller is not passing in its channel. - informerFactory.Start(wait.NeverStop) + informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { diff --git a/source/kong_tcpingress_test.go b/source/kong_tcpingress_test.go index bb9dcc4be..a304c5b5f 100644 --- a/source/kong_tcpingress_test.go +++ b/source/kong_tcpingress_test.go @@ -241,7 +241,7 @@ func TestKongTCPIngressEndpoints(t *testing.T) { _, err = fakeDynamicClient.Resource(kongGroupdVersionResource).Namespace(defaultKongNamespace).Create(context.Background(), &tcpi, metav1.CreateOptions{}) assert.NoError(t, err) - source, err := NewKongTCPIngressSource(fakeDynamicClient, fakeKubernetesClient, defaultKongNamespace, "kubernetes.io/ingress.class=kong") + source, err := NewKongTCPIngressSource(context.TODO(), fakeDynamicClient, fakeKubernetesClient, defaultKongNamespace, "kubernetes.io/ingress.class=kong") assert.NoError(t, err) assert.NotNil(t, source) diff --git a/source/node.go b/source/node.go index 62e83b1bb..b0e672d73 100644 --- a/source/node.go +++ b/source/node.go @@ -25,7 +25,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -42,7 +41,7 @@ type nodeSource struct { } // NewNodeSource creates a new nodeSource with the given config. -func NewNodeSource(kubeClient kubernetes.Interface, annotationFilter, fqdnTemplate string) (Source, error) { +func NewNodeSource(ctx context.Context, kubeClient kubernetes.Interface, annotationFilter, fqdnTemplate string) (Source, error) { tmpl, err := parseTemplate(fqdnTemplate) if err != nil { return nil, err @@ -62,8 +61,7 @@ func NewNodeSource(kubeClient kubernetes.Interface, annotationFilter, fqdnTempla }, ) - // TODO informer is not explicitly stopped since controller is not passing in its channel. - informerFactory.Start(wait.NeverStop) + informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. if err := waitForCacheSync(context.Background(), informerFactory); err != nil { diff --git a/source/node_test.go b/source/node_test.go index 4d24dfbf0..901c1baa1 100644 --- a/source/node_test.go +++ b/source/node_test.go @@ -71,6 +71,7 @@ func testNodeSourceNewNodeSource(t *testing.T) { t.Parallel() _, err := NewNodeSource( + context.TODO(), fake.NewSimpleClientset(), ti.annotationFilter, ti.fqdnTemplate, @@ -353,6 +354,7 @@ func testNodeSourceEndpoints(t *testing.T) { // Create our object under test and get the endpoints. client, err := NewNodeSource( + context.TODO(), kubernetes, tc.annotationFilter, tc.fqdnTemplate, diff --git a/source/openshift_route.go b/source/openshift_route.go index 82cd7edff..a1bc08857 100644 --- a/source/openshift_route.go +++ b/source/openshift_route.go @@ -29,7 +29,6 @@ import ( log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "sigs.k8s.io/external-dns/endpoint" @@ -54,6 +53,7 @@ type ocpRouteSource struct { // NewOcpRouteSource creates a new ocpRouteSource with the given config. func NewOcpRouteSource( + ctx context.Context, ocpClient versioned.Interface, namespace string, annotationFilter string, @@ -81,8 +81,7 @@ func NewOcpRouteSource( }, ) - // TODO informer is not explicitly stopped since controller is not passing in its channel. - informerFactory.Start(wait.NeverStop) + informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. if err := waitForCacheSync(context.Background(), informerFactory); err != nil { diff --git a/source/openshift_route_test.go b/source/openshift_route_test.go index 7dc851990..1c689c28d 100644 --- a/source/openshift_route_test.go +++ b/source/openshift_route_test.go @@ -43,6 +43,7 @@ func (suite *OCPRouteSuite) SetupTest() { var err error suite.sc, err = NewOcpRouteSource( + context.TODO(), fakeClient, "", "", @@ -141,6 +142,7 @@ func testOcpRouteSourceNewOcpRouteSource(t *testing.T) { t.Parallel() _, err := NewOcpRouteSource( + context.TODO(), fake.NewSimpleClientset(), "", ti.annotationFilter, @@ -439,6 +441,7 @@ func testOcpRouteSourceEndpoints(t *testing.T) { require.NoError(t, err) source, err := NewOcpRouteSource( + context.TODO(), fakeClient, "", "", diff --git a/source/pod.go b/source/pod.go index b2df23bfa..dae6a7402 100644 --- a/source/pod.go +++ b/source/pod.go @@ -24,7 +24,6 @@ import ( log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -40,7 +39,7 @@ type podSource struct { } // NewPodSource creates a new podSource with the given config. -func NewPodSource(kubeClient kubernetes.Interface, namespace string, compatibility string) (Source, error) { +func NewPodSource(ctx context.Context, kubeClient kubernetes.Interface, namespace string, compatibility string) (Source, error) { informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace)) podInformer := informerFactory.Core().V1().Pods() nodeInformer := informerFactory.Core().V1().Nodes() @@ -58,7 +57,7 @@ func NewPodSource(kubeClient kubernetes.Interface, namespace string, compatibili }, ) - informerFactory.Start(wait.NeverStop) + informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. if err := waitForCacheSync(context.Background(), informerFactory); err != nil { diff --git a/source/pod_test.go b/source/pod_test.go index 675b8cfb8..c138aaf5f 100644 --- a/source/pod_test.go +++ b/source/pod_test.go @@ -412,7 +412,7 @@ func TestPodSource(t *testing.T) { } } - client, err := NewPodSource(kubernetes, tc.targetNamespace, tc.compatibility) + client, err := NewPodSource(context.TODO(), kubernetes, tc.targetNamespace, tc.compatibility) require.NoError(t, err) endpoints, err := client.Endpoints(ctx) diff --git a/source/service.go b/source/service.go index e49ff2f49..ef3acd97d 100644 --- a/source/service.go +++ b/source/service.go @@ -27,7 +27,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -67,7 +66,7 @@ type serviceSource struct { } // NewServiceSource creates a new serviceSource with the given config. -func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal bool, publishHostIP bool, alwaysPublishNotReadyAddresses bool, serviceTypeFilter []string, ignoreHostnameAnnotation bool, labelSelector labels.Selector) (Source, error) { +func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal bool, publishHostIP bool, alwaysPublishNotReadyAddresses bool, serviceTypeFilter []string, ignoreHostnameAnnotation bool, labelSelector labels.Selector) (Source, error) { tmpl, err := parseTemplate(fqdnTemplate) if err != nil { return nil, err @@ -107,8 +106,7 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt }, ) - // TODO informer is not explicitly stopped since controller is not passing in its channel. - informerFactory.Start(wait.NeverStop) + informerFactory.Start(ctx.Done()) // wait for the local cache to be populated. if err := waitForCacheSync(context.Background(), informerFactory); err != nil { diff --git a/source/service_test.go b/source/service_test.go index 57a2056e8..a13e15169 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -65,6 +65,7 @@ func (suite *ServiceSuite) SetupTest() { suite.NoError(err, "should successfully create service") suite.sc, err = NewServiceSource( + context.TODO(), fakeClient, "", "", @@ -144,6 +145,7 @@ func testServiceSourceNewServiceSource(t *testing.T) { t.Parallel() _, err := NewServiceSource( + context.TODO(), fake.NewSimpleClientset(), "", ti.annotationFilter, @@ -1039,6 +1041,7 @@ func testServiceSourceEndpoints(t *testing.T) { // Create our object under test and get the endpoints. client, err := NewServiceSource( + context.TODO(), kubernetes, tc.targetNamespace, tc.annotationFilter, @@ -1227,6 +1230,7 @@ func testMultipleServicesEndpoints(t *testing.T) { // Create our object under test and get the endpoints. client, err := NewServiceSource( + context.TODO(), kubernetes, tc.targetNamespace, tc.annotationFilter, @@ -1391,6 +1395,7 @@ func TestClusterIpServices(t *testing.T) { } // Create our object under test and get the endpoints. client, _ := NewServiceSource( + context.TODO(), kubernetes, tc.targetNamespace, tc.annotationFilter, @@ -1960,6 +1965,7 @@ func TestServiceSourceNodePortServices(t *testing.T) { // Create our object under test and get the endpoints. client, _ := NewServiceSource( + context.TODO(), kubernetes, tc.targetNamespace, tc.annotationFilter, @@ -2295,6 +2301,7 @@ func TestHeadlessServices(t *testing.T) { // Create our object under test and get the endpoints. client, _ := NewServiceSource( + context.TODO(), kubernetes, tc.targetNamespace, "", @@ -2651,6 +2658,7 @@ func TestHeadlessServicesHostIP(t *testing.T) { // Create our object under test and get the endpoints. client, _ := NewServiceSource( + context.TODO(), kubernetes, tc.targetNamespace, "", @@ -2762,6 +2770,7 @@ func TestExternalServices(t *testing.T) { // Create our object under test and get the endpoints. client, _ := NewServiceSource( + context.TODO(), kubernetes, tc.targetNamespace, "", @@ -2815,6 +2824,7 @@ func BenchmarkServiceEndpoints(b *testing.B) { require.NoError(b, err) client, err := NewServiceSource( + context.TODO(), kubernetes, v1.NamespaceAll, "", diff --git a/source/store.go b/source/store.go index 4e01d2d18..942400aef 100644 --- a/source/store.go +++ b/source/store.go @@ -17,6 +17,7 @@ limitations under the License. package source import ( + "context" "net/http" "os" "strings" @@ -158,10 +159,10 @@ func (p *SingletonClientGenerator) OpenShiftClient() (openshift.Interface, error } // ByNames returns multiple Sources given multiple names. -func ByNames(p ClientGenerator, names []string, cfg *Config) ([]Source, error) { +func ByNames(ctx context.Context, p ClientGenerator, names []string, cfg *Config) ([]Source, error) { sources := []Source{} for _, name := range names { - source, err := BuildWithConfig(name, p, cfg) + source, err := BuildWithConfig(ctx, name, p, cfg) if err != nil { return nil, err } @@ -172,32 +173,32 @@ func ByNames(p ClientGenerator, names []string, cfg *Config) ([]Source, error) { } // BuildWithConfig allows to generate a Source implementation from the shared config -func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, error) { +func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg *Config) (Source, error) { switch source { case "node": client, err := p.KubeClient() if err != nil { return nil, err } - return NewNodeSource(client, cfg.AnnotationFilter, cfg.FQDNTemplate) + return NewNodeSource(ctx, client, cfg.AnnotationFilter, cfg.FQDNTemplate) case "service": client, err := p.KubeClient() if err != nil { return nil, err } - return NewServiceSource(client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.Compatibility, cfg.PublishInternal, cfg.PublishHostIP, cfg.AlwaysPublishNotReadyAddresses, cfg.ServiceTypeFilter, cfg.IgnoreHostnameAnnotation, cfg.LabelFilter) + return NewServiceSource(ctx, client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.Compatibility, cfg.PublishInternal, cfg.PublishHostIP, cfg.AlwaysPublishNotReadyAddresses, cfg.ServiceTypeFilter, cfg.IgnoreHostnameAnnotation, cfg.LabelFilter) case "ingress": client, err := p.KubeClient() if err != nil { return nil, err } - return NewIngressSource(client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation, cfg.IgnoreIngressTLSSpec, cfg.IgnoreIngressRulesSpec, cfg.LabelFilter) + return NewIngressSource(ctx, client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation, cfg.IgnoreIngressTLSSpec, cfg.IgnoreIngressRulesSpec, cfg.LabelFilter) case "pod": client, err := p.KubeClient() if err != nil { return nil, err } - return NewPodSource(client, cfg.Namespace, cfg.Compatibility) + return NewPodSource(ctx, client, cfg.Namespace, cfg.Compatibility) case "istio-gateway": kubernetesClient, err := p.KubeClient() if err != nil { @@ -207,7 +208,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err if err != nil { return nil, err } - return NewIstioGatewaySource(kubernetesClient, istioClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation) + return NewIstioGatewaySource(ctx, kubernetesClient, istioClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation) case "istio-virtualservice": kubernetesClient, err := p.KubeClient() if err != nil { @@ -217,7 +218,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err if err != nil { return nil, err } - return NewIstioVirtualServiceSource(kubernetesClient, istioClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation) + return NewIstioVirtualServiceSource(ctx, kubernetesClient, istioClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation) case "cloudfoundry": cfClient, err := p.CloudFoundryClient(cfg.CFAPIEndpoint, cfg.CFUsername, cfg.CFPassword) if err != nil { @@ -233,13 +234,13 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err if err != nil { return nil, err } - return NewAmbassadorHostSource(dynamicClient, kubernetesClient, cfg.Namespace) + return NewAmbassadorHostSource(ctx, dynamicClient, kubernetesClient, cfg.Namespace) case "contour-httpproxy": dynamicClient, err := p.DynamicKubernetesClient() if err != nil { return nil, err } - return NewContourHTTPProxySource(dynamicClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation) + return NewContourHTTPProxySource(ctx, dynamicClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation) case "gloo-proxy": kubernetesClient, err := p.KubeClient() if err != nil { @@ -255,7 +256,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err if err != nil { return nil, err } - return NewOcpRouteSource(ocpClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation, cfg.LabelFilter, cfg.OCPRouterName) + return NewOcpRouteSource(ctx, ocpClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation, cfg.LabelFilter, cfg.OCPRouterName) case "fake": return NewFakeSource(cfg.FQDNTemplate) case "connector": @@ -290,7 +291,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err if err != nil { return nil, err } - return NewKongTCPIngressSource(dynamicClient, kubernetesClient, cfg.Namespace, cfg.AnnotationFilter) + return NewKongTCPIngressSource(ctx, dynamicClient, kubernetesClient, cfg.Namespace, cfg.AnnotationFilter) } return nil, ErrSourceNotFound } diff --git a/source/store_test.go b/source/store_test.go index bc4950559..bd09ed0e7 100644 --- a/source/store_test.go +++ b/source/store_test.go @@ -17,6 +17,7 @@ limitations under the License. package source import ( + "context" "errors" "testing" @@ -115,7 +116,7 @@ func (suite *ByNamesTestSuite) TestAllInitialized() { }: "TCPIngressesList", }), nil) - sources, err := ByNames(mockClientGenerator, []string{"service", "ingress", "istio-gateway", "contour-httpproxy", "kong-tcpingress", "fake"}, minimalConfig) + sources, err := ByNames(context.TODO(), mockClientGenerator, []string{"service", "ingress", "istio-gateway", "contour-httpproxy", "kong-tcpingress", "fake"}, minimalConfig) suite.NoError(err, "should not generate errors") suite.Len(sources, 6, "should generate all six sources") } @@ -124,7 +125,7 @@ func (suite *ByNamesTestSuite) TestOnlyFake() { mockClientGenerator := new(MockClientGenerator) mockClientGenerator.On("KubeClient").Return(fakeKube.NewSimpleClientset(), nil) - sources, err := ByNames(mockClientGenerator, []string{"fake"}, minimalConfig) + sources, err := ByNames(context.TODO(), mockClientGenerator, []string{"fake"}, minimalConfig) suite.NoError(err, "should not generate errors") suite.Len(sources, 1, "should generate fake source") suite.Nil(mockClientGenerator.kubeClient, "client should not be created") @@ -134,7 +135,7 @@ func (suite *ByNamesTestSuite) TestSourceNotFound() { mockClientGenerator := new(MockClientGenerator) mockClientGenerator.On("KubeClient").Return(fakeKube.NewSimpleClientset(), nil) - sources, err := ByNames(mockClientGenerator, []string{"foo"}, minimalConfig) + sources, err := ByNames(context.TODO(), mockClientGenerator, []string{"foo"}, minimalConfig) suite.Equal(err, ErrSourceNotFound, "should return source not found") suite.Len(sources, 0, "should not returns any source") } @@ -143,16 +144,16 @@ func (suite *ByNamesTestSuite) TestKubeClientFails() { mockClientGenerator := new(MockClientGenerator) mockClientGenerator.On("KubeClient").Return(nil, errors.New("foo")) - _, err := ByNames(mockClientGenerator, []string{"service"}, minimalConfig) + _, err := ByNames(context.TODO(), mockClientGenerator, []string{"service"}, minimalConfig) suite.Error(err, "should return an error if kubernetes client cannot be created") - _, err = ByNames(mockClientGenerator, []string{"ingress"}, minimalConfig) + _, err = ByNames(context.TODO(), mockClientGenerator, []string{"ingress"}, minimalConfig) suite.Error(err, "should return an error if kubernetes client cannot be created") - _, err = ByNames(mockClientGenerator, []string{"istio-gateway"}, minimalConfig) + _, err = ByNames(context.TODO(), mockClientGenerator, []string{"istio-gateway"}, minimalConfig) suite.Error(err, "should return an error if kubernetes client cannot be created") - _, err = ByNames(mockClientGenerator, []string{"kong-tcpingress"}, minimalConfig) + _, err = ByNames(context.TODO(), mockClientGenerator, []string{"kong-tcpingress"}, minimalConfig) suite.Error(err, "should return an error if kubernetes client cannot be created") } @@ -162,10 +163,10 @@ func (suite *ByNamesTestSuite) TestIstioClientFails() { mockClientGenerator.On("IstioClient").Return(nil, errors.New("foo")) mockClientGenerator.On("DynamicKubernetesClient").Return(nil, errors.New("foo")) - _, err := ByNames(mockClientGenerator, []string{"istio-gateway"}, minimalConfig) + _, err := ByNames(context.TODO(), mockClientGenerator, []string{"istio-gateway"}, minimalConfig) suite.Error(err, "should return an error if istio client cannot be created") - _, err = ByNames(mockClientGenerator, []string{"contour-httpproxy"}, minimalConfig) + _, err = ByNames(context.TODO(), mockClientGenerator, []string{"contour-httpproxy"}, minimalConfig) suite.Error(err, "should return an error if contour client cannot be created") }