mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-31 00:11:23 +01:00 
			
		
		
		
	* Fix unsequential m-map files Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
		
			
				
	
	
		
			374 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			374 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package chunks
 | |
| 
 | |
| import (
 | |
| 	"encoding/binary"
 | |
| 	"io/ioutil"
 | |
| 	"math/rand"
 | |
| 	"os"
 | |
| 	"testing"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/tsdb/chunkenc"
 | |
| 	"github.com/prometheus/prometheus/util/testutil"
 | |
| )
 | |
| 
 | |
| func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
 | |
| 	hrw, close := testHeadReadWriter(t)
 | |
| 	defer func() {
 | |
| 		testutil.Ok(t, hrw.Close())
 | |
| 		close()
 | |
| 	}()
 | |
| 
 | |
| 	expectedBytes := []byte{}
 | |
| 	nextChunkOffset := uint64(HeadChunkFileHeaderSize)
 | |
| 	chkCRC32 := newCRC32()
 | |
| 
 | |
| 	type expectedDataType struct {
 | |
| 		seriesRef, chunkRef uint64
 | |
| 		mint, maxt          int64
 | |
| 		numSamples          uint16
 | |
| 		chunk               chunkenc.Chunk
 | |
| 	}
 | |
| 	expectedData := []expectedDataType{}
 | |
| 
 | |
| 	var buf [MaxHeadChunkMetaSize]byte
 | |
| 	totalChunks := 0
 | |
| 	var firstFileName string
 | |
| 	for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 {
 | |
| 		addChunks := func(numChunks int) {
 | |
| 			for i := 0; i < numChunks; i++ {
 | |
| 				seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw)
 | |
| 				totalChunks++
 | |
| 				expectedData = append(expectedData, expectedDataType{
 | |
| 					seriesRef:  seriesRef,
 | |
| 					mint:       mint,
 | |
| 					maxt:       maxt,
 | |
| 					chunkRef:   chkRef,
 | |
| 					chunk:      chunk,
 | |
| 					numSamples: uint16(chunk.NumSamples()),
 | |
| 				})
 | |
| 
 | |
| 				if hrw.curFileSequence != 1 {
 | |
| 					// We are checking for bytes written only for the first file.
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				// Calculating expected bytes written on disk for first file.
 | |
| 				firstFileName = hrw.curFile.Name()
 | |
| 				testutil.Equals(t, chunkRef(1, nextChunkOffset), chkRef)
 | |
| 
 | |
| 				bytesWritten := 0
 | |
| 				chkCRC32.Reset()
 | |
| 
 | |
| 				binary.BigEndian.PutUint64(buf[bytesWritten:], seriesRef)
 | |
| 				bytesWritten += SeriesRefSize
 | |
| 				binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint))
 | |
| 				bytesWritten += MintMaxtSize
 | |
| 				binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
 | |
| 				bytesWritten += MintMaxtSize
 | |
| 				buf[bytesWritten] = byte(chunk.Encoding())
 | |
| 				bytesWritten += ChunkEncodingSize
 | |
| 				n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
 | |
| 				bytesWritten += n
 | |
| 
 | |
| 				expectedBytes = append(expectedBytes, buf[:bytesWritten]...)
 | |
| 				_, err := chkCRC32.Write(buf[:bytesWritten])
 | |
| 				testutil.Ok(t, err)
 | |
| 				expectedBytes = append(expectedBytes, chunk.Bytes()...)
 | |
| 				_, err = chkCRC32.Write(chunk.Bytes())
 | |
| 				testutil.Ok(t, err)
 | |
| 
 | |
| 				expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...)
 | |
| 
 | |
| 				// += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC.
 | |
| 				nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize
 | |
| 			}
 | |
| 		}
 | |
| 		addChunks(100)
 | |
| 		hrw.CutNewFile()
 | |
| 		addChunks(10) // For chunks in in-memory buffer.
 | |
| 	}
 | |
| 
 | |
| 	// Checking on-disk bytes for the first file.
 | |
| 	testutil.Assert(t, len(hrw.mmappedChunkFiles) == 3 && len(hrw.closers) == 3, "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles))
 | |
| 
 | |
| 	actualBytes, err := ioutil.ReadFile(firstFileName)
 | |
| 	testutil.Ok(t, err)
 | |
| 
 | |
| 	// Check header of the segment file.
 | |
| 	testutil.Equals(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize])))
 | |
