From ef6304c5c2f0e962c4f75cf83aae6ac7b1073b5f Mon Sep 17 00:00:00 2001 From: Krishna Srinivas <634494+krishnasrinivas@users.noreply.github.com> Date: Tue, 24 Mar 2020 23:26:13 -0700 Subject: [PATCH] Improve connectDisks() performance (#9203) --- cmd/xl-sets.go | 133 +++++++++++++++++++++++++------------------- cmd/xl-v1-common.go | 9 +-- 2 files changed, 81 insertions(+), 61 deletions(-) diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 05147e524..38147131d 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -108,23 +108,52 @@ type xlSets struct { mrfUploads map[string]int } -// isConnected - checks if the endpoint is connected or not. -func (s *xlSets) isConnected(endpointStr string) bool { +func isEndpointConnected(diskMap map[string]StorageAPI, endpoint string) bool { + disk := diskMap[endpoint] + if disk == nil { + return false + } + return disk.IsOnline() +} + +func (s *xlSets) getOnlineDisksCount() int { + s.xlDisksMu.RLock() + defer s.xlDisksMu.RUnlock() + count := 0 + for i := 0; i < s.setCount; i++ { + for j := 0; j < s.drivesPerSet; j++ { + disk := s.xlDisks[i][j] + if disk == nil { + continue + } + if !disk.IsOnline() { + continue + } + count++ + } + } + return count +} + +func (s *xlSets) getDiskMap() map[string]StorageAPI { + diskMap := make(map[string]StorageAPI) + s.xlDisksMu.RLock() defer s.xlDisksMu.RUnlock() for i := 0; i < s.setCount; i++ { for j := 0; j < s.drivesPerSet; j++ { - if s.xlDisks[i][j] == nil { + disk := s.xlDisks[i][j] + if disk == nil { continue } - if s.xlDisks[i][j].String() != endpointStr { + if !disk.IsOnline() { continue } - return s.xlDisks[i][j].IsOnline() + diskMap[disk.String()] = disk } } - return false + return diskMap } // Initializes a new StorageAPI from the endpoint argument, returns @@ -172,30 +201,11 @@ func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) { // connectDisksWithQuorum is same as connectDisks but waits // for quorum number of formatted disks to be online in any given sets. func (s *xlSets) connectDisksWithQuorum() { - var onlineDisks int - for onlineDisks < len(s.endpoints)/2 { - for i, endpoint := range s.endpoints { - if s.isConnected(s.endpointStrings[i]) { - continue - } - disk, format, err := connectEndpoint(endpoint) - if err != nil { - printEndpointError(endpoint, err) - continue - } - i, j, err := findDiskIndex(s.format, format) - if err != nil { - // Close the internal connection to avoid connection leaks. - disk.Close() - printEndpointError(endpoint, err) - continue - } - disk.SetDiskID(format.XL.This) - s.xlDisks[i][j] = disk - onlineDisks++ + for { + s.connectDisks() + if s.getOnlineDisksCount() > len(s.endpoints)/2 { + return } - // Sleep for a while - so that we don't go into - // 100% CPU when half the disks are online. time.Sleep(100 * time.Millisecond) } } @@ -203,33 +213,41 @@ func (s *xlSets) connectDisksWithQuorum() { // connectDisks - attempt to connect all the endpoints, loads format // and re-arranges the disks in proper position. func (s *xlSets) connectDisks() { + var wg sync.WaitGroup + diskMap := s.getDiskMap() for i, endpoint := range s.endpoints { - if s.isConnected(s.endpointStrings[i]) { + if isEndpointConnected(diskMap, s.endpointStrings[i]) { continue } - disk, format, err := connectEndpoint(endpoint) - if err != nil { - printEndpointError(endpoint, err) - continue - } - setIndex, diskIndex, err := findDiskIndex(s.format, format) - if err != nil { - // Close the internal connection to avoid connection leaks. - disk.Close() - printEndpointError(endpoint, err) - continue - } - disk.SetDiskID(format.XL.This) - s.xlDisksMu.Lock() - s.xlDisks[setIndex][diskIndex] = disk - s.xlDisksMu.Unlock() - - // Send a new disk connect event with a timeout - select { - case s.disksConnectEvent <- diskConnectInfo{setIndex: setIndex}: - case <-time.After(100 * time.Millisecond): - } + wg.Add(1) + go func(endpoint Endpoint) { + defer wg.Done() + disk, format, err := connectEndpoint(endpoint) + if err != nil { + printEndpointError(endpoint, err) + return + } + setIndex, diskIndex, err := findDiskIndex(s.format, format) + if err != nil { + // Close the internal connection to avoid connection leaks. + disk.Close() + printEndpointError(endpoint, err) + return + } + disk.SetDiskID(format.XL.This) + s.xlDisksMu.Lock() + s.xlDisks[setIndex][diskIndex] = disk + s.xlDisksMu.Unlock() + go func(setIndex int) { + // Send a new disk connect event with a timeout + select { + case s.disksConnectEvent <- diskConnectInfo{setIndex: setIndex}: + case <-time.After(100 * time.Millisecond): + } + }(setIndex) + }(endpoint) } + wg.Wait() } // monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected @@ -259,8 +277,8 @@ func (s *xlSets) GetLockers(setIndex int) func() []dsync.NetLocker { // GetDisks returns a closure for a given set, which provides list of disks per set. func (s *xlSets) GetDisks(setIndex int) func() []StorageAPI { return func() []StorageAPI { - s.xlDisksMu.Lock() - defer s.xlDisksMu.Unlock() + s.xlDisksMu.RLock() + defer s.xlDisksMu.RUnlock() disks := make([]StorageAPI, s.drivesPerSet) copy(disks, s.xlDisks[setIndex]) return disks @@ -272,13 +290,14 @@ const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs // Initialize new set of erasure coded sets. func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerSet int) (*xlSets, error) { endpointStrings := make([]string, len(endpoints)) - for _, endpoint := range endpoints { + for i, endpoint := range endpoints { if endpoint.IsLocal { - endpointStrings = append(endpointStrings, endpoint.Path) + endpointStrings[i] = endpoint.Path } else { - endpointStrings = append(endpointStrings, endpoint.String()) + endpointStrings[i] = endpoint.String() } } + // Initialize the XL sets instance. s := &xlSets{ sets: make([]*xlObjects, setCount), diff --git a/cmd/xl-v1-common.go b/cmd/xl-v1-common.go index 36c589f52..f258897bf 100644 --- a/cmd/xl-v1-common.go +++ b/cmd/xl-v1-common.go @@ -24,12 +24,13 @@ import ( ) // getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. -func (xl xlObjects) getLoadBalancedDisks() (disks []StorageAPI) { +func (xl xlObjects) getLoadBalancedDisks() (newDisks []StorageAPI) { + disks := xl.getDisks() // Based on the random shuffling return back randomized disks. - for _, i := range hashOrder(UTCNow().String(), len(xl.getDisks())) { - disks = append(disks, xl.getDisks()[i-1]) + for _, i := range hashOrder(UTCNow().String(), len(disks)) { + newDisks = append(newDisks, disks[i-1]) } - return disks + return newDisks } // This function does the following check, suppose