From c8d3e16e1d83274061d50c72e94fc87a2865a10e Mon Sep 17 00:00:00 2001 From: Irbe Krumina Date: Fri, 27 Sep 2024 19:43:46 +0100 Subject: [PATCH] Code review feedback Signed-off-by: Irbe Krumina --- cmd/containerboot/main.go | 65 ++++++++++++++----------------- cmd/containerboot/services.go | 36 +++++++++-------- cmd/containerboot/settings.go | 4 +- kube/kubeclient/client.go | 2 +- util/linuxfw/iptables_for_svcs.go | 8 ++-- 5 files changed, 56 insertions(+), 59 deletions(-) diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index 09b76c296..6d2ad6cfc 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -158,7 +158,7 @@ func main() { PodIP: defaultEnv("POD_IP", ""), EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false), HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""), - EgressServicesPath: defaultEnv("TS_EGRESS_SERVICES_PATH", ""), + EgressSvcsCfgPath: defaultEnv("TS_EGRESS_SERVICES_CONFIG_PATH", ""), } if err := cfg.validate(); err != nil { @@ -276,10 +276,8 @@ authLoop: switch *n.State { case ipn.NeedsLogin: if isOneStepConfig(cfg) { - // This could happen if this is the - // first time tailscaled was run for - // this device and the auth key was not - // passed via the configfile. + // This could happen if this is the first time tailscaled was run for this + // device and the auth key was not passed via the configfile. log.Fatalf("invalid state: tailscaled daemon started with a config file, but tailscale is not logged in: ensure you pass a valid auth key in the config file.") } if err := authTailscale(); err != nil { @@ -377,6 +375,9 @@ authLoop: } }) ) + // egressSvcsErrorChan will get an error sent to it if this containerboot instance is configured to expose 1+ + // egress services in HA mode and errored. + var egressSvcsErrorChan = make(chan error) defer t.Stop() // resetTimer resets timer for when to next attempt to resolve the DNS // name for the proxy configured with TS_EXPERIMENTAL_DEST_DNS_NAME. The @@ -582,36 +583,27 @@ runLoop: } } if !startupTasksDone { - // For containerboot instances that act as TCP - // proxies (proxying traffic to an endpoint - // passed via one of the env vars that - // containerboot reads) and store state in a - // Kubernetes Secret, we consider startup tasks - // done at the point when device info has been - // successfully stored to state Secret. - // For all other containerboot instances, if we - // just get to this point the startup tasks can - // be considered done. + // For containerboot instances that act as TCP proxies (proxying traffic to an endpoint + // passed via one of the env vars that containerboot reads) and store state in a + // Kubernetes Secret, we consider startup tasks done at the point when device info has + // been successfully stored to state Secret. For all other containerboot instances, if + // we just get to this point the startup tasks can be considered done. if !isL3Proxy(cfg) || !hasKubeStateStore(cfg) || (currentDeviceEndpoints != deephash.Sum{} && currentDeviceID != deephash.Sum{}) { // This log message is used in tests to detect when all // post-auth configuration is done. log.Println("Startup complete, waiting for shutdown signal") startupTasksDone = true - // Configure egress proxy. Egress proxy - // will set up firewall rules to proxy - // traffic to tailnet targets configured - // in the provided configuration file. - // It will then continuously monitor the - // config file and netmap updates and - // reconfigure the firewall rules as - // needed. If any of its operations - // fail, it will crash this node. - if cfg.EgressServicesPath != "" { - log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressServicesPath) + // Configure egress proxy. Egress proxy will set up firewall rules to proxy + // traffic to tailnet targets configured in the provided configuration file. It + // will then continuously monitor the config file and netmap updates and + // reconfigure the firewall rules as needed. If any of its operations fail, it + // will crash this node. + if cfg.EgressSvcsCfgPath != "" { + log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressSvcsCfgPath) egressSvcsNotify = make(chan ipn.Notify) ep := egressProxy{ - cfgPath: cfg.EgressServicesPath, + cfgPath: cfg.EgressSvcsCfgPath, nfr: nfr, kc: kc, stateSecret: cfg.KubeSecret, @@ -619,16 +611,17 @@ runLoop: podIP: cfg.PodIP, tailnetAddrs: addrs, } - go ep.run(ctx, n) + go func() { + if err := ep.run(ctx, n); err != nil { + egressSvcsErrorChan <- err + } + }() } - // Wait on tailscaled process. It won't - // be cleaned up by default when the - // container exits as it is not PID1. - // TODO (irbekrm): perhaps we can - // replace the reaper by a running - // cmd.Wait in a goroutine immediately - // after starting tailscaled? + // Wait on tailscaled process. It won't be cleaned up by default when the + // container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the + // reaper by a running cmd.Wait in a goroutine immediately after starting + // tailscaled? reaper := func() { defer wg.Done() for { @@ -666,6 +659,8 @@ runLoop: } backendAddrs = newBackendAddrs resetTimer(false) + case e := <-egressSvcsErrorChan: + log.Fatalf("egress proxy failed: %v", e) } } wg.Wait() diff --git a/cmd/containerboot/services.go b/cmd/containerboot/services.go index b26eda9dd..b9c2cd45f 100644 --- a/cmd/containerboot/services.go +++ b/cmd/containerboot/services.go @@ -63,7 +63,7 @@ type egressProxy struct { // - the mounted egress config has changed // - the proxy's tailnet IP addresses have changed // - tailnet IPs have changed for any backend targets specified by tailnet FQDN -func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) { +func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) error { var tickChan <-chan time.Time var eventChan <-chan fsnotify.Event // TODO (irbekrm): take a look if this can be pulled into a single func @@ -76,19 +76,19 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) { } else { defer w.Close() if err := w.Add(filepath.Dir(ep.cfgPath)); err != nil { - log.Fatalf("failed to add fsnotify watch: %v", err) + return fmt.Errorf("failed to add fsnotify watch: %w", err) } eventChan = w.Events } if err := ep.sync(ctx, n); err != nil { - log.Fatal(err) + return err } for { var err error select { case <-ctx.Done(): - return + return nil case <-tickChan: err = ep.sync(ctx, n) case <-eventChan: @@ -102,14 +102,15 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) { } } if err != nil { - log.Fatalf("error syncing egress service config: %v", err) + return fmt.Errorf("error syncing egress service config: %w", err) } } } -// sync triggers an egress proxy config resync. The resync calculates the diff -// between config and status to determine if any firewall rules need to be -// updated. +// sync triggers an egress proxy config resync. The resync calculates the diff between config and status to determine if +// any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current +// firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such +// as failed firewall update func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error { cfgs, err := ep.getConfigs() if err != nil { @@ -175,8 +176,8 @@ func (ep *egressProxy) syncEgressConfigs(cfgs *egressservices.Configs, status *e mak.Set(&rulesPerSvcToDelete, svcName, rulesToDelete) } if len(rulesToAdd) != 0 || ep.addrsHaveChanged(n) { - // For each tailnet target, set up SNAT from the local - // tailnet device address of the matching family. + // For each tailnet target, set up SNAT from the local tailnet device address of the matching + // family. for _, t := range tailnetTargetIPs { if t.Is6() && !ep.nfr.HasIPV6NAT() { continue @@ -358,13 +359,14 @@ func (ep *egressProxy) getStatus(ctx context.Context) (*egressservices.Status, e } status := &egressservices.Status{} raw, ok := secret.Data[egressservices.KeyEgressServices] - if ok { - if err := json.Unmarshal([]byte(raw), status); err != nil { - return nil, fmt.Errorf("error unmarshalling previous config: %w", err) - } - if reflect.DeepEqual(status.PodIP, ep.podIP) { - return status, nil - } + if !ok { + return nil, nil + } + if err := json.Unmarshal([]byte(raw), status); err != nil { + return nil, fmt.Errorf("error unmarshalling previous config: %w", err) + } + if reflect.DeepEqual(status.PodIP, ep.podIP) { + return status, nil } return nil, nil } diff --git a/cmd/containerboot/settings.go b/cmd/containerboot/settings.go index 635426b40..d72aefbdf 100644 --- a/cmd/containerboot/settings.go +++ b/cmd/containerboot/settings.go @@ -64,7 +64,7 @@ type settings struct { // target. PodIP string HealthCheckAddrPort string - EgressServicesPath string + EgressSvcsCfgPath string } func (s *settings) validate() error { @@ -199,7 +199,7 @@ func isOneStepConfig(cfg *settings) bool { // as an L3 proxy, proxying to an endpoint provided via one of the config env // vars. func isL3Proxy(cfg *settings) bool { - return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressServicesPath != "" + return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressSvcsCfgPath != "" } // hasKubeStateStore returns true if the state must be stored in a Kubernetes diff --git a/kube/kubeclient/client.go b/kube/kubeclient/client.go index c0337acad..e8ddec75d 100644 --- a/kube/kubeclient/client.go +++ b/kube/kubeclient/client.go @@ -258,7 +258,7 @@ type JSONPatch struct { func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error { for _, p := range patch { if p.Op != "remove" && p.Op != "add" && p.Op != "replace" { - panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op)) + return fmt.Errorf("unsupported JSON patch operation: %q", p.Op) } } return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json")) diff --git a/util/linuxfw/iptables_for_svcs.go b/util/linuxfw/iptables_for_svcs.go index 8f20b5552..8e0f5d48d 100644 --- a/util/linuxfw/iptables_for_svcs.go +++ b/util/linuxfw/iptables_for_svcs.go @@ -33,9 +33,9 @@ func (i *iptablesRunner) EnsurePortMapRuleForSvc(svc, tun string, targetIP netip // DeleteMapRuleForSvc constructs a prerouting rule as would be created by // EnsurePortMapRuleForSvc with the provided args and, if such a rule exists, // deletes it. -func (i *iptablesRunner) DeletePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error { +func (i *iptablesRunner) DeletePortMapRuleForSvc(svc, excludeI string, targetIP netip.Addr, pm PortMap) error { table := i.getIPTByAddr(targetIP) - args := argsForPortMapRule(svc, tun, targetIP, pm) + args := argsForPortMapRule(svc, excludeI, targetIP, pm) exists, err := table.Exists("nat", "PREROUTING", args...) if err != nil { return fmt.Errorf("error checking if rule exists: %w", err) @@ -60,10 +60,10 @@ func (i *iptablesRunner) DeleteSvc(svc, tun string, targetIPs []netip.Addr, pms return nil } -func argsForPortMapRule(svc, tun string, targetIP netip.Addr, pm PortMap) []string { +func argsForPortMapRule(svc, excludeI string, targetIP netip.Addr, pm PortMap) []string { c := commentForSvc(svc, pm) return []string{ - "!", "-i", tun, + "!", "-i", excludeI, "-p", pm.Protocol, "--dport", fmt.Sprintf("%d", pm.MatchPort), "-m", "comment", "--comment", c,