| 	testutil.Equals(t, chunksFormatV1, int(actualBytes[MagicChunksSize]))
 | |
| 
 | |
| 	// Remaining chunk data.
 | |
| 	fileEnd := HeadChunkFileHeaderSize + len(expectedBytes)
 | |
| 	testutil.Equals(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd])
 | |
| 
 | |
| 	// Test for the next chunk header to be all 0s. That marks the end of the file.
 | |
| 	for _, b := range actualBytes[fileEnd : fileEnd+MaxHeadChunkMetaSize] {
 | |
| 		testutil.Equals(t, byte(0), b)
 | |
| 	}
 | |
| 
 | |
| 	// Testing reading of chunks.
 | |
| 	for _, exp := range expectedData {
 | |
| 		actChunk, err := hrw.Chunk(exp.chunkRef)
 | |
| 		testutil.Ok(t, err)
 | |
| 		testutil.Equals(t, exp.chunk.Bytes(), actChunk.Bytes())
 | |
| 	}
 | |
| 
 | |
| 	// Testing IterateAllChunks method.
 | |
| 	dir := hrw.dir.Name()
 | |
| 	testutil.Ok(t, hrw.Close())
 | |
| 	hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
 | |
| 	testutil.Ok(t, err)
 | |
| 
 | |
| 	idx := 0
 | |
| 	err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error {
 | |
| 		t.Helper()
 | |
| 
 | |
| 		expData := expectedData[idx]
 | |
| 		testutil.Equals(t, expData.seriesRef, seriesRef)
 | |
| 		testutil.Equals(t, expData.chunkRef, chunkRef)
 | |
| 		testutil.Equals(t, expData.maxt, maxt)
 | |
| 		testutil.Equals(t, expData.maxt, maxt)
 | |
| 		testutil.Equals(t, expData.numSamples, numSamples)
 | |
| 
 | |
| 		actChunk, err := hrw.Chunk(expData.chunkRef)
 | |
| 		testutil.Ok(t, err)
 | |
| 		testutil.Equals(t, expData.chunk.Bytes(), actChunk.Bytes())
 | |
| 
 | |
| 		idx++
 | |
| 		return nil
 | |
| 	})
 | |
| 	testutil.Ok(t, err)
 | |
| 	testutil.Equals(t, len(expectedData), idx)
 | |
| 
 | |
| }
 | |
| 
 | |
| // TestHeadReadWriter_Truncate tests
 | |
| // * If truncation is happening properly based on the time passed.
 | |
| // * The active file is not deleted even if the passed time makes it eligible to be deleted.
 | |
| // * Empty current file does not lead to creation of another file after truncation.
 | |
| // * Non-empty current file leads to creation of another file after truncation.
 | |
