mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-24 22:11:02 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			1114 lines
		
	
	
		
			35 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1114 lines
		
	
	
		
			35 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package chunks
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"bytes"
 | |
| 	"encoding/binary"
 | |
| 	"hash"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/dennwc/varint"
 | |
| 	"github.com/pkg/errors"
 | |
| 	"github.com/prometheus/client_golang/prometheus"
 | |
| 	"go.uber.org/atomic"
 | |
| 	"golang.org/x/exp/slices"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/tsdb/chunkenc"
 | |
| 	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
 | |
| 	"github.com/prometheus/prometheus/tsdb/fileutil"
 | |
| )
 | |
| 
 | |
| // Head chunk file header fields constants.
 | |
| const (
 | |
| 	// MagicHeadChunks is 4 bytes at the beginning of a head chunk file.
 | |
| 	MagicHeadChunks = 0x0130BC91
 | |
| 
 | |
| 	headChunksFormatV1 = 1
 | |
| )
 | |
| 
 | |
| // ErrChunkDiskMapperClosed returned by any method indicates
 | |
| // that the ChunkDiskMapper was closed.
 | |
| var ErrChunkDiskMapperClosed = errors.New("ChunkDiskMapper closed")
 | |
| 
 | |
| const (
 | |
| 	// MintMaxtSize is the size of the mint/maxt for head chunk file and chunks.
 | |
| 	MintMaxtSize = 8
 | |
| 	// SeriesRefSize is the size of series reference on disk.
 | |
| 	SeriesRefSize = 8
 | |
| 	// HeadChunkFileHeaderSize is the total size of the header for a head chunk file.
 | |
| 	HeadChunkFileHeaderSize = SegmentHeaderSize
 | |
| 	// MaxHeadChunkFileSize is the max size of a head chunk file.
 | |
| 	MaxHeadChunkFileSize = 128 * 1024 * 1024 // 128 MiB.
 | |
| 	// CRCSize is the size of crc32 sum on disk.
 | |
| 	CRCSize = 4
 | |
| 	// MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data.
 | |
| 	// Max because the uvarint size can be smaller.
 | |
| 	MaxHeadChunkMetaSize = SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + MaxChunkLengthFieldSize + CRCSize
 | |
| 	// MinWriteBufferSize is the minimum write buffer size allowed.
 | |
| 	MinWriteBufferSize = 64 * 1024 // 64KB.
 | |
| 	// MaxWriteBufferSize is the maximum write buffer size allowed.
 | |
| 	MaxWriteBufferSize = 8 * 1024 * 1024 // 8 MiB.
 | |
| 	// DefaultWriteBufferSize is the default write buffer size.
 | |
| 	DefaultWriteBufferSize = 4 * 1024 * 1024 // 4 MiB.
 | |
| 	// DefaultWriteQueueSize is the default size of the in-memory queue used before flushing chunks to the disk.
 | |
| 	// A value of 0 completely disables this feature.
 | |
| 	DefaultWriteQueueSize = 0
 | |
| )
 | |
| 
 | |
| // ChunkDiskMapperRef represents the location of a head chunk on disk.
 | |
| // The upper 4 bytes hold the index of the head chunk file and
 | |
| // the lower 4 bytes hold the byte offset in the head chunk file where the chunk starts.
 | |
| type ChunkDiskMapperRef uint64
 | |
| 
 | |
| func newChunkDiskMapperRef(seq, offset uint64) ChunkDiskMapperRef {
 | |
| 	return ChunkDiskMapperRef((seq << 32) | offset)
 | |
| }
 | |
| 
 | |
| func (ref ChunkDiskMapperRef) Unpack() (seq, offset int) {
 | |
| 	seq = int(ref >> 32)
 | |
| 	offset = int((ref << 32) >> 32)
 | |
| 	return seq, offset
 | |
| }
 | |
| 
 | |
| func (ref ChunkDiskMapperRef) GreaterThanOrEqualTo(r ChunkDiskMapperRef) bool {
 | |
| 	s1, o1 := ref.Unpack()
 | |
| 	s2, o2 := r.Unpack()
 | |
| 	return s1 > s2 || (s1 == s2 && o1 >= o2)
 | |
| }
 | |
| 
 | |
| func (ref ChunkDiskMapperRef) GreaterThan(r ChunkDiskMapperRef) bool {
 | |
| 	s1, o1 := ref.Unpack()
 | |
| 	s2, o2 := r.Unpack()
 | |
| 	return s1 > s2 || (s1 == s2 && o1 > o2)
 | |
| }
 | |
| 
 | |
| // CorruptionErr is an error that's returned when corruption is encountered.
 | |
| type CorruptionErr struct {
 | |
| 	Dir       string
 | |
| 	FileIndex int
 | |
| 	Err       error
 | |
| }
 | |
| 
 | |
| func (e *CorruptionErr) Error() string {
 | |
| 	return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error()
 | |
| }
 | |
| 
 | |
| // chunkPos keeps track of the position in the head chunk files.
 | |
| // chunkPos is not thread-safe, a lock must be used to protect it.
 | |
| type chunkPos struct {
 | |
| 	seq     uint64 // Index of chunk file.
 | |
| 	offset  uint64 // Offset within chunk file.
 | |
| 	cutFile bool   // When true then the next chunk will be written to a new file.
 | |
| }
 | |
| 
 | |
| // getNextChunkRef takes a chunk and returns the chunk reference which will refer to it once it has been written.
 | |
| // getNextChunkRef also decides whether a new file should be cut before writing this chunk, and it returns the decision via the second return value.
 | |
| // The order of calling getNextChunkRef must be the order in which chunks are written to the disk.
 | |
