mirror of
https://github.com/tailscale/tailscale.git
synced 2026-05-05 12:16:44 +02:00
wgengine/filter: do stateful TCP connection tracking
WIP Fixes #859 Change-Id: I34c077825248dcebf4283d63081e5bc152b7a58b Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
parent
025867fd07
commit
2396a87871
@ -93,6 +93,11 @@ var debugCmd = &ffcli.Command{
|
||||
Exec: localAPIAction("rebind"),
|
||||
ShortHelp: "force a magicsock rebind",
|
||||
},
|
||||
{
|
||||
Name: "kick-all-tcp-in",
|
||||
Exec: localAPIAction("kick-all-tcp-in"),
|
||||
ShortHelp: "test TCP flow kick [incoming]",
|
||||
},
|
||||
{
|
||||
Name: "prefs",
|
||||
Exec: runPrefs,
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
@ -3283,6 +3284,17 @@ func (b *LocalBackend) DebugReSTUN() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *LocalBackend) DebugKickAllTCPIn() error {
|
||||
filt, ok := b.filterAtomic.Load().(*filter.Filter)
|
||||
if !ok {
|
||||
return errors.New("no filter")
|
||||
}
|
||||
for _, flow := range filt.OpenTCPFlows() {
|
||||
log.Printf("XXX: flow open: %+v", flow)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *LocalBackend) magicConn() (*magicsock.Conn, error) {
|
||||
ig, ok := b.e.(wgengine.InternalsGetter)
|
||||
if !ok {
|
||||
|
||||
@ -278,6 +278,8 @@ func (h *Handler) serveDebug(w http.ResponseWriter, r *http.Request) {
|
||||
err = h.b.DebugRebind()
|
||||
case "restun":
|
||||
err = h.b.DebugReSTUN()
|
||||
case "kick-all-tcp-in":
|
||||
err = h.b.DebugKickAllTCPIn()
|
||||
case "":
|
||||
err = fmt.Errorf("missing parameter 'action'")
|
||||
default:
|
||||
|
||||
@ -390,6 +390,16 @@ func (q *Parsed) IsTCPSyn() bool {
|
||||
return (q.TCPFlags & TCPSynAck) == TCPSyn
|
||||
}
|
||||
|
||||
// IsTCPRst reports whether q is a TCP RST packet.
|
||||
func (q *Parsed) IsTCPRst() bool {
|
||||
return (q.TCPFlags & TCPRst) != 0
|
||||
}
|
||||
|
||||
// IsTCPFin reports whether q is a TCP FIN packet.
|
||||
func (q *Parsed) IsTCPFin() bool {
|
||||
return (q.TCPFlags & TCPFin) != 0
|
||||
}
|
||||
|
||||
// IsError reports whether q is an ICMP "Error" packet.
|
||||
func (q *Parsed) IsError() bool {
|
||||
switch q.IPProto {
|
||||
|
||||
@ -7,6 +7,7 @@ package filter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -17,6 +18,7 @@ import (
|
||||
"tailscale.com/tstime/rate"
|
||||
"tailscale.com/types/ipproto"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/util/mak"
|
||||
)
|
||||
|
||||
// Filter is a stateful packet filter.
|
||||
@ -56,8 +58,106 @@ type Filter struct {
|
||||
|
||||
// filterState is a state cache of past seen packets.
|
||||
type filterState struct {
|
||||
mu sync.Mutex
|
||||
mu sync.Mutex // guards following
|
||||
|
||||
// lru is the flow track cached used by UDP & SCTP.
|
||||
lru *flowtrack.Cache // from flowtrack.Tuple -> nil
|
||||
|
||||
// tcpFlows tracks open TCP flows, both inbound and outbound. Regardless of
|
||||
// which direction initiated it, the Tuple's Src is always the remote side
|
||||
// and Dst is the local side.
|
||||
tcpFlows map[flowtrack.Tuple]*TCPFlow
|
||||
}
|
||||
|
||||
// OpenTCPFlows returns the set of open TCP flows in an unsorted order.
|
||||
func (f *Filter) OpenTCPFlows() []*TCPFlow {
|
||||
st := f.state
|
||||
st.mu.Lock()
|
||||
defer st.mu.Unlock()
|
||||
ret := make([]*TCPFlow, 0, len(st.tcpFlows))
|
||||
for _, f := range st.tcpFlows {
|
||||
ret = append(ret, f)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
type TCPFlow struct {
|
||||
flowtrack.Tuple
|
||||
Out bool // was an outbound connection (from local tailscale)
|
||||
Created time.Time
|
||||
// TODO(bradfitz): lastActivity mono.Time, once we can do it fast enough
|
||||
// to update on all packets.
|
||||
|
||||
mu sync.Mutex // guards finIn/finOut; lock order: filterState.mu, then TCPFlow.mu
|
||||
// finOut and finIn record whether we've seen a FIN packet in or out
|
||||
// for this flow.
|
||||
finOut bool
|
||||
finIn bool
|
||||
}
|
||||
|
||||
func (s *filterState) addOutgoingTCPFlow(t flowtrack.Tuple) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
log.Printf("XXX adding out flow %v", t)
|
||||
mak.Set(&s.tcpFlows, t, &TCPFlow{
|
||||
Tuple: t,
|
||||
Out: true,
|
||||
Created: time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *filterState) addIncomingTCPFlow(t flowtrack.Tuple) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
log.Printf("XXX adding in flow %v", t)
|
||||
mak.Set(&s.tcpFlows, t, &TCPFlow{
|
||||
Tuple: t,
|
||||
Out: false,
|
||||
Created: time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *filterState) removeTCPFlow(t flowtrack.Tuple) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
delete(s.tcpFlows, t)
|
||||
log.Printf("XXX removing flow %v", t)
|
||||
}
|
||||
|
||||
func (s *filterState) setFinOut(t flowtrack.Tuple) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if f, ok := s.tcpFlows[t]; ok {
|
||||
f.mu.Lock()
|
||||
f.finOut = true
|
||||
done := f.finOut && f.finIn
|
||||
f.mu.Unlock()
|
||||
log.Printf("XXX FIN out for flow %v", t)
|
||||
if done {
|
||||
delete(s.tcpFlows, t)
|
||||
log.Printf("XXX due to FINs, removing flow %v", t)
|
||||
}
|
||||
} else {
|
||||
log.Printf("XXX FIN out for unknown flow %v", t)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *filterState) setFinIn(t flowtrack.Tuple) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if f, ok := s.tcpFlows[t]; ok {
|
||||
f.mu.Lock()
|
||||
f.finIn = true
|
||||
done := f.finOut && f.finIn
|
||||
f.mu.Unlock()
|
||||
log.Printf("XXX FIN in for flow %v", t)
|
||||
if done {
|
||||
delete(s.tcpFlows, t)
|
||||
log.Printf("XXX due to FINs, removing flow %v", t)
|
||||
}
|
||||
} else {
|
||||
log.Printf("XXX FIN in for unknown flow %v", t)
|
||||
}
|
||||
}
|
||||
|
||||
// lruMax is the size of the LRU cache in filterState.
|
||||
@ -416,12 +516,25 @@ func (f *Filter) runIn4(q *packet.Parsed) (r Response, why string) {
|
||||
// can't be initiated without first sending a SYN.
|
||||
// It happens to also be much faster.
|
||||
// TODO(apenwarr): Skip the rest of decoding in this path?
|
||||
if !q.IsTCPSyn() {
|
||||
return Accept, "tcp non-syn"
|
||||
if q.IsTCPSyn() {
|
||||
if f.matches4.match(q) {
|
||||
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
|
||||
f.state.addIncomingTCPFlow(t)
|
||||
return Accept, "tcp ok"
|
||||
}
|
||||
return Drop, "no rules matched"
|
||||
}
|
||||
if f.matches4.match(q) {
|
||||
return Accept, "tcp ok"
|
||||
isFin := q.IsTCPFin()
|
||||
isRst := q.IsTCPRst()
|
||||
if isFin || isRst {
|
||||
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
|
||||
if isFin {
|
||||
f.state.setFinIn(t)
|
||||
} else if isRst {
|
||||
f.state.removeTCPFlow(t)
|
||||
}
|
||||
}
|
||||
return Accept, "tcp non-syn"
|
||||
case ipproto.UDP, ipproto.SCTP:
|
||||
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
|
||||
|
||||
@ -476,12 +589,25 @@ func (f *Filter) runIn6(q *packet.Parsed) (r Response, why string) {
|
||||
// can't be initiated without first sending a SYN.
|
||||
// It happens to also be much faster.
|
||||
// TODO(apenwarr): Skip the rest of decoding in this path?
|
||||
if q.IPProto == ipproto.TCP && !q.IsTCPSyn() {
|
||||
return Accept, "tcp non-syn"
|
||||
if q.IsTCPSyn() {
|
||||
if f.matches6.match(q) {
|
||||
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
|
||||
f.state.addIncomingTCPFlow(t)
|
||||
return Accept, "tcp ok"
|
||||
}
|
||||
return Drop, "no rules matched"
|
||||
}
|
||||
if f.matches6.match(q) {
|
||||
return Accept, "tcp ok"
|
||||
isFin := q.IsTCPFin()
|
||||
isRst := q.IsTCPRst()
|
||||
if isFin || isRst {
|
||||
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
|
||||
if isFin {
|
||||
f.state.setFinIn(t)
|
||||
} else if isRst {
|
||||
f.state.removeTCPFlow(t)
|
||||
}
|
||||
}
|
||||
return Accept, "tcp non-syn"
|
||||
case ipproto.UDP, ipproto.SCTP:
|
||||
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
|
||||
|
||||
@ -517,6 +643,23 @@ func (f *Filter) runOut(q *packet.Parsed) (r Response, why string) {
|
||||
f.state.mu.Lock()
|
||||
f.state.lru.Add(tuple, nil)
|
||||
f.state.mu.Unlock()
|
||||
case ipproto.TCP:
|
||||
isSyn := q.IsTCPSyn()
|
||||
isRst := q.IsTCPRst()
|
||||
isFin := q.IsTCPFin()
|
||||
if isSyn || isRst || isFin {
|
||||
tuple := flowtrack.Tuple{
|
||||
Proto: q.IPProto,
|
||||
Src: q.Dst, Dst: q.Src, // src/dst reversed
|
||||
}
|
||||
if isSyn {
|
||||
f.state.addOutgoingTCPFlow(tuple)
|
||||
} else if isRst {
|
||||
f.state.removeTCPFlow(tuple)
|
||||
} else if isFin {
|
||||
f.state.setFinOut(tuple)
|
||||
}
|
||||
}
|
||||
}
|
||||
return Accept, "ok out"
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user