diff --git a/main.go b/main.go index 47d94b46cb..8d596756ac 100644 --- a/main.go +++ b/main.go @@ -118,11 +118,7 @@ func NewPrometheus() *prometheus { PedanticChecks: *storagePedanticChecks, SyncStrategy: syncStrategy, } - memStorage, err := local.NewMemorySeriesStorage(o) - if err != nil { - glog.Error("Error opening memory series storage: ", err) - os.Exit(1) - } + memStorage := local.NewMemorySeriesStorage(o) var sampleAppender storage.SampleAppender var remoteStorageQueues []*remote.StorageQueueManager @@ -213,46 +209,63 @@ func NewPrometheus() *prometheus { } webService.QuitChan = make(chan struct{}) - p.reloadConfig() + if !p.reloadConfig() { + os.Exit(1) + } return p } -func (p *prometheus) reloadConfig() { +func (p *prometheus) reloadConfig() bool { glog.Infof("Loading configuration file %s", *configFile) conf, err := config.LoadFromFile(*configFile) if err != nil { glog.Errorf("Couldn't load configuration (-config.file=%s): %v", *configFile, err) glog.Errorf("Note: The configuration format has changed with version 0.14, please check the documentation.") - return + return false } p.webService.StatusHandler.ApplyConfig(conf) p.targetManager.ApplyConfig(conf) p.ruleManager.ApplyConfig(conf) + + return true } // Serve starts the Prometheus server. It returns after the server has been shut // down. The method installs an interrupt handler, allowing to trigger a // shutdown by sending SIGTERM to the process. func (p *prometheus) Serve() { + // Start all components. + if err := p.storage.Start(); err != nil { + glog.Error("Error opening memory series storage: ", err) + os.Exit(1) + } + defer p.storage.Stop() + + // The storage has to be fully initialized before registering Prometheus. + registry.MustRegister(p) + for _, q := range p.remoteStorageQueues { go q.Run() + defer q.Stop() } + go p.ruleManager.Run() + defer p.ruleManager.Stop() + go p.notificationHandler.Run() + defer p.notificationHandler.Stop() + go p.targetManager.Run() + defer p.targetManager.Stop() - p.storage.Start() + defer p.queryEngine.Stop() - go func() { - err := p.webService.ServeForever(*pathPrefix) - if err != nil { - glog.Fatal(err) - } - }() + go p.webService.ServeForever(*pathPrefix) + // Wait for reload or termination signals. hup := make(chan os.Signal) signal.Notify(hup, syscall.SIGHUP) go func() { @@ -272,19 +285,6 @@ func (p *prometheus) Serve() { close(hup) - p.targetManager.Stop() - p.ruleManager.Stop() - p.queryEngine.Stop() - - if err := p.storage.Stop(); err != nil { - glog.Error("Error stopping local storage: ", err) - } - - for _, q := range p.remoteStorageQueues { - q.Stop() - } - - p.notificationHandler.Stop() glog.Info("See you next time!") } @@ -387,6 +387,5 @@ func main() { } p := NewPrometheus() - registry.MustRegister(p) p.Serve() } diff --git a/storage/local/interface.go b/storage/local/interface.go index 34ed30c7b5..60df2d2f70 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -48,7 +48,7 @@ type Storage interface { // Run the various maintenance loops in goroutines. Returns when the // storage is ready to use. Keeps everything running in the background // until Stop is called. - Start() + Start() error // Stop shuts down the Storage gracefully, flushes all pending // operations, stops all maintenance loops,and frees all resources. Stop() error diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 7b2a34cecd..96d0adf08a 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -268,10 +268,13 @@ func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync sync p.labelPairToFingerprints = labelPairToFingerprints p.labelNameToLabelValues = labelNameToLabelValues - go p.processIndexingQueue() return p, nil } +func (p *persistence) run() { + p.processIndexingQueue() +} + // Describe implements prometheus.Collector. func (p *persistence) Describe(ch chan<- *prometheus.Desc) { ch <- p.indexingQueueLength.Desc() diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 20bfe62160..00e36b792e 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -42,6 +42,7 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes dir.Close() t.Fatal(err) } + go p.run() return p, test.NewCallbackCloser(func() { p.close() dir.Close() diff --git a/storage/local/storage.go b/storage/local/storage.go index c6db45b2b8..56371ca777 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -82,6 +82,8 @@ type memorySeriesStorage struct { fpLocker *fingerprintLocker fpToSeries *seriesMap + options *MemorySeriesStorageOptions + loopStopping, loopStopped chan struct{} maxMemoryChunks int dropAfter time.Duration @@ -124,10 +126,12 @@ type MemorySeriesStorageOptions struct { // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // has to be called to start the storage. -func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { +func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { s := &memorySeriesStorage{ fpLocker: newFingerprintLocker(1024), + options: o, + loopStopping: make(chan struct{}), loopStopped: make(chan struct{}), maxMemoryChunks: o.MemoryChunks, @@ -185,9 +189,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { []string{seriesLocationLabel}, ), } + return s +} +// Start implements Storage. +func (s *memorySeriesStorage) Start() error { var syncStrategy syncStrategy - switch o.SyncStrategy { + switch s.options.SyncStrategy { case Never: syncStrategy = func() bool { return false } case Always: @@ -198,33 +206,32 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { panic("unknown sync strategy") } - p, err := newPersistence(o.PersistenceStoragePath, o.Dirty, o.PedanticChecks, syncStrategy) + p, err := newPersistence(s.options.PersistenceStoragePath, s.options.Dirty, s.options.PedanticChecks, syncStrategy) if err != nil { - return nil, err + return err } s.persistence = p glog.Info("Loading series map and head chunks...") s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads() if err != nil { - return nil, err + return err } glog.Infof("%d series loaded.", s.fpToSeries.length()) s.numSeries.Set(float64(s.fpToSeries.length())) mapper, err := newFPMapper(s.fpToSeries, p) if err != nil { - return nil, err + return err } s.mapper = mapper - return s, nil -} + go s.persistence.run() -// Start implements Storage. -func (s *memorySeriesStorage) Start() { go s.handleEvictList() go s.loop() + + return nil } // Stop implements Storage. diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 8ae4d3507e..16ec17b017 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -163,9 +163,9 @@ func TestLoop(t *testing.T) { CheckpointInterval: 250 * time.Millisecond, SyncStrategy: Adaptive, } - storage, err := NewMemorySeriesStorage(o) - if err != nil { - t.Fatalf("Error creating storage: %s", err) + storage := NewMemorySeriesStorage(o) + if err := storage.Start; err != nil { + t.Fatalf("Error starting storage: %s", err) } storage.Start() for _, s := range samples { @@ -731,9 +731,9 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { CheckpointInterval: time.Second, SyncStrategy: Adaptive, } - s, err := NewMemorySeriesStorage(o) - if err != nil { - b.Fatalf("Error creating storage: %s", err) + s := NewMemorySeriesStorage(o) + if err := s.Start(); err != nil { + b.Fatalf("Error starting storage: %s", err) } s.Start() defer s.Stop() diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index a645bd1f51..21f6aa0080 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -48,14 +48,12 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (*memorySeriesStorage, tes CheckpointInterval: time.Hour, SyncStrategy: Adaptive, } - storage, err := NewMemorySeriesStorage(o) - if err != nil { + storage := NewMemorySeriesStorage(o) + if err := storage.Start(); err != nil { directory.Close() t.Fatalf("Error creating storage: %s", err) } - storage.Start() - closer := &testStorageCloser{ storage: storage, directory: directory, diff --git a/web/web.go b/web/web.go index ff0917b2de..a202c96c9b 100644 --- a/web/web.go +++ b/web/web.go @@ -58,7 +58,7 @@ type WebService struct { } // ServeForever serves the HTTP endpoints and only returns upon errors. -func (ws WebService) ServeForever(pathPrefix string) error { +func (ws WebService) ServeForever(pathPrefix string) { http.Handle("/favicon.ico", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.Error(w, "", 404) @@ -108,9 +108,16 @@ func (ws WebService) ServeForever(pathPrefix string) error { })) } - glog.Info("listening on ", *listenAddress) + glog.Infof("Listening on %s", *listenAddress) - return http.ListenAndServe(*listenAddress, nil) + // If we cannot bind to a port, retry after 30 seconds. + for { + err := http.ListenAndServe(*listenAddress, nil) + if err != nil { + glog.Errorf("Could not listen on %s: %s", *listenAddress, err) + } + time.Sleep(30 * time.Second) + } } func (ws WebService) quitHandler(w http.ResponseWriter, r *http.Request) {