diff --git a/Makefile b/Makefile index ebfaff0a4..faa2b2102 100644 --- a/Makefile +++ b/Makefile @@ -676,7 +676,8 @@ OPTIONS_OBJS += src/mux_quic.o src/h3.o src/quic_rx.o src/quic_tx.o \ src/quic_cc_nocc.o src/quic_cc.o src/quic_pacing.o \ src/h3_stats.o src/quic_stats.o src/qpack-enc.o \ src/qpack-tbl.o src/quic_cc_drs.o src/quic_fctl.o \ - src/quic_enc.o src/mux_quic_qstrm.o src/xprt_qstrm.o + src/quic_enc.o src/mux_quic_qstrm.o src/xprt_qstrm.o \ + src/mpring.o endif ifneq ($(USE_QUIC_OPENSSL_COMPAT:0=),) diff --git a/include/haproxy/mpring.h b/include/haproxy/mpring.h new file mode 100644 index 000000000..52ca31d05 --- /dev/null +++ b/include/haproxy/mpring.h @@ -0,0 +1,54 @@ +/* + * MPSC byte ring buffer with variable sized entries. + */ + +#ifndef _MPRING_H +#define _MPRING_H + +#include + +#include + +struct mpring { + size_t capacity; + size_t mask; + uint8_t *buffer; + uint64_t head THREAD_ALIGNED(); + uint64_t tail THREAD_ALIGNED(); +}; + +/* Initialize the ring buffer. The size MUST be a power of 2, and bigger than + * the value of the MPRING_PAYLOAD_ALIGN macro in mpring.c (currently set to 8). + */ +void mpring_init(struct mpring *ring, void *buffer, size_t size); + +/* Reserve bytes in the buffer. Returns NULL in case of failure, and otherwise + * a pointer to the buffer with enough space to write bytes. + */ +void *mpring_write_reserve(struct mpring *ring, size_t len); + +/* Commit data to the buffer after it was written to the pointer given by + * mpring_write_reserve(). The and parameters MUST be identical to + * the ones returned by and passed to mpring_write_reserve(), respectively. + */ +void mpring_write_commit(struct mpring *ring, void *ptr, size_t len); + +/* Convenience shorthand for when we only need to write one contiguous set of + * bytes to the buffer. Returns 0 in case of failure, and a non-zero value + * otherwise. + */ +int mpring_write(struct mpring *ring, const void *data, size_t len); + +/* Get the next entry to be read. Returns NULL if there is no data to be read, + * otherwise returns a pointer to that data and set the size of the entry in the + * pointer. + */ +void *mpring_read_begin(struct mpring *ring, size_t *len); + +/* Indicate that we are done reading an entry, and that the space can be reused + * for new entries. This MUST be called after we are done reading an entry. The + * parameter MUST be equal to the length given by mpring_read_begin(). + */ +void mpring_read_end(struct mpring *ring, size_t len); + +#endif /* _MPRING_H */ diff --git a/src/mpring.c b/src/mpring.c new file mode 100644 index 000000000..cbbee94e9 --- /dev/null +++ b/src/mpring.c @@ -0,0 +1,166 @@ +#include +#include +#include +#include + +#include +#include + +/* 16 bytes would be more wasteful but would allow 128-bit SIMD/NEON memcpy() */ +#define MPRING_PAYLOAD_ALIGN 8 + +#define MPRING_HDR_PADDING (-1) /* Denotes padding space at the end of the buffer */ +#define MPRING_HDR_BUSY 0 /* No data or it is still being written */ + +struct mpring_record { + /* The length or one of the two magic values above */ + int64_t header; +} ALIGNED(MPRING_PAYLOAD_ALIGN); + +/* What we call the stride is the total amount of bytes we need to store an + * entry, including the record header, and the padding bytes necessary to + * maintain proper alignment. + */ +#define MPRING_STRIDE_LEN(len) \ + (sizeof(struct mpring_record) + ((len + MPRING_PAYLOAD_ALIGN - 1) & ~(MPRING_PAYLOAD_ALIGN - 1))) + +void mpring_init(struct mpring *ring, void *buffer, size_t size) +{ + /* The size of the buffer must be a power of 2 */ + BUG_ON((size & (size - 1)) != 0); + + /* And must also be bigger than the payload alignment */ + BUG_ON(size < MPRING_PAYLOAD_ALIGN); + + ring->buffer = buffer; + /* We have to zero the buffer to ensure that all records are marked + * as BUSY even if we have not written there yet. + */ + memset(ring->buffer, 0, size); + + ring->capacity = size; + ring->mask = size - 1; + + ring->head = ring->tail = 0; +} + +void *mpring_write_reserve(struct mpring *ring, size_t len) +{ + struct mpring_record *record; + uint64_t head, tail; + size_t stride, offset, padding, need; + + /* Align writes to the buffer. This is both useful in order to guarantee + * that SIMD/NEON optimized memcpy() implementations can be used, but + * also required to ensure we always have space at the end of the buffer + * for a header to mark padding. + */ + stride = MPRING_STRIDE_LEN(len); + + head = _HA_ATOMIC_LOAD(&ring->head); + do { + offset = head & ring->mask; + + /* Check if we have enough contiguous space */ + padding = 0; + if (offset + stride > ring->capacity) { + padding = ring->capacity - offset; + } + + need = stride + padding; + + tail = HA_ATOMIC_LOAD(&ring->tail); + if (ring->capacity < head - tail + need) { + /* Not enough room */ + return NULL; + } + } while (!_HA_ATOMIC_CAS(&ring->head, &head, head + need)); + + /* Burn the rest of the buffer */ + if (padding > 0) { + record = (struct mpring_record *)(ring->buffer + offset); + HA_ATOMIC_STORE(&record->header, MPRING_HDR_PADDING); + + offset = 0; + } + + record = (struct mpring_record *)(ring->buffer + offset); + _HA_ATOMIC_STORE(&record->header, MPRING_HDR_BUSY); + + return record + 1; +} + +void mpring_write_commit(struct mpring *ring, void *ptr, size_t len) +{ + struct mpring_record *record; + + record = (struct mpring_record *)ptr - 1; + HA_ATOMIC_STORE(&record->header, len); +} + +int mpring_write(struct mpring *ring, const void *data, size_t len) +{ + void *ptr; + + ptr = mpring_write_reserve(ring, len); + if (!ptr) + return 0; + + memcpy(ptr, data, len); + + mpring_write_commit(ring, ptr, len); + return 1; +} + +void *mpring_read_begin(struct mpring *ring, size_t *len) +{ + struct mpring_record *record; + uint64_t tail; + int64_t size; + size_t offset, skip; + + tail = ring->tail; + +again: + offset = tail & ring->mask; + record = (struct mpring_record *)(ring->buffer + offset); + size = HA_ATOMIC_LOAD(&record->header); + + if (size == MPRING_HDR_BUSY) + return NULL; /* No more data to read */ + + if (size == MPRING_HDR_PADDING) { + /* Reset to 0 for next wrap-around */ + _HA_ATOMIC_STORE(&record->header, MPRING_HDR_BUSY); + + /* Skip over the padding */ + skip = ring->capacity - offset; + tail += skip; + _HA_ATOMIC_STORE(&ring->tail, tail); + /* Try again with new tail */ + goto again; + } + + *len = size; + return record + 1; +} + +void mpring_read_end(struct mpring *ring, size_t len) +{ + struct mpring_record *record; + uint64_t tail; + size_t offset, stride; + + tail = _HA_ATOMIC_LOAD(&ring->tail); + offset = tail & ring->mask; + record = (struct mpring_record *)(ring->buffer + offset); + + stride = MPRING_STRIDE_LEN(len); + + /* Reset to 0 so all records are set to mpring_HDR_BUSY when + * producers wrap around and reuse this memory later. + */ + memset(record, 0, stride); + + HA_ATOMIC_STORE(&ring->tail, tail + stride); +}