| func (f *chunkPos) getNextChunkRef(chk chunkenc.Chunk) (chkRef ChunkDiskMapperRef, cutFile bool) {
 | |
| 	chkLen := uint64(len(chk.Bytes()))
 | |
| 	bytesToWrite := f.bytesToWriteForChunk(chkLen)
 | |
| 
 | |
| 	if f.shouldCutNewFile(bytesToWrite) {
 | |
| 		f.toNewFile()
 | |
| 		f.cutFile = false
 | |
| 		cutFile = true
 | |
| 	}
 | |
| 
 | |
| 	chkOffset := f.offset
 | |
| 	f.offset += bytesToWrite
 | |
| 
 | |
| 	return newChunkDiskMapperRef(f.seq, chkOffset), cutFile
 | |
| }
 | |
| 
 | |
| // toNewFile updates the seq/offset position to point to the beginning of a new chunk file.
 | |
| func (f *chunkPos) toNewFile() {
 | |
| 	f.seq++
 | |
| 	f.offset = SegmentHeaderSize
 | |
| }
 | |
| 
 | |
| // cutFileOnNextChunk triggers that the next chunk will be written in to a new file.
 | |
| // Not thread safe, a lock must be held when calling this.
 | |
| func (f *chunkPos) cutFileOnNextChunk() {
 | |
| 	f.cutFile = true
 | |
| }
 | |
| 
 | |
| // setSeq sets the sequence number of the head chunk file.
 | |
| func (f *chunkPos) setSeq(seq uint64) {
 | |
| 	f.seq = seq
 | |
| }
 | |
| 
 | |
| // shouldCutNewFile returns whether a new file should be cut based on the file size.
 | |
| // Not thread safe, a lock must be held when calling this.
 | |
| func (f *chunkPos) shouldCutNewFile(bytesToWrite uint64) bool {
 | |
| 	if f.cutFile {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return f.offset == 0 || // First head chunk file.
 | |
| 		f.offset+bytesToWrite > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
 | |
| }
 | |
| 
 | |
| // bytesToWriteForChunk returns the number of bytes that will need to be written for the given chunk size,
 | |
| // including all meta data before and after the chunk data.
 | |
| // Head chunk format: https://github.com/prometheus/prometheus/blob/main/tsdb/docs/format/head_chunks.md#chunk
 | |
| func (f *chunkPos) bytesToWriteForChunk(chkLen uint64) uint64 {
 | |
| 	// Headers.
 | |
| 	bytes := uint64(SeriesRefSize) + 2*MintMaxtSize + ChunkEncodingSize
 | |
| 
 | |
| 	// Size of chunk length encoded as uvarint.
 | |
| 	bytes += uint64(varint.UvarintSize(chkLen))
 | |
| 
 | |
| 	// Chunk length.
 | |
| 	bytes += chkLen
 | |
| 
 | |
| 	// crc32.
 | |
| 	bytes += CRCSize
 | |
| 
 | |
| 	return bytes
 | |
| }
 | |
| 
 | |
| // ChunkDiskMapper is for writing the Head block chunks to the disk
 | |
| // and access chunks via mmapped file.
 | |
| type ChunkDiskMapper struct {
 | |
| 	/// Writer.
 | |
| 	dir             *os.File
 | |
| 	writeBufferSize int
 | |
| 
 | |
| 	curFile         *os.File      // File being written to.
 | |
| 	curFileSequence int           // Index of current open file being appended to. 0 if no file is active.
 | |
| 	curFileOffset   atomic.Uint64 // Bytes written in current open file.
 | |
| 	curFileMaxt     int64         // Used for the size retention.
 | |
| 
 | |
| 	// The values in evtlPos represent the file position which will eventually be
 | |
| 	// reached once the content of the write queue has been fully processed.
 | |
| 	evtlPosMtx sync.Mutex
 | |
| 	evtlPos    chunkPos
 | |
| 
 | |
| 	byteBuf      [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk.
 | |
| 	chkWriter    *bufio.Writer              // Writer for the current open file.
 | |
| 	crc32        hash.Hash
 | |
| 	writePathMtx sync.Mutex
 | |
| 
 | |
| 	/// Reader.
 | |
| 	// The int key in the map is the file number on the disk.
 | |
| 	mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index.
 | |
| 	closers           map[int]io.Closer         // Closers for resources behind the byte slices.
 | |
| 	readPathMtx       sync.RWMutex              // Mutex used to protect the above 2 maps.
 | |
| 	pool              chunkenc.Pool             // This is used when fetching a chunk from the disk to allocate a chunk.
 | |
| 
 | |
| 	// Writer and Reader.
 | |
| 	// We flush chunks to disk in batches. Hence, we store them in this buffer
 | |
| 	// from which chunks are served till they are flushed and are ready for m-mapping.
 | |
| 	chunkBuffer *chunkBuffer
 | |
| 
 | |
| 	// Whether the maxt field is set for all mmapped chunk files tracked within the mmappedChunkFiles map.
 | |
| 	// This is done after iterating through all the chunks in those files using the IterateAllChunks method.
 | |
| 	fileMaxtSet bool
 | |
| 
 | |
| 	writeQueue *chunkWriteQueue
 | |
| 
 | |
| 	closed bool
 | |
| }
 | |
| 
 | |
| // mmappedChunkFile provides mmapp access to an entire head chunks file that holds many chunks.
 | |
| type mmappedChunkFile struct {
 | |
| 	byteSlice ByteSlice
 | |
| 	maxt      int64 // Max timestamp among all of this file's chunks.
 | |
| }
 | |
| 
 | |
| // NewChunkDiskMapper returns a new ChunkDiskMapper against the given directory
 | |
| // using the default head chunk file duration.
 | |
| // NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
 | |
| // to set the maxt of all the file.
 | |
| func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Pool, writeBufferSize, writeQueueSize int) (*ChunkDiskMapper, error) {
 | |
| 	// Validate write buffer size.
 | |
| 	if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
 | |
| 		return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize)
 | |
| 	}
 | |
