mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-25 14:31:01 +02:00 
			
		
		
		
	* Introduce out-of-order TSDB support This implementation is based on this design doc: https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing This commit adds support to accept out-of-order ("OOO") sample into the TSDB up to a configurable time allowance. If OOO is enabled, overlapping querying are automatically enabled. Most of the additions have been borrowed from https://github.com/grafana/mimir-prometheus/ Here is the list ist of the original commits cherry picked from mimir-prometheus into this branch: - 4b2198d7ec47d50989b7c2df66b7b207c32f7f6e - 2836e5513f1bc591535a859f5d41154a75e7c6bc - 00b379c3a5b1ec3799699b6242f300a2b3ea30f0 - ff0dc757587cada63ca948d2d5eb00bf090d63e0 - a632c73352a7e39d60b445700beb47d691549c3e - c6f3d4ab339ab80bbbce74c9946237ced01f0509 - 5e8406a1d4a50d0052bbee83e28ca3b3371408aa - abde1e0ba128936b9eb0224ee1551e56216ebd4a - e70e7698897bb03860bee0467c733fa44e14c9bd - df59320886e03a555d379ac4b0b3130f661407e0 Co-authored-by: Jesus Vazquez <jesus.vazquez@grafana.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * gofumpt files Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Add license header to missing files Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix OOO tests due to existing chunk disk mapper implementation Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix truncate int overflow Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Add Sync method to the WAL and update tests Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * remove useless sync Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Update minOOOTime after truncating Head * Update minOOOTime after truncating Head Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix lint Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Add a unit test Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Load OutOfOrderTimeWindow only once per appender Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix OOO Head LabelValues and PostingsForMatchers Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix replay of OOO mmap chunks Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Remove unnecessary err check Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Prevent panic with ApplyConfig Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Run OOO compaction after restart if there is OOO data from WBL Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Apply Bartek's suggestions Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Refactor OOO compaction Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Address comments and TODOs - Added a comment explaining why we need the allow overlapping compaction toggle - Clarified TSDBConfig OutOfOrderTimeWindow doc - Added an owner to all the TODOs in the code Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Run go format Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix remaining review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix tests Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Change wbl reference when truncating ooo in TestHeadMinOOOTimeUpdate Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix TestWBLAndMmapReplay test failure on windows Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Address most of the feedback Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Refactor the block meta for out of order Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix windows error Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
		
			
				
	
	
		
			647 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			647 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2017 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package chunks
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"encoding/binary"
 | |
| 	"fmt"
 | |
| 	"hash"
 | |
| 	"hash/crc32"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"strconv"
 | |
| 
 | |
| 	"github.com/pkg/errors"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/tsdb/chunkenc"
 | |
| 	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
 | |
| 	"github.com/prometheus/prometheus/tsdb/fileutil"
 | |
| )
 | |
| 
 | |
| // Segment header fields constants.
 | |
| const (
 | |
| 	// MagicChunks is 4 bytes at the head of a series file.
 | |
| 	MagicChunks = 0x85BD40DD
 | |
| 	// MagicChunksSize is the size in bytes of MagicChunks.
 | |
| 	MagicChunksSize          = 4
 | |
| 	chunksFormatV1           = 1
 | |
| 	ChunksFormatVersionSize  = 1
 | |
| 	segmentHeaderPaddingSize = 3
 | |
| 	// SegmentHeaderSize defines the total size of the header part.
 | |
| 	SegmentHeaderSize = MagicChunksSize + ChunksFormatVersionSize + segmentHeaderPaddingSize
 | |
| )
 | |
| 
 | |
| // Chunk fields constants.
 | |
| const (
 | |
| 	// MaxChunkLengthFieldSize defines the maximum size of the data length part.
 | |
| 	MaxChunkLengthFieldSize = binary.MaxVarintLen32
 | |
| 	// ChunkEncodingSize defines the size of the chunk encoding part.
 | |
| 	ChunkEncodingSize = 1
 | |
| )
 | |
| 
 | |
| // ChunkRef is a generic reference for reading chunk data. In prometheus it
 | |
