diff --git a/net/uring/file_linux.go b/net/uring/file_linux.go index 7689cfc34..5e56dee50 100644 --- a/net/uring/file_linux.go +++ b/net/uring/file_linux.go @@ -24,7 +24,7 @@ type file struct { // We have two urings so that we don't have to demux completion events. // writeRing is the uring for pwritev calls. - writeRing *C.go_uring + writeRing writeRing // readRing is the uring for preadv calls. readRing *C.go_uring @@ -42,13 +42,6 @@ type file struct { // We attempt to keep them all queued up for the kernel to fulfill. // The array length is tied to the size of the uring. readReqs [1]*C.goreq // Whoops! The kernel apparently cannot handle more than 1 concurrent preadv calls on a tun device! - // writeReqs is an array of re-usable file pwritev requests. - // We dispatch them to the kernel as writes are requested. - // The array length is tied to the size of the uring. - writeReqs [8]*C.goreq - // writeReqC is a channel containing indices into writeReqs - // that are free to use (that is, not in the kernel). - writeReqC chan int // refcount counts the number of outstanding read/write requests. // See the length comment for UDPConn.refcount for details. @@ -57,10 +50,10 @@ type file struct { func newFile(f *os.File) (*file, error) { u := &file{ - readRing: new(C.go_uring), - writeRing: new(C.go_uring), - file: f, + readRing: new(C.go_uring), + file: f, } + u.writeRing.ring = new(C.go_uring) fd := f.Fd() if ret := C.initialize(u.readRing, C.int(fd)); ret < 0 { @@ -71,28 +64,24 @@ func newFile(f *os.File) (*file, error) { C.io_uring_queue_exit(u.readRing) }) - if ret := C.initialize(u.writeRing, C.int(fd)); ret < 0 { + if ret := C.initialize(u.writeRing.ring, C.int(fd)); ret < 0 { u.doShutdown() return nil, fmt.Errorf("writeRing initialization failed: %w", syscall.Errno(-ret)) } u.shutdown = append(u.shutdown, func() { - C.io_uring_queue_exit(u.writeRing) + C.io_uring_queue_exit(u.writeRing.ring) }) // Initialize buffers for i := range &u.readReqs { - u.readReqs[i] = C.initializeReq(bufferSize, 0) // 0: not used for IP addresses - } - for i := range &u.writeReqs { - u.writeReqs[i] = C.initializeReq(bufferSize, 0) // 0: not used for IP addresses + u.readReqs[i] = C.initializeReq(bufferSize, C.size_t(i), 0) // 0: not used for IP addresses } + u.writeRing.initReqs(0) // 0: not used for IP addresses u.shutdown = append(u.shutdown, func() { for _, r := range u.readReqs { C.freeReq(r) } - for _, r := range u.writeReqs { - C.freeReq(r) - } + u.writeRing.freeReqs() }) // Initialize read half. @@ -103,12 +92,6 @@ func newFile(f *os.File) (*file, error) { } } - // Initialize write half. - u.writeReqC = make(chan int, len(u.writeReqs)) - for i := range u.writeReqs { - u.writeReqC <- i - } - // Initialization succeeded. // Take ownership of the file. u.shutdown = append(u.shutdown, func() { @@ -118,7 +101,7 @@ func newFile(f *os.File) (*file, error) { } func (u *file) submitReadvRequest(idx int) error { - errno := C.submit_readv_request(u.readRing, u.readReqs[idx], C.size_t(idx)) + errno := C.submit_readv_request(u.readRing, u.readReqs[idx]) if errno < 0 { return fmt.Errorf("uring.submitReadvRequest failed: %w", syscall.Errno(-errno)) } @@ -174,34 +157,17 @@ func (u *file) Write(buf []byte) (int, error) { return 0, os.ErrClosed } - // If we need a buffer, get a buffer, potentially blocking. - var idx int - select { - case idx = <-u.writeReqC: - default: - // No request available. Get one from the kernel. - n, idx, err := waitCompletion(u.writeRing) - if err != nil { - return 0, fmt.Errorf("Write io_uring call failed: %w", err) - } - if n < 0 { - // Past syscall failed. - u.writeReqC <- idx // don't leak idx - return 0, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n)) - } + // Get a req, blocking as needed. + r, err := u.writeRing.getReq() + if err != nil { + return 0, err } - r := u.writeReqs[idx] // Do the write. rbuf := sliceOf(r.buf, len(buf)) copy(rbuf, buf) - C.submit_writev_request(u.writeRing, r, C.int(len(buf)), C.size_t(idx)) + C.submit_writev_request(u.writeRing.ring, r, C.int(len(buf))) // Get an extra buffer, if available. - idx, ok := peekCompletion(u.writeRing) - if ok { - // Put the request buffer back in the usable queue. - // Should never block, by construction. - u.writeReqC <- idx - } + u.writeRing.prefetch() return len(buf), nil } diff --git a/net/uring/io_uring_linux.c b/net/uring/io_uring_linux.c index cb0622363..71d82d506 100644 --- a/net/uring/io_uring_linux.c +++ b/net/uring/io_uring_linux.c @@ -44,11 +44,12 @@ struct req { // It is accessed atomically. int32_t in_kernel; char *buf; + size_t idx; }; typedef struct req goreq; -static struct req *initializeReq(size_t sz, int ipLen) { +static struct req *initializeReq(size_t sz, size_t idx, int ipLen) { struct req *r = malloc(sizeof(struct req)); memset(r, 0, sizeof(*r)); r->buf = malloc(sz); @@ -57,6 +58,7 @@ static struct req *initializeReq(size_t sz, int ipLen) { r->iov.iov_len = sz; r->hdr.msg_iov = &r->iov; r->hdr.msg_iovlen = 1; + r->idx = idx; switch(ipLen) { case 4: r->hdr.msg_name = &r->sa; @@ -77,34 +79,27 @@ static void freeReq(struct req *r) { // submit a recvmsg request via liburing // TODO: What recvfrom support arrives, maybe use that instead? -static int submit_recvmsg_request(struct io_uring *ring, struct req *r, size_t idx) { +static int submit_recvmsg_request(struct io_uring *ring, struct req *r) { struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_recvmsg(sqe, 0, &r->hdr, 0); // use the 0th file in the list of registered fds io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - io_uring_sqe_set_data(sqe, (void *)(idx)); + io_uring_sqe_set_data(sqe, (void *)(r->idx)); io_uring_submit(ring); return 0; } // submit a recvmsg request via liburing // TODO: What recvfrom support arrives, maybe use that instead? -static int submit_sendmsg_request(struct io_uring *ring, struct req *r, int buflen, size_t idx) { +static int submit_sendmsg_request(struct io_uring *ring, struct req *r, int buflen) { r->iov.iov_len = buflen; struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_sendmsg(sqe, 0, &r->hdr, 0); // use the 0th file in the list of registered fds io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - io_uring_sqe_set_data(sqe, (void *)(idx)); + io_uring_sqe_set_data(sqe, (void *)(r->idx)); io_uring_submit(ring); return 0; } -static void submit_nop_request(struct io_uring *ring) { - struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - io_uring_prep_nop(sqe); - io_uring_sqe_set_data(sqe, (void *)(-1)); - io_uring_submit(ring); -} - static void submit_cancel_request(struct io_uring *ring, size_t idx) { struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_cancel(sqe, (void *)(idx), 0); @@ -112,22 +107,22 @@ static void submit_cancel_request(struct io_uring *ring, size_t idx) { } // submit a writev request via liburing -static int submit_writev_request(struct io_uring *ring, struct req *r, int buflen, size_t idx) { +static int submit_writev_request(struct io_uring *ring, struct req *r, int buflen) { r->iov.iov_len = buflen; struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_writev(sqe, 0, &r->iov, 1, 0); // use the 0th file in the list of registered fds io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - io_uring_sqe_set_data(sqe, (void *)(idx)); + io_uring_sqe_set_data(sqe, (void *)(r->idx)); int submitted = io_uring_submit(ring); return 0; } // submit a readv request via liburing -static int submit_readv_request(struct io_uring *ring, struct req *r, size_t idx) { +static int submit_readv_request(struct io_uring *ring, struct req *r) { struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_readv(sqe, 0, &r->iov, 1, 0); // use the 0th file in the list of registered fds io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); - io_uring_sqe_set_data(sqe, (void *)(idx)); + io_uring_sqe_set_data(sqe, (void *)(r->idx)); int submitted = io_uring_submit(ring); return 0; } diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index df8dcbb55..b3324bb0a 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -5,11 +5,75 @@ package uring import "C" import ( + "fmt" "reflect" "syscall" "unsafe" ) +// A writeRing is an io_uring usable for sendmsg or pwritev calls. +// It manages an array of re-usable buffers. +type writeRing struct { + ring *C.go_uring + // reqs is an array of re-usable write requests. + // We dispatch them to the kernel as writes are requested. + // The array length is tied to the size of the uring. + reqs [8]*C.goreq + // reqC is a channel containing indices into reqs + // that are free to use (that is, not in the kernel). + reqC chan int +} + +// initReqs initializes r's reqs so that they can be used for writes/sends. +func (r *writeRing) initReqs(ipLen int) { + for i := range &r.reqs { + r.reqs[i] = C.initializeReq(bufferSize, C.size_t(i), C.int(ipLen)) + } + r.reqC = make(chan int, len(r.reqs)) + for i := range r.reqs { + r.reqC <- i + } +} + +// getReq gets a req usable for a write/send. +// It blocks until such a req is available. +func (r *writeRing) getReq() (req *C.goreq, err error) { + var idx int + select { + case idx = <-r.reqC: + default: + // No request available. Get one from the kernel. + n, idx, err := waitCompletion(r.ring) + if err != nil { + return nil, fmt.Errorf("Write io_uring call failed: %w", err) + } + if n < 0 { + // Past syscall failed. + r.reqC <- idx // don't leak idx + return nil, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n)) + } + } + return r.reqs[idx], nil +} + +// prefetch attempts to fetch a req for use by future writes. +// It does not block. +func (r *writeRing) prefetch() { + idx, ok := peekCompletion(r.ring) + if ok { + // Put the request buffer back in the usable queue. + // Should never block, by construction. + r.reqC <- idx + } +} + +// freeReqs frees the reqs allocated by initReqs. +func (r *writeRing) freeReqs() { + for _, req := range r.reqs { + C.freeReq(req) + } +} + const ( noBlockForCompletion = 0 blockForCompletion = 1 diff --git a/net/uring/udp_linux.go b/net/uring/udp_linux.go index dd8df22c9..1efb28507 100644 --- a/net/uring/udp_linux.go +++ b/net/uring/udp_linux.go @@ -30,7 +30,7 @@ type UDPConn struct { // recvRing is the uring for recvmsg calls. recvRing *C.go_uring // sendRing is the uring for sendmsg calls. - sendRing *C.go_uring + sendRing writeRing // close ensures that connection closes occur exactly once. close sync.Once @@ -106,11 +106,11 @@ func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) { u := &UDPConn{ recvRing: new(C.go_uring), - sendRing: new(C.go_uring), file: file, local: local, is4: len(udpAddr.IP) == 4, } + u.sendRing.ring = new(C.go_uring) fd := file.Fd() u.shutdown = append(u.shutdown, func() { @@ -125,28 +125,24 @@ func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) { C.io_uring_queue_exit(u.recvRing) }) - if ret := C.initialize(u.sendRing, C.int(fd)); ret < 0 { + if ret := C.initialize(u.sendRing.ring, C.int(fd)); ret < 0 { u.doShutdown() return nil, fmt.Errorf("sendRing initialization failed: %w", syscall.Errno(-ret)) } u.shutdown = append(u.shutdown, func() { - C.io_uring_queue_exit(u.sendRing) + C.io_uring_queue_exit(u.sendRing.ring) }) // Initialize buffers for i := range u.recvReqs { - u.recvReqs[i] = C.initializeReq(bufferSize, C.int(len(udpAddr.IP))) - } - for i := range u.sendReqs { - u.sendReqs[i] = C.initializeReq(bufferSize, C.int(len(udpAddr.IP))) + u.recvReqs[i] = C.initializeReq(bufferSize, C.size_t(i), C.int(len(udpAddr.IP))) } + u.sendRing.initReqs(len(udpAddr.IP)) u.shutdown = append(u.shutdown, func() { for _, r := range u.recvReqs { C.freeReq(r) } - for _, r := range u.sendReqs { - C.freeReq(r) - } + u.sendRing.freeReqs() }) // Initialize recv half. @@ -156,16 +152,11 @@ func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) { return nil, err } } - // Initialize send half. - u.sendReqC = make(chan int, len(u.sendReqs)) - for i := range u.sendReqs { - u.sendReqC <- i - } return u, nil } func (u *UDPConn) submitRecvRequest(idx int) error { - errno := C.submit_recvmsg_request(u.recvRing, u.recvReqs[idx], C.size_t(idx)) + errno := C.submit_recvmsg_request(u.recvRing, u.recvReqs[idx]) if errno < 0 { return fmt.Errorf("uring.submitRecvRequest failed: %w", syscall.Errno(-errno)) } @@ -247,24 +238,12 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { if !ok { return 0, fmt.Errorf("cannot WriteTo net.Addr of type %T", addr) } - // If we need a buffer, get a buffer, potentially blocking. - var idx int - select { - case idx = <-u.sendReqC: - default: - // No request available. Get one from the kernel. - n, idx, err = waitCompletion(u.sendRing) - if err != nil { - // io_uring failed to issue the syscall. - return 0, fmt.Errorf("WriteTo io_uring call failed: %w", err) - } - if n < 0 { - // Past syscall failed. - u.sendReqC <- idx // don't leak idx - return 0, fmt.Errorf("previous WriteTo failed: %w", syscall.Errno(-n)) - } + + // Get a req, blocking as needed. + r, err := u.sendRing.getReq() + if err != nil { + return 0, err } - r := u.sendReqs[idx] // Do the write. rbuf := sliceOf(r.buf, len(p)) copy(rbuf, p) @@ -282,13 +261,9 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { r.sa6.sin6_port = C.uint16_t(endian.Hton16(uint16(udpAddr.Port))) r.sa6.sin6_family = C.AF_INET6 } - C.submit_sendmsg_request(u.sendRing, r, C.int(len(p)), C.size_t(idx)) + C.submit_sendmsg_request(u.sendRing.ring, r, C.int(len(p))) // Get an extra buffer, if available. - if idx, ok := peekCompletion(u.sendRing); ok { - // Put the request buffer back in the usable queue. - // Should never block, by construction. - u.sendReqC <- idx - } + u.sendRing.prefetch() return len(p), nil }