| 	if writeBufferSize%1024 != 0 {
 | |
| 		return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize)
 | |
| 	}
 | |
| 
 | |
| 	if err := os.MkdirAll(dir, 0o777); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	dirFile, err := fileutil.OpenDir(dir)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	m := &ChunkDiskMapper{
 | |
| 		dir:             dirFile,
 | |
| 		pool:            pool,
 | |
| 		writeBufferSize: writeBufferSize,
 | |
| 		crc32:           newCRC32(),
 | |
| 		chunkBuffer:     newChunkBuffer(),
 | |
| 	}
 | |
| 
 | |
| 	if writeQueueSize > 0 {
 | |
| 		m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, m.writeChunk)
 | |
| 	}
 | |
| 
 | |
| 	if m.pool == nil {
 | |
| 		m.pool = chunkenc.NewPool()
 | |
| 	}
 | |
| 
 | |
| 	return m, m.openMMapFiles()
 | |
| }
 | |
| 
 | |
| // Chunk encodings for out-of-order chunks.
 | |
| // These encodings must be only used by the Head block for its internal bookkeeping.
 | |
| const (
 | |
| 	OutOfOrderMask = uint8(0b10000000)
 | |
| )
 | |
| 
 | |
| func (cdm *ChunkDiskMapper) ApplyOutOfOrderMask(sourceEncoding chunkenc.Encoding) chunkenc.Encoding {
 | |
| 	enc := uint8(sourceEncoding) | OutOfOrderMask
 | |
| 	return chunkenc.Encoding(enc)
 | |
| }
 | |
| 
 | |
| func (cdm *ChunkDiskMapper) IsOutOfOrderChunk(e chunkenc.Encoding) bool {
 | |
| 	return (uint8(e) & OutOfOrderMask) != 0
 | |
| }
 | |
| 
 | |
| func (cdm *ChunkDiskMapper) RemoveMasks(sourceEncoding chunkenc.Encoding) chunkenc.Encoding {
 | |
| 	restored := uint8(sourceEncoding) & (^OutOfOrderMask)
 | |
| 	return chunkenc.Encoding(restored)
 | |
| }
 | |
| 
 | |
| // openMMapFiles opens all files within dir for mmapping.
 | |
| func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
 | |
| 	cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
 | |
| 	cdm.closers = map[int]io.Closer{}
 | |
| 	defer func() {
 | |
| 		if returnErr != nil {
 | |
| 			returnErr = tsdb_errors.NewMulti(returnErr, closeAllFromMap(cdm.closers)).Err()
 | |
| 
 | |
| 			cdm.mmappedChunkFiles = nil
 | |
| 			cdm.closers = nil
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	files, err := listChunkFiles(cdm.dir.Name())
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	files, err = repairLastChunkFile(files)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	chkFileIndices := make([]int, 0, len(files))
 | |
| 	for seq, fn := range files {
 | |
| 		f, err := fileutil.OpenMmapFile(fn)
 | |
| 		if err != nil {
 | |
| 			return errors.Wrapf(err, "mmap files, file: %s", fn)
 | |
| 		}
 | |
| 		cdm.closers[seq] = f
 | |
| 		cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())}
 | |
| 		chkFileIndices = append(chkFileIndices, seq)
 | |
| 	}
 | |
| 
 | |
| 	// Check for gaps in the files.
 | |
| 	slices.Sort(chkFileIndices)
 | |
| 	if len(chkFileIndices) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 	lastSeq := chkFileIndices[0]
 | |
| 	for _, seq := range chkFileIndices[1:] {
 | |
| 		if seq != lastSeq+1 {
 | |
| 			return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq)
 | |
| 		}
 | |
| 		lastSeq = seq
 | |
| 	}
 | |
| 
 | |
| 	for i, b := range cdm.mmappedChunkFiles {
 | |
| 		if b.byteSlice.Len() < HeadChunkFileHeaderSize {
 | |
| 			return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i])
 | |
| 		}
 | |
| 		// Verify magic number.
 | |
| 		if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks {
 | |
| 			return errors.Errorf("%s: invalid magic number %x", files[i], m)
 | |
| 		}
 | |
| 
 | |
| 		// Verify chunk format version.
 | |
| 		if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
 | |
| 			return errors.Errorf("%s: invalid chunk format version %d", files[i], v)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	cdm.evtlPos.setSeq(uint64(lastSeq))
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func listChunkFiles(dir string) (map[int]string, error) {
 | |
| 	files, err := os.ReadDir(dir)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	res := map[int]string{}
 | |
| 	for _, fi := range files {
 | |
| 		seq, err := strconv.ParseUint(fi.Name(), 10, 64)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		res[int(seq)] = filepath.Join(dir, fi.Name())
 | |
| 	}
 | |
| 
 | |
| 	return res, nil
 | |
| }
 | |
| 
 | |
| // repairLastChunkFile deletes the last file if it's empty.
 | |
| // Because we don't fsync when creating these files, we could end
 | |
| // up with an empty file at the end during an abrupt shutdown.
 | |