| // is either a HeadChunkRef or BlockChunkRef, though other implementations
 | |
| // may have their own reference types.
 | |
| type ChunkRef uint64
 | |
| 
 | |
| // HeadSeriesRef refers to in-memory series.
 | |
| type HeadSeriesRef uint64
 | |
| 
 | |
| // HeadChunkRef packs a HeadSeriesRef and a ChunkID into a global 8 Byte ID.
 | |
| // The HeadSeriesRef and ChunkID may not exceed 5 and 3 bytes respectively.
 | |
| type HeadChunkRef uint64
 | |
| 
 | |
| func NewHeadChunkRef(hsr HeadSeriesRef, chunkID HeadChunkID) HeadChunkRef {
 | |
| 	if hsr > (1<<40)-1 {
 | |
| 		panic("series ID exceeds 5 bytes")
 | |
| 	}
 | |
| 	if chunkID > (1<<24)-1 {
 | |
| 		panic("chunk ID exceeds 3 bytes")
 | |
| 	}
 | |
| 	return HeadChunkRef(uint64(hsr<<24) | uint64(chunkID))
 | |
| }
 | |
| 
 | |
| func (p HeadChunkRef) Unpack() (HeadSeriesRef, HeadChunkID) {
 | |
| 	return HeadSeriesRef(p >> 24), HeadChunkID(p<<40) >> 40
 | |
| }
 | |
| 
 | |
| // HeadChunkID refers to a specific chunk in a series (memSeries) in the Head.
 | |
| // Each memSeries has its own monotonically increasing number to refer to its chunks.
 | |
| // If the HeadChunkID value is...
 | |
| //   - memSeries.firstChunkID+len(memSeries.mmappedChunks), it's the head chunk.
 | |
| //   - less than the above, but >= memSeries.firstID, then it's
 | |
| //     memSeries.mmappedChunks[i] where i = HeadChunkID - memSeries.firstID.
 | |
| //
 | |
| // Example:
 | |
| // assume a memSeries.firstChunkID=7 and memSeries.mmappedChunks=[p5,p6,p7,p8,p9].
 | |
| // | HeadChunkID value | refers to ...                                                                          |
 | |
| // |-------------------|----------------------------------------------------------------------------------------|
 | |
| // |               0-6 | chunks that have been compacted to blocks, these won't return data for queries in Head |
 | |
| // |              7-11 | memSeries.mmappedChunks[i] where i is 0 to 4.                                          |
 | |
| // |                12 | memSeries.headChunk                                                                    |
 | |
| type HeadChunkID uint64
 | |
| 
 | |
| // BlockChunkRef refers to a chunk within a persisted block.
 | |
| // The upper 4 bytes are for the segment index and
 | |
| // the lower 4 bytes are for the segment offset where the data starts for this chunk.
 | |
| type BlockChunkRef uint64
 | |
| 
 | |
| // NewBlockChunkRef packs the file index and byte offset into a BlockChunkRef.
 | |
| func NewBlockChunkRef(fileIndex, fileOffset uint64) BlockChunkRef {
 | |
| 	return BlockChunkRef(fileIndex<<32 | fileOffset)
 | |
| }
 | |
| 
 | |
| func (b BlockChunkRef) Unpack() (int, int) {
 | |
| 	sgmIndex := int(b >> 32)
 | |
| 	chkStart := int((b << 32) >> 32)
 | |
| 	return sgmIndex, chkStart
 | |
| }
 | |
| 
 | |
| // Meta holds information about a chunk of data.
 | |
