diff --git a/wgengine/monitor/monitor.go b/wgengine/monitor/monitor.go index ba099196d..c3a9804aa 100644 --- a/wgengine/monitor/monitor.go +++ b/wgengine/monitor/monitor.go @@ -10,13 +10,16 @@ package monitor import ( "encoding/json" "errors" + "net" "net/netip" "runtime" "sync" "time" + "golang.org/x/exp/slices" "tailscale.com/net/interfaces" "tailscale.com/types/logger" + "tailscale.com/util/mak" ) // pollWallTimeInterval is how often we check the time to check @@ -64,19 +67,21 @@ type Mon struct { change chan struct{} stop chan struct{} // closed on Stop - mu sync.Mutex // guards all following fields - cbs map[*callbackHandle]ChangeFunc - ruleDelCB map[*callbackHandle]RuleDeleteCallback - ifState *interfaces.State - gwValid bool // whether gw and gwSelfIP are valid - gw netip.Addr // our gateway's IP - gwSelfIP netip.Addr // our own IP address (that corresponds to gw) - started bool - closed bool - goroutines sync.WaitGroup - wallTimer *time.Timer // nil until Started; re-armed AfterFunc per tick - lastWall time.Time - timeJumped bool // whether we need to send a changed=true after a big time jump + mu sync.Mutex // guards all following fields + cbs map[*callbackHandle]ChangeFunc + ruleDelCB map[*callbackHandle]RuleDeleteCallback + linkChangedCB map[*callbackHandle]LinkChangedCallback + ifState *interfaces.State + ifState2 []*net.Interface + gwValid bool // whether gw and gwSelfIP are valid + gw netip.Addr // our gateway's IP + gwSelfIP netip.Addr // our own IP address (that corresponds to gw) + started bool + closed bool + goroutines sync.WaitGroup + wallTimer *time.Timer // nil until Started; re-armed AfterFunc per tick + lastWall time.Time + timeJumped bool // whether we need to send a changed=true after a big time jump } // New instantiates and starts a monitoring instance. @@ -179,6 +184,26 @@ func (m *Mon) RegisterRuleDeleteCallback(callback RuleDeleteCallback) (unregiste } } +// LinkChangedCallback is a callback when a network link changes. +type LinkChangedCallback func(iif *net.Interface, deleted bool) + +// RegisterLinkChangedCallback adds a callback to the set of parties to be +// notified (in their own goroutine) whenever a link (a.k.a. "interface") is +// changed. +// +// To remove this callback, call unregister or close the monitor. +func (m *Mon) RegisterLinkChangedCallback(callback LinkChangedCallback) (unregister func()) { + handle := new(callbackHandle) + m.mu.Lock() + defer m.mu.Unlock() + mak.Set(&m.linkChangedCB, handle, callback) + return func() { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.linkChangedCB, handle) + } +} + // Start starts the monitor. // A monitor can only be started & closed once. func (m *Mon) Start() { @@ -256,6 +281,7 @@ func (m *Mon) stopped() bool { // the change channel of changes, and stopping when a stop is issued. func (m *Mon) pump() { defer m.goroutines.Done() +pumpLoop: for !m.stopped() { msg, err := m.om.Receive() if err != nil { @@ -265,14 +291,18 @@ func (m *Mon) pump() { // Keep retrying while we're not closed. m.logf("error from link monitor: %v", err) time.Sleep(time.Second) - continue + continue pumpLoop } - if rdm, ok := msg.(ipRuleDeletedMessage); ok { - m.notifyRuleDeleted(rdm) - continue + switch v := msg.(type) { + case ipRuleDeletedMessage: + m.notifyRuleDeleted(v) + continue pumpLoop + case newLinkMessage: + m.notifyLinkChanged(v) + continue pumpLoop } if msg.ignore() { - continue + continue pumpLoop } m.InjectEvent() } @@ -286,6 +316,38 @@ func (m *Mon) notifyRuleDeleted(rdm ipRuleDeletedMessage) { } } +func (m *Mon) notifyLinkChanged(nlm newLinkMessage) { + m.mu.Lock() + defer m.mu.Unlock() + for _, cb := range m.linkChangedCB { + go cb(nlm.Link, nlm.Delete) + } + + // Update our cached state + updated := false + for i, iif := range m.ifState2 { + if iif.Index == nlm.Link.Index { + if nlm.Delete { + m.ifState2 = slices.Delete(m.ifState2, i, i) + } else { + m.ifState2[i] = nlm.Link + } + updated = true + break + } + } + if updated { + return + } + + // Need to append + // TODO(andrew): insert sorted instead of insert then sort? + m.ifState2 = append(m.ifState2, nlm.Link) + slices.SortFunc(m.ifState2, func(x, y *net.Interface) bool { + return x.Index < y.Index + }) +} + // isInterestingInterface reports whether the provided interface should be // considered when checking for network state changes. // The ips parameter should be the IPs of the provided interface. diff --git a/wgengine/monitor/monitor_linux.go b/wgengine/monitor/monitor_linux.go index b6d1188bb..5fb2aa31d 100644 --- a/wgengine/monitor/monitor_linux.go +++ b/wgengine/monitor/monitor_linux.go @@ -54,14 +54,18 @@ func newOSMon(logf logger.Logf, m *Mon) (osMon, error) { // but all reachability would. Groups: unix.RTMGRP_IPV4_IFADDR | unix.RTMGRP_IPV6_IFADDR | unix.RTMGRP_IPV4_ROUTE | unix.RTMGRP_IPV6_ROUTE | - unix.RTMGRP_IPV4_RULE, // no IPV6_RULE in x/sys/unix + unix.RTMGRP_IPV4_RULE | unix.RTMGRP_LINK, // no IPV6_RULE in x/sys/unix }) if err != nil { // Google Cloud Run does not implement NETLINK_ROUTE RTMGRP support logf("monitor_linux: AF_NETLINK RTMGRP failed, falling back to polling") return newPollingMon(logf, m) } - return &nlConn{logf: logf, conn: conn, addrCache: make(map[uint32]map[netip.Addr]bool)}, nil + return &nlConn{ + logf: logf, + conn: conn, + addrCache: make(map[uint32]map[netip.Addr]bool), + }, nil } func (c *nlConn) IsInterestingInterface(iface string) bool { return true } @@ -229,6 +233,53 @@ func (c *nlConn) Receive() (message, error) { c.logf("%+v", rdm) } return rdm, nil + case unix.RTM_NEWLINK, unix.RTM_DELLINK: + typeStr := "RTM_NEWLINK" + if msg.Header.Type == unix.RTM_DELLINK { + typeStr = "RTM_DELLINK" + } + + var lmsg rtnetlink.LinkMessage + if err := lmsg.UnmarshalBinary(msg.Data); err != nil { + c.logf("%s: failed to parse: %v", typeStr, err) + return unspecifiedMessage{}, nil + } + + // Make a *net.Interface + netif := &net.Interface{ + Index: int(lmsg.Index), + } + if attrs := lmsg.Attributes; attrs != nil { + netif.HardwareAddr = attrs.Address + netif.MTU = int(attrs.MTU) + netif.Name = attrs.Name + } + + // Handle flags + if lmsg.Flags&unix.IFF_UP != 0 { + netif.Flags |= net.FlagUp + } + if lmsg.Flags&unix.IFF_BROADCAST != 0 { + netif.Flags |= net.FlagBroadcast + } + if lmsg.Flags&unix.IFF_LOOPBACK != 0 { + netif.Flags |= net.FlagLoopback + } + if lmsg.Flags&unix.IFF_POINTOPOINT != 0 { + netif.Flags |= net.FlagPointToPoint + } + if lmsg.Flags&unix.IFF_MULTICAST != 0 { + netif.Flags |= net.FlagMulticast + } + + nlm := &newLinkMessage{ + Link: netif, + Delete: msg.Header.Type == unix.RTM_DELLINK, + } + if debugNetlinkMessages() { + c.logf("newLinkMessage{Link: %+v, Delete: %v}", nlm.Link, nlm.Delete) + } + return nlm, nil default: c.logf("unhandled netlink msg type %+v, %q", msg.Header, msg.Data) return unspecifiedMessage{}, nil @@ -286,3 +337,11 @@ func (m *newAddrMessage) ignore() bool { type ignoreMessage struct{} func (ignoreMessage) ignore() bool { return true } + +// newLinkMessage is a message for a link being added. +type newLinkMessage struct { + Link *net.Interface + Delete bool +} + +func (newLinkMessage) ignore() bool { return true }