| func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr error) {
 | |
| 	lastFile := -1
 | |
| 	for seq := range files {
 | |
| 		if seq > lastFile {
 | |
| 			lastFile = seq
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if lastFile <= 0 {
 | |
| 		return files, nil
 | |
| 	}
 | |
| 
 | |
| 	f, err := os.Open(files[lastFile])
 | |
| 	if err != nil {
 | |
| 		return files, errors.Wrap(err, "open file during last head chunk file repair")
 | |
| 	}
 | |
| 
 | |
| 	buf := make([]byte, MagicChunksSize)
 | |
| 	size, err := f.Read(buf)
 | |
| 	if err != nil && err != io.EOF {
 | |
| 		return files, errors.Wrap(err, "failed to read magic number during last head chunk file repair")
 | |
| 	}
 | |
| 	if err := f.Close(); err != nil {
 | |
| 		return files, errors.Wrap(err, "close file during last head chunk file repair")
 | |
| 	}
 | |
| 
 | |
| 	// We either don't have enough bytes for the magic number or the magic number is 0.
 | |
| 	// NOTE: we should not check for wrong magic number here because that error
 | |
| 	// needs to be sent up the function called (already done elsewhere)
 | |
| 	// for proper repair mechanism to happen in the Head.
 | |
| 	if size < MagicChunksSize || binary.BigEndian.Uint32(buf) == 0 {
 | |
| 		// Corrupt file, hence remove it.
 | |
| 		if err := os.RemoveAll(files[lastFile]); err != nil {
 | |
| 			return files, errors.Wrap(err, "delete corrupted, empty head chunk file during last file repair")
 | |
| 		}
 | |
| 		delete(files, lastFile)
 | |
| 	}
 | |
| 
 | |
| 	return files, nil
 | |
| }
 | |
| 
 | |
| // WriteChunk writes the chunk to the disk.
 | |
| // The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
 | |
| func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, isOOO bool, callback func(err error)) (chkRef ChunkDiskMapperRef) {
 | |
| 	// cdm.evtlPosMtx must be held to serialize the calls to cdm.evtlPos.getNextChunkRef() and the writing of the chunk (either with or without queue).
 | |
| 	cdm.evtlPosMtx.Lock()
 | |
| 	defer cdm.evtlPosMtx.Unlock()
 | |
| 	ref, cutFile := cdm.evtlPos.getNextChunkRef(chk)
 | |
| 
 | |
| 	if cdm.writeQueue != nil {
 | |
| 		return cdm.writeChunkViaQueue(ref, isOOO, cutFile, seriesRef, mint, maxt, chk, callback)
 | |
| 	}
 | |
| 
 | |
| 	err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile)
 | |
| 	if callback != nil {
 | |
| 		callback(err)
 | |
| 	}
 | |
| 
 | |
| 	return ref
 | |
| }
 | |
| 
 | |
| func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, isOOO, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
 | |
| 	var err error
 | |
| 	if callback != nil {
 | |
| 		defer func() {
 | |
| 			if err != nil {
 | |
| 				callback(err)
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	err = cdm.writeQueue.addJob(chunkWriteJob{
 | |
| 		cutFile:   cutFile,
 | |
| 		seriesRef: seriesRef,
 | |
| 		mint:      mint,
 | |
| 		maxt:      maxt,
 | |
| 		chk:       chk,
 | |
| 		ref:       ref,
 | |
| 		isOOO:     isOOO,
 | |
| 		callback:  callback,
 | |
| 	})
 | |
| 
 | |
| 	return ref
 | |
| }
 | |
| 
 | |
| func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) (err error) {
 | |
| 	cdm.writePathMtx.Lock()
 | |
| 	defer cdm.writePathMtx.Unlock()
 | |
| 
 | |
| 	if cdm.closed {
 | |
| 		return ErrChunkDiskMapperClosed
 | |
| 	}
 | |
| 
 | |
| 	if cutFile {
 | |
| 		err := cdm.cutAndExpectRef(ref)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size;
 | |
| 	// so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer).
 | |
| 	if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) {
 | |
| 		if err := cdm.flushBuffer(); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	cdm.crc32.Reset()
 | |
| 	bytesWritten := 0
 | |
| 
 | |
| 	binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(seriesRef))
 | |
| 	bytesWritten += SeriesRefSize
 | |
| 	binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint))
 | |
| 	bytesWritten += MintMaxtSize
 | |
| 	binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt))
 | |
| 	bytesWritten += MintMaxtSize
 | |
| 	enc := chk.Encoding()
 | |
| 	if isOOO {
 | |
| 		enc = cdm.ApplyOutOfOrderMask(enc)
 | |
| 	}
 | |
| 	cdm.byteBuf[bytesWritten] = byte(enc)
 | |
| 	bytesWritten += ChunkEncodingSize
 | |
| 	n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes())))
 | |
| 	bytesWritten += n
 | |
| 
 | |
