mirror of
https://github.com/traefik/traefik.git
synced 2025-10-27 14:31:14 +01:00
Avoid allocations in readLoop by using sync.Pool
This commit is contained in:
parent
d28d719276
commit
463ffadb6a
@ -239,8 +239,6 @@ linters:
|
|||||||
text: ' always receives '
|
text: ' always receives '
|
||||||
linters:
|
linters:
|
||||||
- unparam
|
- unparam
|
||||||
- path: pkg/server/service/bufferpool.go
|
|
||||||
text: 'SA6002: argument should be pointer-like to avoid allocations'
|
|
||||||
- path: pkg/server/middleware/middlewares.go
|
- path: pkg/server/middleware/middlewares.go
|
||||||
text: Function 'buildConstructor' has too many statements
|
text: Function 'buildConstructor' has too many statements
|
||||||
linters:
|
linters:
|
||||||
@ -316,8 +314,12 @@ linters:
|
|||||||
text: 'the methods of "wasmMiddlewareBuilder" use pointer receiver and non-pointer receiver.'
|
text: 'the methods of "wasmMiddlewareBuilder" use pointer receiver and non-pointer receiver.'
|
||||||
linters:
|
linters:
|
||||||
- recvcheck
|
- recvcheck
|
||||||
|
- path: pkg/server/service/bufferpool.go
|
||||||
|
text: 'SA6002: argument should be pointer-like to avoid allocations'
|
||||||
- path: pkg/proxy/httputil/bufferpool.go
|
- path: pkg/proxy/httputil/bufferpool.go
|
||||||
text: 'SA6002: argument should be pointer-like to avoid allocations'
|
text: 'SA6002: argument should be pointer-like to avoid allocations'
|
||||||
|
- path: pkg/udp/conn.go
|
||||||
|
text: 'SA6002: argument should be pointer-like to avoid allocations'
|
||||||
- path: integration/integration_test.go
|
- path: integration/integration_test.go
|
||||||
text: 'var (gatewayAPIConformanceRunTest|traefikVersion) is unused'
|
text: 'var (gatewayAPIConformanceRunTest|traefikVersion) is unused'
|
||||||
paths:
|
paths:
|
||||||
|
|||||||
@ -32,6 +32,9 @@ type Listener struct {
|
|||||||
// timeout defines how long to wait on an idle session,
|
// timeout defines how long to wait on an idle session,
|
||||||
// before releasing its related resources.
|
// before releasing its related resources.
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
|
||||||
|
// readBufferPool is a pool of byte slices for UDP packet reading.
|
||||||
|
readBufferPool sync.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenPacketConn creates a new listener from PacketConn.
|
// ListenPacketConn creates a new listener from PacketConn.
|
||||||
@ -51,6 +54,11 @@ func ListenPacketConn(packetConn net.PacketConn, timeout time.Duration) (*Listen
|
|||||||
conns: make(map[string]*Conn),
|
conns: make(map[string]*Conn),
|
||||||
accepting: true,
|
accepting: true,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
|
readBufferPool: sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return make([]byte, maxDatagramSize)
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
go l.readLoop()
|
go l.readLoop()
|
||||||
@ -152,21 +160,26 @@ func (l *Listener) readLoop() {
|
|||||||
for {
|
for {
|
||||||
// Allocating a new buffer for every read avoids
|
// Allocating a new buffer for every read avoids
|
||||||
// overwriting data in c.msgs in case the next packet is received
|
// overwriting data in c.msgs in case the next packet is received
|
||||||
// before c.msgs is emptied via Read()
|
// before c.msgs is emptied via Read().
|
||||||
buf := make([]byte, maxDatagramSize)
|
// Reuses buffers via the readBufferPool sync.Pool.
|
||||||
|
buf := l.readBufferPool.Get().([]byte)
|
||||||
|
|
||||||
n, raddr, err := l.pConn.ReadFrom(buf)
|
n, raddr, err := l.pConn.ReadFrom(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
l.readBufferPool.Put(buf)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
conn, err := l.getConn(raddr)
|
conn, err := l.getConn(raddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
l.readBufferPool.Put(buf)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
// Receiver must call releaseReadBuffer() when done reading the data.
|
||||||
case conn.receiveCh <- buf[:n]:
|
case conn.receiveCh <- buf[:n]:
|
||||||
case <-conn.doneCh:
|
case <-conn.doneCh:
|
||||||
|
l.readBufferPool.Put(buf)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -211,15 +224,15 @@ type Conn struct {
|
|||||||
listener *Listener
|
listener *Listener
|
||||||
rAddr net.Addr
|
rAddr net.Addr
|
||||||
|
|
||||||
receiveCh chan []byte // to receive the data from the listener's readLoop
|
receiveCh chan []byte // to receive the data from the listener's readLoop.
|
||||||
readCh chan []byte // to receive the buffer into which we should Read
|
readCh chan []byte // to receive the buffer into which we should Read.
|
||||||
sizeCh chan int // to synchronize with the end of a Read
|
sizeCh chan int // to synchronize with the end of a Read.
|
||||||
msgs [][]byte // to store data from listener, to be consumed by Reads
|
msgs [][]byte // to store data from listener, to be consumed by Reads.
|
||||||
|
|
||||||
muActivity sync.RWMutex
|
muActivity sync.RWMutex
|
||||||
lastActivity time.Time // the last time the session saw either read or write activity
|
lastActivity time.Time // the last time the session saw either read or write activity.
|
||||||
|
|
||||||
timeout time.Duration // for timeouts
|
timeout time.Duration // for timeouts.
|
||||||
doneOnce sync.Once
|
doneOnce sync.Once
|
||||||
doneCh chan struct{}
|
doneCh chan struct{}
|
||||||
}
|
}
|
||||||
@ -254,6 +267,8 @@ func (c *Conn) readLoop() {
|
|||||||
msg := c.msgs[0]
|
msg := c.msgs[0]
|
||||||
c.msgs = c.msgs[1:]
|
c.msgs = c.msgs[1:]
|
||||||
n := copy(cBuf, msg)
|
n := copy(cBuf, msg)
|
||||||
|
// Return buffer to sync.Pool once done reading from it.
|
||||||
|
c.listener.readBufferPool.Put(msg)
|
||||||
c.sizeCh <- n
|
c.sizeCh <- n
|
||||||
case msg := <-c.receiveCh:
|
case msg := <-c.receiveCh:
|
||||||
c.msgs = append(c.msgs, msg)
|
c.msgs = append(c.msgs, msg)
|
||||||
@ -299,6 +314,11 @@ func (c *Conn) Write(p []byte) (n int, err error) {
|
|||||||
|
|
||||||
func (c *Conn) close() {
|
func (c *Conn) close() {
|
||||||
c.doneOnce.Do(func() {
|
c.doneOnce.Do(func() {
|
||||||
|
// Release any buffered data before closing.
|
||||||
|
for _, msg := range c.msgs {
|
||||||
|
c.listener.readBufferPool.Put(msg)
|
||||||
|
}
|
||||||
|
c.msgs = nil
|
||||||
close(c.doneCh)
|
close(c.doneCh)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user