| func TestHeadReadWriter_Truncate(t *testing.T) {
 | |
| 	hrw, close := testHeadReadWriter(t)
 | |
| 	defer func() {
 | |
| 		testutil.Ok(t, hrw.Close())
 | |
| 		close()
 | |
| 	}()
 | |
| 
 | |
| 	timeRange := 0
 | |
| 	fileTimeStep := 100
 | |
| 	totalFiles := 7
 | |
| 	startIndexAfter1stTruncation, startIndexAfter2ndTruncation := 3, 6
 | |
| 	filesDeletedAfter1stTruncation, filesDeletedAfter2ndTruncation := 2, 5
 | |
| 	var timeToTruncate, timeToTruncateAfterRestart int64
 | |
| 
 | |
| 	addChunk := func() int {
 | |
| 		mint := timeRange + 1                // Just after the the new file cut.
 | |
| 		maxt := timeRange + fileTimeStep - 1 // Just before the next file.
 | |
| 
 | |
| 		// Write a chunks to set maxt for the segment.
 | |
| 		_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
 | |
| 		testutil.Ok(t, err)
 | |
| 
 | |
| 		timeRange += fileTimeStep
 | |
| 
 | |
| 		return mint
 | |
| 	}
 | |
| 
 | |
| 	cutFile := func(i int) {
 | |
| 		testutil.Ok(t, hrw.CutNewFile())
 | |
| 
 | |
| 		mint := addChunk()
 | |
| 
 | |
| 		if i == startIndexAfter1stTruncation {
 | |
| 			timeToTruncate = int64(mint)
 | |
| 		} else if i == startIndexAfter2ndTruncation {
 | |
| 			timeToTruncateAfterRestart = int64(mint)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Cut segments.
 | |
| 	for i := 1; i <= totalFiles; i++ {
 | |
| 		cutFile(i)
 | |
| 	}
 | |
| 
 | |
| 	// Verifying the files.
 | |
| 	verifyFiles := func(remainingFiles, startIndex int) {
 | |
| 		t.Helper()
 | |
| 
 | |
| 		files, err := ioutil.ReadDir(hrw.dir.Name())
 | |
| 		testutil.Ok(t, err)
 | |
| 		testutil.Equals(t, remainingFiles, len(files), "files on disk")
 | |
| 		testutil.Equals(t, remainingFiles, len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
 | |
| 		testutil.Equals(t, remainingFiles, len(hrw.closers), "closers")
 | |
| 
 | |
| 		for i := 1; i <= totalFiles; i++ {
 | |
| 			_, ok := hrw.mmappedChunkFiles[i]
 | |
| 			if i < startIndex {
 | |
| 				testutil.Equals(t, false, ok)
 | |
| 			} else {
 | |
| 				testutil.Equals(t, true, ok)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Verify the number of segments.
 | |
| 	verifyFiles(totalFiles, 1)
 | |
| 
 | |
| 	// Truncating files.
 | |
| 	testutil.Ok(t, hrw.Truncate(timeToTruncate))
 | |
| 	totalFiles++ // Truncation creates a new file as the last file is not empty.
 | |
| 	verifyFiles(totalFiles-filesDeletedAfter1stTruncation, startIndexAfter1stTruncation)
 | |
| 	addChunk() // Add a chunk so that new file is not truncated.
 | |
| 
 | |
| 	dir := hrw.dir.Name()
 | |
| 	testutil.Ok(t, hrw.Close())
 | |
| 
 | |
| 	// Restarted.
 | |
| 	var err error
 | |
| 	hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
 | |
| 	testutil.Ok(t, err)
 | |
| 
 | |
| 	testutil.Assert(t, !hrw.fileMaxtSet, "")
 | |
| 	testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
 | |
| 	testutil.Assert(t, hrw.fileMaxtSet, "")
 | |
| 
 | |
| 	// Truncating files after restart. As the last file was empty, this creates no new files.
 | |
| 	testutil.Ok(t, hrw.Truncate(timeToTruncateAfterRestart))
 | |
| 	verifyFiles(totalFiles-filesDeletedAfter2ndTruncation, startIndexAfter2ndTruncation)
 | |
| 
 | |
| 	// First chunk after restart creates a new file.
 | |
| 	addChunk()
 | |
| 	totalFiles++
 | |
| 
 | |
| 	// Truncating till current time should not delete the current active file.
 | |
| 	testutil.Ok(t, hrw.Truncate(int64(timeRange+fileTimeStep)))
 | |
| 	verifyFiles(2, totalFiles) // One file is the active file and one was newly created.
 | |
| }
 | |
| 
 | |
| // TestHeadReadWriter_Truncate_NoUnsequentialFiles tests
 | |
| // that truncation leaves no unsequential files on disk, mainly under the following case
 | |
| // * There is an empty file in between the sequence while the truncation
 | |
| //   deletes files only upto a sequence before that (i.e. stops deleting
 | |
| //   after it has found a file that is not deletable).
 | |
| // This tests https://github.com/prometheus/prometheus/issues/7412 where
 | |
| // the truncation used to check all the files for deletion and end up
 | |
| // deleting empty files in between and breaking the sequence.
 | |
| func TestHeadReadWriter_Truncate_NoUnsequentialFiles(t *testing.T) {
 | |
| 	hrw, close := testHeadReadWriter(t)
 | |
| 	defer func() {
 | |
| 		testutil.Ok(t, hrw.Close())
 | |
| 		close()
 | |
| 	}()
 | |
| 
 | |
| 	timeRange := 0
 | |
| 	addChunk := func() {
 | |
| 		step := 100
 | |
| 		mint, maxt := timeRange+1, timeRange+step-1
 | |
| 		_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
 | |
| 		testutil.Ok(t, err)
 | |
| 		timeRange += step
 | |
| 	}
 | |
| 	emptyFile := func() {
 | |
| 		testutil.Ok(t, hrw.CutNewFile())
 | |
| 	}
 | |
| 	nonEmptyFile := func() {
 | |
| 		emptyFile()
 | |
| 		addChunk()
 | |
| 	}
 | |
| 
 | |
| 	addChunk()     // 1. Created with the first chunk.
 | |
| 	nonEmptyFile() // 2.
 | |
| 	nonEmptyFile() // 3.
 | |
| 	emptyFile()    // 4.
 | |
| 	nonEmptyFile() // 5.
 | |
| 	emptyFile()    // 6.
 | |
| 
 | |
| 	// Verifying the files.
 | |
| 	verifyFiles := func(remainingFiles []int) {
 | |
| 		t.Helper()
 | |
| 
 | |
| 		files, err := ioutil.ReadDir(hrw.dir.Name())
 | |
| 		testutil.Ok(t, err)
 | |
| 		testutil.Equals(t, len(remainingFiles), len(files), "files on disk")
 | |
| 		testutil.Equals(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
 | |
| 		testutil.Equals(t, len(remainingFiles), len(hrw.closers), "closers")
 | |
| 
 | |
| 		for _, i := range remainingFiles {
 | |
| 			_, ok := hrw.mmappedChunkFiles[i]
 | |
| 			testutil.Equals(t, true, ok)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	verifyFiles([]int{1, 2, 3, 4, 5, 6})
 | |
| 
 | |
| 	// Truncating files till 2. It should not delete anything after 3 (inclusive)
 | |
| 	// though files 4 and 6 are empty.
 | |
| 	file2Maxt := hrw.mmappedChunkFiles[2].maxt
 | |
| 	testutil.Ok(t, hrw.Truncate(file2Maxt+1))
 | |
| 	// As 6 was empty, it should not create another file.
 | |
| 	verifyFiles([]int{3, 4, 5, 6})
 | |
| 
 | |
| 	addChunk()
 | |
| 	// Truncate creates another file as 6 is not empty now.
 | |
| 	testutil.Ok(t, hrw.Truncate(file2Maxt+1))
 | |
| 	verifyFiles([]int{3, 4, 5, 6, 7})
 | |
| 
 | |
| 	dir := hrw.dir.Name()
 | |
| 	testutil.Ok(t, hrw.Close())
 | |
| 
 | |
| 	// Restarting checks for unsequential files.
 | |
| 	var err error
 | |
| 	hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
 | |
| 	testutil.Ok(t, err)
 | |
| 	verifyFiles([]int{3, 4, 5, 6, 7})
 | |
| }
 | |
| 
 | |
| func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) {
 | |
| 	tmpdir, err := ioutil.TempDir("", "data")
 | |
| 	testutil.Ok(t, err)
 | |
| 	hrw, err = NewChunkDiskMapper(tmpdir, chunkenc.NewPool())
 | |
| 	testutil.Ok(t, err)
 | |
| 	testutil.Assert(t, !hrw.fileMaxtSet, "")
 | |
| 	testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
 | |
| 	testutil.Assert(t, hrw.fileMaxtSet, "")
 | |
| 	return hrw, func() {
 | |
| 		testutil.Ok(t, os.RemoveAll(tmpdir))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func randomChunk(t *testing.T) chunkenc.Chunk {
 | |
| 	chunk := chunkenc.NewXORChunk()
 | |
| 	len := rand.Int() % 120
 | |
| 	app, err := chunk.Appender()
 | |
| 	testutil.Ok(t, err)
 | |
| 	for i := 0; i < len; i++ {
 | |
| 		app.Append(rand.Int63(), rand.Float64())
 | |
| 	}
 | |
| 	return chunk
 | |
| }
 | |
| 
 | |
| func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef uint64, chunkRef uint64, mint, maxt int64, chunk chunkenc.Chunk) {
 | |
| 	var err error
 | |
| 	seriesRef = uint64(rand.Int63())
 | |
| 	mint = int64((idx)*1000 + 1)
 | |
| 	maxt = int64((idx + 1) * 1000)
 | |
| 	chunk = randomChunk(t)
 | |
| 	chunkRef, err = hrw.WriteChunk(seriesRef, mint, maxt, chunk)
 | |
| 	testutil.Ok(t, err)
 | |
| 	return
 | |
| }
 |