diff --git a/net/uring/io_uring.c b/net/uring/io_uring.c index 5abdd64c0..ee65f4aaa 100644 --- a/net/uring/io_uring.c +++ b/net/uring/io_uring.c @@ -48,33 +48,6 @@ static uint64_t packNIdx(int n, size_t idx) { return (n64 << 32) | idx64; } -// Wait for a completion to be available, fetch the data -static uint64_t receive_into_udp(struct io_uring *ring) { - struct io_uring_cqe *cqe; -again:; - - int ret = io_uring_wait_cqe(ring, &cqe); - if (ret == -EINTR) { - goto again; - } - // TODO: Delete perror, fprintf, etc. - // Encode in return value or similar. - if (ret < 0) { - perror("io_uring_wait_cqe"); - return ret; - } - int n = cqe->res; - if (n < 0) { - // TODO: this leaks a buffer!!!! - fprintf(stderr, "recvmsg failed: %d.\n", n); - return n; - } - size_t idx = (size_t)io_uring_cqe_get_data(cqe); - uint64_t nidx = packNIdx(n, idx); - io_uring_cqe_seen(ring, cqe); - return nidx; -} - static uint32_t ip(struct sockaddr_in *sa) { return ntohl(sa->sin_addr.s_addr); } @@ -83,6 +56,14 @@ static uint16_t port(struct sockaddr_in *sa) { return ntohs(sa->sin_port); } +static uint32_t setIP(struct sockaddr_in *sa, uint32_t ip) { + sa->sin_addr.s_addr = htonl(ip); +} + +static uint16_t setPort(struct sockaddr_in *sa, uint16_t port) { + sa->sin_port = htons(port); +} + // submit a recvmsg request via liburing // TODO: What recvfrom support arrives, maybe use that instead? static int submit_recvmsg_request(struct io_uring *ring, struct msghdr *mhdr, struct iovec *iov, struct sockaddr_in *sender, char *buf, int buflen, size_t idx) { @@ -103,6 +84,26 @@ static int submit_recvmsg_request(struct io_uring *ring, struct msghdr *mhdr, st return 0; } +// submit a recvmsg request via liburing +// TODO: What recvfrom support arrives, maybe use that instead? +static int submit_sendmsg_request(struct io_uring *ring, struct msghdr *mhdr, struct iovec *iov, struct sockaddr_in *sender, char *buf, int buflen, size_t idx) { + iov->iov_base = buf; + iov->iov_len = buflen; + + mhdr->msg_iov = iov; + mhdr->msg_iovlen = 1; + + mhdr->msg_name = sender; + mhdr->msg_namelen = sizeof(struct sockaddr_in); + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + io_uring_prep_sendmsg(sqe, 0, mhdr, 0); // use the 0th file in the list of registered fds + io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); + io_uring_sqe_set_data(sqe, (void *)(idx)); + io_uring_submit(ring); + return 0; +} + static void submit_nop_request(struct io_uring *ring) { struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_nop(sqe); @@ -111,8 +112,7 @@ static void submit_nop_request(struct io_uring *ring) { } // Wait for a completion to be available, fetch the data -// TODO: unify with receive_into_udp -static uint64_t get_file_completion(struct io_uring *ring) { +static uint64_t wait_completion(struct io_uring *ring) { struct io_uring_cqe *cqe; again:; @@ -123,13 +123,13 @@ again:; // TODO: Delete perror, fprintf, etc. // Encode in return value or similar. if (ret < 0) { - perror("get_file_completion io_uring_wait_cqe"); + perror("wait_completion io_uring_wait_cqe"); return ret; } int n = cqe->res; if (n < 0) { // TODO: This leaks a buffer!!! - fprintf(stderr, "get_file_completion write failed: %d.\n", n); + fprintf(stderr, "wait_completion failed: %d.\n", n); return n; } size_t idx = (size_t)io_uring_cqe_get_data(cqe); @@ -140,12 +140,6 @@ again:; // submit a write request via liburing static int submit_write_request(struct io_uring *ring, char *buf, int buflen, size_t idx, struct iovec *iov) { - // fprintf(stderr, "submit_write_request to fd %d buf %p %s buflen %d idx %lu\n", fd, buf, buf, buflen, idx); - // errno= 0; - // perror("before bonus write"); - // int x = write(fd, buf, buflen); - // fprintf(stderr, "plain write returned %d\n", x); - // perror("submit_write_request bonus write"); iov->iov_base = buf; iov->iov_len = buflen; @@ -154,15 +148,12 @@ static int submit_write_request(struct io_uring *ring, char *buf, int buflen, si io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); io_uring_sqe_set_data(sqe, (void *)(idx)); int submitted = io_uring_submit(ring); - // fprintf(stderr, "submitted %d sqes\n", submitted); return 0; } -// TODO: unify with get_file_completion -static uint64_t peek_file_completion(struct io_uring *ring) { +static uint64_t peek_completion(struct io_uring *ring) { struct io_uring_cqe *cqe; int ret = io_uring_peek_cqe(ring, &cqe); - // perror("on entry, peek_file_completion io_uring_wait_cqe"); if ((-ret == EAGAIN) || (-ret == EINTR)) { return ret; } diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index 0ac10da65..8210c18c0 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -30,14 +30,18 @@ import ( // That's OK for now, but later it could be a performance issue. // For now, keep it simple and enqueue/dequeue in a single step. // TODO: IPv6 +// TODO: Maybe combine the urings into a single uring with dispatch. type UDPConn struct { - ptr *C.go_uring - close sync.Once - conn *net.UDPConn - file *os.File // must keep file from being GC'd - fd C.int - local net.Addr - reqs [8]udpReq + recvRing *C.go_uring + sendRing *C.go_uring + close sync.Once + conn *net.UDPConn + file *os.File // must keep file from being GC'd + fd C.int + local net.Addr + recvReqs [8]udpReq + sendReqs [8]udpReq + sendReqC chan int // indices into sendReqs } func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) { @@ -55,26 +59,37 @@ func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) { if err != nil { return nil, err } - r := new(C.go_uring) + recvRing := new(C.go_uring) + sendRing := new(C.go_uring) fd := C.int(file.Fd()) - ret := C.initialize(r, fd) - if ret < 0 { - return nil, fmt.Errorf("uring initialization failed: %d", ret) + for _, r := range []*C.go_uring{recvRing, sendRing} { + ret := C.initialize(r, fd) + if ret < 0 { + // TODO: free recvRing if sendRing initialize failed + return nil, fmt.Errorf("uring initialization failed: %d", ret) + } } u := &UDPConn{ - ptr: r, - conn: conn, - file: file, - fd: fd, - local: conn.LocalAddr(), + recvRing: recvRing, + sendRing: sendRing, + conn: conn, + file: file, + fd: fd, + local: conn.LocalAddr(), } - for i := range u.reqs { - if err := u.submitRequest(i); err != nil { + // Initialize recv half. + for i := range u.recvReqs { + if err := u.submitRecvRequest(i); err != nil { u.Close() // TODO: will this crash? return nil, err } } + // Initialize send half. + u.sendReqC = make(chan int, len(u.sendReqs)) + for i := range u.sendReqs { + u.sendReqC <- i + } return u, nil } @@ -85,12 +100,12 @@ type udpReq struct { buf [device.MaxSegmentSize]byte } -func (u *UDPConn) submitRequest(idx int) error { - r := &u.reqs[idx] +func (u *UDPConn) submitRecvRequest(idx int) error { + r := &u.recvReqs[idx] // TODO: make a C struct instead of a Go struct, and pass that in, to simplify call sites. - errno := C.submit_recvmsg_request(u.ptr, &r.mhdr, &r.iov, &r.sa, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(r.buf)), C.size_t(idx)) + errno := C.submit_recvmsg_request(u.recvRing, &r.mhdr, &r.iov, &r.sa, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(r.buf)), C.size_t(idx)) if errno < 0 { - return fmt.Errorf("uring.submitRequest failed: %v", errno) // TODO: Improve + return fmt.Errorf("uring.submitRecvRequest failed: %v", errno) // TODO: Improve } return nil } @@ -99,12 +114,12 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { if u.fd == 0 { return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn") } - nidx := C.receive_into_udp(u.ptr) + nidx := C.wait_completion(u.recvRing) n, idx, err := unpackNIdx(nidx) if err != nil { return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr: %v", err) } - r := &u.reqs[idx] + r := &u.recvReqs[idx] ip := C.ip(&r.sa) var ip4 [4]byte binary.BigEndian.PutUint32(ip4[:], uint32(ip)) @@ -112,7 +127,7 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { ipp := netaddr.IPPortFrom(netaddr.IPFrom4(ip4), uint16(port)) copy(buf, r.buf[:n]) // Queue up a new request. - err = u.submitRequest(int(idx)) + err = u.submitRecvRequest(int(idx)) if err != nil { panic("how should we handle this?") } @@ -120,8 +135,9 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { } func (u *UDPConn) Close() error { - // fmt.Println("CLOSE URING", u) u.close.Do(func() { + u.conn.Close() + u.conn = nil // Send a nop to unblock any outstanding readers. // Hope that we manage to close before any new readers appear. // Not sure exactly how this is supposed to work reliably... @@ -131,12 +147,10 @@ func (u *UDPConn) Close() error { // // Update: this causes crashes, because of entirely predictable and predicted races. // The mystery about how to safely unblock all outstanding io_uring_wait_cqe calls remains... - // fmt.Println("io_uring_queue_exit", u.ptr) - C.io_uring_queue_exit(u.ptr) - // fmt.Println("DONE io_uring_queue_exit", u.ptr) - u.ptr = nil - u.conn.Close() - u.conn = nil + C.io_uring_queue_exit(u.recvRing) + C.io_uring_queue_exit(u.sendRing) + u.recvRing = nil + u.sendRing = nil u.file.Close() u.file = nil u.fd = 0 @@ -163,8 +177,58 @@ func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { return n, udpAddr{ipp: ipp}, err } -func (c *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { - return c.conn.WriteTo(p, addr) +func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { + if u.fd == 0 { + return 0, errors.New("invalid uring.UDPConn") + } + udpAddr, ok := addr.(*net.UDPAddr) + if !ok { + return 0, fmt.Errorf("cannot WriteTo net.Addr of type %T", addr) + } + // If we need a buffer, get a buffer, potentially blocking. + var idx int + select { + case idx = <-u.sendReqC: + default: + // No request available. Get one from the kernel. + nidx := C.wait_completion(u.sendRing) + var err error + _, idx, err = unpackNIdx(nidx) + if err != nil { + return 0, fmt.Errorf("some WriteTo failed, maybe long ago: %v", err) + } + } + r := &u.sendReqs[idx] + // Do the write. + copy(r.buf[:], p) + + ip := binary.BigEndian.Uint32(udpAddr.IP) + C.setIP(&r.sa, C.uint32_t(ip)) + C.setPort(&r.sa, C.uint16_t(udpAddr.Port)) + + // TODO: populate r.sa with ip/port + C.submit_sendmsg_request( + u.sendRing, // ring + &r.mhdr, // msghdr + &r.iov, // iov -- TODO: populate and don't pass it + &r.sa, // sockaddr_in, ditto + (*C.char)(unsafe.Pointer(&r.buf[0])), // buffer ptr, ditto + C.int(len(p)), // buffer len, ditto + C.size_t(idx), // user data + ) + // Get an extra buffer, if available. + nidx := C.peek_completion(u.sendRing) + if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR { + // Nothing waiting for us. + } else { + _, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?) + if err == nil { + // Put the request buffer back in the usable queue. + // Should never block, by construction. + u.sendReqC <- idx + } + } + return len(p), nil } // LocalAddr returns the local network address. @@ -198,16 +262,6 @@ type File struct { func NewFile(file *os.File) (*File, error) { r := new(C.go_uring) - // d, err := syscall.Dup(int(file.Fd())) - // if err != nil { - // return nil, err - // } - // err = syscall.SetNonblock(d, false) - // if err != nil { - // return nil, err - // } - // fd := C.int(d) - // fmt.Println("INIT NEW FILE WITH FD", int(file.Fd()), "DUP'd to", d) fd := C.int(file.Fd()) ret := C.initialize(r, fd) if ret < 0 { @@ -238,7 +292,6 @@ type fileReq struct { } func (u *File) Write(buf []byte) (int, error) { - // fmt.Println("WRITE ", len(buf), "BYTES") if u.fd == 0 { return 0, errors.New("invalid uring.FileConn") } @@ -246,11 +299,9 @@ func (u *File) Write(buf []byte) (int, error) { var idx int select { case idx = <-u.reqC: - // fmt.Println("REQ AVAIL") default: - // fmt.Println("NO REQ AVAIL??? wait for one...") // No request available. Get one from the kernel. - nidx := C.get_file_completion(u.ptr) + nidx := C.wait_completion(u.ptr) var err error _, idx, err = unpackNIdx(nidx) if err != nil { @@ -260,16 +311,13 @@ func (u *File) Write(buf []byte) (int, error) { r := &u.reqs[idx] // Do the write. copy(r.buf[:], buf) - // fmt.Println("SUBMIT WRITE REQUEST") C.submit_write_request(u.ptr, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(buf)), C.size_t(idx), &r.iov) // Get an extra buffer, if available. - nidx := C.peek_file_completion(u.ptr) + nidx := C.peek_completion(u.ptr) if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR { // Nothing waiting for us. - // fmt.Println("PEEK: ignore EAGAIN/EINTR") } else { _, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?) - // fmt.Println("PEEK RESULT:", n, idx, err) if err == nil { // Put the request buffer back in the usable queue. // Should never block, by construction.