mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-11-04 02:11:01 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			324 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			324 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2022 The Prometheus Authors
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
// http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package chunks
 | 
						|
 | 
						|
import (
 | 
						|
	"math/rand"
 | 
						|
	"sync"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/stretchr/testify/require"
 | 
						|
	"go.uber.org/atomic"
 | 
						|
)
 | 
						|
 | 
						|
func (q *writeJobQueue) assertInvariants(t *testing.T) {
 | 
						|
	q.mtx.Lock()
 | 
						|
	defer q.mtx.Unlock()
 | 
						|
 | 
						|
	totalSize := 0
 | 
						|
	for s := q.first; s != nil; s = s.nextSegment {
 | 
						|
		require.NotNil(t, s.segment)
 | 
						|
 | 
						|
		// Next read index is lower or equal than next write index (we cannot past written jobs)
 | 
						|
		require.LessOrEqual(t, s.nextRead, s.nextWrite)
 | 
						|
 | 
						|
		// Number of unread elements in this segment.
 | 
						|
		totalSize += s.nextWrite - s.nextRead
 | 
						|
 | 
						|
		// First segment can be partially read, other segments were not read yet.
 | 
						|
		if s == q.first {
 | 
						|
			require.GreaterOrEqual(t, s.nextRead, 0)
 | 
						|
		} else {
 | 
						|
			require.Equal(t, 0, s.nextRead)
 | 
						|
		}
 | 
						|
 | 
						|
		// If first shard is empty (everything was read from it already), it must have extra capacity for
 | 
						|
		// additional elements, otherwise it would have been removed.
 | 
						|
		if s == q.first && s.nextRead == s.nextWrite {
 | 
						|
			require.Less(t, s.nextWrite, len(s.segment))
 | 
						|
		}
 | 
						|
 | 
						|
		// Segments in the middle are full.
 | 
						|
		if s != q.first && s != q.last {
 | 
						|
			require.Len(t, s.segment, s.nextWrite)
 | 
						|
		}
 | 
						|
		// Last segment must have at least one element, or we wouldn't have created it.
 | 
						|
		require.Positive(t, s.nextWrite)
 | 
						|
	}
 | 
						|
 | 
						|
	require.Equal(t, q.size, totalSize)
 | 
						|
}
 | 
						|
 | 
						|
