From ef1d5fd8a24d184e90822bc64388cbfc8d7e2d02 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Wed, 5 Jun 2013 10:40:39 +0200 Subject: [PATCH] Introduce semaphores for tiered storage. This commit wraps the tiered storage access componnets in semaphores, since we can handle several concurrent memory reads. --- storage/metric/tiered.go | 40 ++++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index d13f8c263e..6d8e451878 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -17,7 +17,6 @@ import ( "fmt" "log" "sort" - "sync" "time" dto "github.com/prometheus/prometheus/model/generated" @@ -27,6 +26,7 @@ import ( "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/stats" "github.com/prometheus/prometheus/storage/raw/leveldb" + "sync" ) type chunk model.Values @@ -65,6 +65,7 @@ const ( // TieredStorage both persists samples and generates materialized views for // queries. type TieredStorage struct { + // mu is purely used for state transitions. mu sync.RWMutex // BUG(matt): This introduces a Law of Demeter violation. Ugh. @@ -81,6 +82,9 @@ type TieredStorage struct { draining chan chan<- bool state tieredStorageState + + memorySemaphore chan bool + diskSemaphore chan bool } // viewJob encapsulates a request to extract sample values from the datastore. @@ -92,13 +96,18 @@ type viewJob struct { stats *stats.TimerGroup } -func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) { +const ( + tieredDiskSemaphores = 1 + tieredMemorySemaphores = 5 +) + +func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (*TieredStorage, error) { diskStorage, err := NewLevelDBMetricPersistence(root) if err != nil { - return + return nil, err } - storage = &TieredStorage{ + s := &TieredStorage{ appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth), DiskStorage: diskStorage, draining: make(chan chan<- bool), @@ -106,8 +115,19 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn memoryArena: NewMemorySeriesStorage(), memoryTTL: memoryTTL, viewQueue: make(chan viewJob, viewQueueDepth), + + diskSemaphore: make(chan bool, tieredDiskSemaphores), + memorySemaphore: make(chan bool, tieredMemorySemaphores), } - return + + for i := 0; i < tieredDiskSemaphores; i++ { + s.diskSemaphore <- true + } + for i := 0; i < tieredMemorySemaphores; i++ { + s.memorySemaphore <- true + } + + return s, nil } // Enqueues Samples for storage. @@ -206,7 +226,8 @@ func (t *TieredStorage) Serve(started chan<- bool) { t.flushMemory(t.memoryTTL) case viewRequest := <-t.viewQueue: viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop() - t.renderView(viewRequest) + <-t.memorySemaphore + go t.renderView(viewRequest) case drainingDone := <-t.draining: t.Flush() drainingDone <- true @@ -303,6 +324,8 @@ func (t *TieredStorage) renderView(viewJob viewJob) { var err error begin := time.Now() defer func() { + t.memorySemaphore <- true + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure}) @@ -341,6 +364,11 @@ func (t *TieredStorage) renderView(viewJob viewJob) { // Conditionalize disk access. if diskFrontier == nil && diskPresent { if iterator == nil { + <-t.diskSemaphore + defer func() { + t.diskSemaphore <- true + }() + // Get a single iterator that will be used for all data extraction // below. iterator = t.DiskStorage.MetricSamples.NewIterator(true)