| 	if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := cdm.writeCRC32(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if maxt > cdm.curFileMaxt {
 | |
| 		cdm.curFileMaxt = maxt
 | |
| 	}
 | |
| 
 | |
| 	cdm.chunkBuffer.put(ref, chk)
 | |
| 
 | |
| 	if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize {
 | |
| 		// The chunk was bigger than the buffer itself.
 | |
| 		// Flushing to not keep partial chunks in buffer.
 | |
| 		if err := cdm.flushBuffer(); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // CutNewFile makes that a new file will be created the next time a chunk is written.
 | |
| func (cdm *ChunkDiskMapper) CutNewFile() {
 | |
| 	cdm.evtlPosMtx.Lock()
 | |
| 	defer cdm.evtlPosMtx.Unlock()
 | |
| 
 | |
| 	cdm.evtlPos.cutFileOnNextChunk()
 | |
| }
 | |
| 
 | |
| func (cdm *ChunkDiskMapper) IsQueueEmpty() bool {
 | |
| 	if cdm.writeQueue == nil {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return cdm.writeQueue.queueIsEmpty()
 | |
| }
 | |
| 
 | |
| // cutAndExpectRef creates a new m-mapped file.
 | |
| // The write lock should be held before calling this.
 | |
| // It ensures that the position in the new file matches the given chunk reference, if not then it errors.
 | |
| func (cdm *ChunkDiskMapper) cutAndExpectRef(chkRef ChunkDiskMapperRef) (err error) {
 | |
| 	seq, offset, err := cdm.cut()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if expSeq, expOffset := chkRef.Unpack(); seq != expSeq || offset != expOffset {
 | |
| 		return errors.Errorf("expected newly cut file to have sequence:offset %d:%d, got %d:%d", expSeq, expOffset, seq, offset)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // cut creates a new m-mapped file. The write lock should be held before calling this.
 | |
| // It returns the file sequence and the offset in that file to start writing chunks.
 | |
| func (cdm *ChunkDiskMapper) cut() (seq, offset int, returnErr error) {
 | |
| 	// Sync current tail to disk and close.
 | |
| 	if err := cdm.finalizeCurFile(); err != nil {
 | |
| 		return 0, 0, err
 | |
| 	}
 | |
| 
 | |
| 	offset, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize)
 | |
| 	if err != nil {
 | |
| 		return 0, 0, err
 | |
| 	}
 | |
| 
 | |
| 	defer func() {
 | |
| 		// The file should not be closed if there is no error,
 | |
| 		// its kept open in the ChunkDiskMapper.
 | |
| 		if returnErr != nil {
 | |
| 			returnErr = tsdb_errors.NewMulti(returnErr, newFile.Close()).Err()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	cdm.curFileOffset.Store(uint64(offset))
 | |
| 
 | |
| 	if cdm.curFile != nil {
 | |
| 		cdm.readPathMtx.Lock()
 | |
| 		cdm.mmappedChunkFiles[cdm.curFileSequence].maxt = cdm.curFileMaxt
 | |
| 		cdm.readPathMtx.Unlock()
 | |
| 	}
 | |
| 
 | |
| 	mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize)
 | |
| 	if err != nil {
 | |
| 		return 0, 0, err
 | |
| 	}
 | |
| 
 | |
| 	cdm.readPathMtx.Lock()
 | |
| 	cdm.curFileSequence = seq
 | |
| 	cdm.curFile = newFile
 | |
| 	if cdm.chkWriter != nil {
 | |
| 		cdm.chkWriter.Reset(newFile)
 | |
| 	} else {
 | |
| 		cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize)
 | |
| 	}
 | |
| 
 | |
| 	cdm.closers[cdm.curFileSequence] = mmapFile
 | |
| 	cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())}
 | |
| 	cdm.readPathMtx.Unlock()
 | |
| 
 | |
| 	cdm.curFileMaxt = 0
 | |
| 
 | |
| 	return seq, offset, nil
 | |
| }
 | |
| 
 | |
| // finalizeCurFile writes all pending data to the current tail file,
 | |
| // truncates its size, and closes it.
 | |
| func (cdm *ChunkDiskMapper) finalizeCurFile() error {
 | |
| 	if cdm.curFile == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if err := cdm.flushBuffer(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if err := cdm.curFile.Sync(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return cdm.curFile.Close()
 | |
| }
 | |
| 
 | |
| func (cdm *ChunkDiskMapper) write(b []byte) error {
 | |
| 	n, err := cdm.chkWriter.Write(b)
 | |
| 	cdm.curFileOffset.Add(uint64(n))
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (cdm *ChunkDiskMapper) writeAndAppendToCRC32(b []byte) error {
 | |
| 	if err := cdm.write(b); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	_, err := cdm.crc32.Write(b)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (cdm *ChunkDiskMapper) writeCRC32() error {
 | |
| 	return cdm.write(cdm.crc32.Sum(cdm.byteBuf[:0]))
 | |
| }
 | |
| 
 | |
| // flushBuffer flushes the current in-memory chunks.
 | |
| // Assumes that writePathMtx is _write_ locked before calling this method.
 | |
| func (cdm *ChunkDiskMapper) flushBuffer() error {
 | |
| 	if err := cdm.chkWriter.Flush(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	cdm.chunkBuffer.clear()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Chunk returns a chunk from a given reference.
 | |
| func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) {
 | |
| 	cdm.readPathMtx.RLock()
 | |
| 	// We hold this read lock for the entire duration because if Close()
 | |
| 	// is called, the data in the byte slice will get corrupted as the mmapped
 | |
| 	// file will be closed.
 | |
| 	defer cdm.readPathMtx.RUnlock()
 | |
| 
 | |
| 	if cdm.closed {
 | |
| 		return nil, ErrChunkDiskMapperClosed
 | |
| 	}
 | |
| 
 | |
| 	if cdm.writeQueue != nil {
 | |
| 		chunk := cdm.writeQueue.get(ref)
 | |
| 		if chunk != nil {
 | |
| 			return chunk, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	sgmIndex, chkStart := ref.Unpack()
 | |
| 	// We skip the series ref and the mint/maxt beforehand.
 | |
| 	chkStart += SeriesRefSize + (2 * MintMaxtSize)
 | |
| 	chkCRC32 := newCRC32()
 | |
| 
 | |
| 	// If it is the current open file, then the chunks can be in the buffer too.
 | |
| 	if sgmIndex == cdm.curFileSequence {
 | |
| 		chunk := cdm.chunkBuffer.get(ref)
 | |
| 		if chunk != nil {
 | |
| 			return chunk, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex]
 | |
| 	if !ok {
 | |
| 		if sgmIndex > cdm.curFileSequence {
 | |
| 			return nil, &CorruptionErr{
 | |
| 				Dir:       cdm.dir.Name(),
 | |
| 				FileIndex: -1,
 | |
| 				Err:       errors.Errorf("head chunk file index %d more than current open file", sgmIndex),
 | |
| 			}
 | |
| 		}
 | |
| 		return nil, &CorruptionErr{
 | |
| 			Dir:       cdm.dir.Name(),
 | |
| 			FileIndex: sgmIndex,
 | |
| 			Err:       errors.Errorf("head chunk file index %d does not exist on disk", sgmIndex),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() {
 | |
| 		return nil, &CorruptionErr{
 | |
| 			Dir:       cdm.dir.Name(),
 | |
| 			FileIndex: sgmIndex,
 | |
| 			Err:       errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Encoding.
 | |
| 	chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0]
 | |
| 	sourceChkEnc := chunkenc.Encoding(chkEnc)
 | |
| 	// Extract the encoding from the byte. ChunkDiskMapper uses only the last 7 bits for the encoding.
 | |
| 	chkEnc = byte(cdm.RemoveMasks(sourceChkEnc))
 | |
| 	// Data length.
 | |
| 	// With the minimum chunk length this should never cause us reading
 | |
| 	// over the end of the slice.
 | |
| 	chkDataLenStart := chkStart + ChunkEncodingSize
 | |
| 	c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize)
 | |
| 	chkDataLen, n := binary.Uvarint(c)
 | |
| 	if n <= 0 {
 | |
| 		return nil, &CorruptionErr{
 | |
| 			Dir:       cdm.dir.Name(),
 | |
| 			FileIndex: sgmIndex,
 | |
| 			Err:       errors.Errorf("reading chunk length failed with %d", n),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Verify the chunk data end.
 | |
| 	chkDataEnd := chkDataLenStart + n + int(chkDataLen)
 | |
| 	if chkDataEnd > mmapFile.byteSlice.Len() {
 | |
| 		return nil, &CorruptionErr{
 | |
| 			Dir:       cdm.dir.Name(),
 | |
| 			FileIndex: sgmIndex,
 | |
| 			Err:       errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Check the CRC.
 | |
| 	sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize)
 | |
| 	if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil {
 | |
| 		return nil, &CorruptionErr{
 | |
| 			Dir:       cdm.dir.Name(),
 | |
| 			FileIndex: sgmIndex,
 | |
| 			Err:       err,
 | |
| 		}
 | |
| 	}
 | |
| 	if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
 | |
| 		return nil, &CorruptionErr{
 | |
| 			Dir:       cdm.dir.Name(),
 | |
| 			FileIndex: sgmIndex,
 | |
| 			Err:       errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// The chunk data itself.
 | |
| 	chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd)
 | |
| 
 | |
| 	// Make a copy of the chunk data to prevent a panic occurring because the returned
 | |
| 	// chunk data slice references an mmap-ed file which could be closed after the
 | |
| 	// function returns but while the chunk is still in use.
 | |
| 	chkDataCopy := make([]byte, len(chkData))
 | |
| 	copy(chkDataCopy, chkData)
 | |
| 
 | |
| 	chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkDataCopy)
 | |
| 	if err != nil {
 | |
| 		return nil, &CorruptionErr{
 | |
| 			Dir:       cdm.dir.Name(),
 | |
| 			FileIndex: sgmIndex,
 | |
| 			Err:       err,
 | |
| 		}
 | |
| 	}
 | |
| 	return chk, nil
 | |
| }
 | |
| 
 | |
| // IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it
 | |
| // and runs the provided function with information about each chunk. It returns on the first error encountered.
 | |
| // NOTE: This method needs to be called at least once after creating ChunkDiskMapper
 | |
| // to set the maxt of all the file.
 | |
| func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error) (err error) {
 | |
| 	cdm.writePathMtx.Lock()
 | |
| 	defer cdm.writePathMtx.Unlock()
 | |
| 
 | |
| 	defer func() {
 | |
| 		cdm.fileMaxtSet = true
 | |
| 	}()
 | |
| 
 | |
| 	chkCRC32 := newCRC32()
 | |
| 
 | |
| 	// Iterate files in ascending order.
 | |
| 	segIDs := make([]int, 0, len(cdm.mmappedChunkFiles))
 | |
| 	for seg := range cdm.mmappedChunkFiles {
 | |
| 		segIDs = append(segIDs, seg)
 | |
| 	}
 | |
| 	slices.Sort(segIDs)
 | |
| 	for _, segID := range segIDs {
 | |
| 		mmapFile := cdm.mmappedChunkFiles[segID]
 | |
| 		fileEnd := mmapFile.byteSlice.Len()
 | |
| 		if segID == cdm.curFileSequence {
 | |
| 			fileEnd = int(cdm.curFileSize())
 | |
| 		}
 | |
| 		idx := HeadChunkFileHeaderSize
 | |
| 		for idx < fileEnd {
 | |
| 			if fileEnd-idx < MaxHeadChunkMetaSize {
 | |
| 				// Check for all 0s which marks the end of the file.
 | |
| 				allZeros := true
 | |
| 				for _, b := range mmapFile.byteSlice.Range(idx, fileEnd) {
 | |
| 					if b != byte(0) {
 | |
| 						allZeros = false
 | |
| 						break
 | |
| 					}
 | |
| 				}
 | |
| 				if allZeros {
 | |
| 					// End of segment chunk file content.
 | |
| 					break
 | |
| 				}
 | |
| 				return &CorruptionErr{
 | |
| 					Dir:       cdm.dir.Name(),
 | |
| 					FileIndex: segID,
 | |
| 					Err: errors.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+
 | |
| 						" - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID),
 | |
| 				}
 | |
| 			}
 | |
| 			chkCRC32.Reset()
 | |
| 			chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx))
 | |
| 
 | |
| 			startIdx := idx
 | |
| 			seriesRef := HeadSeriesRef(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize)))
 | |
| 			idx += SeriesRefSize
 | |
| 			mint := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize)))
 | |
| 			idx += MintMaxtSize
 | |
| 			maxt := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize)))
 | |
| 			idx += MintMaxtSize
 | |
| 
 | |
| 			// We preallocate file to help with m-mapping (especially windows systems).
 | |
| 			// As series ref always starts from 1, we assume it being 0 to be the end of the actual file data.
 | |
| 			// We are not considering possible file corruption that can cause it to be 0.
 | |
| 			// Additionally we are checking mint and maxt just to be sure.
 | |
| 			if seriesRef == 0 && mint == 0 && maxt == 0 {
 | |
| 				break
 | |
| 			}
 | |
| 
 | |
| 			chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0])
 | |
| 			idx += ChunkEncodingSize
 | |
| 			dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize))
 | |
| 			idx += n
 | |
| 
 | |
| 			numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2))
 | |
