mirror of
https://github.com/tailscale/tailscale.git
synced 2026-05-05 12:16:44 +02:00
refactor out common write code
This commit is contained in:
parent
ba6c48c9e9
commit
934709fd7a
@ -24,7 +24,7 @@ type file struct {
|
||||
// We have two urings so that we don't have to demux completion events.
|
||||
|
||||
// writeRing is the uring for pwritev calls.
|
||||
writeRing *C.go_uring
|
||||
writeRing writeRing
|
||||
// readRing is the uring for preadv calls.
|
||||
readRing *C.go_uring
|
||||
|
||||
@ -42,13 +42,6 @@ type file struct {
|
||||
// We attempt to keep them all queued up for the kernel to fulfill.
|
||||
// The array length is tied to the size of the uring.
|
||||
readReqs [1]*C.goreq // Whoops! The kernel apparently cannot handle more than 1 concurrent preadv calls on a tun device!
|
||||
// writeReqs is an array of re-usable file pwritev requests.
|
||||
// We dispatch them to the kernel as writes are requested.
|
||||
// The array length is tied to the size of the uring.
|
||||
writeReqs [8]*C.goreq
|
||||
// writeReqC is a channel containing indices into writeReqs
|
||||
// that are free to use (that is, not in the kernel).
|
||||
writeReqC chan int
|
||||
|
||||
// refcount counts the number of outstanding read/write requests.
|
||||
// See the length comment for UDPConn.refcount for details.
|
||||
@ -57,10 +50,10 @@ type file struct {
|
||||
|
||||
func newFile(f *os.File) (*file, error) {
|
||||
u := &file{
|
||||
readRing: new(C.go_uring),
|
||||
writeRing: new(C.go_uring),
|
||||
file: f,
|
||||
readRing: new(C.go_uring),
|
||||
file: f,
|
||||
}
|
||||
u.writeRing.ring = new(C.go_uring)
|
||||
|
||||
fd := f.Fd()
|
||||
if ret := C.initialize(u.readRing, C.int(fd)); ret < 0 {
|
||||
@ -71,28 +64,24 @@ func newFile(f *os.File) (*file, error) {
|
||||
C.io_uring_queue_exit(u.readRing)
|
||||
})
|
||||
|
||||
if ret := C.initialize(u.writeRing, C.int(fd)); ret < 0 {
|
||||
if ret := C.initialize(u.writeRing.ring, C.int(fd)); ret < 0 {
|
||||
u.doShutdown()
|
||||
return nil, fmt.Errorf("writeRing initialization failed: %w", syscall.Errno(-ret))
|
||||
}
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
C.io_uring_queue_exit(u.writeRing)
|
||||
C.io_uring_queue_exit(u.writeRing.ring)
|
||||
})
|
||||
|
||||
// Initialize buffers
|
||||
for i := range &u.readReqs {
|
||||
u.readReqs[i] = C.initializeReq(bufferSize, 0) // 0: not used for IP addresses
|
||||
}
|
||||
for i := range &u.writeReqs {
|
||||
u.writeReqs[i] = C.initializeReq(bufferSize, 0) // 0: not used for IP addresses
|
||||
u.readReqs[i] = C.initializeReq(bufferSize, C.size_t(i), 0) // 0: not used for IP addresses
|
||||
}
|
||||
u.writeRing.initReqs(0) // 0: not used for IP addresses
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
for _, r := range u.readReqs {
|
||||
C.freeReq(r)
|
||||
}
|
||||
for _, r := range u.writeReqs {
|
||||
C.freeReq(r)
|
||||
}
|
||||
u.writeRing.freeReqs()
|
||||
})
|
||||
|
||||
// Initialize read half.
|
||||
@ -103,12 +92,6 @@ func newFile(f *os.File) (*file, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize write half.
|
||||
u.writeReqC = make(chan int, len(u.writeReqs))
|
||||
for i := range u.writeReqs {
|
||||
u.writeReqC <- i
|
||||
}
|
||||
|
||||
// Initialization succeeded.
|
||||
// Take ownership of the file.
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
@ -118,7 +101,7 @@ func newFile(f *os.File) (*file, error) {
|
||||
}
|
||||
|
||||
func (u *file) submitReadvRequest(idx int) error {
|
||||
errno := C.submit_readv_request(u.readRing, u.readReqs[idx], C.size_t(idx))
|
||||
errno := C.submit_readv_request(u.readRing, u.readReqs[idx])
|
||||
if errno < 0 {
|
||||
return fmt.Errorf("uring.submitReadvRequest failed: %w", syscall.Errno(-errno))
|
||||
}
|
||||
@ -174,34 +157,17 @@ func (u *file) Write(buf []byte) (int, error) {
|
||||
return 0, os.ErrClosed
|
||||
}
|
||||
|
||||
// If we need a buffer, get a buffer, potentially blocking.
|
||||
var idx int
|
||||
select {
|
||||
case idx = <-u.writeReqC:
|
||||
default:
|
||||
// No request available. Get one from the kernel.
|
||||
n, idx, err := waitCompletion(u.writeRing)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("Write io_uring call failed: %w", err)
|
||||
}
|
||||
if n < 0 {
|
||||
// Past syscall failed.
|
||||
u.writeReqC <- idx // don't leak idx
|
||||
return 0, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n))
|
||||
}
|
||||
// Get a req, blocking as needed.
|
||||
r, err := u.writeRing.getReq()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r := u.writeReqs[idx]
|
||||
// Do the write.
|
||||
rbuf := sliceOf(r.buf, len(buf))
|
||||
copy(rbuf, buf)
|
||||
C.submit_writev_request(u.writeRing, r, C.int(len(buf)), C.size_t(idx))
|
||||
C.submit_writev_request(u.writeRing.ring, r, C.int(len(buf)))
|
||||
// Get an extra buffer, if available.
|
||||
idx, ok := peekCompletion(u.writeRing)
|
||||
if ok {
|
||||
// Put the request buffer back in the usable queue.
|
||||
// Should never block, by construction.
|
||||
u.writeReqC <- idx
|
||||
}
|
||||
u.writeRing.prefetch()
|
||||
return len(buf), nil
|
||||
}
|
||||
|
||||
|
||||
@ -44,11 +44,12 @@ struct req {
|
||||
// It is accessed atomically.
|
||||
int32_t in_kernel;
|
||||
char *buf;
|
||||
size_t idx;
|
||||
};
|
||||
|
||||
typedef struct req goreq;
|
||||
|
||||
static struct req *initializeReq(size_t sz, int ipLen) {
|
||||
static struct req *initializeReq(size_t sz, size_t idx, int ipLen) {
|
||||
struct req *r = malloc(sizeof(struct req));
|
||||
memset(r, 0, sizeof(*r));
|
||||
r->buf = malloc(sz);
|
||||
@ -57,6 +58,7 @@ static struct req *initializeReq(size_t sz, int ipLen) {
|
||||
r->iov.iov_len = sz;
|
||||
r->hdr.msg_iov = &r->iov;
|
||||
r->hdr.msg_iovlen = 1;
|
||||
r->idx = idx;
|
||||
switch(ipLen) {
|
||||
case 4:
|
||||
r->hdr.msg_name = &r->sa;
|
||||
@ -77,34 +79,27 @@ static void freeReq(struct req *r) {
|
||||
|
||||
// submit a recvmsg request via liburing
|
||||
// TODO: What recvfrom support arrives, maybe use that instead?
|
||||
static int submit_recvmsg_request(struct io_uring *ring, struct req *r, size_t idx) {
|
||||
static int submit_recvmsg_request(struct io_uring *ring, struct req *r) {
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_recvmsg(sqe, 0, &r->hdr, 0); // use the 0th file in the list of registered fds
|
||||
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
|
||||
io_uring_sqe_set_data(sqe, (void *)(idx));
|
||||
io_uring_sqe_set_data(sqe, (void *)(r->idx));
|
||||
io_uring_submit(ring);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// submit a recvmsg request via liburing
|
||||
// TODO: What recvfrom support arrives, maybe use that instead?
|
||||
static int submit_sendmsg_request(struct io_uring *ring, struct req *r, int buflen, size_t idx) {
|
||||
static int submit_sendmsg_request(struct io_uring *ring, struct req *r, int buflen) {
|
||||
r->iov.iov_len = buflen;
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_sendmsg(sqe, 0, &r->hdr, 0); // use the 0th file in the list of registered fds
|
||||
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
|
||||
io_uring_sqe_set_data(sqe, (void *)(idx));
|
||||
io_uring_sqe_set_data(sqe, (void *)(r->idx));
|
||||
io_uring_submit(ring);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void submit_nop_request(struct io_uring *ring) {
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_nop(sqe);
|
||||
io_uring_sqe_set_data(sqe, (void *)(-1));
|
||||
io_uring_submit(ring);
|
||||
}
|
||||
|
||||
static void submit_cancel_request(struct io_uring *ring, size_t idx) {
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_cancel(sqe, (void *)(idx), 0);
|
||||
@ -112,22 +107,22 @@ static void submit_cancel_request(struct io_uring *ring, size_t idx) {
|
||||
}
|
||||
|
||||
// submit a writev request via liburing
|
||||
static int submit_writev_request(struct io_uring *ring, struct req *r, int buflen, size_t idx) {
|
||||
static int submit_writev_request(struct io_uring *ring, struct req *r, int buflen) {
|
||||
r->iov.iov_len = buflen;
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_writev(sqe, 0, &r->iov, 1, 0); // use the 0th file in the list of registered fds
|
||||
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
|
||||
io_uring_sqe_set_data(sqe, (void *)(idx));
|
||||
io_uring_sqe_set_data(sqe, (void *)(r->idx));
|
||||
int submitted = io_uring_submit(ring);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// submit a readv request via liburing
|
||||
static int submit_readv_request(struct io_uring *ring, struct req *r, size_t idx) {
|
||||
static int submit_readv_request(struct io_uring *ring, struct req *r) {
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_readv(sqe, 0, &r->iov, 1, 0); // use the 0th file in the list of registered fds
|
||||
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
|
||||
io_uring_sqe_set_data(sqe, (void *)(idx));
|
||||
io_uring_sqe_set_data(sqe, (void *)(r->idx));
|
||||
int submitted = io_uring_submit(ring);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -5,11 +5,75 @@ package uring
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// A writeRing is an io_uring usable for sendmsg or pwritev calls.
|
||||
// It manages an array of re-usable buffers.
|
||||
type writeRing struct {
|
||||
ring *C.go_uring
|
||||
// reqs is an array of re-usable write requests.
|
||||
// We dispatch them to the kernel as writes are requested.
|
||||
// The array length is tied to the size of the uring.
|
||||
reqs [8]*C.goreq
|
||||
// reqC is a channel containing indices into reqs
|
||||
// that are free to use (that is, not in the kernel).
|
||||
reqC chan int
|
||||
}
|
||||
|
||||
// initReqs initializes r's reqs so that they can be used for writes/sends.
|
||||
func (r *writeRing) initReqs(ipLen int) {
|
||||
for i := range &r.reqs {
|
||||
r.reqs[i] = C.initializeReq(bufferSize, C.size_t(i), C.int(ipLen))
|
||||
}
|
||||
r.reqC = make(chan int, len(r.reqs))
|
||||
for i := range r.reqs {
|
||||
r.reqC <- i
|
||||
}
|
||||
}
|
||||
|
||||
// getReq gets a req usable for a write/send.
|
||||
// It blocks until such a req is available.
|
||||
func (r *writeRing) getReq() (req *C.goreq, err error) {
|
||||
var idx int
|
||||
select {
|
||||
case idx = <-r.reqC:
|
||||
default:
|
||||
// No request available. Get one from the kernel.
|
||||
n, idx, err := waitCompletion(r.ring)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Write io_uring call failed: %w", err)
|
||||
}
|
||||
if n < 0 {
|
||||
// Past syscall failed.
|
||||
r.reqC <- idx // don't leak idx
|
||||
return nil, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n))
|
||||
}
|
||||
}
|
||||
return r.reqs[idx], nil
|
||||
}
|
||||
|
||||
// prefetch attempts to fetch a req for use by future writes.
|
||||
// It does not block.
|
||||
func (r *writeRing) prefetch() {
|
||||
idx, ok := peekCompletion(r.ring)
|
||||
if ok {
|
||||
// Put the request buffer back in the usable queue.
|
||||
// Should never block, by construction.
|
||||
r.reqC <- idx
|
||||
}
|
||||
}
|
||||
|
||||
// freeReqs frees the reqs allocated by initReqs.
|
||||
func (r *writeRing) freeReqs() {
|
||||
for _, req := range r.reqs {
|
||||
C.freeReq(req)
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
noBlockForCompletion = 0
|
||||
blockForCompletion = 1
|
||||
|
||||
@ -30,7 +30,7 @@ type UDPConn struct {
|
||||
// recvRing is the uring for recvmsg calls.
|
||||
recvRing *C.go_uring
|
||||
// sendRing is the uring for sendmsg calls.
|
||||
sendRing *C.go_uring
|
||||
sendRing writeRing
|
||||
|
||||
// close ensures that connection closes occur exactly once.
|
||||
close sync.Once
|
||||
@ -106,11 +106,11 @@ func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) {
|
||||
|
||||
u := &UDPConn{
|
||||
recvRing: new(C.go_uring),
|
||||
sendRing: new(C.go_uring),
|
||||
file: file,
|
||||
local: local,
|
||||
is4: len(udpAddr.IP) == 4,
|
||||
}
|
||||
u.sendRing.ring = new(C.go_uring)
|
||||
|
||||
fd := file.Fd()
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
@ -125,28 +125,24 @@ func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) {
|
||||
C.io_uring_queue_exit(u.recvRing)
|
||||
})
|
||||
|
||||
if ret := C.initialize(u.sendRing, C.int(fd)); ret < 0 {
|
||||
if ret := C.initialize(u.sendRing.ring, C.int(fd)); ret < 0 {
|
||||
u.doShutdown()
|
||||
return nil, fmt.Errorf("sendRing initialization failed: %w", syscall.Errno(-ret))
|
||||
}
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
C.io_uring_queue_exit(u.sendRing)
|
||||
C.io_uring_queue_exit(u.sendRing.ring)
|
||||
})
|
||||
|
||||
// Initialize buffers
|
||||
for i := range u.recvReqs {
|
||||
u.recvReqs[i] = C.initializeReq(bufferSize, C.int(len(udpAddr.IP)))
|
||||
}
|
||||
for i := range u.sendReqs {
|
||||
u.sendReqs[i] = C.initializeReq(bufferSize, C.int(len(udpAddr.IP)))
|
||||
u.recvReqs[i] = C.initializeReq(bufferSize, C.size_t(i), C.int(len(udpAddr.IP)))
|
||||
}
|
||||
u.sendRing.initReqs(len(udpAddr.IP))
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
for _, r := range u.recvReqs {
|
||||
C.freeReq(r)
|
||||
}
|
||||
for _, r := range u.sendReqs {
|
||||
C.freeReq(r)
|
||||
}
|
||||
u.sendRing.freeReqs()
|
||||
})
|
||||
|
||||
// Initialize recv half.
|
||||
@ -156,16 +152,11 @@ func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// Initialize send half.
|
||||
u.sendReqC = make(chan int, len(u.sendReqs))
|
||||
for i := range u.sendReqs {
|
||||
u.sendReqC <- i
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func (u *UDPConn) submitRecvRequest(idx int) error {
|
||||
errno := C.submit_recvmsg_request(u.recvRing, u.recvReqs[idx], C.size_t(idx))
|
||||
errno := C.submit_recvmsg_request(u.recvRing, u.recvReqs[idx])
|
||||
if errno < 0 {
|
||||
return fmt.Errorf("uring.submitRecvRequest failed: %w", syscall.Errno(-errno))
|
||||
}
|
||||
@ -247,24 +238,12 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("cannot WriteTo net.Addr of type %T", addr)
|
||||
}
|
||||
// If we need a buffer, get a buffer, potentially blocking.
|
||||
var idx int
|
||||
select {
|
||||
case idx = <-u.sendReqC:
|
||||
default:
|
||||
// No request available. Get one from the kernel.
|
||||
n, idx, err = waitCompletion(u.sendRing)
|
||||
if err != nil {
|
||||
// io_uring failed to issue the syscall.
|
||||
return 0, fmt.Errorf("WriteTo io_uring call failed: %w", err)
|
||||
}
|
||||
if n < 0 {
|
||||
// Past syscall failed.
|
||||
u.sendReqC <- idx // don't leak idx
|
||||
return 0, fmt.Errorf("previous WriteTo failed: %w", syscall.Errno(-n))
|
||||
}
|
||||
|
||||
// Get a req, blocking as needed.
|
||||
r, err := u.sendRing.getReq()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r := u.sendReqs[idx]
|
||||
// Do the write.
|
||||
rbuf := sliceOf(r.buf, len(p))
|
||||
copy(rbuf, p)
|
||||
@ -282,13 +261,9 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
||||
r.sa6.sin6_port = C.uint16_t(endian.Hton16(uint16(udpAddr.Port)))
|
||||
r.sa6.sin6_family = C.AF_INET6
|
||||
}
|
||||
C.submit_sendmsg_request(u.sendRing, r, C.int(len(p)), C.size_t(idx))
|
||||
C.submit_sendmsg_request(u.sendRing.ring, r, C.int(len(p)))
|
||||
// Get an extra buffer, if available.
|
||||
if idx, ok := peekCompletion(u.sendRing); ok {
|
||||
// Put the request buffer back in the usable queue.
|
||||
// Should never block, by construction.
|
||||
u.sendReqC <- idx
|
||||
}
|
||||
u.sendRing.prefetch()
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user