From 75419826353c7ab710200f0df9bbdf87d2c75f17 Mon Sep 17 00:00:00 2001 From: Dmytro Shynkevych Date: Wed, 26 Aug 2020 19:17:20 -0400 Subject: [PATCH] tsdns: remove forwarding queue. Two levels of queueing are unnecessary. The resulting implementation performs as follows under request bursts (`count` packets sent concurrently): lost count avg latency 0 / 256 (00.00%) - 28ms 0 / 512 (00.00%) - 146ms 0 / 768 (00.00%) - 166ms 0 / 1024 (00.00%) - 416ms 11 / 1280 (00.86%) - 430ms 145 / 1536 (09.44%) - 715ms 364 / 2048 (17.77%) - 836ms Signed-off-by: Dmytro Shynkevych --- wgengine/tsdns/forwarder.go | 49 +++++++------------------------------ wgengine/tsdns/tsdns.go | 6 ++--- 2 files changed, 12 insertions(+), 43 deletions(-) diff --git a/wgengine/tsdns/forwarder.go b/wgengine/tsdns/forwarder.go index c43873a2f..48312d1bd 100644 --- a/wgengine/tsdns/forwarder.go +++ b/wgengine/tsdns/forwarder.go @@ -22,11 +22,6 @@ import ( // headerBytes is the number of bytes in a DNS message header. const headerBytes = 12 -// forwardQueueSize is the maximal number of requests that can be pending delegation. -// Note that this is distinct from the number of requests that are pending a response, -// which is not limited (except by txid collisions). -const forwardQueueSize = 64 - // connCount is the number of UDP connections to use for forwarding. const connCount = 32 @@ -138,7 +133,6 @@ func newForwarder(logf logger.Logf, responses chan Packet) *forwarder { return &forwarder{ logf: logger.WithPrefix(logf, "forward: "), responses: responses, - queue: make(chan forwardedPacket, forwardQueueSize), closed: make(chan struct{}), conns: make([]*net.UDPConn, connCount), txMap: make(map[txid]forwardingRecord), @@ -155,11 +149,10 @@ func (f *forwarder) Start() error { } } - f.wg.Add(connCount + 2) + f.wg.Add(connCount + 1) for idx, conn := range f.conns { go f.recv(uint16(idx), conn) } - go f.send() go f.cleanMap() return nil @@ -191,28 +184,13 @@ func (f *forwarder) setUpstreams(upstreams []net.Addr) { f.mu.Unlock() } -func (f *forwarder) send() { - defer f.wg.Done() - - var packet forwardedPacket - for { - select { - case <-f.closed: - return - case packet = <-f.queue: - // continue - } - - connIdx := rand.Intn(connCount) - conn := f.conns[connIdx] - _, err := conn.WriteTo(packet.payload, packet.dst) - if err != nil { - // Do not log errors due to expired deadline. - if !errors.Is(err, os.ErrDeadlineExceeded) { - f.logf("send: %v", err) - } - return - } +func (f *forwarder) send(packet []byte, dst net.Addr) { + connIdx := rand.Intn(connCount) + conn := f.conns[connIdx] + _, err := conn.WriteTo(packet, dst) + // Do not log errors due to expired deadline. + if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) { + f.logf("send: %v", err) } } @@ -308,17 +286,8 @@ func (f *forwarder) forward(query Packet) error { f.mu.Unlock() - packet := forwardedPacket{ - payload: query.Payload, - } for _, upstream := range upstreams { - packet.dst = upstream - select { - case <-f.closed: - return ErrClosed - case f.queue <- packet: - // continue - } + f.send(query.Payload, upstream) } return nil diff --git a/wgengine/tsdns/tsdns.go b/wgengine/tsdns/tsdns.go index 06c010a37..2d663f5fc 100644 --- a/wgengine/tsdns/tsdns.go +++ b/wgengine/tsdns/tsdns.go @@ -22,10 +22,10 @@ import ( // maxResponseBytes is the maximum size of a response from a Resolver. const maxResponseBytes = 512 -// pendingQueueSize is the maximal number of DNS requests that can await polling. +// queueSize is the maximal number of DNS requests that can await polling. // If EnqueueRequest is called when this many requests are already pending, // the request will be dropped to avoid blocking the caller. -const pendingQueueSize = 64 +const queueSize = 64 // defaultTTL is the TTL of all responses from Resolver. const defaultTTL = 600 * time.Second @@ -94,7 +94,7 @@ type ResolverConfig struct { func NewResolver(config ResolverConfig) *Resolver { r := &Resolver{ logf: logger.WithPrefix(config.Logf, "tsdns: "), - queue: make(chan Packet, pendingQueueSize), + queue: make(chan Packet, queueSize), responses: make(chan Packet), errors: make(chan error), closed: make(chan struct{}),