| 			idx += int(dataLen) // Skip the data.
 | |
| 
 | |
| 			// In the beginning we only checked for the chunk meta size.
 | |
| 			// Now that we have added the chunk data length, we check for sufficient bytes again.
 | |
| 			if idx+CRCSize > fileEnd {
 | |
| 				return &CorruptionErr{
 | |
| 					Dir:       cdm.dir.Name(),
 | |
| 					FileIndex: segID,
 | |
| 					Err:       errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID),
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			// Check CRC.
 | |
| 			sum := mmapFile.byteSlice.Range(idx, idx+CRCSize)
 | |
| 			if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
 | |
| 				return &CorruptionErr{
 | |
| 					Dir:       cdm.dir.Name(),
 | |
| 					FileIndex: segID,
 | |
| 					Err:       errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
 | |
| 				}
 | |
| 			}
 | |
| 			idx += CRCSize
 | |
| 
 | |
| 			if maxt > mmapFile.maxt {
 | |
| 				mmapFile.maxt = maxt
 | |
| 			}
 | |
| 			isOOO := cdm.IsOutOfOrderChunk(chkEnc)
 | |
| 			// Extract the encoding from the byte. ChunkDiskMapper uses only the last 7 bits for the encoding.
 | |
| 			chkEnc = cdm.RemoveMasks(chkEnc)
 | |
