use io_uring for sendmsg

and clean up some dead code and unify some things
This commit is contained in:
Josh Bleecher Snyder 2021-06-03 12:02:24 -07:00
parent f254f779b5
commit a8a7208dbd
2 changed files with 132 additions and 93 deletions

View File

@ -48,33 +48,6 @@ static uint64_t packNIdx(int n, size_t idx) {
return (n64 << 32) | idx64;
}
// Wait for a completion to be available, fetch the data
static uint64_t receive_into_udp(struct io_uring *ring) {
struct io_uring_cqe *cqe;
again:;
int ret = io_uring_wait_cqe(ring, &cqe);
if (ret == -EINTR) {
goto again;
}
// TODO: Delete perror, fprintf, etc.
// Encode in return value or similar.
if (ret < 0) {
perror("io_uring_wait_cqe");
return ret;
}
int n = cqe->res;
if (n < 0) {
// TODO: this leaks a buffer!!!!
fprintf(stderr, "recvmsg failed: %d.\n", n);
return n;
}
size_t idx = (size_t)io_uring_cqe_get_data(cqe);
uint64_t nidx = packNIdx(n, idx);
io_uring_cqe_seen(ring, cqe);
return nidx;
}
static uint32_t ip(struct sockaddr_in *sa) {
return ntohl(sa->sin_addr.s_addr);
}
@ -83,6 +56,14 @@ static uint16_t port(struct sockaddr_in *sa) {
return ntohs(sa->sin_port);
}
static uint32_t setIP(struct sockaddr_in *sa, uint32_t ip) {
sa->sin_addr.s_addr = htonl(ip);
}
static uint16_t setPort(struct sockaddr_in *sa, uint16_t port) {
sa->sin_port = htons(port);
}
// submit a recvmsg request via liburing
// TODO: What recvfrom support arrives, maybe use that instead?
static int submit_recvmsg_request(struct io_uring *ring, struct msghdr *mhdr, struct iovec *iov, struct sockaddr_in *sender, char *buf, int buflen, size_t idx) {
@ -103,6 +84,26 @@ static int submit_recvmsg_request(struct io_uring *ring, struct msghdr *mhdr, st
return 0;
}
// submit a recvmsg request via liburing
// TODO: What recvfrom support arrives, maybe use that instead?
static int submit_sendmsg_request(struct io_uring *ring, struct msghdr *mhdr, struct iovec *iov, struct sockaddr_in *sender, char *buf, int buflen, size_t idx) {
iov->iov_base = buf;
iov->iov_len = buflen;
mhdr->msg_iov = iov;
mhdr->msg_iovlen = 1;
mhdr->msg_name = sender;
mhdr->msg_namelen = sizeof(struct sockaddr_in);
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_sendmsg(sqe, 0, mhdr, 0); // use the 0th file in the list of registered fds
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
io_uring_sqe_set_data(sqe, (void *)(idx));
io_uring_submit(ring);
return 0;
}
static void submit_nop_request(struct io_uring *ring) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_nop(sqe);
@ -111,8 +112,7 @@ static void submit_nop_request(struct io_uring *ring) {
}
// Wait for a completion to be available, fetch the data
// TODO: unify with receive_into_udp
static uint64_t get_file_completion(struct io_uring *ring) {
static uint64_t wait_completion(struct io_uring *ring) {
struct io_uring_cqe *cqe;
again:;
@ -123,13 +123,13 @@ again:;
// TODO: Delete perror, fprintf, etc.
// Encode in return value or similar.
if (ret < 0) {
perror("get_file_completion io_uring_wait_cqe");
perror("wait_completion io_uring_wait_cqe");
return ret;
}
int n = cqe->res;
if (n < 0) {
// TODO: This leaks a buffer!!!
fprintf(stderr, "get_file_completion write failed: %d.\n", n);
fprintf(stderr, "wait_completion failed: %d.\n", n);
return n;
}
size_t idx = (size_t)io_uring_cqe_get_data(cqe);
@ -140,12 +140,6 @@ again:;
// submit a write request via liburing
static int submit_write_request(struct io_uring *ring, char *buf, int buflen, size_t idx, struct iovec *iov) {
// fprintf(stderr, "submit_write_request to fd %d buf %p %s buflen %d idx %lu\n", fd, buf, buf, buflen, idx);
// errno= 0;
// perror("before bonus write");
// int x = write(fd, buf, buflen);
// fprintf(stderr, "plain write returned %d\n", x);
// perror("submit_write_request bonus write");
iov->iov_base = buf;
iov->iov_len = buflen;
@ -154,15 +148,12 @@ static int submit_write_request(struct io_uring *ring, char *buf, int buflen, si
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
io_uring_sqe_set_data(sqe, (void *)(idx));
int submitted = io_uring_submit(ring);
// fprintf(stderr, "submitted %d sqes\n", submitted);
return 0;
}
// TODO: unify with get_file_completion
static uint64_t peek_file_completion(struct io_uring *ring) {
static uint64_t peek_completion(struct io_uring *ring) {
struct io_uring_cqe *cqe;
int ret = io_uring_peek_cqe(ring, &cqe);
// perror("on entry, peek_file_completion io_uring_wait_cqe");
if ((-ret == EAGAIN) || (-ret == EINTR)) {
return ret;
}

View File

@ -30,14 +30,18 @@ import (
// That's OK for now, but later it could be a performance issue.
// For now, keep it simple and enqueue/dequeue in a single step.
// TODO: IPv6
// TODO: Maybe combine the urings into a single uring with dispatch.
type UDPConn struct {
ptr *C.go_uring
close sync.Once
conn *net.UDPConn
file *os.File // must keep file from being GC'd
fd C.int
local net.Addr
reqs [8]udpReq
recvRing *C.go_uring
sendRing *C.go_uring
close sync.Once
conn *net.UDPConn
file *os.File // must keep file from being GC'd
fd C.int
local net.Addr
recvReqs [8]udpReq
sendReqs [8]udpReq
sendReqC chan int // indices into sendReqs
}
func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) {
@ -55,26 +59,37 @@ func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) {
if err != nil {
return nil, err
}
r := new(C.go_uring)
recvRing := new(C.go_uring)
sendRing := new(C.go_uring)
fd := C.int(file.Fd())
ret := C.initialize(r, fd)
if ret < 0 {
return nil, fmt.Errorf("uring initialization failed: %d", ret)
for _, r := range []*C.go_uring{recvRing, sendRing} {
ret := C.initialize(r, fd)
if ret < 0 {
// TODO: free recvRing if sendRing initialize failed
return nil, fmt.Errorf("uring initialization failed: %d", ret)
}
}
u := &UDPConn{
ptr: r,
conn: conn,
file: file,
fd: fd,
local: conn.LocalAddr(),
recvRing: recvRing,
sendRing: sendRing,
conn: conn,
file: file,
fd: fd,
local: conn.LocalAddr(),
}
for i := range u.reqs {
if err := u.submitRequest(i); err != nil {
// Initialize recv half.
for i := range u.recvReqs {
if err := u.submitRecvRequest(i); err != nil {
u.Close() // TODO: will this crash?
return nil, err
}
}
// Initialize send half.
u.sendReqC = make(chan int, len(u.sendReqs))
for i := range u.sendReqs {
u.sendReqC <- i
}
return u, nil
}
@ -85,12 +100,12 @@ type udpReq struct {
buf [device.MaxSegmentSize]byte
}
func (u *UDPConn) submitRequest(idx int) error {
r := &u.reqs[idx]
func (u *UDPConn) submitRecvRequest(idx int) error {
r := &u.recvReqs[idx]
// TODO: make a C struct instead of a Go struct, and pass that in, to simplify call sites.
errno := C.submit_recvmsg_request(u.ptr, &r.mhdr, &r.iov, &r.sa, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(r.buf)), C.size_t(idx))
errno := C.submit_recvmsg_request(u.recvRing, &r.mhdr, &r.iov, &r.sa, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(r.buf)), C.size_t(idx))
if errno < 0 {
return fmt.Errorf("uring.submitRequest failed: %v", errno) // TODO: Improve
return fmt.Errorf("uring.submitRecvRequest failed: %v", errno) // TODO: Improve
}
return nil
}
@ -99,12 +114,12 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
if u.fd == 0 {
return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn")
}
nidx := C.receive_into_udp(u.ptr)
nidx := C.wait_completion(u.recvRing)
n, idx, err := unpackNIdx(nidx)
if err != nil {
return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr: %v", err)
}
r := &u.reqs[idx]
r := &u.recvReqs[idx]
ip := C.ip(&r.sa)
var ip4 [4]byte
binary.BigEndian.PutUint32(ip4[:], uint32(ip))
@ -112,7 +127,7 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
ipp := netaddr.IPPortFrom(netaddr.IPFrom4(ip4), uint16(port))
copy(buf, r.buf[:n])
// Queue up a new request.
err = u.submitRequest(int(idx))
err = u.submitRecvRequest(int(idx))
if err != nil {
panic("how should we handle this?")
}
@ -120,8 +135,9 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
}
func (u *UDPConn) Close() error {
// fmt.Println("CLOSE URING", u)
u.close.Do(func() {
u.conn.Close()
u.conn = nil
// Send a nop to unblock any outstanding readers.
// Hope that we manage to close before any new readers appear.
// Not sure exactly how this is supposed to work reliably...
@ -131,12 +147,10 @@ func (u *UDPConn) Close() error {
//
// Update: this causes crashes, because of entirely predictable and predicted races.
// The mystery about how to safely unblock all outstanding io_uring_wait_cqe calls remains...
// fmt.Println("io_uring_queue_exit", u.ptr)
C.io_uring_queue_exit(u.ptr)
// fmt.Println("DONE io_uring_queue_exit", u.ptr)
u.ptr = nil
u.conn.Close()
u.conn = nil
C.io_uring_queue_exit(u.recvRing)
C.io_uring_queue_exit(u.sendRing)
u.recvRing = nil
u.sendRing = nil
u.file.Close()
u.file = nil
u.fd = 0
@ -163,8 +177,58 @@ func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
return n, udpAddr{ipp: ipp}, err
}
func (c *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
return c.conn.WriteTo(p, addr)
func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
if u.fd == 0 {
return 0, errors.New("invalid uring.UDPConn")
}
udpAddr, ok := addr.(*net.UDPAddr)
if !ok {
return 0, fmt.Errorf("cannot WriteTo net.Addr of type %T", addr)
}
// If we need a buffer, get a buffer, potentially blocking.
var idx int
select {
case idx = <-u.sendReqC:
default:
// No request available. Get one from the kernel.
nidx := C.wait_completion(u.sendRing)
var err error
_, idx, err = unpackNIdx(nidx)
if err != nil {
return 0, fmt.Errorf("some WriteTo failed, maybe long ago: %v", err)
}
}
r := &u.sendReqs[idx]
// Do the write.
copy(r.buf[:], p)
ip := binary.BigEndian.Uint32(udpAddr.IP)
C.setIP(&r.sa, C.uint32_t(ip))
C.setPort(&r.sa, C.uint16_t(udpAddr.Port))
// TODO: populate r.sa with ip/port
C.submit_sendmsg_request(
u.sendRing, // ring
&r.mhdr, // msghdr
&r.iov, // iov -- TODO: populate and don't pass it
&r.sa, // sockaddr_in, ditto
(*C.char)(unsafe.Pointer(&r.buf[0])), // buffer ptr, ditto
C.int(len(p)), // buffer len, ditto
C.size_t(idx), // user data
)
// Get an extra buffer, if available.
nidx := C.peek_completion(u.sendRing)
if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR {
// Nothing waiting for us.
} else {
_, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?)
if err == nil {
// Put the request buffer back in the usable queue.
// Should never block, by construction.
u.sendReqC <- idx
}
}
return len(p), nil
}
// LocalAddr returns the local network address.
@ -198,16 +262,6 @@ type File struct {
func NewFile(file *os.File) (*File, error) {
r := new(C.go_uring)
// d, err := syscall.Dup(int(file.Fd()))
// if err != nil {
// return nil, err
// }
// err = syscall.SetNonblock(d, false)
// if err != nil {
// return nil, err
// }
// fd := C.int(d)
// fmt.Println("INIT NEW FILE WITH FD", int(file.Fd()), "DUP'd to", d)
fd := C.int(file.Fd())
ret := C.initialize(r, fd)
if ret < 0 {
@ -238,7 +292,6 @@ type fileReq struct {
}
func (u *File) Write(buf []byte) (int, error) {
// fmt.Println("WRITE ", len(buf), "BYTES")
if u.fd == 0 {
return 0, errors.New("invalid uring.FileConn")
}
@ -246,11 +299,9 @@ func (u *File) Write(buf []byte) (int, error) {
var idx int
select {
case idx = <-u.reqC:
// fmt.Println("REQ AVAIL")
default:
// fmt.Println("NO REQ AVAIL??? wait for one...")
// No request available. Get one from the kernel.
nidx := C.get_file_completion(u.ptr)
nidx := C.wait_completion(u.ptr)
var err error
_, idx, err = unpackNIdx(nidx)
if err != nil {
@ -260,16 +311,13 @@ func (u *File) Write(buf []byte) (int, error) {
r := &u.reqs[idx]
// Do the write.
copy(r.buf[:], buf)
// fmt.Println("SUBMIT WRITE REQUEST")
C.submit_write_request(u.ptr, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(buf)), C.size_t(idx), &r.iov)
// Get an extra buffer, if available.
nidx := C.peek_file_completion(u.ptr)
nidx := C.peek_completion(u.ptr)
if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR {
// Nothing waiting for us.
// fmt.Println("PEEK: ignore EAGAIN/EINTR")
} else {
_, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?)
// fmt.Println("PEEK RESULT:", n, idx, err)
if err == nil {
// Put the request buffer back in the usable queue.
// Should never block, by construction.