move code around

This commit is contained in:
Josh Bleecher Snyder 2021-07-08 13:49:08 -07:00
parent 8478d34cca
commit 8a7b42a557

View File

@ -246,50 +246,6 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
return n, ipp, nil
}
func (u *UDPConn) Close() error {
u.close.Do(func() {
// Announce to readers and writers that we are closing down.
atomic.StoreUint32(&u.closed, 1)
// It is now not possible for u.reads to reach zero without
// all reads being unblocked.
// Busy loop until all reads and writes are unblocked.
// See the docs for u.refcount.
for {
for idx := range u.recvReqs {
if atomic.LoadInt32(u.recvReqInKernel(idx)) != 0 {
C.submit_cancel_request(u.recvRing, C.size_t(idx))
}
}
if atomic.LoadInt32(&u.refcount) == 0 {
break
}
time.Sleep(time.Millisecond)
}
// TODO: block until no one else uses our rings.
// (Or is that unnecessary now?)
u.doShutdown()
})
return nil
}
func (u *UDPConn) doShutdown() {
for _, fn := range u.shutdown {
fn()
}
}
// Implement net.PacketConn, for convenience integrating with magicsock.
var _ net.PacketConn = (*UDPConn)(nil)
type udpAddr struct {
ipp netaddr.IPPort
}
func (u udpAddr) Network() string { return "udp4" } // TODO: ipv6
func (u udpAddr) String() string { return u.ipp.String() }
func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
n, ipp, err := c.ReadFromNetaddr(p)
if err != nil {
@ -344,12 +300,7 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
r.sa6.sin6_port = C.uint16_t(endian.Hton16(uint16(udpAddr.Port)))
r.sa6.sin6_family = C.AF_INET6
}
C.submit_sendmsg_request(
u.sendRing, // ring
r,
C.int(len(p)), // buffer len, ditto
C.size_t(idx), // user data
)
C.submit_sendmsg_request(u.sendRing, r, C.int(len(p)), C.size_t(idx))
// Get an extra buffer, if available.
if idx, ok := peekCompletion(u.sendRing); ok {
// Put the request buffer back in the usable queue.
@ -359,6 +310,50 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
return len(p), nil
}
func (u *UDPConn) Close() error {
u.close.Do(func() {
// Announce to readers and writers that we are closing down.
atomic.StoreUint32(&u.closed, 1)
// It is now not possible for u.reads to reach zero without
// all reads being unblocked.
// Busy loop until all reads and writes are unblocked.
// See the docs for u.refcount.
for {
for idx := range u.recvReqs {
if atomic.LoadInt32(u.recvReqInKernel(idx)) != 0 {
C.submit_cancel_request(u.recvRing, C.size_t(idx))
}
}
if atomic.LoadInt32(&u.refcount) == 0 {
break
}
time.Sleep(time.Millisecond)
}
// TODO: block until no one else uses our rings.
// (Or is that unnecessary now?)
u.doShutdown()
})
return nil
}
func (u *UDPConn) doShutdown() {
for _, fn := range u.shutdown {
fn()
}
}
// Implement net.PacketConn, for convenience integrating with magicsock.
var _ net.PacketConn = (*UDPConn)(nil)
type udpAddr struct {
ipp netaddr.IPPort
}
func (u udpAddr) Network() string { return "udp4" } // TODO: ipv6
func (u udpAddr) String() string { return u.ipp.String() }
// LocalAddr returns the local network address.
func (c *UDPConn) LocalAddr() net.Addr { return c.local }