diff --git a/cmd/tailscale/cli/debug.go b/cmd/tailscale/cli/debug.go index c23d976af..7f763753c 100644 --- a/cmd/tailscale/cli/debug.go +++ b/cmd/tailscale/cli/debug.go @@ -93,6 +93,11 @@ var debugCmd = &ffcli.Command{ Exec: localAPIAction("rebind"), ShortHelp: "force a magicsock rebind", }, + { + Name: "kick-all-tcp-in", + Exec: localAPIAction("kick-all-tcp-in"), + ShortHelp: "test TCP flow kick [incoming]", + }, { Name: "prefs", Exec: runPrefs, diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 2d2d0f90d..d6b1d9daa 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "io" + "log" "net" "net/http" "os" @@ -3283,6 +3284,17 @@ func (b *LocalBackend) DebugReSTUN() error { return nil } +func (b *LocalBackend) DebugKickAllTCPIn() error { + filt, ok := b.filterAtomic.Load().(*filter.Filter) + if !ok { + return errors.New("no filter") + } + for _, flow := range filt.OpenTCPFlows() { + log.Printf("XXX: flow open: %+v", flow) + } + return nil +} + func (b *LocalBackend) magicConn() (*magicsock.Conn, error) { ig, ok := b.e.(wgengine.InternalsGetter) if !ok { diff --git a/ipn/localapi/localapi.go b/ipn/localapi/localapi.go index 699758d3e..286a54754 100644 --- a/ipn/localapi/localapi.go +++ b/ipn/localapi/localapi.go @@ -278,6 +278,8 @@ func (h *Handler) serveDebug(w http.ResponseWriter, r *http.Request) { err = h.b.DebugRebind() case "restun": err = h.b.DebugReSTUN() + case "kick-all-tcp-in": + err = h.b.DebugKickAllTCPIn() case "": err = fmt.Errorf("missing parameter 'action'") default: diff --git a/net/packet/packet.go b/net/packet/packet.go index 48633b89d..f9d912d03 100644 --- a/net/packet/packet.go +++ b/net/packet/packet.go @@ -390,6 +390,16 @@ func (q *Parsed) IsTCPSyn() bool { return (q.TCPFlags & TCPSynAck) == TCPSyn } +// IsTCPRst reports whether q is a TCP RST packet. +func (q *Parsed) IsTCPRst() bool { + return (q.TCPFlags & TCPRst) != 0 +} + +// IsTCPFin reports whether q is a TCP FIN packet. +func (q *Parsed) IsTCPFin() bool { + return (q.TCPFlags & TCPFin) != 0 +} + // IsError reports whether q is an ICMP "Error" packet. func (q *Parsed) IsError() bool { switch q.IPProto { diff --git a/wgengine/filter/filter.go b/wgengine/filter/filter.go index cda28eb30..8aea09e12 100644 --- a/wgengine/filter/filter.go +++ b/wgengine/filter/filter.go @@ -7,6 +7,7 @@ package filter import ( "fmt" + "log" "sync" "time" @@ -17,6 +18,7 @@ import ( "tailscale.com/tstime/rate" "tailscale.com/types/ipproto" "tailscale.com/types/logger" + "tailscale.com/util/mak" ) // Filter is a stateful packet filter. @@ -56,8 +58,106 @@ type Filter struct { // filterState is a state cache of past seen packets. type filterState struct { - mu sync.Mutex + mu sync.Mutex // guards following + + // lru is the flow track cached used by UDP & SCTP. lru *flowtrack.Cache // from flowtrack.Tuple -> nil + + // tcpFlows tracks open TCP flows, both inbound and outbound. Regardless of + // which direction initiated it, the Tuple's Src is always the remote side + // and Dst is the local side. + tcpFlows map[flowtrack.Tuple]*TCPFlow +} + +// OpenTCPFlows returns the set of open TCP flows in an unsorted order. +func (f *Filter) OpenTCPFlows() []*TCPFlow { + st := f.state + st.mu.Lock() + defer st.mu.Unlock() + ret := make([]*TCPFlow, 0, len(st.tcpFlows)) + for _, f := range st.tcpFlows { + ret = append(ret, f) + } + return ret +} + +type TCPFlow struct { + flowtrack.Tuple + Out bool // was an outbound connection (from local tailscale) + Created time.Time + // TODO(bradfitz): lastActivity mono.Time, once we can do it fast enough + // to update on all packets. + + mu sync.Mutex // guards finIn/finOut; lock order: filterState.mu, then TCPFlow.mu + // finOut and finIn record whether we've seen a FIN packet in or out + // for this flow. + finOut bool + finIn bool +} + +func (s *filterState) addOutgoingTCPFlow(t flowtrack.Tuple) { + s.mu.Lock() + defer s.mu.Unlock() + log.Printf("XXX adding out flow %v", t) + mak.Set(&s.tcpFlows, t, &TCPFlow{ + Tuple: t, + Out: true, + Created: time.Now(), + }) +} + +func (s *filterState) addIncomingTCPFlow(t flowtrack.Tuple) { + s.mu.Lock() + defer s.mu.Unlock() + log.Printf("XXX adding in flow %v", t) + mak.Set(&s.tcpFlows, t, &TCPFlow{ + Tuple: t, + Out: false, + Created: time.Now(), + }) +} + +func (s *filterState) removeTCPFlow(t flowtrack.Tuple) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.tcpFlows, t) + log.Printf("XXX removing flow %v", t) +} + +func (s *filterState) setFinOut(t flowtrack.Tuple) { + s.mu.Lock() + defer s.mu.Unlock() + if f, ok := s.tcpFlows[t]; ok { + f.mu.Lock() + f.finOut = true + done := f.finOut && f.finIn + f.mu.Unlock() + log.Printf("XXX FIN out for flow %v", t) + if done { + delete(s.tcpFlows, t) + log.Printf("XXX due to FINs, removing flow %v", t) + } + } else { + log.Printf("XXX FIN out for unknown flow %v", t) + } +} + +func (s *filterState) setFinIn(t flowtrack.Tuple) { + s.mu.Lock() + defer s.mu.Unlock() + if f, ok := s.tcpFlows[t]; ok { + f.mu.Lock() + f.finIn = true + done := f.finOut && f.finIn + f.mu.Unlock() + log.Printf("XXX FIN in for flow %v", t) + if done { + delete(s.tcpFlows, t) + log.Printf("XXX due to FINs, removing flow %v", t) + } + } else { + log.Printf("XXX FIN in for unknown flow %v", t) + } } // lruMax is the size of the LRU cache in filterState. @@ -416,12 +516,25 @@ func (f *Filter) runIn4(q *packet.Parsed) (r Response, why string) { // can't be initiated without first sending a SYN. // It happens to also be much faster. // TODO(apenwarr): Skip the rest of decoding in this path? - if !q.IsTCPSyn() { - return Accept, "tcp non-syn" + if q.IsTCPSyn() { + if f.matches4.match(q) { + t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst} + f.state.addIncomingTCPFlow(t) + return Accept, "tcp ok" + } + return Drop, "no rules matched" } - if f.matches4.match(q) { - return Accept, "tcp ok" + isFin := q.IsTCPFin() + isRst := q.IsTCPRst() + if isFin || isRst { + t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst} + if isFin { + f.state.setFinIn(t) + } else if isRst { + f.state.removeTCPFlow(t) + } } + return Accept, "tcp non-syn" case ipproto.UDP, ipproto.SCTP: t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst} @@ -476,12 +589,25 @@ func (f *Filter) runIn6(q *packet.Parsed) (r Response, why string) { // can't be initiated without first sending a SYN. // It happens to also be much faster. // TODO(apenwarr): Skip the rest of decoding in this path? - if q.IPProto == ipproto.TCP && !q.IsTCPSyn() { - return Accept, "tcp non-syn" + if q.IsTCPSyn() { + if f.matches6.match(q) { + t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst} + f.state.addIncomingTCPFlow(t) + return Accept, "tcp ok" + } + return Drop, "no rules matched" } - if f.matches6.match(q) { - return Accept, "tcp ok" + isFin := q.IsTCPFin() + isRst := q.IsTCPRst() + if isFin || isRst { + t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst} + if isFin { + f.state.setFinIn(t) + } else if isRst { + f.state.removeTCPFlow(t) + } } + return Accept, "tcp non-syn" case ipproto.UDP, ipproto.SCTP: t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst} @@ -517,6 +643,23 @@ func (f *Filter) runOut(q *packet.Parsed) (r Response, why string) { f.state.mu.Lock() f.state.lru.Add(tuple, nil) f.state.mu.Unlock() + case ipproto.TCP: + isSyn := q.IsTCPSyn() + isRst := q.IsTCPRst() + isFin := q.IsTCPFin() + if isSyn || isRst || isFin { + tuple := flowtrack.Tuple{ + Proto: q.IPProto, + Src: q.Dst, Dst: q.Src, // src/dst reversed + } + if isSyn { + f.state.addOutgoingTCPFlow(tuple) + } else if isRst { + f.state.removeTCPFlow(tuple) + } else if isFin { + f.state.setFinOut(tuple) + } + } } return Accept, "ok out" }