diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index dad042631..2b4f1ee04 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -246,50 +246,6 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { return n, ipp, nil } -func (u *UDPConn) Close() error { - u.close.Do(func() { - // Announce to readers and writers that we are closing down. - atomic.StoreUint32(&u.closed, 1) - // It is now not possible for u.reads to reach zero without - // all reads being unblocked. - - // Busy loop until all reads and writes are unblocked. - // See the docs for u.refcount. - for { - for idx := range u.recvReqs { - if atomic.LoadInt32(u.recvReqInKernel(idx)) != 0 { - C.submit_cancel_request(u.recvRing, C.size_t(idx)) - } - } - if atomic.LoadInt32(&u.refcount) == 0 { - break - } - time.Sleep(time.Millisecond) - } - // TODO: block until no one else uses our rings. - // (Or is that unnecessary now?) - u.doShutdown() - }) - return nil -} - -func (u *UDPConn) doShutdown() { - for _, fn := range u.shutdown { - fn() - } -} - -// Implement net.PacketConn, for convenience integrating with magicsock. - -var _ net.PacketConn = (*UDPConn)(nil) - -type udpAddr struct { - ipp netaddr.IPPort -} - -func (u udpAddr) Network() string { return "udp4" } // TODO: ipv6 -func (u udpAddr) String() string { return u.ipp.String() } - func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { n, ipp, err := c.ReadFromNetaddr(p) if err != nil { @@ -344,12 +300,7 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { r.sa6.sin6_port = C.uint16_t(endian.Hton16(uint16(udpAddr.Port))) r.sa6.sin6_family = C.AF_INET6 } - C.submit_sendmsg_request( - u.sendRing, // ring - r, - C.int(len(p)), // buffer len, ditto - C.size_t(idx), // user data - ) + C.submit_sendmsg_request(u.sendRing, r, C.int(len(p)), C.size_t(idx)) // Get an extra buffer, if available. if idx, ok := peekCompletion(u.sendRing); ok { // Put the request buffer back in the usable queue. @@ -359,6 +310,50 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { return len(p), nil } +func (u *UDPConn) Close() error { + u.close.Do(func() { + // Announce to readers and writers that we are closing down. + atomic.StoreUint32(&u.closed, 1) + // It is now not possible for u.reads to reach zero without + // all reads being unblocked. + + // Busy loop until all reads and writes are unblocked. + // See the docs for u.refcount. + for { + for idx := range u.recvReqs { + if atomic.LoadInt32(u.recvReqInKernel(idx)) != 0 { + C.submit_cancel_request(u.recvRing, C.size_t(idx)) + } + } + if atomic.LoadInt32(&u.refcount) == 0 { + break + } + time.Sleep(time.Millisecond) + } + // TODO: block until no one else uses our rings. + // (Or is that unnecessary now?) + u.doShutdown() + }) + return nil +} + +func (u *UDPConn) doShutdown() { + for _, fn := range u.shutdown { + fn() + } +} + +// Implement net.PacketConn, for convenience integrating with magicsock. + +var _ net.PacketConn = (*UDPConn)(nil) + +type udpAddr struct { + ipp netaddr.IPPort +} + +func (u udpAddr) Network() string { return "udp4" } // TODO: ipv6 +func (u udpAddr) String() string { return u.ipp.String() } + // LocalAddr returns the local network address. func (c *UDPConn) LocalAddr() net.Addr { return c.local }