diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ad4eea7f3d..26c6add180 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -33,6 +33,7 @@ jobs:
- uses: prometheus/promci@443c7fc2397e946bc9f5029e313a9c3441b9b86d # v0.4.7
- uses: ./.github/promci/actions/setup_environment
- run: go test --tags=dedupelabels ./...
+ - run: go test --tags=forcedirectio,stringlabels -race ./tsdb/
- run: GOARCH=386 go test ./...
- uses: ./.github/promci/actions/check_proto
with:
diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go
index efcf5eb241..6bf9778f0c 100644
--- a/cmd/prometheus/main.go
+++ b/cmd/prometheus/main.go
@@ -293,6 +293,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "type-and-unit-labels":
c.scrape.EnableTypeAndUnitLabels = true
logger.Info("Experimental type and unit labels enabled")
+ case "use-uncached-io":
+ c.tsdb.UseUncachedIO = true
+ logger.Info("Experimental Uncached IO is enabled.")
default:
logger.Warn("Unknown option for --enable-feature", "option", o)
}
@@ -557,7 +560,7 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
- a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
+ a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr, use-uncached-io. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode)
@@ -1844,6 +1847,7 @@ type tsdbOptions struct {
EnableDelayedCompaction bool
CompactionDelayMaxPercent int
EnableOverlappingCompaction bool
+ UseUncachedIO bool
}
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
@@ -1867,6 +1871,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
EnableDelayedCompaction: opts.EnableDelayedCompaction,
CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
+ UseUncachedIO: opts.UseUncachedIO,
}
}
diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md
index ebd6007f7b..18a979367b 100644
--- a/docs/command-line/prometheus.md
+++ b/docs/command-line/prometheus.md
@@ -61,7 +61,7 @@ The Prometheus monitoring server
| --query.timeout
| Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| --query.max-concurrency
| Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| --query.max-samples
| Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
-| --enable-feature
... | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
+| --enable-feature
... | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr, use-uncached-io. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| --agent
| Run Prometheus in 'Agent mode'. | |
| --log.level
| Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| --log.format
| Output format of log messages. One of: [logfmt, json] | `logfmt` |
diff --git a/docs/feature_flags.md b/docs/feature_flags.md
index 0e6bff3861..5a7ad75f43 100644
--- a/docs/feature_flags.md
+++ b/docs/feature_flags.md
@@ -272,3 +272,17 @@ It's especially useful for users who:
In future more [work is planned](https://github.com/prometheus/prometheus/issues/16610) that will depend on this e.g. rich PromQL UX that helps
when wrong types are used on wrong functions, automatic renames, delta types and more.
+
+## Use Uncached IO
+
+`--enable-feature=use-uncached-io`
+
+Experimental and only available on Linux.
+
+When enabled, it makes chunks writing bypass the page cache. Its primary
+goal is to reduce confusion around page‐cache behavior and to prevent over‑allocation of
+memory in response to misleading cache growth.
+
+This is currently implemented using direct I/O.
+
+For more details, see the [proposal](https://github.com/prometheus/proposals/pull/45).
diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go
index f505d762bb..d89efe31b7 100644
--- a/tsdb/chunks/chunks.go
+++ b/tsdb/chunks/chunks.go
@@ -14,7 +14,6 @@
package chunks
import (
- "bufio"
"encoding/binary"
"errors"
"fmt"
@@ -281,12 +280,13 @@ func checkCRC32(data, sum []byte) error {
type Writer struct {
dirFile *os.File
files []*os.File
- wbuf *bufio.Writer
+ wbuf fileutil.BufWriter
n int64
crc32 hash.Hash
buf [binary.MaxVarintLen32]byte
- segmentSize int64
+ segmentSize int64
+ useUncachedIO bool
}
const (
@@ -294,21 +294,34 @@ const (
DefaultChunkSegmentSize = 512 * 1024 * 1024
)
-// NewWriterWithSegSize returns a new writer against the given directory
-// and allows setting a custom size for the segments.
-func NewWriterWithSegSize(dir string, segmentSize int64) (*Writer, error) {
- return newWriter(dir, segmentSize)
+type writerOptions struct {
+ segmentSize int64
+ useUncachedIO bool
}
-// NewWriter returns a new writer against the given directory
-// using the default segment size.
-func NewWriter(dir string) (*Writer, error) {
- return newWriter(dir, DefaultChunkSegmentSize)
+type WriterOption func(*writerOptions)
+
+func WithUncachedIO(enabled bool) WriterOption {
+ return func(o *writerOptions) {
+ o.useUncachedIO = enabled
+ }
}
-func newWriter(dir string, segmentSize int64) (*Writer, error) {
- if segmentSize <= 0 {
- segmentSize = DefaultChunkSegmentSize
+func WithSegmentSize(segmentSize int64) WriterOption {
+ return func(o *writerOptions) {
+ if segmentSize <= 0 {
+ segmentSize = DefaultChunkSegmentSize
+ }
+ o.segmentSize = segmentSize
+ }
+}
+
+// NewWriter returns a new writer against the given directory.
+func NewWriter(dir string, opts ...WriterOption) (*Writer, error) {
+ options := &writerOptions{}
+
+ for _, opt := range opts {
+ opt(options)
}
if err := os.MkdirAll(dir, 0o777); err != nil {
@@ -319,10 +332,11 @@ func newWriter(dir string, segmentSize int64) (*Writer, error) {
return nil, err
}
return &Writer{
- dirFile: dirFile,
- n: 0,
- crc32: newCRC32(),
- segmentSize: segmentSize,
+ dirFile: dirFile,
+ n: 0,
+ crc32: newCRC32(),
+ segmentSize: options.segmentSize,
+ useUncachedIO: options.useUncachedIO,
}, nil
}
@@ -333,7 +347,7 @@ func (w *Writer) tail() *os.File {
return w.files[len(w.files)-1]
}
-// finalizeTail writes all pending data to the current tail file,
+// finalizeTail writes all pending data to the current tail file if any,
// truncates its size, and closes it.
func (w *Writer) finalizeTail() error {
tf := w.tail()
@@ -341,8 +355,10 @@ func (w *Writer) finalizeTail() error {
return nil
}
- if err := w.wbuf.Flush(); err != nil {
- return err
+ if w.wbuf != nil {
+ if err := w.wbuf.Flush(); err != nil {
+ return err
+ }
}
if err := tf.Sync(); err != nil {
return err
@@ -373,9 +389,25 @@ func (w *Writer) cut() error {
w.files = append(w.files, f)
if w.wbuf != nil {
- w.wbuf.Reset(f)
+ if err := w.wbuf.Reset(f); err != nil {
+ return err
+ }
} else {
- w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
+ var (
+ wbuf fileutil.BufWriter
+ err error
+ )
+ size := 8 * 1024 * 1024
+ if w.useUncachedIO {
+ // Uncached IO is implemented using direct I/O for now.
+ wbuf, err = fileutil.NewDirectIOWriter(f, size)
+ } else {
+ wbuf, err = fileutil.NewBufioWriterWithSeek(f, size)
+ }
+ if err != nil {
+ return err
+ }
+ w.wbuf = wbuf
}
return nil
@@ -434,8 +466,9 @@ func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, all
return 0, nil, 0, fmt.Errorf("open final file: %w", err)
}
// Skip header for further writes.
- if _, err := f.Seek(int64(n), 0); err != nil {
- return 0, nil, 0, fmt.Errorf("seek in final file: %w", err)
+ offset := int64(n)
+ if _, err := f.Seek(offset, 0); err != nil {
+ return 0, nil, 0, fmt.Errorf("seek to %d in final file: %w", offset, err)
}
return n, f, seq, nil
}
diff --git a/tsdb/compact.go b/tsdb/compact.go
index b66f7eed8f..602958eaeb 100644
--- a/tsdb/compact.go
+++ b/tsdb/compact.go
@@ -85,6 +85,7 @@ type LeveledCompactor struct {
chunkPool chunkenc.Pool
ctx context.Context
maxBlockChunkSegmentSize int64
+ useUncachedIO bool
mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder
postingsDecoderFactory PostingsDecoderFactory
@@ -171,6 +172,8 @@ type LeveledCompactorOptions struct {
EnableOverlappingCompaction bool
// Metrics is set of metrics for Compactor. By default, NewCompactorMetrics would be called to initialize metrics unless it is provided.
Metrics *CompactorMetrics
+ // UseUncachedIO allows bypassing the page cache when appropriate.
+ UseUncachedIO bool
}
type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder
@@ -226,6 +229,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
metrics: opts.Metrics,
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
+ useUncachedIO: opts.UseUncachedIO,
mergeFunc: mergeFunc,
postingsEncoder: pe,
postingsDecoderFactory: opts.PD,
@@ -657,7 +661,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
// data of all blocks.
var chunkw ChunkWriter
- chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize)
+ chunkw, err = chunks.NewWriter(chunkDir(tmp), chunks.WithSegmentSize(c.maxBlockChunkSegmentSize), chunks.WithUncachedIO(c.useUncachedIO))
if err != nil {
return fmt.Errorf("open chunk writer: %w", err)
}
diff --git a/tsdb/db.go b/tsdb/db.go
index 2d0af5c940..3eab096505 100644
--- a/tsdb/db.go
+++ b/tsdb/db.go
@@ -219,6 +219,9 @@ type Options struct {
// PostingsDecoderFactory allows users to customize postings decoders based on BlockMeta.
// By default, DefaultPostingsDecoderFactory will be used to create raw posting decoder.
PostingsDecoderFactory PostingsDecoderFactory
+
+ // UseUncachedIO allows bypassing the page cache when appropriate.
+ UseUncachedIO bool
}
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
@@ -903,6 +906,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
PD: opts.PostingsDecoderFactory,
+ UseUncachedIO: opts.UseUncachedIO,
})
}
if err != nil {
diff --git a/tsdb/db_test.go b/tsdb/db_test.go
index b5ce5f20fd..b118d3deb3 100644
--- a/tsdb/db_test.go
+++ b/tsdb/db_test.go
@@ -2923,7 +2923,7 @@ func TestChunkWriter_ReadAfterWrite(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
tempDir := t.TempDir()
- chunkw, err := chunks.NewWriterWithSegSize(tempDir, chunks.SegmentHeaderSize+int64(test.segmentSize))
+ chunkw, err := chunks.NewWriter(tempDir, chunks.WithSegmentSize(chunks.SegmentHeaderSize+int64(test.segmentSize)))
require.NoError(t, err)
for _, chks := range test.chks {
diff --git a/tsdb/fileutil/direct_io.go b/tsdb/fileutil/direct_io.go
new file mode 100644
index 0000000000..ad306776ca
--- /dev/null
+++ b/tsdb/fileutil/direct_io.go
@@ -0,0 +1,39 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileutil
+
+import (
+ "bufio"
+ "errors"
+ "os"
+)
+
+var errDirectIOUnsupported = errors.New("direct IO is unsupported")
+
+type BufWriter interface {
+ Write([]byte) (int, error)
+ Flush() error
+ Reset(f *os.File) error
+}
+
+// writer is a specialized wrapper around bufio.Writer.
+// It is used when Direct IO isn't enabled, as using directIOWriter in such cases is impractical.
+type writer struct {
+ *bufio.Writer
+}
+
+func (b *writer) Reset(f *os.File) error {
+ b.Writer.Reset(f)
+ return nil
+}
diff --git a/tsdb/fileutil/direct_io_force.go b/tsdb/fileutil/direct_io_force.go
new file mode 100644
index 0000000000..e2f811b9f2
--- /dev/null
+++ b/tsdb/fileutil/direct_io_force.go
@@ -0,0 +1,28 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This allows seamless testing of the Direct I/O writer across all tsdb tests.
+
+//go:build linux && forcedirectio
+
+package fileutil
+
+import "os"
+
+func NewDirectIOWriter(f *os.File, size int) (BufWriter, error) {
+ return newDirectIOWriter(f, size)
+}
+
+func NewBufioWriterWithSeek(f *os.File, size int) (BufWriter, error) {
+ return NewDirectIOWriter(f, size)
+}
diff --git a/tsdb/fileutil/direct_io_linux.go b/tsdb/fileutil/direct_io_linux.go
new file mode 100644
index 0000000000..7406cc1594
--- /dev/null
+++ b/tsdb/fileutil/direct_io_linux.go
@@ -0,0 +1,29 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build linux && !forcedirectio
+
+package fileutil
+
+import (
+ "bufio"
+ "os"
+)
+
+func NewBufioWriterWithSeek(f *os.File, size int) (BufWriter, error) {
+ return &writer{bufio.NewWriterSize(f, size)}, nil
+}
+
+func NewDirectIOWriter(f *os.File, size int) (BufWriter, error) {
+ return newDirectIOWriter(f, size)
+}
diff --git a/tsdb/fileutil/direct_io_unsupported.go b/tsdb/fileutil/direct_io_unsupported.go
new file mode 100644
index 0000000000..fb0b28fcc3
--- /dev/null
+++ b/tsdb/fileutil/direct_io_unsupported.go
@@ -0,0 +1,29 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !linux
+
+package fileutil
+
+import (
+ "bufio"
+ "os"
+)
+
+func NewBufioWriterWithSeek(f *os.File, size int) (BufWriter, error) {
+ return &writer{bufio.NewWriterSize(f, size)}, nil
+}
+
+func NewDirectIOWriter(_ *os.File, _ int) (BufWriter, error) {
+ return nil, errDirectIOUnsupported
+}
diff --git a/tsdb/fileutil/direct_io_writer.go b/tsdb/fileutil/direct_io_writer.go
new file mode 100644
index 0000000000..cf20504006
--- /dev/null
+++ b/tsdb/fileutil/direct_io_writer.go
@@ -0,0 +1,407 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build linux
+
+package fileutil
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "unsafe"
+
+ "golang.org/x/sys/unix"
+)
+
+const (
+ // the defaults are deliberately set higher to cover most setups.
+ // On Linux >= 6.14, statx(2) https://man7.org/linux/man-pages/man2/statx.2.html will be later
+ // used to fetch the exact alignment restrictions.
+ defaultAlignment = 4096
+ defaultBufSize = 4096
+)
+
+var (
+ errWriterInvalid = errors.New("the last flush resulted in an unaligned offset, the writer can no longer ensure contiguous writes")
+ errStatxNotSupported = errors.New("the statx syscall with STATX_DIOALIGN is not supported. At least Linux kernel 6.1 is needed")
+)
+
+// directIOWriter is a specialized bufio.Writer that supports Direct IO to a file
+// by ensuring all alignment restrictions are satisfied.
+// The writer can handle files whose initial offsets are not aligned.
+// Once Direct IO is in use, if an explicit call to Flush() results in an unaligned offset, the writer
+// should no longer be used, as it can no longer support contiguous writes.
+type directIOWriter struct {
+ buf []byte
+ n int
+
+ f *os.File
+ // offsetAlignmentGap represents the number of bytes needed to reach the nearest
+ // offset alignment on the file, making Direct IO possible.
+ offsetAlignmentGap int
+ alignmentRqmts *directIORqmts
+
+ err error
+ invalid bool
+}
+
+func newDirectIOWriter(f *os.File, size int) (*directIOWriter, error) {
+ alignmentRqmts, err := fileDirectIORqmts(f)
+ if err != nil {
+ return nil, err
+ }
+
+ if size <= 0 {
+ size = defaultBufSize
+ }
+ if size%alignmentRqmts.offsetAlign != 0 {
+ return nil, fmt.Errorf("size %d should be a multiple of %d", size, alignmentRqmts.offsetAlign)
+ }
+ gap, err := checkInitialUnalignedOffset(f, alignmentRqmts)
+ if err != nil {
+ return nil, err
+ }
+
+ return &directIOWriter{
+ buf: alignedBlock(size, alignmentRqmts),
+ f: f,
+ offsetAlignmentGap: gap,
+ alignmentRqmts: alignmentRqmts,
+ }, nil
+}
+
+func (b *directIOWriter) Available() int { return len(b.buf) - b.n }
+
+func (b *directIOWriter) Buffered() int { return b.n }
+
+// fillInitialOffsetGap writes the necessary bytes from the buffer without Direct IO
+// to fill offsetAlignmentGap and align the file offset, enabling Direct IO usage.
+// Once alignment is achieved, Direct IO is enabled.
+func (b *directIOWriter) fillInitialOffsetGap() {
+ if b.n == 0 || b.offsetAlignmentGap == 0 {
+ return
+ }
+
+ bytesToAlign := min(b.n, b.offsetAlignmentGap)
+ n, err := b.f.Write(b.buf[:bytesToAlign])
+ if n < bytesToAlign && err == nil {
+ err = io.ErrShortWrite
+ }
+ if n > 0 {
+ copy(b.buf[0:b.n-n], b.buf[n:b.n])
+ b.n -= n
+ }
+ // If the file offset was aligned, enable Direct IO.
+ b.offsetAlignmentGap -= n
+ if b.offsetAlignmentGap == 0 {
+ err = errors.Join(err, enableDirectIO(b.f.Fd()))
+ }
+ b.err = errors.Join(b.err, err)
+}
+
+func (b *directIOWriter) directIOWrite(p []byte, padding int) (int, error) {
+ relevant := len(p) - padding
+
+ n, err := b.f.Write(p)
+ switch {
+ case n < relevant:
+ relevant = n
+ if err == nil {
+ err = io.ErrShortWrite
+ }
+ case n > relevant:
+ // Adjust the offset to discard the padding that was written.
+ writtenPadding := int64(n - relevant)
+ _, err := b.f.Seek(-writtenPadding, io.SeekCurrent)
+ if err != nil {
+ b.err = errors.Join(b.err, fmt.Errorf("seek to discard written padding %d: %w", writtenPadding, err))
+ }
+ }
+
+ if relevant%b.alignmentRqmts.offsetAlign != 0 {
+ b.invalid = true
+ }
+ return relevant, err
+}
+
+// canDirectIOWrite returns true when all Direct IO alignment restrictions
+// are met for the p block to be written into the file.
+func (b *directIOWriter) canDirectIOWrite(p []byte) bool {
+ return isAligned(p, b.alignmentRqmts) && b.offsetAlignmentGap == 0
+}
+
+func (b *directIOWriter) Write(p []byte) (nn int, err error) {
+ if b.invalid {
+ return 0, errWriterInvalid
+ }
+
+ for len(p) > b.Available() && b.err == nil {
+ var n1, n2 int
+ if b.Buffered() == 0 && b.canDirectIOWrite(p) {
+ // Large write, empty buffer.
+ // To avoid copy, write from p via Direct IO as the block and the file
+ // offset are aligned.
+ n1, b.err = b.directIOWrite(p, 0)
+ } else {
+ n1 = copy(b.buf[b.n:], p)
+ b.n += n1
+ if b.offsetAlignmentGap != 0 {
+ b.fillInitialOffsetGap()
+ // Refill the buffer.
+ n2 = copy(b.buf[b.n:], p[n1:])
+ b.n += n2
+ }
+ if b.Available() == 0 {
+ // Avoid flushing in case the second refill wasn't complete.
+ b.err = errors.Join(b.err, b.flush())
+ }
+ }
+ nn += n1 + n2
+ p = p[n1+n2:]
+ }
+
+ if b.err != nil {
+ return nn, b.err
+ }
+
+ n := copy(b.buf[b.n:], p)
+ b.n += n
+ nn += n
+ return nn, nil
+}
+
+func (b *directIOWriter) flush() error {
+ if b.invalid {
+ return errWriterInvalid
+ }
+ if b.err != nil {
+ return b.err
+ }
+ if b.n == 0 {
+ return nil
+ }
+
+ // Ensure the segment length alignment restriction is met.
+ // If the buffer length isn't a multiple of offsetAlign, round
+ // it to the nearest upper multiple and add zero padding.
+ uOffset := b.n
+ if uOffset%b.alignmentRqmts.offsetAlign != 0 {
+ uOffset = ((uOffset / b.alignmentRqmts.offsetAlign) + 1) * b.alignmentRqmts.offsetAlign
+ for i := b.n; i < uOffset; i++ {
+ b.buf[i] = 0
+ }
+ }
+ n, err := b.directIOWrite(b.buf[:uOffset], uOffset-b.n)
+ if err != nil {
+ if n > 0 && n < b.n {
+ copy(b.buf[0:b.n-n], b.buf[n:b.n])
+ }
+ b.n -= n
+ b.err = errors.Join(b.err, err)
+ return err
+ }
+
+ b.n = 0
+ return nil
+}
+
+func (b *directIOWriter) Flush() error {
+ if b.offsetAlignmentGap != 0 {
+ b.fillInitialOffsetGap()
+ if b.err != nil {
+ return b.err
+ }
+ }
+ return b.flush()
+}
+
+func (b *directIOWriter) Reset(f *os.File) error {
+ alignmentRqmts, err := fileDirectIORqmts(f)
+ if err != nil {
+ return err
+ }
+ b.alignmentRqmts = alignmentRqmts
+
+ if b.buf == nil {
+ b.buf = alignedBlock(defaultBufSize, b.alignmentRqmts)
+ }
+ gap, err := checkInitialUnalignedOffset(f, b.alignmentRqmts)
+ if err != nil {
+ return err
+ }
+ b.offsetAlignmentGap = gap
+ b.err = nil
+ b.invalid = false
+ b.n = 0
+ b.f = f
+ return nil
+}
+
+func fileDirectIORqmts(f *os.File) (*directIORqmts, error) {
+ alignmentRqmts, err := fetchDirectIORqmts(f.Fd())
+ switch {
+ case errors.Is(err, errStatxNotSupported):
+ alignmentRqmts = defaultDirectIORqmts()
+ case err != nil:
+ return nil, err
+ }
+
+ if alignmentRqmts.memoryAlign == 0 || alignmentRqmts.offsetAlign == 0 {
+ // This may require some extra testing.
+ return nil, fmt.Errorf("zero alignment requirement is not supported %+v", alignmentRqmts)
+ }
+ return alignmentRqmts, nil
+}
+
+func alignmentOffset(block []byte, requiredAlignment int) int {
+ return computeAlignmentOffset(block, requiredAlignment)
+}
+
+func computeAlignmentOffset(block []byte, alignment int) int {
+ if alignment == 0 {
+ return 0
+ }
+ if len(block) == 0 {
+ panic("empty block not supported")
+ }
+ return int(uintptr(unsafe.Pointer(&block[0])) & uintptr(alignment-1))
+}
+
+// isAligned checks if the length of the block is a multiple of offsetAlign
+// and if its address is aligned with memoryAlign.
+func isAligned(block []byte, alignmentRqmts *directIORqmts) bool {
+ return alignmentOffset(block, alignmentRqmts.memoryAlign) == 0 && len(block)%alignmentRqmts.offsetAlign == 0
+}
+
+// alignedBlock returns a block whose address is alignment aligned.
+// The size should be a multiple of offsetAlign.
+func alignedBlock(size int, alignmentRqmts *directIORqmts) []byte {
+ if size == 0 || size%alignmentRqmts.offsetAlign != 0 {
+ panic(fmt.Errorf("size %d should be > 0 and a multiple of offsetAlign=%d", size, alignmentRqmts.offsetAlign))
+ }
+ if alignmentRqmts.memoryAlign == 0 {
+ return make([]byte, size)
+ }
+
+ block := make([]byte, size+alignmentRqmts.memoryAlign)
+ a := alignmentOffset(block, alignmentRqmts.memoryAlign)
+ if a == 0 {
+ return block[:size]
+ }
+
+ offset := alignmentRqmts.memoryAlign - a
+ block = block[offset : offset+size]
+ if !isAligned(block, alignmentRqmts) {
+ // Assuming this to be rare, if not impossible.
+ panic("cannot create an aligned block")
+ }
+ return block
+}
+
+func currentFileOffset(f *os.File) (int, error) {
+ curOff, err := f.Seek(0, io.SeekCurrent)
+ if err != nil {
+ return 0, fmt.Errorf("cannot get the current offset: %w", err)
+ }
+ return int(curOff), nil
+}
+
+func fileStatusFlags(fd uintptr) (int, error) {
+ flag, err := unix.FcntlInt(fd, unix.F_GETFL, 0)
+ if err != nil {
+ return 0, fmt.Errorf("cannot get file status flags: %w", err)
+ }
+ return flag, err
+}
+
+// enableDirectIO enables Direct IO on the file if needed.
+func enableDirectIO(fd uintptr) error {
+ flag, err := fileStatusFlags(fd)
+ if err != nil {
+ return err
+ }
+
+ if (flag & unix.O_DIRECT) == unix.O_DIRECT {
+ return nil
+ }
+
+ _, err = unix.FcntlInt(fd, unix.F_SETFL, flag|unix.O_DIRECT)
+ if err != nil {
+ return fmt.Errorf("cannot enable Direct IO: %w", err)
+ }
+ return nil
+}
+
+// checkInitialUnalignedOffset returns the gap between the current offset of the file
+// and the nearest aligned offset.
+// If the current offset is aligned, Direct IO is enabled on the file.
+func checkInitialUnalignedOffset(f *os.File, alignmentRqmts *directIORqmts) (int, error) {
+ offset, err := currentFileOffset(f)
+ if err != nil {
+ return 0, err
+ }
+ alignment := alignmentRqmts.offsetAlign
+ gap := (alignment - offset%alignment) % alignment
+ if gap == 0 {
+ if err := enableDirectIO(f.Fd()); err != nil {
+ return 0, err
+ }
+ }
+ return gap, nil
+}
+
+// directIORqmts holds the alignment requirements for direct I/O.
+// All fields are in bytes.
+type directIORqmts struct {
+ // The required alignment for memory buffers addresses.
+ memoryAlign int
+ // The required alignment for I/O segment lengths and file offsets.
+ offsetAlign int
+}
+
+func defaultDirectIORqmts() *directIORqmts {
+ return &directIORqmts{
+ memoryAlign: defaultAlignment,
+ offsetAlign: defaultAlignment,
+ }
+}
+
+// fetchDirectIORqmts retrieves direct I/O alignment requirements for a file descriptor using statx
+// when possible.
+func fetchDirectIORqmts(fd uintptr) (*directIORqmts, error) {
+ var stat unix.Statx_t
+ flags := unix.AT_SYMLINK_NOFOLLOW | unix.AT_EMPTY_PATH | unix.AT_STATX_DONT_SYNC
+ mask := unix.STATX_DIOALIGN
+
+ if err := unix.Statx(int(fd), "", flags, unix.STATX_DIOALIGN, &stat); err != nil {
+ if err == unix.ENOSYS {
+ return nil, errStatxNotSupported
+ }
+ return nil, fmt.Errorf("statx failed on fd %d: %w", fd, err)
+ }
+
+ if stat.Mask&uint32(mask) == 0 {
+ return nil, errStatxNotSupported
+ }
+
+ if stat.Dio_mem_align == 0 || stat.Dio_offset_align == 0 {
+ return nil, fmt.Errorf("%w: kernel may be old or the file may be on an unsupported FS", errDirectIOUnsupported)
+ }
+
+ return &directIORqmts{
+ memoryAlign: int(stat.Dio_mem_align),
+ offsetAlign: int(stat.Dio_offset_align),
+ }, nil
+}
diff --git a/tsdb/fileutil/direct_io_writer_test.go b/tsdb/fileutil/direct_io_writer_test.go
new file mode 100644
index 0000000000..31ea6fda6e
--- /dev/null
+++ b/tsdb/fileutil/direct_io_writer_test.go
@@ -0,0 +1,197 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build linux
+
+package fileutil
+
+import (
+ "io"
+ "os"
+ "path"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func directIORqmtsForTest(tb testing.TB) *directIORqmts {
+ f, err := os.OpenFile(path.Join(tb.TempDir(), "foo"), os.O_CREATE|os.O_WRONLY, 0o666)
+ require.NoError(tb, err)
+ alignmentRqmts, err := fetchDirectIORqmts(f.Fd())
+ require.NoError(tb, err)
+ return alignmentRqmts
+}
+
+func TestDirectIOFile(t *testing.T) {
+ tmpDir := t.TempDir()
+
+ f, err := os.OpenFile(path.Join(tmpDir, "test"), os.O_CREATE|os.O_WRONLY, 0o666)
+ require.NoError(t, err)
+
+ require.NoError(t, enableDirectIO(f.Fd()))
+}
+
+func TestAlignedBlockEarlyPanic(t *testing.T) {
+ alignRqmts := directIORqmtsForTest(t)
+ cases := []struct {
+ desc string
+ size int
+ }{
+ {"Zero size", 0},
+ {"Size not multiple of offset alignment", 9973},
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.desc, func(t *testing.T) {
+ require.Panics(t, func() {
+ alignedBlock(tc.size, alignRqmts)
+ })
+ })
+ }
+}
+
+func TestAlignedBloc(t *testing.T) {
+ alignRqmts := directIORqmtsForTest(t)
+ block := alignedBlock(5*alignRqmts.offsetAlign, alignRqmts)
+ require.True(t, isAligned(block, alignRqmts))
+ require.Len(t, block, 5*alignRqmts.offsetAlign)
+ require.False(t, isAligned(block[1:], alignRqmts))
+}
+
+func TestDirectIOWriter(t *testing.T) {
+ alignRqmts := directIORqmtsForTest(t)
+ cases := []struct {
+ name string
+ initialOffset int
+ bufferSize int
+ dataSize int
+ // writtenBytes should also consider needed zero padding.
+ writtenBytes int
+ shouldInvalidate bool
+ }{
+ {
+ name: "data equal to buffer",
+ bufferSize: 8 * alignRqmts.offsetAlign,
+ dataSize: 8 * alignRqmts.offsetAlign,
+ writtenBytes: 8 * alignRqmts.offsetAlign,
+ },
+ {
+ name: "data exceeds buffer",
+ bufferSize: 4 * alignRqmts.offsetAlign,
+ dataSize: 64 * alignRqmts.offsetAlign,
+ writtenBytes: 64 * alignRqmts.offsetAlign,
+ },
+ {
+ name: "data exceeds buffer + final offset unaligned",
+ bufferSize: 2 * alignRqmts.offsetAlign,
+ dataSize: 4*alignRqmts.offsetAlign + 33,
+ writtenBytes: 4*alignRqmts.offsetAlign + alignRqmts.offsetAlign,
+ shouldInvalidate: true,
+ },
+ {
+ name: "data smaller than buffer",
+ bufferSize: 8 * alignRqmts.offsetAlign,
+ dataSize: 3 * alignRqmts.offsetAlign,
+ writtenBytes: 3 * alignRqmts.offsetAlign,
+ },
+ {
+ name: "data smaller than buffer + final offset unaligned",
+ bufferSize: 4 * alignRqmts.offsetAlign,
+ dataSize: alignRqmts.offsetAlign + 70,
+ writtenBytes: alignRqmts.offsetAlign + alignRqmts.offsetAlign,
+ shouldInvalidate: true,
+ },
+ {
+ name: "offset aligned",
+ initialOffset: alignRqmts.offsetAlign,
+ bufferSize: 8 * alignRqmts.offsetAlign,
+ dataSize: alignRqmts.offsetAlign,
+ writtenBytes: alignRqmts.offsetAlign,
+ },
+ {
+ name: "initial offset unaligned + final offset unaligned",
+ initialOffset: 8,
+ bufferSize: 8 * alignRqmts.offsetAlign,
+ dataSize: 64 * alignRqmts.offsetAlign,
+ writtenBytes: 64*alignRqmts.offsetAlign + (alignRqmts.offsetAlign - 8),
+ shouldInvalidate: true,
+ },
+ {
+ name: "offset unaligned + final offset aligned",
+ initialOffset: 8,
+ bufferSize: 4 * alignRqmts.offsetAlign,
+ dataSize: 4*alignRqmts.offsetAlign + (alignRqmts.offsetAlign - 8),
+ writtenBytes: 4*alignRqmts.offsetAlign + (alignRqmts.offsetAlign - 8),
+ },
+ {
+ name: "empty data",
+ bufferSize: 4 * alignRqmts.offsetAlign,
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ fileName := path.Join(t.TempDir(), "test")
+
+ data := make([]byte, tc.dataSize)
+ for i := 0; i < len(data); i++ {
+ // Do not use 256 as it may be a divider of requiredAlignment. To avoid patterns.
+ data[i] = byte(i % 251)
+ }
+
+ f, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, 0o666)
+ require.NoError(t, err)
+
+ if tc.initialOffset != 0 {
+ _, err = f.Seek(int64(tc.initialOffset), io.SeekStart)
+ require.NoError(t, err)
+ }
+
+ w, err := newDirectIOWriter(f, tc.bufferSize)
+ require.NoError(t, err)
+
+ n, err := w.Write(data)
+ require.NoError(t, err)
+ require.Equal(t, tc.dataSize, n)
+ require.NoError(t, w.Flush())
+
+ // Check the file's final offset.
+ currOffset, err := currentFileOffset(f)
+ require.NoError(t, err)
+ require.Equal(t, tc.dataSize+tc.initialOffset, currOffset)
+
+ // Check the written data.
+ fileBytes, err := os.ReadFile(fileName)
+ require.NoError(t, err)
+ if tc.dataSize > 0 {
+ require.Len(t, fileBytes, tc.writtenBytes+tc.initialOffset)
+ require.Equal(t, data, fileBytes[tc.initialOffset:tc.dataSize+tc.initialOffset])
+ } else {
+ require.Empty(t, fileBytes)
+ }
+
+ // Check the writer state.
+ if tc.shouldInvalidate {
+ require.True(t, w.invalid)
+ require.Error(t, w.Flush())
+ _, err = w.Write([]byte{})
+ require.Error(t, err)
+ } else {
+ require.False(t, w.invalid)
+ require.NoError(t, w.Flush())
+ _, err = w.Write([]byte{})
+ require.NoError(t, err)
+ }
+ })
+ }
+}