From 1bc7c0ce7616e1f22a3034377dd8e2bd622946fb Mon Sep 17 00:00:00 2001 From: julianknodt Date: Fri, 18 Jun 2021 14:44:46 -0700 Subject: [PATCH] Remove old portmapping code Signed-off-by: julianknodt --- net/portmapper/portmapper.go | 31 ++- net/portmapper/portmapper_test.go | 6 +- net/portmapper/probe.go | 362 ++++++------------------------ syncs/syncs.go | 59 +++++ wgengine/magicsock/magicsock.go | 23 +- 5 files changed, 179 insertions(+), 302 deletions(-) diff --git a/net/portmapper/portmapper.go b/net/portmapper/portmapper.go index d2deee536..938cd39d5 100644 --- a/net/portmapper/portmapper.go +++ b/net/portmapper/portmapper.go @@ -63,7 +63,7 @@ type Client struct { localPort uint16 pmpMapping *pmpMapping // non-nil if we have a PMP mapping - *Prober // non-nil once the prober has started + prober *Prober // non-nil once probe has been called. } // HaveMapping reports whether we have a current valid mapping. @@ -131,6 +131,9 @@ func (c *Client) Close() error { if c.closed { return nil } + if c.prober != nil { + c.prober.Close() + } c.closed = true c.invalidateMappingsLocked(true) // TODO: close some future ever-listening UDP socket(s), @@ -181,6 +184,24 @@ func (c *Client) invalidateMappingsLocked(releaseOld bool) { c.uPnPSawTime = time.Time{} } +// Probe will assess the network for the presence of portmapping services. +func (c *Client) Probe(ctx context.Context) (ProbeResult, error) { + c.mu.Lock() + if c.prober == nil { + c.initProberLocked(ctx) + } + c.mu.Unlock() + return c.prober.Complete() +} + +func (c *Client) StartProbing() { + c.mu.Lock() + if c.prober == nil { + c.initProberLocked(context.Background()) + } + c.mu.Unlock() +} + func (c *Client) sawPMPRecently() bool { c.mu.Lock() defer c.mu.Unlock() @@ -435,19 +456,25 @@ func parsePMPResponse(pkt []byte) (res pmpResponse, ok bool) { return res, true } +// ProbeResults indicates which services are present after a probe. +// The presense of services may change over time, so it represents the presense +// of these items at a given time. type ProbeResult struct { PCP bool PMP bool UPnP bool } +// oldProbe is the old API for probing, retained in order to ensure back-compatibility. +// It's currently used in TestProberEquivalent. +// // Probe returns a summary of which port mapping services are // available on the network. // // If a probe has run recently and there haven't been any network changes since, // the returned result might be server from the Client's cache, without // sending any network traffic. -func (c *Client) Probe(ctx context.Context) (res ProbeResult, err error) { +func (c *Client) oldProbe(ctx context.Context) (res ProbeResult, err error) { gw, myIP, ok := c.gatewayAndSelfIP() if !ok { return res, ErrGatewayNotFound diff --git a/net/portmapper/portmapper_test.go b/net/portmapper/portmapper_test.go index 995347fa9..7c5da438d 100644 --- a/net/portmapper/portmapper_test.go +++ b/net/portmapper/portmapper_test.go @@ -59,12 +59,12 @@ func TestProberEquivalent(t *testing.T) { } c := NewClient(t.Logf) c.SetLocalPort(1234) - res, err := c.Probe(context.Background()) + res, err := c.oldProbe(context.Background()) if err != nil { return } - proberRes, proberErr := c.NewProber(context.Background()).StatusBlock() - if err == nil && proberErr != nil { + proberRes, proberErr := c.Probe(context.Background()) + if proberErr != nil { t.Errorf("prober returned err while probe did not: %v, %v", err, proberErr) } if res.PCP && !proberRes.PCP { diff --git a/net/portmapper/probe.go b/net/portmapper/probe.go index dca7647b8..83123884d 100644 --- a/net/portmapper/probe.go +++ b/net/portmapper/probe.go @@ -1,332 +1,114 @@ // Copyright (c) 2021 Tailscale Inc & AUTHORS All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. + package portmapper import ( "context" - "net" - "sync" "time" - "go4.org/mem" - "inet.af/netaddr" - "tailscale.com/net/netns" + "tailscale.com/syncs" ) +// Prober periodically pings the network and checks for port-mapping services. type Prober struct { - // pause signals the probe to either pause temporarily (true), or stop entirely (false) - // to restart the probe, send another pause to it. - pause chan<- bool + // stop will stop the prober + stop func() - PMP *ProbeSubResult - PCP *ProbeSubResult - UPnP *ProbeSubResult + // Each of the SubResults below is intended to expose whether a specific service is available + // for use on a client, and the most recent seen time. Should not be modified externally, and + // will be periodically updated. + + // PMP stores the result of probing pmp services and is populated by the prober. + PMP syncs.WaitableResult + // PCP stores the result of probing pcp services and is populated by the prober. + PCP syncs.WaitableResult + // UPnP stores the result of probing pcp services and is populated by the prober. + UPnP syncs.WaitableResult } -// NewProber creates a new prober for a given client. Should not be called concurrently. +// initProberLocked will start a prober if it does not exist on the given portmapping client. +// The prober will run until the context terminates or stop is called, probing whether services +// are available periodically. c.mu must be held. +func (c *Client) initProberLocked(ctx context.Context) { + stop := make(chan struct{}) + p := &Prober{ + PMP: syncs.NewWaitableResult(), + PCP: syncs.NewWaitableResult(), + UPnP: syncs.NewWaitableResult(), + stop: func() { close(stop) }, + } + c.prober = p + go func() { + for { + res, err := c.oldProbe(ctx) + p.PMP.Set(res.PMP, err) + p.PCP.Set(res.PCP, err) + p.UPnP.Set(res.UPnP, err) + + select { + case <-time.After(trustServiceStillAvailableDuration * 3 / 4): + case <-ctx.Done(): + return + case <-stop: + return + } + } + }() +} + +// Close gracefully turns the Prober off, completing the current probes before exiting. // -// It is not currently the only method to probe the network, so that it can be tested for -// compatibility with the prior method. -func (c *Client) NewProber(ctx context.Context) (p *Prober) { - if c.Prober != nil { - return c.Prober - } - pause := make(chan bool) - p = &Prober{ - pause: pause, +// Calling stop Close multiple times will have no additional effects. +func (p *Prober) Close() { p.stop() } - PMP: NewProbeSubResult(), - PCP: NewProbeSubResult(), - UPnP: NewProbeSubResult(), - } - c.Prober = p - - go func() { - defer p.PMP.Set(false, nil) - defer p.PCP.Set(false, nil) - for { - pmp_ctx, cancel := context.WithTimeout(ctx, portMapServiceTimeout) - hasPCP, hasPMP, err := c.probePMPAndPCP(pmp_ctx) - if err != nil { - if ctx.Err() != nil { - err = nil - // the global context has passed, exit cleanly - cancel() - return - } - if pmp_ctx.Err() == context.DeadlineExceeded { - err = nil - } - } - cancel() - p.PMP.Set(hasPMP, err) - p.PCP.Set(hasPCP, err) - - t := time.NewTimer(trustServiceStillAvailableDuration * 3 / 4) - - select { - case should_pause := <-pause: - if !should_pause { - t.Stop() - return - } - restart := <-pause - if !restart { - t.Stop() - return - } - case <-t.C: // break through and retry the connection - } - } - }() - - go func() { - defer p.UPnP.Set(false, nil) - for { - upnp_ctx, cancel := context.WithTimeout(ctx, portMapServiceTimeout) - hasUPnP, err := c.probeUPnP(upnp_ctx) - if err != nil { - if ctx.Err() != nil { - // the global context has passed, exit cleanly - cancel() - return - } - if upnp_ctx.Err() == context.DeadlineExceeded { - err = nil - } - } - cancel() - t := time.NewTimer(trustServiceStillAvailableDuration * 3 / 4) - p.UPnP.Set(hasUPnP, err) - - select { - case should_pause := <-pause: - if !should_pause { - t.Stop() - return - } - restart := <-pause - if !restart { - t.Stop() - return - } - case <-t.C: // break through and retry the connection - } - } - }() - - return -} - -// Stop gracefully turns the Prober off, completing the current probes before exiting. -func (p *Prober) Stop() { close(p.pause) } - -// Pauses the prober if currently running, or starts if it was previously paused. -func (p *Prober) Toggle() { p.pause <- true } - -// CurrentStatus returns the current results of the prober, regardless of whether they have -// completed or not. -func (p *Prober) CurrentStatus() (res ProbeResult, err error) { - hasPMP, errPMP := p.PMP.PresentCurrent() +// Current returns the current results of the prober, regardless of whether they have completed +// or not. The returned probe result returns whether any of the services have been known to be +// detected and if a value is true it will be available. If any of the services recently +// returned an error due to inability to reach it, some failure of protocol, it will also be +// returned, but if one of the probe results returned true it can still be used. Notably, it is +// not an error to not yet have completed, or for a limited number of services to be available. +func (p *Prober) Current() (ProbeResult, error) { + var res ProbeResult + _, hasPMP, errPMP := p.PMP.Peek() res.PMP = hasPMP - err = errPMP + err := errPMP - hasUPnP, errUPnP := p.UPnP.PresentCurrent() + _, hasUPnP, errUPnP := p.UPnP.Peek() res.UPnP = hasUPnP if err == nil { err = errUPnP } - hasPCP, errPCP := p.PCP.PresentCurrent() + _, hasPCP, errPCP := p.PCP.Peek() res.PCP = hasPCP if err == nil { err = errPCP } - return + return res, err } -func (p *Prober) StatusBlock() (res ProbeResult, err error) { - hasPMP, errPMP := p.PMP.PresentBlock() +// Complete blocks the caller until probing all services has completed, regardless of success +// or failure. It returns the result of probing each of UPnP, PMP, and PCP, and if there is an +// error on any service, it will be returned. If any result is true, that service completed without +// error and can be used. +func (p *Prober) Complete() (ProbeResult, error) { + var res ProbeResult + hasPMP, errPMP := p.PMP.Get() res.PMP = hasPMP - err = errPMP + err := errPMP - hasUPnP, errUPnP := p.UPnP.PresentBlock() + hasUPnP, errUPnP := p.UPnP.Get() res.UPnP = hasUPnP if err == nil { err = errUPnP } - hasPCP, errPCP := p.PCP.PresentBlock() + hasPCP, errPCP := p.PCP.Get() res.PCP = hasPCP if err == nil { err = errPCP } - return -} - -type ProbeSubResult struct { - cond *sync.Cond - // If this probe has finished, regardless of success or failure - completed bool - - // whether or not this feature is present - present bool - // most recent error - err error - - // time we last saw it to be available. - sawTime time.Time -} - -func NewProbeSubResult() *ProbeSubResult { - return &ProbeSubResult{ - cond: &sync.Cond{ - L: &sync.Mutex{}, - }, - } -} - -// PresentBlock blocks until the probe completes, then returns the result. -func (psr *ProbeSubResult) PresentBlock() (bool, error) { - psr.cond.L.Lock() - defer psr.cond.L.Unlock() - for !psr.completed { - psr.cond.Wait() - } - return psr.present, psr.err -} - -// PresentCurrent returns the current state, regardless whether or not the probe has completed. -func (psr *ProbeSubResult) PresentCurrent() (bool, error) { - psr.cond.L.Lock() - defer psr.cond.L.Unlock() - present := psr.present && psr.sawTime.After(time.Now().Add(-trustServiceStillAvailableDuration)) - return present, psr.err -} - -func (psr *ProbeSubResult) Set(present bool, err error) { - saw := time.Now() - psr.cond.L.Lock() - psr.sawTime = saw - psr.completed = true - psr.err = err - psr.present = present - psr.cond.L.Unlock() - - psr.cond.Broadcast() -} - -func (c *Client) probePMPAndPCP(ctx context.Context) (pcp bool, pmp bool, err error) { - gw, myIP, ok := c.gatewayAndSelfIP() - if !ok { - return false, false, ErrGatewayNotFound - } - - uc, err := netns.Listener().ListenPacket(ctx, "udp4", ":0") - if err != nil { - c.logf("ProbePCP/PMP: %v", err) - return false, false, err - } - defer uc.Close() - defer closeCloserOnContextDone(ctx, uc)() - - pcpAddr := netaddr.IPPortFrom(gw, pcpPort).UDPAddr() - pmpAddr := netaddr.IPPortFrom(gw, pmpPort).UDPAddr() - - // Don't send probes to services that we recently learned (for - // the same gw/myIP) are available. See - // https://github.com/tailscale/tailscale/issues/1001 - if c.sawPMPRecently() { - pmp = true - } else { - uc.WriteTo(pmpReqExternalAddrPacket, pmpAddr) - } - if c.sawPCPRecently() { - pcp = true - } else { - uc.WriteTo(pcpAnnounceRequest(myIP), pcpAddr) - } - - buf := make([]byte, 1500) - pcpHeard := false // true when we get any PCP response - for { - if pcpHeard && pmp { - // Nothing more to discover. - return - } - n, _, err := uc.ReadFrom(buf) - if err != nil { - if ctx.Err() == context.DeadlineExceeded { - err = nil - } - return pcp, pmp, err - } - if pres, ok := parsePCPResponse(buf[:n]); ok { - if pres.OpCode == pcpOpReply|pcpOpAnnounce { - pcpHeard = true - //c.mu.Lock() - //c.pcpSawTime = time.Now() - //c.mu.Unlock() - switch pres.ResultCode { - case pcpCodeOK: - c.logf("Got PCP response: epoch: %v", pres.Epoch) - pcp = true - continue - case pcpCodeNotAuthorized: - // A PCP service is running, but refuses to - // provide port mapping services. - pcp = false - continue - default: - // Fall through to unexpected log line. - } - } - c.logf("unexpected PCP probe response: %+v", pres) - } - if pres, ok := parsePMPResponse(buf[:n]); ok { - if pres.OpCode == pmpOpReply|pmpOpMapPublicAddr && pres.ResultCode == pmpCodeOK { - c.logf("Got PMP response; IP: %v, epoch: %v", pres.PublicAddr, pres.SecondsSinceEpoch) - pmp = true - c.mu.Lock() - c.pmpPubIP = pres.PublicAddr - c.pmpPubIPTime = time.Now() - c.pmpLastEpoch = pres.SecondsSinceEpoch - c.mu.Unlock() - continue - } - c.logf("unexpected PMP probe response: %+v", pres) - } - } -} - -func (c *Client) probeUPnP(ctx context.Context) (upnp bool, err error) { - gw, _, ok := c.gatewayAndSelfIP() - if !ok { - return false, ErrGatewayNotFound - } - if c.sawUPnPRecently() { - return true, nil - } - upnpAddr := netaddr.IPPortFrom(gw, upnpPort).UDPAddr() - uc, err := netns.Listener().ListenPacket(ctx, "udp4", ":0") - if deadline, ok := ctx.Deadline(); ok { - uc.SetDeadline(deadline) - } - if err != nil { - c.logf("ProbeUPnP: %v", err) - return false, err - } - defer uc.Close() - uc.WriteTo(uPnPPacket, upnpAddr) - buf := make([]byte, 1500) - n, _, err := uc.ReadFrom(buf) - if err != nil { - if err.(net.Error).Timeout() || err.(net.Error).Temporary() { - err = nil - } - return false, err - } - upnp = mem.Contains(mem.B(buf[:n]), mem.S(":InternetGatewayDevice:")) - return upnp, err + return res, err } diff --git a/syncs/syncs.go b/syncs/syncs.go index 46861af63..c305ba550 100644 --- a/syncs/syncs.go +++ b/syncs/syncs.go @@ -7,7 +7,9 @@ package syncs import ( "context" + "sync" "sync/atomic" + "time" ) // ClosedChan returns a channel that's already closed. @@ -135,3 +137,60 @@ func (s Semaphore) TryAcquire() bool { func (s Semaphore) Release() { <-s.c } + +// WaitableResult allows for blocking on a repeated, fallible operation until it completes, +// and getting the result. +type WaitableResult struct { + // sync.Cond.L guards all the fields below, and is used to wait until completed is true. + cond *sync.Cond + // Completed is set after the first operation has completed, and should be used in conjunction + // with `cond` above in order to block. + completed bool + + result bool // result is whether or not the most recent operation succeeded or not. + err error // err indicates the most recent error during the operation. + + // sawTime is the last time this result was updated. + sawTime time.Time +} + +func NewWaitableResult() WaitableResult { + return WaitableResult{ + cond: &sync.Cond{ + L: &sync.Mutex{}, + }, + } +} + +// Get blocks until an operation completes, then returns true if it was a success. +// Otherwise, it returns returns false, with a possible error. +func (wr *WaitableResult) Get() (bool, error) { + wr.cond.L.Lock() + defer wr.cond.L.Unlock() + for !wr.completed { + wr.cond.Wait() + } + return wr.result, wr.err +} + +// Current returns the current state of the result without blocking, regardless of whether or +// not it has completed, as well as the completion time of the operation. +func (wr *WaitableResult) Peek() (time.Time, bool, error) { + wr.cond.L.Lock() + defer wr.cond.L.Unlock() + return wr.sawTime, wr.result, wr.err +} + +// Set should be called when an operation has completed. It will unblock any items waiting for +// the completed operation, and overwrite previous the results of previous operations. +func (wr *WaitableResult) Set(result bool, err error) { + saw := time.Now() + wr.cond.L.Lock() + wr.sawTime = saw + wr.completed = true + wr.err = err + wr.result = result + wr.cond.L.Unlock() + + wr.cond.Broadcast() +} diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index a2c200be2..1422b0e95 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -487,6 +487,7 @@ func NewConn(opts Options) (*Conn, error) { c.simulatedNetwork = opts.SimulatedNetwork c.disableLegacy = opts.DisableLegacyNetworking c.portMapper = portmapper.NewClient(logger.WithPrefix(c.logf, "portmapper: ")) + c.portMapper.StartProbing() if opts.LinkMonitor != nil { c.portMapper.SetGatewayLookupFunc(opts.LinkMonitor.GatewayAndSelfIP) } @@ -985,6 +986,15 @@ func (c *Conn) determineEndpoints(ctx context.Context) ([]tailcfg.Endpoint, erro return nil, err } + portmap := make(chan netaddr.IPPort, 1) + go func() { + if ext, err := c.portMapper.CreateOrGetMapping(ctx); err == nil { + portmap <- ext + } else if !portmapper.IsNoMappingError(err) { + c.logf("portmapper: %v", err) + } + }() + already := make(map[netaddr.IPPort]tailcfg.EndpointType) // endpoint -> how it was found var eps []tailcfg.Endpoint // unique endpoints @@ -1002,13 +1012,6 @@ func (c *Conn) determineEndpoints(ctx context.Context) ([]tailcfg.Endpoint, erro } } - if ext, err := c.portMapper.CreateOrGetMapping(ctx); err == nil { - addAddr(ext, tailcfg.EndpointPortmapped) - c.setNetInfoHavePortMap() - } else if !portmapper.IsNoMappingError(err) { - c.logf("portmapper: %v", err) - } - if nr.GlobalV4 != "" { addAddr(ipp(nr.GlobalV4), tailcfg.EndpointSTUN) @@ -1050,6 +1053,12 @@ func (c *Conn) determineEndpoints(ctx context.Context) ([]tailcfg.Endpoint, erro // Do not offer addresses on other local interfaces. addAddr(ipp(localAddr.String()), tailcfg.EndpointLocal) } + select { + case ext := <-portmap: + addAddr(ext, tailcfg.EndpointPortmapped) + c.setNetInfoHavePortMap() + case <-time.After(200 * time.Millisecond): + } // Note: the endpoints are intentionally returned in priority order, // from "farthest but most reliable" to "closest but least