| type Meta struct {
 | |
| 	// Ref and Chunk hold either a reference that can be used to retrieve
 | |
| 	// chunk data or the data itself.
 | |
| 	// If Chunk is nil, call ChunkReader.Chunk(Meta.Ref) to get the chunk and assign it to the Chunk field
 | |
| 	Ref   ChunkRef
 | |
| 	Chunk chunkenc.Chunk
 | |
| 
 | |
| 	// Time range the data covers.
 | |
| 	// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
 | |
| 	MinTime, MaxTime int64
 | |
| 
 | |
| 	// OOOLastRef, OOOLastMinTime and OOOLastMaxTime are kept as markers for
 | |
| 	// overlapping chunks.
 | |
| 	// These fields point to the last created out of order Chunk (the head) that existed
 | |
| 	// when Series() was called and was overlapping.
 | |
| 	// Series() and Chunk() method responses should be consistent for the same
 | |
| 	// query even if new data is added in between the calls.
 | |
| 	OOOLastRef                     ChunkRef
 | |
| 	OOOLastMinTime, OOOLastMaxTime int64
 | |
| }
 | |
| 
 | |
| // Iterator iterates over the chunks of a single time series.
 | |
| type Iterator interface {
 | |
| 	// At returns the current meta.
 | |
| 	// It depends on implementation if the chunk is populated or not.
 | |
| 	At() Meta
 | |
| 	// Next advances the iterator by one.
 | |
| 	Next() bool
 | |
| 	// Err returns optional error if Next is false.
 | |
| 	Err() error
 | |
| }
 | |
| 
 | |
| // writeHash writes the chunk encoding and raw data into the provided hash.
 | |
| func (cm *Meta) writeHash(h hash.Hash, buf []byte) error {
 | |
| 	buf = append(buf[:0], byte(cm.Chunk.Encoding()))
 | |
| 	if _, err := h.Write(buf[:1]); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if _, err := h.Write(cm.Chunk.Bytes()); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // OverlapsClosedInterval Returns true if the chunk overlaps [mint, maxt].
 | |
| func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
 | |
| 	// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
 | |
| 	return cm.MinTime <= maxt && mint <= cm.MaxTime
 | |
| }
 | |
| 
 | |
| var errInvalidSize = fmt.Errorf("invalid size")
 | |
| 
 | |
| var castagnoliTable *crc32.Table
 | |
| 
 | |
| func init() {
 | |
| 	castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
 | |
| }
 | |
| 
 | |
| // newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
 | |
| // polynomial may be easily changed in one location at a later time, if necessary.
 | |
| func newCRC32() hash.Hash32 {
 | |
| 	return crc32.New(castagnoliTable)
 | |
| }
 | |
| 
 | |
| // Check if the CRC of data matches that stored in sum, computed when the chunk was stored.
 | |
