diff --git a/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml b/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml index 637bdf793..7056ef42f 100644 --- a/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml +++ b/cmd/k8s-operator/deploy/chart/templates/operator-rbac.yaml @@ -63,7 +63,10 @@ rules: verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] - apiGroups: [""] resources: ["pods"] - verbs: ["get","list","watch"] + verbs: ["get","list","watch", "update"] +- apiGroups: [""] + resources: ["pods/status"] + verbs: ["update"] - apiGroups: ["apps"] resources: ["statefulsets", "deployments"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] diff --git a/cmd/k8s-operator/deploy/manifests/operator.yaml b/cmd/k8s-operator/deploy/manifests/operator.yaml index def5716f6..e966ef559 100644 --- a/cmd/k8s-operator/deploy/manifests/operator.yaml +++ b/cmd/k8s-operator/deploy/manifests/operator.yaml @@ -4854,6 +4854,13 @@ rules: - get - list - watch + - update + - apiGroups: + - "" + resources: + - pods/status + verbs: + - update - apiGroups: - apps resources: diff --git a/cmd/k8s-operator/egress-pod-readiness.go b/cmd/k8s-operator/egress-pod-readiness.go new file mode 100644 index 000000000..a6c57bf9d --- /dev/null +++ b/cmd/k8s-operator/egress-pod-readiness.go @@ -0,0 +1,274 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package main + +import ( + "context" + "errors" + "fmt" + "net/http" + "slices" + "strings" + "sync/atomic" + "time" + + "go.uber.org/zap" + xslices "golang.org/x/exp/slices" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + tsapi "tailscale.com/k8s-operator/apis/v1alpha1" + "tailscale.com/kube/kubetypes" + "tailscale.com/logtail/backoff" + "tailscale.com/tstime" + "tailscale.com/util/httpm" +) + +const tsEgressReadinessGate = "tailscale.com/egress-services" + +// egressPodsReconciler is responsible for setting tailscale.com/egress-services condition on egress ProxyGroup Pods. +// The condition is used as a readiness gate for the Pod, meaning that kubelet will not mark the Pod as ready before the +// condition is set. The ProxyGroup StatefulSet updates are rolled out in such a way that no Pod is restarted, before +// the previous Pod is marked as ready, so ensuring that the Pod does not get marked as ready when it is not yet able to +// route traffic for egress service prevents downtime during restarts caused by no available endpoints left because +// every Pod has been recreated and is not yet added to endpoints. +// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-readiness-gate +type egressPodsReconciler struct { + client.Client + logger *zap.SugaredLogger + tsNamespace string + clock tstime.Clock + httpClient doer // http client that can be set to a mock client in tests + maxBackoff time.Duration // max backoff period between health check calls +} + +// Reconcile reconciles an egress ProxyGroup Pods on changes to those Pods and ProxyGroup EndpointSlices. It ensures +// that for each Pod who is ready to route traffic to all egress services for the ProxyGroup, the Pod has a +// tailscale.com/egress-services condition to set, so that kubelet will mark the Pod as ready. +// +// For the Pod to be ready +// to route traffic to the egress service, the kube proxy needs to have set up the Pod's IP as an endpoint for the +// ClusterIP Service corresponding to the egress service. +// +// Note that the endpoints for the ClusterIP Service are configured by the operator itself using custom +// EndpointSlices(egress-eps-reconciler), so the routing is not blocked on Pod's readiness. +// +// Each egress service has a corresponding ClusterIP Service, that exposes all user configured +// tailnet ports, as well as a health check port for the proxy. +// +// The reconciler calls the health check endpoint of each Service up to N number of times, where N is the number of +// replicas for the ProxyGroup x 3, and checks if the received response is healthy response from the Pod being reconciled. +// +// The health check response contains a header with the +// Pod's IP address- this is used to determine whether the response is received from this Pod. +// +// If the Pod does not appear to be serving the health check endpoint (pre-v1.80 proxies), the reconciler just sets the +// readiness condition for backwards compatibility reasons. +func (er *egressPodsReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { + l := er.logger.With("Pod", req.NamespacedName) + l.Debugf("starting reconcile") + defer l.Debugf("reconcile finished") + + pod := new(corev1.Pod) + err = er.Get(ctx, req.NamespacedName, pod) + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get Pod: %w", err) + } + if !pod.DeletionTimestamp.IsZero() { + l.Debugf("Pod is being deleted, do nothing") + return res, nil + } + if pod.Labels[LabelParentType] != proxyTypeProxyGroup { + l.Infof("[unexpected] reconciler called for a Pod that is not a ProxyGroup Pod") + return res, nil + } + + // If the Pod does not have the readiness gate set, there is no need to add the readiness condition. In practice + // this will happen if the user has configured custom TS_LOCAL_ADDR_PORT, thus disabling the graceful failover. + if !slices.ContainsFunc(pod.Spec.ReadinessGates, func(r corev1.PodReadinessGate) bool { + return r.ConditionType == tsEgressReadinessGate + }) { + l.Debug("Pod does not have egress readiness gate set, skipping") + return res, nil + } + + proxyGroupName := pod.Labels[LabelParentName] + pg := new(tsapi.ProxyGroup) + if err := er.Get(ctx, types.NamespacedName{Name: proxyGroupName}, pg); err != nil { + return res, fmt.Errorf("error getting ProxyGroup %q: %w", proxyGroupName, err) + } + if pg.Spec.Type != typeEgress { + l.Infof("[unexpected] reconciler called for %q ProxyGroup Pod", pg.Spec.Type) + return res, nil + } + // Get all ClusterIP Services for all egress targets exposed to cluster via this ProxyGroup. + lbls := map[string]string{ + LabelManaged: "true", + labelProxyGroup: proxyGroupName, + labelSvcType: typeEgress, + } + svcs := &corev1.ServiceList{} + if err := er.List(ctx, svcs, client.InNamespace(er.tsNamespace), client.MatchingLabels(lbls)); err != nil { + return res, fmt.Errorf("error listing ClusterIP Services") + } + + idx := xslices.IndexFunc(pod.Status.Conditions, func(c corev1.PodCondition) bool { + return c.Type == tsEgressReadinessGate + }) + if idx != -1 { + l.Debugf("Pod is already ready, do nothing") + return res, nil + } + + var routesMissing atomic.Bool + errChan := make(chan error, len(svcs.Items)) + for _, svc := range svcs.Items { + s := svc + go func() { + ll := l.With("service_name", s.Name) + d := retrieveClusterDomain(er.tsNamespace, ll) + healthCheckAddr := healthCheckForSvc(&s, d) + if healthCheckAddr == "" { + ll.Debugf("ClusterIP Service does not expose a health check endpoint, unable to verify if routing is set up") + errChan <- nil + return + } + + var routesSetup bool + bo := backoff.NewBackoff(s.Name, ll.Infof, er.maxBackoff) + for range numCalls(pgReplicas(pg)) { + if ctx.Err() != nil { + errChan <- nil + return + } + state, err := er.lookupPodRouteViaSvc(ctx, pod, healthCheckAddr, ll) + if err != nil { + errChan <- fmt.Errorf("error validating if routing has been set up for Pod: %w", err) + return + } + if state == healthy || state == cannotVerify { + routesSetup = true + break + } + if state == unreachable || state == unhealthy || state == podNotReady { + bo.BackOff(ctx, errors.New("backoff")) + } + } + if !routesSetup { + ll.Debugf("Pod is not yet configured as Service endpoint") + routesMissing.Store(true) + } + errChan <- nil + }() + } + for range len(svcs.Items) { + e := <-errChan + err = errors.Join(err, e) + } + if err != nil { + return res, fmt.Errorf("error verifying conectivity: %w", err) + } + if rm := routesMissing.Load(); rm { + l.Info("Pod is not yet added as an endpoint for all egress targets, waiting...") + return reconcile.Result{RequeueAfter: shortRequeue}, nil + } + if err := er.setPodReady(ctx, pod, l); err != nil { + return res, fmt.Errorf("error setting Pod as ready: %w", err) + } + return res, nil +} + +func (er *egressPodsReconciler) setPodReady(ctx context.Context, pod *corev1.Pod, l *zap.SugaredLogger) error { + if slices.ContainsFunc(pod.Status.Conditions, func(c corev1.PodCondition) bool { + return c.Type == tsEgressReadinessGate + }) { + return nil + } + l.Infof("Pod is ready to route traffic to all egress targets") + pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{ + Type: tsEgressReadinessGate, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: er.clock.Now()}, + }) + return er.Status().Update(ctx, pod) +} + +// healthCheckState is the result of a single request to an egress Service health check endpoint with a goal to hit a +// specific backend Pod. +type healthCheckState int8 + +const ( + cannotVerify healthCheckState = iota // not verifiable for this setup (i.e earlier proxy version) + unreachable // no backends or another network error + notFound // hit another backend + unhealthy // not 200 + podNotReady // Pod is not ready, i.e does not have an IP address yet + healthy // 200 +) + +// lookupPodRouteViaSvc attempts to reach a Pod using a health check endpoint served by a Service and returns the state of the health check. +func (er *egressPodsReconciler) lookupPodRouteViaSvc(ctx context.Context, pod *corev1.Pod, healthCheckAddr string, l *zap.SugaredLogger) (healthCheckState, error) { + if !slices.ContainsFunc(pod.Spec.Containers[0].Env, func(e corev1.EnvVar) bool { + return e.Name == "TS_ENABLE_HEALTH_CHECK" && e.Value == "true" + }) { + l.Debugf("Pod does not have health check enabled, unable to verify if it is currently routable via Service") + return cannotVerify, nil + } + wantsIP, err := podIPv4(pod) + if err != nil { + return -1, fmt.Errorf("error determining Pod's IP address: %w", err) + } + if wantsIP == "" { + return podNotReady, nil + } + + ctx, cancel := context.WithTimeout(ctx, time.Second*3) + defer cancel() + req, err := http.NewRequestWithContext(ctx, httpm.GET, healthCheckAddr, nil) + if err != nil { + return -1, fmt.Errorf("error creating new HTTP request: %w", err) + } + // Do not re-use the same connection for the next request so to maximize the chance of hitting all backends equally. + req.Close = true + resp, err := er.httpClient.Do(req) + if err != nil { + // This is most likely because this is the first Pod and is not yet added to Service endoints. Other + // error types are possible, but checking for those would likely make the system too fragile. + return unreachable, nil + } + defer resp.Body.Close() + gotIP := resp.Header.Get(kubetypes.PodIPv4Header) + if gotIP == "" { + l.Debugf("Health check does not return Pod's IP header, unable to verify if Pod is currently routable via Service") + return cannotVerify, nil + } + if !strings.EqualFold(wantsIP, gotIP) { + return notFound, nil + } + if resp.StatusCode != http.StatusOK { + return unhealthy, nil + } + return healthy, nil +} + +// numCalls return the number of times an endpoint on a ProxyGroup Service should be called till it can be safely +// assumed that, if none of the responses came back from a specific Pod then traffic for the Service is currently not +// being routed to that Pod. This assumes that traffic for the Service is routed via round robin, so +// InternalTrafficPolicy must be 'Cluster' and session affinity must be None. +func numCalls(replicas int32) int32 { + return replicas * 3 +} + +// doer is an interface for HTTP client that can be set to a mock client in tests. +type doer interface { + Do(*http.Request) (*http.Response, error) +} diff --git a/cmd/k8s-operator/egress-pod-readiness_test.go b/cmd/k8s-operator/egress-pod-readiness_test.go new file mode 100644 index 000000000..5e6fa2bb4 --- /dev/null +++ b/cmd/k8s-operator/egress-pod-readiness_test.go @@ -0,0 +1,525 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package main + +import ( + "bytes" + "errors" + "fmt" + "io" + "log" + "net/http" + "sync" + "testing" + "time" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + tsapi "tailscale.com/k8s-operator/apis/v1alpha1" + "tailscale.com/kube/kubetypes" + "tailscale.com/tstest" + "tailscale.com/types/ptr" +) + +func TestEgressPodReadiness(t *testing.T) { + // We need to pass a Pod object to WithStatusSubresource because of some quirks in how the fake client + // works. Without this code we would not be able to update Pod's status further down. + fc := fake.NewClientBuilder(). + WithScheme(tsapi.GlobalScheme). + WithStatusSubresource(&corev1.Pod{}). + Build() + zl, _ := zap.NewDevelopment() + cl := tstest.NewClock(tstest.ClockOpts{}) + rec := &egressPodsReconciler{ + tsNamespace: "operator-ns", + Client: fc, + logger: zl.Sugar(), + clock: cl, + } + pg := &tsapi.ProxyGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dev", + }, + Spec: tsapi.ProxyGroupSpec{ + Type: "egress", + Replicas: ptr.To(int32(3)), + }, + } + mustCreate(t, fc, pg) + podIP := "10.0.0.2" + podTemplate := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "operator-ns", + Name: "pod", + Labels: map[string]string{ + LabelParentType: "proxygroup", + LabelParentName: "dev", + }, + }, + Spec: corev1.PodSpec{ + ReadinessGates: []corev1.PodReadinessGate{{ + ConditionType: tsEgressReadinessGate, + }}, + Containers: []corev1.Container{{ + Name: "tailscale", + Env: []corev1.EnvVar{{ + Name: "TS_ENABLE_HEALTH_CHECK", + Value: "true", + }}, + }}, + }, + Status: corev1.PodStatus{ + PodIPs: []corev1.PodIP{{IP: podIP}}, + }, + } + + t.Run("no_egress_services", func(t *testing.T) { + pod := podTemplate.DeepCopy() + mustCreate(t, fc, pod) + expectReconciled(t, rec, "operator-ns", pod.Name) + + // Pod should have readiness gate condition set. + podSetReady(pod, cl) + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod) + }) + t.Run("one_svc_already_routed_to", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + svc, hep := newSvc("svc", 9002) + mustCreateAll(t, fc, svc, pod) + resp := readyResps(podIP, 1) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{hep: resp}, + } + rec.httpClient = &httpCl + expectReconciled(t, rec, "operator-ns", pod.Name) + + // Pod should have readiness gate condition set. + podSetReady(pod, cl) + expectEqual(t, fc, pod) + + // A subsequent reconcile should not change the Pod. + expectReconciled(t, rec, "operator-ns", pod.Name) + expectEqual(t, fc, pod) + + mustDeleteAll(t, fc, pod, svc) + }) + t.Run("one_svc_many_backends_eventually_routed_to", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + svc, hep := newSvc("svc", 9002) + mustCreateAll(t, fc, svc, pod) + // For a 3 replica ProxyGroup the healthcheck endpoint should be called 9 times, make the 9th time only + // return with the right Pod IP. + resps := append(readyResps("10.0.0.3", 4), append(readyResps("10.0.0.4", 4), readyResps(podIP, 1)...)...) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{hep: resps}, + } + rec.httpClient = &httpCl + expectReconciled(t, rec, "operator-ns", pod.Name) + + // Pod should have readiness gate condition set. + podSetReady(pod, cl) + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc) + }) + t.Run("one_svc_one_backend_eventually_healthy", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + svc, hep := newSvc("svc", 9002) + mustCreateAll(t, fc, svc, pod) + // For a 3 replica ProxyGroup the healthcheck endpoint should be called 9 times, make the 9th time only + // return with 200 status code. + resps := append(unreadyResps(podIP, 8), readyResps(podIP, 1)...) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{hep: resps}, + } + rec.httpClient = &httpCl + expectReconciled(t, rec, "operator-ns", pod.Name) + + // Pod should have readiness gate condition set. + podSetReady(pod, cl) + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc) + }) + t.Run("one_svc_one_backend_never_routable", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + svc, hep := newSvc("svc", 9002) + mustCreateAll(t, fc, svc, pod) + // For a 3 replica ProxyGroup the healthcheck endpoint should be called 9 times and Pod should be + // requeued if neither of those succeed. + resps := readyResps("10.0.0.3", 9) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{hep: resps}, + } + rec.httpClient = &httpCl + expectRequeue(t, rec, "operator-ns", pod.Name) + + // Pod should not have readiness gate condition set. + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc) + }) + t.Run("one_svc_many_backends_already_routable", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + svc, hep := newSvc("svc", 9002) + svc2, hep2 := newSvc("svc-2", 9002) + svc3, hep3 := newSvc("svc-3", 9002) + mustCreateAll(t, fc, svc, svc2, svc3, pod) + resps := readyResps(podIP, 1) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{ + hep: resps, + hep2: resps, + hep3: resps, + }, + } + rec.httpClient = &httpCl + expectReconciled(t, rec, "operator-ns", pod.Name) + + // Pod should not have readiness gate condition set. + podSetReady(pod, cl) + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc, svc2, svc3) + }) + t.Run("one_svc_many_backends_eventually_routable_and_healthy", func(t *testing.T) { + pod := podTemplate.DeepCopy() + svc, hep := newSvc("svc", 9002) + svc2, hep2 := newSvc("svc-2", 9002) + svc3, hep3 := newSvc("svc-3", 9002) + mustCreateAll(t, fc, svc, svc2, svc3, pod) + resps := append(readyResps("10.0.0.3", 7), readyResps(podIP, 1)...) + resps2 := append(readyResps("10.0.0.3", 5), readyResps(podIP, 1)...) + resps3 := append(unreadyResps(podIP, 4), readyResps(podIP, 1)...) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{ + hep: resps, + hep2: resps2, + hep3: resps3, + }, + } + rec.httpClient = &httpCl + expectReconciled(t, rec, "operator-ns", pod.Name) + + // Pod should have readiness gate condition set. + podSetReady(pod, cl) + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc, svc2, svc3) + }) + t.Run("one_svc_many_backends_never_routable_and_healthy", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + svc, hep := newSvc("svc", 9002) + svc2, hep2 := newSvc("svc-2", 9002) + svc3, hep3 := newSvc("svc-3", 9002) + mustCreateAll(t, fc, svc, svc2, svc3, pod) + // For a ProxyGroup with 3 replicas, each Service's health endpoint will be tried 9 times and the Pod + // will be requeued if neither succeeds. + resps := readyResps("10.0.0.3", 9) + resps2 := append(readyResps("10.0.0.3", 5), readyResps("10.0.0.4", 4)...) + resps3 := unreadyResps(podIP, 9) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{ + hep: resps, + hep2: resps2, + hep3: resps3, + }, + } + rec.httpClient = &httpCl + expectRequeue(t, rec, "operator-ns", pod.Name) + + // Pod should not have readiness gate condition set. + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc, svc2, svc3) + }) + t.Run("one_svc_many_backends_one_never_routable", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + svc, hep := newSvc("svc", 9002) + svc2, hep2 := newSvc("svc-2", 9002) + svc3, hep3 := newSvc("svc-3", 9002) + mustCreateAll(t, fc, svc, svc2, svc3, pod) + // For a ProxyGroup with 3 replicas, each Service's health endpoint will be tried 9 times and the Pod + // will be requeued if any one never succeeds. + resps := readyResps(podIP, 9) + resps2 := readyResps(podIP, 9) + resps3 := append(readyResps("10.0.0.3", 5), readyResps("10.0.0.4", 4)...) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{ + hep: resps, + hep2: resps2, + hep3: resps3, + }, + } + rec.httpClient = &httpCl + expectRequeue(t, rec, "operator-ns", pod.Name) + + // Pod should not have readiness gate condition set. + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc, svc2, svc3) + }) + t.Run("one_svc_many_backends_one_never_healthy", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + svc, hep := newSvc("svc", 9002) + svc2, hep2 := newSvc("svc-2", 9002) + svc3, hep3 := newSvc("svc-3", 9002) + mustCreateAll(t, fc, svc, svc2, svc3, pod) + // For a ProxyGroup with 3 replicas, each Service's health endpoint will be tried 9 times and the Pod + // will be requeued if any one never succeeds. + resps := readyResps(podIP, 9) + resps2 := unreadyResps(podIP, 9) + resps3 := readyResps(podIP, 9) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{ + hep: resps, + hep2: resps2, + hep3: resps3, + }, + } + rec.httpClient = &httpCl + expectRequeue(t, rec, "operator-ns", pod.Name) + + // Pod should not have readiness gate condition set. + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc, svc2, svc3) + }) + t.Run("one_svc_many_backends_different_ports_eventually_healthy_and_routable", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + svc, hep := newSvc("svc", 9003) + svc2, hep2 := newSvc("svc-2", 9004) + svc3, hep3 := newSvc("svc-3", 9010) + mustCreateAll(t, fc, svc, svc2, svc3, pod) + // For a ProxyGroup with 3 replicas, each Service's health endpoint will be tried up to 9 times and + // marked as success as soon as one try succeeds. + resps := append(readyResps("10.0.0.3", 7), readyResps(podIP, 1)...) + resps2 := append(readyResps("10.0.0.3", 5), readyResps(podIP, 1)...) + resps3 := append(unreadyResps(podIP, 4), readyResps(podIP, 1)...) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{ + hep: resps, + hep2: resps2, + hep3: resps3, + }, + } + rec.httpClient = &httpCl + expectReconciled(t, rec, "operator-ns", pod.Name) + + // Pod should have readiness gate condition set. + podSetReady(pod, cl) + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc, svc2, svc3) + }) + // Proxies of 1.78 and earlier did not set the Pod IP header. + t.Run("pod_does_not_return_ip_header", func(t *testing.T) { + pod := podTemplate.DeepCopy() + pod.Name = "foo-bar" + + svc, hep := newSvc("foo-bar", 9002) + mustCreateAll(t, fc, svc, pod) + // If a response does not contain Pod IP header, we assume that this is an earlier proxy version, + // readiness cannot be verified so the readiness gate is just set to true. + resps := unreadyResps("", 1) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{ + hep: resps, + }, + } + rec.httpClient = &httpCl + expectReconciled(t, rec, "operator-ns", pod.Name) + + // Pod should have readiness gate condition set. + podSetReady(pod, cl) + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc) + }) + t.Run("one_svc_one_backend_eventually_healthy_and_routable", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + svc, hep := newSvc("svc", 9002) + mustCreateAll(t, fc, svc, pod) + // If a response errors, it is probably because the Pod is not yet properly running, so retry. + resps := append(erroredResps(8), readyResps(podIP, 1)...) + httpCl := fakeHTTPClient{ + t: t, + state: map[string][]fakeResponse{ + hep: resps, + }, + } + rec.httpClient = &httpCl + expectReconciled(t, rec, "operator-ns", pod.Name) + + // Pod should have readiness gate condition set. + podSetReady(pod, cl) + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc) + }) + t.Run("one_svc_one_backend_svc_does_not_have_health_port", func(t *testing.T) { + pod := podTemplate.DeepCopy() + + // If a Service does not have health port set, we assume that it is not possible to determine Pod's + // readiness and set it to ready. + svc, _ := newSvc("svc", -1) + mustCreateAll(t, fc, svc, pod) + rec.httpClient = nil + expectReconciled(t, rec, "operator-ns", pod.Name) + + // Pod should have readiness gate condition set. + podSetReady(pod, cl) + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc) + }) + t.Run("error_setting_up_healthcheck", func(t *testing.T) { + pod := podTemplate.DeepCopy() + // This is not a realistic reason for error, but we are just testing the behaviour of a healthcheck + // lookup failing. + pod.Status.PodIPs = []corev1.PodIP{{IP: "not-an-ip"}} + + svc, _ := newSvc("svc", 9002) + svc2, _ := newSvc("svc-2", 9002) + svc3, _ := newSvc("svc-3", 9002) + mustCreateAll(t, fc, svc, svc2, svc3, pod) + rec.httpClient = nil + expectError(t, rec, "operator-ns", pod.Name) + + // Pod should not have readiness gate condition set. + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc, svc2, svc3) + }) + t.Run("pod_does_not_have_an_ip_address", func(t *testing.T) { + pod := podTemplate.DeepCopy() + pod.Status.PodIPs = nil + + svc, _ := newSvc("svc", 9002) + svc2, _ := newSvc("svc-2", 9002) + svc3, _ := newSvc("svc-3", 9002) + mustCreateAll(t, fc, svc, svc2, svc3, pod) + rec.httpClient = nil + expectRequeue(t, rec, "operator-ns", pod.Name) + + // Pod should not have readiness gate condition set. + expectEqual(t, fc, pod) + mustDeleteAll(t, fc, pod, svc, svc2, svc3) + }) +} + +func readyResps(ip string, num int) (resps []fakeResponse) { + for range num { + resps = append(resps, fakeResponse{statusCode: 200, podIP: ip}) + } + return resps +} + +func unreadyResps(ip string, num int) (resps []fakeResponse) { + for range num { + resps = append(resps, fakeResponse{statusCode: 503, podIP: ip}) + } + return resps +} + +func erroredResps(num int) (resps []fakeResponse) { + for range num { + resps = append(resps, fakeResponse{err: errors.New("timeout")}) + } + return resps +} + +func newSvc(name string, port int32) (*corev1.Service, string) { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "operator-ns", + Name: name, + Labels: map[string]string{ + LabelManaged: "true", + labelProxyGroup: "dev", + labelSvcType: typeEgress, + }, + }, + Spec: corev1.ServiceSpec{}, + } + if port != -1 { + svc.Spec.Ports = []corev1.ServicePort{ + { + Name: tsHealthCheckPortName, + Port: port, + TargetPort: intstr.FromInt(9002), + Protocol: "TCP", + }, + } + } + return svc, fmt.Sprintf("http://%s.operator-ns.svc.cluster.local:%d/healthz", name, port) +} + +func podSetReady(pod *corev1.Pod, cl *tstest.Clock) { + pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{ + Type: tsEgressReadinessGate, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: cl.Now().Truncate(time.Second)}, + }) +} + +// fakeHTTPClient is a mock HTTP client with a preset map of request URLs to list of responses. When it receives a +// request for a specific URL, it returns the preset response for that URL. It errors if an unexpected request is +// received. +type fakeHTTPClient struct { + t *testing.T + mu sync.Mutex // protects following + state map[string][]fakeResponse +} + +func (f *fakeHTTPClient) Do(req *http.Request) (*http.Response, error) { + f.mu.Lock() + resps := f.state[req.URL.String()] + if len(resps) == 0 { + f.mu.Unlock() + log.Printf("\n\n\nURL %q\n\n\n", req.URL) + f.t.Fatalf("fakeHTTPClient received an unexpected request for %q", req.URL) + } + defer func() { + if len(resps) == 1 { + delete(f.state, req.URL.String()) + f.mu.Unlock() + return + } + f.state[req.URL.String()] = f.state[req.URL.String()][1:] + f.mu.Unlock() + }() + + resp := resps[0] + if resp.err != nil { + return nil, resp.err + } + r := http.Response{ + StatusCode: resp.statusCode, + Header: make(http.Header), + Body: io.NopCloser(bytes.NewReader([]byte{})), + } + r.Header.Add(kubetypes.PodIPv4Header, resp.podIP) + return &r, nil +} + +type fakeResponse struct { + err error + statusCode int + podIP string // for the Pod IP header +} diff --git a/cmd/k8s-operator/egress-services-readiness.go b/cmd/k8s-operator/egress-services-readiness.go index f1964d452..5e95a5279 100644 --- a/cmd/k8s-operator/egress-services-readiness.go +++ b/cmd/k8s-operator/egress-services-readiness.go @@ -48,11 +48,12 @@ type egressSvcsReadinessReconciler struct { // service to determine how many replicas are currently able to route traffic. func (esrr *egressSvcsReadinessReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { l := esrr.logger.With("Service", req.NamespacedName) - defer l.Info("reconcile finished") + l.Debugf("starting reconcile") + defer l.Debugf("reconcile finished") svc := new(corev1.Service) if err = esrr.Get(ctx, req.NamespacedName, svc); apierrors.IsNotFound(err) { - l.Info("Service not found") + l.Debugf("Service not found") return res, nil } else if err != nil { return res, fmt.Errorf("failed to get Service: %w", err) @@ -127,16 +128,16 @@ func (esrr *egressSvcsReadinessReconciler) Reconcile(ctx context.Context, req re return res, err } if pod == nil { - l.Infof("[unexpected] ProxyGroup is ready, but replica %d was not found", i) + l.Warnf("[unexpected] ProxyGroup is ready, but replica %d was not found", i) reason, msg = reasonClusterResourcesNotReady, reasonClusterResourcesNotReady return res, nil } - l.Infof("looking at Pod with IPs %v", pod.Status.PodIPs) + l.Debugf("looking at Pod with IPs %v", pod.Status.PodIPs) ready := false for _, ep := range eps.Endpoints { - l.Infof("looking at endpoint with addresses %v", ep.Addresses) + l.Debugf("looking at endpoint with addresses %v", ep.Addresses) if endpointReadyForPod(&ep, pod, l) { - l.Infof("endpoint is ready for Pod") + l.Debugf("endpoint is ready for Pod") ready = true break } @@ -165,7 +166,7 @@ func (esrr *egressSvcsReadinessReconciler) Reconcile(ctx context.Context, req re func endpointReadyForPod(ep *discoveryv1.Endpoint, pod *corev1.Pod, l *zap.SugaredLogger) bool { podIP, err := podIPv4(pod) if err != nil { - l.Infof("[unexpected] error retrieving Pod's IPv4 address: %v", err) + l.Warnf("[unexpected] error retrieving Pod's IPv4 address: %v", err) return false } // Currently we only ever set a single address on and Endpoint and nothing else is meant to modify this. diff --git a/cmd/k8s-operator/operator.go b/cmd/k8s-operator/operator.go index 6631c4f98..8fa979094 100644 --- a/cmd/k8s-operator/operator.go +++ b/cmd/k8s-operator/operator.go @@ -9,6 +9,7 @@ package main import ( "context" + "net/http" "os" "regexp" "strconv" @@ -453,6 +454,24 @@ func runReconcilers(opts reconcilerOpts) { startlog.Fatalf("could not create egress EndpointSlices reconciler: %v", err) } + podsForEps := handler.EnqueueRequestsFromMapFunc(podsFromEgressEps(mgr.GetClient(), opts.log, opts.tailscaleNamespace)) + podsER := handler.EnqueueRequestsFromMapFunc(egressPodsHandler) + err = builder. + ControllerManagedBy(mgr). + Named("egress-pods-readiness-reconciler"). + Watches(&discoveryv1.EndpointSlice{}, podsForEps). + Watches(&corev1.Pod{}, podsER). + Complete(&egressPodsReconciler{ + Client: mgr.GetClient(), + tsNamespace: opts.tailscaleNamespace, + clock: tstime.DefaultClock{}, + logger: opts.log.Named("egress-pods-readiness-reconciler"), + httpClient: http.DefaultClient, + }) + if err != nil { + startlog.Fatalf("could not create egress Pods readiness reconciler: %v", err) + } + // ProxyClass reconciler gets triggered on ServiceMonitor CRD changes to ensure that any ProxyClasses, that // define that a ServiceMonitor should be created, were set to invalid because the CRD did not exist get // reconciled if the CRD is applied at a later point. @@ -906,6 +925,20 @@ func egressEpsHandler(_ context.Context, o client.Object) []reconcile.Request { } } +func egressPodsHandler(_ context.Context, o client.Object) []reconcile.Request { + if typ := o.GetLabels()[LabelParentType]; typ != proxyTypeProxyGroup { + return nil + } + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: o.GetNamespace(), + Name: o.GetName(), + }, + }, + } +} + // egressEpsFromEgressPods returns a Pod event handler that checks if Pod is a replica for a ProxyGroup and if it is, // returns reconciler requests for all egress EndpointSlices for that ProxyGroup. func egressEpsFromPGPods(cl client.Client, ns string) handler.MapFunc { @@ -1056,6 +1089,43 @@ func epsFromExternalNameService(cl client.Client, logger *zap.SugaredLogger, ns } } +func podsFromEgressEps(cl client.Client, logger *zap.SugaredLogger, ns string) handler.MapFunc { + return func(ctx context.Context, o client.Object) []reconcile.Request { + eps, ok := o.(*discoveryv1.EndpointSlice) + if !ok { + logger.Infof("[unexpected] EndpointSlice handler triggered for an object that is not a EndpointSlice") + return nil + } + if eps.Labels[labelProxyGroup] == "" { + return nil + } + if eps.Labels[labelSvcType] != "egress" { + return nil + } + podLabels := map[string]string{ + LabelManaged: "true", + LabelParentType: "proxygroup", + LabelParentName: eps.Labels[labelProxyGroup], + } + podList := &corev1.PodList{} + if err := cl.List(ctx, podList, client.InNamespace(ns), + client.MatchingLabels(podLabels)); err != nil { + logger.Infof("error listing EndpointSlices: %v, skipping a reconcile for event on EndpointSlice %s", err, eps.Name) + return nil + } + reqs := make([]reconcile.Request, 0) + for _, pod := range podList.Items { + reqs = append(reqs, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + }, + }) + } + return reqs + } +} + // proxyClassesWithServiceMonitor returns an event handler that, given that the event is for the Prometheus // ServiceMonitor CRD, returns all ProxyClasses that define that a ServiceMonitor should be created. func proxyClassesWithServiceMonitor(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc { diff --git a/cmd/k8s-operator/testutils_test.go b/cmd/k8s-operator/testutils_test.go index 160f24ec9..83c42cb76 100644 --- a/cmd/k8s-operator/testutils_test.go +++ b/cmd/k8s-operator/testutils_test.go @@ -583,6 +583,21 @@ func mustCreate(t *testing.T, client client.Client, obj client.Object) { t.Fatalf("creating %q: %v", obj.GetName(), err) } } +func mustCreateAll(t *testing.T, client client.Client, objs ...client.Object) { + t.Helper() + for _, obj := range objs { + mustCreate(t, client, obj) + } +} + +func mustDeleteAll(t *testing.T, client client.Client, objs ...client.Object) { + t.Helper() + for _, obj := range objs { + if err := client.Delete(context.Background(), obj); err != nil { + t.Fatalf("deleting %q: %v", obj.GetName(), err) + } + } +} func mustUpdate[T any, O ptrObject[T]](t *testing.T, client client.Client, ns, name string, update func(O)) { t.Helper() @@ -706,6 +721,19 @@ func expectRequeue(t *testing.T, sr reconcile.Reconciler, ns, name string) { t.Fatalf("expected timed requeue, got success") } } +func expectError(t *testing.T, sr reconcile.Reconciler, ns, name string) { + t.Helper() + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: name, + Namespace: ns, + }, + } + _, err := sr.Reconcile(context.Background(), req) + if err == nil { + t.Error("Reconcile: expected error but did not get one") + } +} // expectEvents accepts a test recorder and a list of events, tests that expected // events are sent down the recorder's channel. Waits for 5s for each event.