func TestQueuePushPopSingleGoroutine(t *testing.T) {
 | 
						|
	seed := time.Now().UnixNano()
 | 
						|
	t.Log("seed:", seed)
 | 
						|
	r := rand.New(rand.NewSource(seed))
 | 
						|
 | 
						|
	const maxSize = 500
 | 
						|
	const maxIters = 50
 | 
						|
 | 
						|
	for maxCount := 1; maxCount < maxSize; maxCount++ {
 | 
						|
		queue := newWriteJobQueue(maxCount, 1+(r.Int()%maxCount))
 | 
						|
 | 
						|
		elements := 0 // total elements in the queue
 | 
						|
		lastWriteID := 0
 | 
						|
		lastReadID := 0
 | 
						|
 | 
						|
		for iter := 0; iter < maxIters; iter++ {
 | 
						|
			if elements < maxCount {
 | 
						|
				toWrite := r.Int() % (maxCount - elements)
 | 
						|
				if toWrite == 0 {
 | 
						|
					toWrite = 1
 | 
						|
				}
 | 
						|
 | 
						|
				for i := 0; i < toWrite; i++ {
 | 
						|
					lastWriteID++
 | 
						|
					require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(lastWriteID)}))
 | 
						|
 | 
						|
					elements++
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			if elements > 0 {
 | 
						|
				toRead := r.Int() % elements
 | 
						|
				if toRead == 0 {
 | 
						|
					toRead = 1
 | 
						|
				}
 | 
						|
 | 
						|
				for i := 0; i < toRead; i++ {
 | 
						|
					lastReadID++
 | 
						|
 | 
						|
					j, b := queue.pop()
 | 
						|
					require.True(t, b)
 | 
						|
					require.Equal(t, HeadSeriesRef(lastReadID), j.seriesRef)
 | 
						|
 | 
						|
					elements--
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			require.Equal(t, elements, queue.length())
 | 
						|
			queue.assertInvariants(t)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestQueuePushBlocksOnFullQueue(t *testing.T) {
 | 
						|
	queue := newWriteJobQueue(5, 5)
 | 
						|
 | 
						|
	pushTime := make(chan time.Time)
 | 
						|
	go func() {
 | 
						|
		require.True(t, queue.push(chunkWriteJob{seriesRef: 1}))
 | 
						|
		require.True(t, queue.push(chunkWriteJob{seriesRef: 2}))
 | 
						|
		require.True(t, queue.push(chunkWriteJob{seriesRef: 3}))
 | 
						|
		require.True(t, queue.push(chunkWriteJob{seriesRef: 4}))
 | 
						|
		require.True(t, queue.push(chunkWriteJob{seriesRef: 5}))
 | 
						|
		pushTime <- time.Now()
 | 
						|
		// This will block
 | 
						|
		require.True(t, queue.push(chunkWriteJob{seriesRef: 6}))
 | 
						|
		pushTime <- time.Now()
 | 
						|
	}()
 | 
						|
 | 
						|
	timeBeforePush := <-pushTime
 | 
						|
 | 
						|
	delay := 100 * time.Millisecond
 | 
						|
	select {
 | 
						|
	case <-time.After(delay):
 | 
						|
		// ok
 | 
						|
	case <-pushTime:
 | 
						|
		require.Fail(t, "didn't expect another push to proceed")
 | 
						|
	}
 | 
						|
 | 
						|
	popTime := time.Now()
 | 
						|
	j, b := queue.pop()
 | 
						|
	require.True(t, b)
 | 
						|
	require.Equal(t, HeadSeriesRef(1), j.seriesRef)
 | 
						|
 | 
						|
	timeAfterPush := <-pushTime
 | 
						|
 | 
						|
	require.GreaterOrEqual(t, timeAfterPush.Sub(popTime), time.Duration(0))
 | 
						|
	require.GreaterOrEqual(t, timeAfterPush.Sub(timeBeforePush), delay)
 | 
						|
}
 | 
						|
 | 
						|
func TestQueuePopBlocksOnEmptyQueue(t *testing.T) {
 | 
						|
	queue := newWriteJobQueue(5, 5)
 | 
						|
 | 
						|
	popTime := make(chan time.Time)
 | 
						|
	go func() {
 | 
						|
		j, b := queue.pop()
 | 
						|
		require.True(t, b)
 | 
						|
		require.Equal(t, HeadSeriesRef(1), j.seriesRef)
 | 
						|
 | 
						|
		popTime <- time.Now()
 | 
						|
 | 
						|
		// This will block
 | 
						|
		j, b = queue.pop()
 | 
						|
		require.True(t, b)
 | 
						|
		require.Equal(t, HeadSeriesRef(2), j.seriesRef)
 | 
						|
 | 
						|
		popTime <- time.Now()
 | 
						|
	}()
 | 
						|
 | 
						|
	queue.push(chunkWriteJob{seriesRef: 1})
 | 
						|
 | 
						|
	timeBeforePop := <-popTime
 | 
						|
 | 
						|
	delay := 100 * time.Millisecond
 | 
						|
	select {
 | 
						|
	case <-time.After(delay):
 | 
						|
		// ok
 | 
						|
	case <-popTime:
 | 
						|
		require.Fail(t, "didn't expect another pop to proceed")
 | 
						|
	}
 | 
						|
 | 
						|
	pushTime := time.Now()
 | 
						|
	require.True(t, queue.push(chunkWriteJob{seriesRef: 2}))
 | 
						|
 | 
						|
	timeAfterPop := <-popTime
 | 
						|
 | 
						|
	require.GreaterOrEqual(t, timeAfterPop.Sub(pushTime), time.Duration(0))
 | 
						|
	require.Greater(t, timeAfterPop.Sub(timeBeforePop), delay)
 | 
						|
}
 | 
						|
 | 
						|
func TestQueuePopUnblocksOnClose(t *testing.T) {
 | 
						|
	queue := newWriteJobQueue(5, 5)
 | 
						|
 | 
						|
	popTime := make(chan time.Time)
 | 
						|
	go func() {
 | 
						|
		j, b := queue.pop()
 | 
						|
		require.True(t, b)
 | 
						|
		require.Equal(t, HeadSeriesRef(1), j.seriesRef)
 | 
						|
 | 
						|
		popTime <- time.Now()
 | 
						|
 | 
						|
		// This will block until queue is closed.
 | 
						|
		j, b = queue.pop()
 | 
						|
		require.False(t, b)
 | 
						|
 | 
						|
		popTime <- time.Now()
 | 
						|
	}()
 | 
						|
 | 
						|
	queue.push(chunkWriteJob{seriesRef: 1})
 | 
						|
 | 
						|
	timeBeforePop := <-popTime
 | 
						|
 | 
						|
	delay := 100 * time.Millisecond
 | 
						|
	select {
 | 
						|
	case <-time.After(delay):
 | 
						|
		// ok
 | 
						|
	case <-popTime:
 | 
						|
		require.Fail(t, "didn't expect another pop to proceed")
 | 
						|
	}
 | 
						|
 | 
						|
	closeTime := time.Now()
 | 
						|
	queue.close()
 | 
						|
 | 
						|
	timeAfterPop := <-popTime
 | 
						|
 | 
						|
	require.GreaterOrEqual(t, timeAfterPop.Sub(closeTime), time.Duration(0))
 | 
						|
	require.GreaterOrEqual(t, timeAfterPop.Sub(timeBeforePop), delay)
 | 
						|
}
 | 
						|
 | 
						|
func TestQueuePopAfterCloseReturnsAllElements(t *testing.T) {
 | 
						|
	const count = 10
 | 
						|
 | 
						|
	queue := newWriteJobQueue(count, count)
 | 
						|
 | 
						|
	for i := 0; i < count; i++ {
 | 
						|
		require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(i)}))
 | 
						|
	}
 | 
						|
 | 
						|
	// close the queue before popping all elements.
 | 
						|
	queue.close()
 | 
						|
 | 
						|
	// No more pushing allowed after close.
 | 
						|
	require.False(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(11111)}))
 | 
						|
 | 
						|
	// Verify that we can still read all pushed elements.
 | 
						|
	for i := 0; i < count; i++ {
 | 
						|
		j, b := queue.pop()
 | 
						|
		require.True(t, b)
 | 
						|
		require.Equal(t, HeadSeriesRef(i), j.seriesRef)
 | 
						|
	}
 | 
						|
 | 
						|
	_, b := queue.pop()
 | 
						|
	require.False(t, b)
 | 
						|
}
 | 
						|
 | 
						|