| func checkCRC32(data, sum []byte) error {
 | |
| 	got := crc32.Checksum(data, castagnoliTable)
 | |
| 	// This combination of shifts is the inverse of digest.Sum() in go/src/hash/crc32.
 | |
| 	want := uint32(sum[0])<<24 + uint32(sum[1])<<16 + uint32(sum[2])<<8 + uint32(sum[3])
 | |
| 	if got != want {
 | |
| 		return errors.Errorf("checksum mismatch expected:%x, actual:%x", want, got)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Writer implements the ChunkWriter interface for the standard
 | |
| // serialization format.
 | |
| type Writer struct {
 | |
| 	dirFile *os.File
 | |
| 	files   []*os.File
 | |
| 	wbuf    *bufio.Writer
 | |
| 	n       int64
 | |
| 	crc32   hash.Hash
 | |
| 	buf     [binary.MaxVarintLen32]byte
 | |
| 
 | |
| 	segmentSize int64
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	// DefaultChunkSegmentSize is the default chunks segment size.
 | |
| 	DefaultChunkSegmentSize = 512 * 1024 * 1024
 | |
| )
 | |
| 
 | |
| // NewWriterWithSegSize returns a new writer against the given directory
 | |
| // and allows setting a custom size for the segments.
 | |
| func NewWriterWithSegSize(dir string, segmentSize int64) (*Writer, error) {
 | |
| 	return newWriter(dir, segmentSize)
 | |
| }
 | |
| 
 | |
| // NewWriter returns a new writer against the given directory
 | |
| // using the default segment size.
 | |
| func NewWriter(dir string) (*Writer, error) {
 | |
| 	return newWriter(dir, DefaultChunkSegmentSize)
 | |
| }
 | |
| 
 | |
| func newWriter(dir string, segmentSize int64) (*Writer, error) {
 | |
| 	if segmentSize <= 0 {
 | |
| 		segmentSize = DefaultChunkSegmentSize
 | |
| 	}
 | |
| 
 | |
| 	if err := os.MkdirAll(dir, 0o777); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	dirFile, err := fileutil.OpenDir(dir)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return &Writer{
 | |
| 		dirFile:     dirFile,
 | |
| 		n:           0,
 | |
| 		crc32:       newCRC32(),
 | |
| 		segmentSize: segmentSize,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (w *Writer) tail() *os.File {
 | |
| 	if len(w.files) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 	return w.files[len(w.files)-1]
 | |
| }
 | |
| 
 | |
| // finalizeTail writes all pending data to the current tail file,
 | |
| // truncates its size, and closes it.
 | |
| func (w *Writer) finalizeTail() error {
 | |
| 	tf := w.tail()
 | |
| 	if tf == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if err := w.wbuf.Flush(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := tf.Sync(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	// As the file was pre-allocated, we truncate any superfluous zero bytes.
 | |
| 	off, err := tf.Seek(0, io.SeekCurrent)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := tf.Truncate(off); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return tf.Close()
 | |
| }
 | |
| 
 | |
| func (w *Writer) cut() error {
 | |
| 	// Sync current tail to disk and close.
 | |
| 	if err := w.finalizeTail(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	n, f, _, err := cutSegmentFile(w.dirFile, MagicChunks, chunksFormatV1, w.segmentSize)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	w.n = int64(n)
 | |
| 
 | |
| 	w.files = append(w.files, f)
 | |
| 	if w.wbuf != nil {
 | |
| 		w.wbuf.Reset(f)
 | |
| 	} else {
 | |
| 		w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, allocSize int64) (headerSize int, newFile *os.File, seq int, returnErr error) {
 | |
| 	p, seq, err := nextSequenceFile(dirFile.Name())
 | |
| 	if err != nil {
 | |
| 		return 0, nil, 0, errors.Wrap(err, "next sequence file")
 | |
| 	}
 | |
| 	ptmp := p + ".tmp"
 | |
| 	f, err := os.OpenFile(ptmp, os.O_WRONLY|os.O_CREATE, 0o666)
 | |
| 	if err != nil {
 | |
| 		return 0, nil, 0, errors.Wrap(err, "open temp file")
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		if returnErr != nil {
 | |
| 			errs := tsdb_errors.NewMulti(returnErr)
 | |
| 			if f != nil {
 | |
| 				errs.Add(f.Close())
 | |
| 			}
 | |
| 			// Calling RemoveAll on a non-existent file does not return error.
 | |
| 			errs.Add(os.RemoveAll(ptmp))
 | |
| 			returnErr = errs.Err()
 | |
| 		}
 | |
| 	}()
 | |
| 	if allocSize > 0 {
 | |
| 		if err = fileutil.Preallocate(f, allocSize, true); err != nil {
 | |
| 			return 0, nil, 0, errors.Wrap(err, "preallocate")
 | |
| 		}
 | |
| 	}
 | |
| 	if err = dirFile.Sync(); err != nil {
 | |
| 		return 0, nil, 0, errors.Wrap(err, "sync directory")
 | |
| 	}
 | |
| 
 | |
| 	// Write header metadata for new file.
 | |
| 	metab := make([]byte, SegmentHeaderSize)
 | |
| 	binary.BigEndian.PutUint32(metab[:MagicChunksSize], magicNumber)
 | |
| 	metab[4] = chunksFormat
 | |
| 
 | |
| 	n, err := f.Write(metab)
 | |
| 	if err != nil {
 | |
| 		return 0, nil, 0, errors.Wrap(err, "write header")
 | |
| 	}
 | |
| 	if err := f.Close(); err != nil {
 | |
| 		return 0, nil, 0, errors.Wrap(err, "close temp file")
 | |
| 	}
 | |
| 	f = nil
 | |
| 
 | |
| 	if err := fileutil.Rename(ptmp, p); err != nil {
 | |
| 		return 0, nil, 0, errors.Wrap(err, "replace file")
 | |
| 	}
 | |
| 
 | |
| 	f, err = os.OpenFile(p, os.O_WRONLY, 0o666)
 | |
| 	if err != nil {
 | |
| 		return 0, nil, 0, errors.Wrap(err, "open final file")
 | |
| 	}
 | |
| 	// Skip header for further writes.
 | |
| 	if _, err := f.Seek(int64(n), 0); err != nil {
 | |
| 		return 0, nil, 0, errors.Wrap(err, "seek in final file")
 | |
| 	}
 | |
| 	return n, f, seq, nil
 | |
| }
 | |
| 
 | |
| func (w *Writer) write(b []byte) error {
 | |
| 	n, err := w.wbuf.Write(b)
 | |
| 	w.n += int64(n)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // WriteChunks writes as many chunks as possible to the current segment,
 | |
| // cuts a new segment when the current segment is full and
 | |
| // writes the rest of the chunks in the new segment.
 | |
| func (w *Writer) WriteChunks(chks ...Meta) error {
 | |
| 	var (
 | |
| 		batchSize  = int64(0)
 | |
| 		batchStart = 0
 | |
| 		batches    = make([][]Meta, 1)
 | |
| 		batchID    = 0
 | |
| 		firstBatch = true
 | |
| 	)
 | |
| 
 | |
| 	for i, chk := range chks {
 | |
| 		// Each chunk contains: data length + encoding + the data itself + crc32
 | |
| 		chkSize := int64(MaxChunkLengthFieldSize) // The data length is a variable length field so use the maximum possible value.
 | |
| 		chkSize += ChunkEncodingSize              // The chunk encoding.
 | |
| 		chkSize += int64(len(chk.Chunk.Bytes()))  // The data itself.
 | |
| 		chkSize += crc32.Size                     // The 4 bytes of crc32.
 | |
| 		batchSize += chkSize
 | |
| 
 | |
| 		// Cut a new batch when it is not the first chunk(to avoid empty segments) and
 | |
| 		// the batch is too large to fit in the current segment.
 | |
| 		cutNewBatch := (i != 0) && (batchSize+SegmentHeaderSize > w.segmentSize)
 | |
| 
 | |
| 		// When the segment already has some data than
 | |
| 		// the first batch size calculation should account for that.
 | |
| 		if firstBatch && w.n > SegmentHeaderSize {
 | |
| 			cutNewBatch = batchSize+w.n > w.segmentSize
 | |
| 			if cutNewBatch {
 | |
| 				firstBatch = false
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if cutNewBatch {
 | |
| 			batchStart = i
 | |
| 			batches = append(batches, []Meta{})
 | |
| 			batchID++
 | |
| 			batchSize = chkSize
 | |
| 		}
 | |
| 		batches[batchID] = chks[batchStart : i+1]
 | |
| 	}
 | |
| 
 | |
| 	// Create a new segment when one doesn't already exist.
 | |
| 	if w.n == 0 {
 | |
| 		if err := w.cut(); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for i, chks := range batches {
 | |
| 		if err := w.writeChunks(chks); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		// Cut a new segment only when there are more chunks to write.
 | |
| 		// Avoid creating a new empty segment at the end of the write.
 | |
| 		if i < len(batches)-1 {
 | |
| 			if err := w.cut(); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // writeChunks writes the chunks into the current segment irrespective
 | |
| // of the configured segment size limit. A segment should have been already
 | |
| // started before calling this.
 | |
| func (w *Writer) writeChunks(chks []Meta) error {
 | |
| 	if len(chks) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	seq := uint64(w.seq())
 | |
| 	for i := range chks {
 | |
| 		chk := &chks[i]
 | |
| 
 | |
| 		chk.Ref = ChunkRef(NewBlockChunkRef(seq, uint64(w.n)))
 | |
| 
 | |
| 		n := binary.PutUvarint(w.buf[:], uint64(len(chk.Chunk.Bytes())))
 | |
| 
 | |
| 		if err := w.write(w.buf[:n]); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		w.buf[0] = byte(chk.Chunk.Encoding())
 | |
| 		if err := w.write(w.buf[:1]); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if err := w.write(chk.Chunk.Bytes()); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		w.crc32.Reset()
 | |
| 		if err := chk.writeHash(w.crc32, w.buf[:]); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if err := w.write(w.crc32.Sum(w.buf[:0])); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (w *Writer) seq() int {
 | |
| 	return len(w.files) - 1
 | |
| }
 | |
| 
 | |
| func (w *Writer) Close() error {
 | |
| 	if err := w.finalizeTail(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// close dir file (if not windows platform will fail on rename)
 | |
| 	return w.dirFile.Close()
 | |
| }
 | |
| 
 | |
| // ByteSlice abstracts a byte slice.
 | |
| type ByteSlice interface {
 | |
| 	Len() int
 | |
| 	Range(start, end int) []byte
 | |
| }
 | |
| 
 | |
| type realByteSlice []byte
 | |
| 
 | |
| func (b realByteSlice) Len() int {
 | |
| 	return len(b)
 | |
| }
 | |
| 
 | |
| func (b realByteSlice) Range(start, end int) []byte {
 | |
| 	return b[start:end]
 | |
| }
 | |
| 
 | |
| // Reader implements a ChunkReader for a serialized byte stream
 | |
| // of series data.
 | |
| type Reader struct {
 | |
| 	// The underlying bytes holding the encoded series data.
 | |
| 	// Each slice holds the data for a different segment.
 | |
| 	bs   []ByteSlice
 | |
| 	cs   []io.Closer // Closers for resources behind the byte slices.
 | |
| 	size int64       // The total size of bytes in the reader.
 | |
| 	pool chunkenc.Pool
 | |
| }
 | |
| 
 | |
| func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
 | |
| 	cr := Reader{pool: pool, bs: bs, cs: cs}
 | |
| 	for i, b := range cr.bs {
 | |
| 		if b.Len() < SegmentHeaderSize {
 | |
| 			return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i)
 | |
| 		}
 | |
| 		// Verify magic number.
 | |
| 		if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks {
 | |
| 			return nil, errors.Errorf("invalid magic number %x", m)
 | |
| 		}
 | |
| 
 | |
| 		// Verify chunk format version.
 | |
| 		if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
 | |
| 			return nil, errors.Errorf("invalid chunk format version %d", v)
 | |
| 		}
 | |
| 		cr.size += int64(b.Len())
 | |
| 	}
 | |
| 	return &cr, nil
 | |
| }
 | |
| 
 | |
| // NewDirReader returns a new Reader against sequentially numbered files in the
 | |
| // given directory.
 | |
| func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
 | |
| 	files, err := sequenceFiles(dir)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if pool == nil {
 | |
| 		pool = chunkenc.NewPool()
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		bs []ByteSlice
 | |
| 		cs []io.Closer
 | |
| 	)
 | |
| 	for _, fn := range files {
 | |
| 		f, err := fileutil.OpenMmapFile(fn)
 | |
| 		if err != nil {
 | |
| 			return nil, tsdb_errors.NewMulti(
 | |
| 				errors.Wrap(err, "mmap files"),
 | |
| 				tsdb_errors.CloseAll(cs),
 | |
| 			).Err()
 | |
| 		}
 | |
| 		cs = append(cs, f)
 | |
| 		bs = append(bs, realByteSlice(f.Bytes()))
 | |
| 	}
 | |
| 
 | |
| 	reader, err := newReader(bs, cs, pool)
 | |
| 	if err != nil {
 | |
| 		return nil, tsdb_errors.NewMulti(
 | |
| 			err,
 | |
| 			tsdb_errors.CloseAll(cs),
 | |
| 		).Err()
 | |
| 	}
 | |
| 	return reader, nil
 | |
| }
 | |
| 
 | |
| func (s *Reader) Close() error {
 | |
| 	return tsdb_errors.CloseAll(s.cs)
 | |
| }
 | |
| 
 | |
| // Size returns the size of the chunks.
 | |
| func (s *Reader) Size() int64 {
 | |
| 	return s.size
 | |
| }
 | |
| 
 | |
| // Chunk returns a chunk from a given reference.
 | |
| func (s *Reader) Chunk(meta Meta) (chunkenc.Chunk, error) {
 | |
| 	sgmIndex, chkStart := BlockChunkRef(meta.Ref).Unpack()
 | |
| 
 | |
| 	if sgmIndex >= len(s.bs) {
 | |
| 		return nil, errors.Errorf("segment index %d out of range", sgmIndex)
 | |
| 	}
 | |
| 
 | |
| 	sgmBytes := s.bs[sgmIndex]
 | |
| 
 | |
| 	if chkStart+MaxChunkLengthFieldSize > sgmBytes.Len() {
 | |
| 		return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len())
 | |
| 	}
 | |
| 	// With the minimum chunk length this should never cause us reading
 | |
| 	// over the end of the slice.
 | |
| 	c := sgmBytes.Range(chkStart, chkStart+MaxChunkLengthFieldSize)
 | |
| 	chkDataLen, n := binary.Uvarint(c)
 | |
| 	if n <= 0 {
 | |
| 		return nil, errors.Errorf("reading chunk length failed with %d", n)
 | |
| 	}
 | |
| 
 | |
| 	chkEncStart := chkStart + n
 | |
| 	chkEnd := chkEncStart + ChunkEncodingSize + int(chkDataLen) + crc32.Size
 | |
| 	chkDataStart := chkEncStart + ChunkEncodingSize
 | |
| 	chkDataEnd := chkEnd - crc32.Size
 | |
| 
 | |
| 	if chkEnd > sgmBytes.Len() {
 | |
| 		return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len())
 | |
| 	}
 | |
| 
 | |
| 	sum := sgmBytes.Range(chkDataEnd, chkEnd)
 | |
| 	if err := checkCRC32(sgmBytes.Range(chkEncStart, chkDataEnd), sum); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	chkData := sgmBytes.Range(chkDataStart, chkDataEnd)
 | |
| 	chkEnc := sgmBytes.Range(chkEncStart, chkEncStart+ChunkEncodingSize)[0]
 | |
| 	return s.pool.Get(chunkenc.Encoding(chkEnc), chkData)
 | |
| }
 | |
| 
 | |
| func nextSequenceFile(dir string) (string, int, error) {
 | |
| 	files, err := os.ReadDir(dir)
 | |
| 	if err != nil {
 | |
| 		return "", 0, err
 | |
| 	}
 | |
| 
 | |
| 	i := uint64(0)
 | |
| 	for _, f := range files {
 | |
| 		j, err := strconv.ParseUint(f.Name(), 10, 64)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		// It is not necessary that we find the files in number order,
 | |
| 		// for example with '1000000' and '200000', '1000000' would come first.
 | |
| 		// Though this is a very very race case, we check anyway for the max id.
 | |
| 		if j > i {
 | |
| 			i = j
 | |
| 		}
 | |
| 	}
 | |
| 	return segmentFile(dir, int(i+1)), int(i + 1), nil
 | |
| }
 | |
| 
 | |
| func segmentFile(baseDir string, index int) string {
 | |
| 	return filepath.Join(baseDir, fmt.Sprintf("%0.6d", index))
 | |
| }
 | |
| 
 | |
| func sequenceFiles(dir string) ([]string, error) {
 | |
| 	files, err := os.ReadDir(dir)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var res []string
 | |
| 	for _, fi := range files {
 | |
| 		if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		res = append(res, filepath.Join(dir, fi.Name()))
 | |
| 	}
 | |
| 	return res, nil
 | |
| }
 |