| 			if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc, isOOO); err != nil {
 | |
| 				if cerr, ok := err.(*CorruptionErr); ok {
 | |
| 					cerr.Dir = cdm.dir.Name()
 | |
| 					cerr.FileIndex = segID
 | |
| 					return cerr
 | |
| 				}
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if idx > fileEnd {
 | |
| 			// It should be equal to the slice length.
 | |
| 			return &CorruptionErr{
 | |
| 				Dir:       cdm.dir.Name(),
 | |
| 				FileIndex: segID,
 | |
| 				Err:       errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID),
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Truncate deletes the head chunk files whose file number is less than given fileNo.
 | |
| func (cdm *ChunkDiskMapper) Truncate(fileNo uint32) error {
 | |
| 	cdm.readPathMtx.RLock()
 | |
| 
 | |
| 	// Sort the file indices, else if files deletion fails in between,
 | |
| 	// it can lead to unsequential files as the map is not sorted.
 | |
| 	chkFileIndices := make([]int, 0, len(cdm.mmappedChunkFiles))
 | |
| 	for seq := range cdm.mmappedChunkFiles {
 | |
| 		chkFileIndices = append(chkFileIndices, seq)
 | |
| 	}
 | |
| 	slices.Sort(chkFileIndices)
 | |
| 
 | |
| 	var removedFiles []int
 | |
| 	for _, seq := range chkFileIndices {
 | |
| 		if seq == cdm.curFileSequence || uint32(seq) >= fileNo {
 | |
| 			break
 | |
| 		}
 | |
| 		removedFiles = append(removedFiles, seq)
 | |
| 	}
 | |
| 	cdm.readPathMtx.RUnlock()
 | |
| 
 | |
| 	errs := tsdb_errors.NewMulti()
 | |
| 	// Cut a new file only if the current file has some chunks.
 | |
| 	if cdm.curFileSize() > HeadChunkFileHeaderSize {
 | |
| 		// There is a known race condition here because between the check of curFileSize() and the call to CutNewFile()
 | |
| 		// a new file could already be cut, this is acceptable because it will simply result in an empty file which
 | |
| 		// won't do any harm.
 | |
| 		cdm.CutNewFile()
 | |
| 	}
 | |
| 	pendingDeletes, err := cdm.deleteFiles(removedFiles)
 | |
| 	errs.Add(err)
 | |
| 
 | |
| 	if len(chkFileIndices) == len(removedFiles) {
 | |
| 		// All files were deleted. Reset the current sequence.
 | |
| 		cdm.evtlPosMtx.Lock()
 | |
| 		if err == nil {
 | |
| 			cdm.evtlPos.setSeq(0)
 | |
| 		} else {
 | |
| 			// In case of error, set it to the last file number on the disk that was not deleted.
 | |
| 			cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1]))
 | |
| 		}
 | |
| 		cdm.evtlPosMtx.Unlock()
 | |
| 	}
 | |
| 
 | |
| 	return errs.Err()
 | |
| }
 | |
| 
 | |
| // deleteFiles deletes the given file sequences in order of the sequence.
 | |
| // In case of an error, it returns the sorted file sequences that were not deleted from the _disk_.
 | |
