kube-router/pkg/controllers/proxy/hairpin_controller.go
Aaron U'Ren ecaad2c6e4 fix(cleanup): add missing handlers for cleanup
kube-router v2.X introduced the idea of iptables and ipset handlers that
allow kube-router to be dual-stack capable. However, the cleanup logic
for the various controllers was not properly ported when this happened.
When the cleanup functions run, they often have not had their
controllers fully initialized as cleanup should not be dependant on
kube-router being able to reach a kube-apiserver.

As such, they were missing these handlers. And as such they either
silently ended up doing noops or worse, they would run into nil pointer
failures.

This corrects that, so that kube-router no longer fails this way and
cleans up as it had in v1.X.
2024-04-26 14:16:09 -05:00

121 lines
3.9 KiB
Go

package proxy
import (
"fmt"
"net"
"os"
"path"
"runtime"
"sync"
"time"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
"github.com/vishvananda/netns"
"k8s.io/klog/v2"
)
type hairpinController struct {
epC <-chan string
nsc *NetworkServicesController
}
func (hpc *hairpinController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup,
healthChan chan<- *healthcheck.ControllerHeartbeat) {
defer wg.Done()
klog.Infof("Starting hairping controller (handles setting hairpin_mode for veth interfaces)")
t := time.NewTicker(healthcheck.HPCSyncPeriod)
defer t.Stop()
for {
// Add an additional non-blocking select to ensure that if the stopCh channel is closed it is handled first
select {
case <-stopCh:
klog.Info("Shutting down Hairpin Controller goroutine")
return
default:
}
select {
case <-stopCh:
klog.Info("Shutting down Hairpin Controller goroutine")
return
case endpointIP := <-hpc.epC:
klog.V(1).Infof("Received request for hairpin setup of endpoint %s, processing", endpointIP)
err := hpc.ensureHairpinEnabledForPodInterface(endpointIP)
if err != nil {
klog.Errorf("unable to set hairpin mode for endpoint %s, its possible that hairpinning will not "+
"work as expected. Error was: %v",
endpointIP, err)
}
case <-t.C:
healthcheck.SendHeartBeat(healthChan, "HPC")
}
}
}
func (hpc *hairpinController) ensureHairpinEnabledForPodInterface(endpointIP string) error {
klog.V(2).Infof("Attempting to enable hairpin mode for endpoint IP %s", endpointIP)
crRuntime, containerID, err := hpc.nsc.findContainerRuntimeReferences(endpointIP)
if err != nil {
return err
}
klog.V(2).Infof("Detected runtime %s and container ID %s for endpoint IP %s", crRuntime, containerID, endpointIP)
runtime.LockOSThread()
defer runtime.UnlockOSThread()
hostNetworkNSHandle, err := netns.Get()
if err != nil {
return fmt.Errorf("failed to get namespace due to %v", err)
}
defer utils.CloseCloserDisregardError(&hostNetworkNSHandle)
var pid int
if crRuntime == "docker" {
// WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet.
pid, err = hpc.nsc.ln.getContainerPidWithDocker(containerID)
if err != nil {
return fmt.Errorf("failed to get pod's (%s) pid for hairpinning due to %v", endpointIP, err)
}
} else {
// We expect CRI compliant runtimes here
// ugly workaround, refactoring of pkg/Proxy is required
pid, err = hpc.nsc.ln.getContainerPidWithCRI(hpc.nsc.dsr.runtimeEndpoint, containerID)
if err != nil {
return fmt.Errorf("failed to get pod's (%s) pid for hairpinning due to %v", endpointIP, err)
}
}
klog.V(2).Infof("Found PID %d for endpoint IP %s", pid, endpointIP)
// Get the interface link ID from inside the container so that we can link it to the veth on the host namespace
ifaceID, err := hpc.nsc.ln.findIfaceLinkForPid(pid)
if err != nil {
return fmt.Errorf("failed to find the interface ID inside the container NS for endpoint IP: %s, due to: %v",
endpointIP, err)
}
klog.V(2).Infof("Found Interface Link ID %d for endpoint IP %s", ifaceID, endpointIP)
ifaceName, err := net.InterfaceByIndex(ifaceID)
if err != nil {
return fmt.Errorf("failed to get the interface name from the link ID inside the container for endpoint IP: "+
"%s and Interface ID: %d due to: %v", endpointIP, ifaceID, err)
}
klog.V(1).Infof("Enabling hairpin for interface %s for endpoint IP %s", ifaceName.Name, endpointIP)
hpPath := path.Join(sysFSVirtualNetPath, ifaceName.Name, sysFSHairpinRelPath)
if _, err := os.Stat(hpPath); err != nil {
return fmt.Errorf("hairpin path %s doesn't appear to exist for us to set", hpPath)
}
return os.WriteFile(hpPath, []byte(hairpinEnable), 0644)
}
func NewHairpinController(nsc *NetworkServicesController, endpointCh <-chan string) *hairpinController {
hpc := hairpinController{
nsc: nsc,
epC: endpointCh,
}
return &hpc
}