func TestQueuePushPopManyGoroutines(t *testing.T) {
 | 
						|
	const readGoroutines = 5
 | 
						|
	const writeGoroutines = 10
 | 
						|
	const writes = 500
 | 
						|
 | 
						|
	queue := newWriteJobQueue(1024, 64)
 | 
						|
 | 
						|
	// Reading goroutine
 | 
						|
	refsMx := sync.Mutex{}
 | 
						|
	refs := map[HeadSeriesRef]bool{}
 | 
						|
 | 
						|
	readersWG := sync.WaitGroup{}
 | 
						|
	for i := 0; i < readGoroutines; i++ {
 | 
						|
		readersWG.Add(1)
 | 
						|
 | 
						|
		go func() {
 | 
						|
			defer readersWG.Done()
 | 
						|
 | 
						|
			for j, ok := queue.pop(); ok; j, ok = queue.pop() {
 | 
						|
				refsMx.Lock()
 | 
						|
				refs[j.seriesRef] = true
 | 
						|
				refsMx.Unlock()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
	}
 | 
						|
 | 
						|
	id := atomic.Uint64{}
 | 
						|
 | 
						|
	writersWG := sync.WaitGroup{}
 | 
						|
	for i := 0; i < writeGoroutines; i++ {
 | 
						|
		writersWG.Add(1)
 | 
						|
 | 
						|
		go func() {
 | 
						|
			defer writersWG.Done()
 | 
						|
 | 
						|
			for i := 0; i < writes; i++ {
 | 
						|
				ref := id.Inc()
 | 
						|
 | 
						|
				require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(ref)}))
 | 
						|
			}
 | 
						|
		}()
 | 
						|
	}
 | 
						|
 | 
						|
	// Wait until all writes are done.
 | 
						|
	writersWG.Wait()
 | 
						|
 | 
						|
	// Close the queue and wait for reading to be done.
 | 
						|
	queue.close()
 | 
						|
	readersWG.Wait()
 | 
						|
 | 
						|
	// Check if we have all expected values
 | 
						|
	require.Len(t, refs, writeGoroutines*writes)
 | 
						|
}
 | 
						|
 | 
						|
func TestQueueSegmentIsKeptEvenIfEmpty(t *testing.T) {
 | 
						|
	queue := newWriteJobQueue(1024, 64)
 | 
						|
 | 
						|
	require.True(t, queue.push(chunkWriteJob{seriesRef: 1}))
 | 
						|
	_, b := queue.pop()
 | 
						|
	require.True(t, b)
 | 
						|
 | 
						|
	require.NotNil(t, queue.first)
 | 
						|
	require.Equal(t, 1, queue.first.nextRead)
 | 
						|
	require.Equal(t, 1, queue.first.nextWrite)
 | 
						|
}
 |