| func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) ([]int, error) {
 | |
| 	slices.Sort(removedFiles) // To delete them in order.
 | |
| 	cdm.readPathMtx.Lock()
 | |
| 	for _, seq := range removedFiles {
 | |
| 		if err := cdm.closers[seq].Close(); err != nil {
 | |
| 			cdm.readPathMtx.Unlock()
 | |
| 			return removedFiles, err
 | |
| 		}
 | |
| 		delete(cdm.mmappedChunkFiles, seq)
 | |
| 		delete(cdm.closers, seq)
 | |
| 	}
 | |
| 	cdm.readPathMtx.Unlock()
 | |
| 
 | |
| 	// We actually delete the files separately to not block the readPathMtx for long.
 | |
| 	for i, seq := range removedFiles {
 | |
| 		if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil {
 | |
| 			return removedFiles[i:], err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil, nil
 | |
| }
 | |
| 
 | |
| // DeleteCorrupted deletes all the head chunk files after the one which had the corruption
 | |
| // (including the corrupt file).
 | |
| func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error {
 | |
| 	err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped.
 | |
| 	cerr, ok := err.(*CorruptionErr)
 | |
| 	if !ok {
 | |
| 		return errors.Wrap(originalErr, "cannot handle error")
 | |
| 	}
 | |
| 
 | |
| 	// Delete all the head chunk files following the corrupt head chunk file.
 | |
| 	segs := []int{}
 | |
| 	cdm.readPathMtx.RLock()
 | |
| 	lastSeq := 0
 | |
| 	for seg := range cdm.mmappedChunkFiles {
 | |
| 		if seg >= cerr.FileIndex {
 | |
| 			segs = append(segs, seg)
 | |
| 		} else if seg > lastSeq {
 | |
| 			lastSeq = seg
 | |
| 		}
 | |
| 	}
 | |
| 	cdm.readPathMtx.RUnlock()
 | |
| 
 | |
| 	pendingDeletes, err := cdm.deleteFiles(segs)
 | |
| 	cdm.evtlPosMtx.Lock()
 | |
| 	if err == nil {
 | |
| 		cdm.evtlPos.setSeq(uint64(lastSeq))
 | |
| 	} else {
 | |
| 		// In case of error, set it to the last file number on the disk that was not deleted.
 | |
| 		cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1]))
 | |
| 	}
 | |
| 	cdm.evtlPosMtx.Unlock()
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Size returns the size of the chunk files.
 | |
| func (cdm *ChunkDiskMapper) Size() (int64, error) {
 | |
| 	return fileutil.DirSize(cdm.dir.Name())
 | |
| }
 | |
| 
 | |
| func (cdm *ChunkDiskMapper) curFileSize() uint64 {
 | |
| 	return cdm.curFileOffset.Load()
 | |
| }
 | |
| 
 | |
| // Close closes all the open files in ChunkDiskMapper.
 | |
| // It is not longer safe to access chunks from this struct after calling Close.
 | |
| func (cdm *ChunkDiskMapper) Close() error {
 | |
| 	// Locking the eventual position lock blocks WriteChunk()
 | |
| 	cdm.evtlPosMtx.Lock()
 | |
| 	defer cdm.evtlPosMtx.Unlock()
 | |
| 
 | |
| 	if cdm.writeQueue != nil {
 | |
| 		cdm.writeQueue.stop()
 | |
| 	}
 | |
| 
 | |
| 	// 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file.
 | |
| 	// The lock order should not be reversed here else it can cause deadlocks.
 | |
| 	cdm.writePathMtx.Lock()
 | |
| 	defer cdm.writePathMtx.Unlock()
 | |
| 	cdm.readPathMtx.Lock()
 | |
| 	defer cdm.readPathMtx.Unlock()
 | |
| 
 | |
| 	if cdm.closed {
 | |
| 		return nil
 | |
| 	}
 | |
| 	cdm.closed = true
 | |
| 
 | |
| 	errs := tsdb_errors.NewMulti(
 | |
| 		closeAllFromMap(cdm.closers),
 | |
| 		cdm.finalizeCurFile(),
 | |
| 		cdm.dir.Close(),
 | |
| 	)
 | |
| 	cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
 | |
| 	cdm.closers = map[int]io.Closer{}
 | |
| 
 | |
| 	return errs.Err()
 | |
| }
 | |
| 
 | |
| func closeAllFromMap(cs map[int]io.Closer) error {
 | |
| 	errs := tsdb_errors.NewMulti()
 | |
| 	for _, c := range cs {
 | |
| 		errs.Add(c.Close())
 | |
| 	}
 | |
| 	return errs.Err()
 | |
| }
 | |
| 
 | |
| const inBufferShards = 128 // 128 is a randomly chosen number.
 | |
| 
 | |
| // chunkBuffer is a thread safe lookup table for chunks by their ref.
 | |
| type chunkBuffer struct {
 | |
| 	inBufferChunks     [inBufferShards]map[ChunkDiskMapperRef]chunkenc.Chunk
 | |
| 	inBufferChunksMtxs [inBufferShards]sync.RWMutex
 | |
| }
 | |
| 
 | |
| func newChunkBuffer() *chunkBuffer {
 | |
| 	cb := &chunkBuffer{}
 | |
| 	for i := 0; i < inBufferShards; i++ {
 | |
| 		cb.inBufferChunks[i] = make(map[ChunkDiskMapperRef]chunkenc.Chunk)
 | |
| 	}
 | |
| 	return cb
 | |
| }
 | |
| 
 | |
| func (cb *chunkBuffer) put(ref ChunkDiskMapperRef, chk chunkenc.Chunk) {
 | |
| 	shardIdx := ref % inBufferShards
 | |
| 
 | |
| 	cb.inBufferChunksMtxs[shardIdx].Lock()
 | |
| 	cb.inBufferChunks[shardIdx][ref] = chk
 | |
| 	cb.inBufferChunksMtxs[shardIdx].Unlock()
 | |
| }
 | |
| 
 | |
| func (cb *chunkBuffer) get(ref ChunkDiskMapperRef) chunkenc.Chunk {
 | |
| 	shardIdx := ref % inBufferShards
 | |
| 
 | |
| 	cb.inBufferChunksMtxs[shardIdx].RLock()
 | |
| 	defer cb.inBufferChunksMtxs[shardIdx].RUnlock()
 | |
| 
 | |
| 	return cb.inBufferChunks[shardIdx][ref]
 | |
| }
 | |
| 
 | |
| func (cb *chunkBuffer) clear() {
 | |
| 	for i := 0; i < inBufferShards; i++ {
 | |
| 		cb.inBufferChunksMtxs[i].Lock()
 | |
| 		cb.inBufferChunks[i] = make(map[ChunkDiskMapperRef]chunkenc.Chunk)
 | |
| 		cb.inBufferChunksMtxs[i].Unlock()
 | |
| 	}
 | |
| }
 |