diff --git a/vendor/github.com/minio/dsync/dmutex.go b/vendor/github.com/minio/dsync/dmutex.go deleted file mode 100644 index 7fcdc0927..000000000 --- a/vendor/github.com/minio/dsync/dmutex.go +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dsync - -import ( - "log" - "math" - "math/rand" - "net/rpc" - "sync" - "time" -) - -const DMutexAcquireTimeout = 25 * time.Millisecond - -// A DMutex is a distributed mutual exclusion lock. -type DMutex struct { - Name string - locks []bool // Array of nodes that granted a lock - uids []string // Array of uids for verification of sending correct release messages - m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node - -} - -type Granted struct { - index int - locked bool - uid string -} - -// Connect to respective lock server nodes on the first Lock() call. -func connectLazy(dm *DMutex) { - if clnts == nil { - panic("rpc client connections weren't initialized.") - } - for i := range clnts { - if clnts[i].rpc != nil { - continue - } - - // Pass in unique path (as required by server.HandleHTTP(). - // Ignore failure to connect, the lock server node may join the - // cluster later. - clnt, err := rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i]) - if err != nil { - clnts[i].SetRPC(nil) - continue - } - clnts[i].SetRPC(clnt) - } -} - -// Lock locks dm. -// -// If the lock is already in use, the calling goroutine -// blocks until the mutex is available. -func (dm *DMutex) Lock() { - - // Shield Lock() with local mutex in order to prevent more than - // one broadcast going out at the same time from this node - dm.m.Lock() - defer dm.m.Unlock() - - runs, backOff := 1, 1 - - for { - connectLazy(dm) - - // create temp arrays on stack - locks := make([]bool, n) - ids := make([]string, n) - - // try to acquire the lock - success := lock(clnts, &locks, &ids, dm.Name) - if success { - // if success, copy array to object - dm.locks = make([]bool, n) - copy(dm.locks, locks[:]) - dm.uids = make([]string, n) - copy(dm.uids, ids[:]) - return - } - - // We timed out on the previous lock, incrementally wait for a longer back-off time, - // and try again afterwards - time.Sleep(time.Duration(backOff) * time.Millisecond) - - backOff += int(rand.Float64() * math.Pow(2, float64(runs))) - if backOff > 1024 { - backOff = backOff % 64 - - runs = 1 // reset runs - } else if runs < 10 { - runs++ - } - } -} - -func (dm *DMutex) tryLockTimeout() bool { - - // Shield Lock() with local mutex in order to prevent more than - // one broadcast going out at the same time from this node - dm.m.Lock() - defer dm.m.Unlock() - - // TODO: Implement reconnect - connectLazy(dm) - - // create temp arrays on stack - locks := make([]bool, n) - ids := make([]string, n) - - // try to acquire the lock - success := lock(clnts, &locks, &ids, dm.Name) - if success { - // if success, copy array to object - dm.locks = make([]bool, n) - copy(dm.locks, locks[:]) - dm.uids = make([]string, n) - copy(dm.uids, ids[:]) - } - return success -} - -// lock tries to acquire the distributed lock, returning true or false -// -func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string) bool { - - // Create buffered channel of quorum size - ch := make(chan Granted, n/2+1) - - for index, c := range clnts { - - // broadcast lock request to all nodes - go func(index int, c *RPCClient) { - // All client methods issuing RPCs are thread-safe and goroutine-safe, - // i.e. it is safe to call them from multiple concurrently running go routines. - var status bool - err := c.Call("Dsync.Lock", lockName, &status) - - locked, uid := false, "" - if err == nil { - locked = status - // TODO: Get UIOD again - uid = "" - } else { - // If rpc call failed due to connection related errors, reset rpc.Client object - // to trigger reconnect on subsequent Lock()/Unlock() requests to the same node. - if IsRPCError(err) { - clnts[index].SetRPC(nil) - } - // silently ignore error, retry later - } - - ch <- Granted{index: index, locked: locked, uid: uid} - - }(index, c) - } - - var wg sync.WaitGroup - wg.Add(1) - - quorum := false - - go func() { - - // Wait until we have received (minimally) quorum number of responses or timeout - i := 0 - done := false - timeout := time.After(DMutexAcquireTimeout) - - for ; i < n; i++ { - - select { - case grant := <-ch: - if grant.locked { - // Mark that this node has acquired the lock - (*locks)[grant.index] = true - (*uids)[grant.index] = grant.uid - } else { - done = true - //fmt.Println("one lock failed before quorum -- release locks acquired") - releaseAll(clnts, locks, uids, lockName) - } - - case <-timeout: - done = true - // timeout happened, maybe one of the nodes is slow, count - // number of locks to check whether we have quorum or not - if !quorumMet(locks) { - //fmt.Println("timed out -- release locks acquired") - releaseAll(clnts, locks, uids, lockName) - } - } - - if done { - break - } - } - - // Count locks in order to determine whterh we have quorum or not - quorum = quorumMet(locks) - - // Signal that we have the quorum - wg.Done() - - // Wait for the other responses and immediately release the locks - // (do not add them to the locks array because the DMutex could - // already has been unlocked again by the original calling thread) - for ; i < n; i++ { - grantToBeReleased := <-ch - if grantToBeReleased.locked { - // release lock - go sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.uid) - } - } - }() - - wg.Wait() - - return quorum -} - -// quorumMet determines whether we have acquired n/2+1 underlying locks or not -func quorumMet(locks *[]bool) bool { - - count := 0 - for _, locked := range *locks { - if locked { - count++ - } - } - - return count >= n/2+1 -} - -// releaseAll releases all locks that are marked as locked -func releaseAll(clnts []*RPCClient, locks *[]bool, ids *[]string, lockName string) { - - for lock := 0; lock < n; lock++ { - if (*locks)[lock] { - go sendRelease(clnts[lock], lockName, (*ids)[lock]) - (*locks)[lock] = false - (*ids)[lock] = "" - } - } - -} - -// hasLock returns whether or not a node participated in granting the lock -func (dm *DMutex) hasLock(node string) bool { - - for index, n := range nodes { - if n == node { - return dm.locks[index] - } - } - - return false -} - -// locked returns whether or not we have met the quorum -func (dm *DMutex) locked() bool { - - locks := make([]bool, n) - copy(locks[:], dm.locks[:]) - - return quorumMet(&locks) -} - -// Unlock unlocks dm. -// -// It is a run-time error if dm is not locked on entry to Unlock. -func (dm *DMutex) Unlock() { - - // Verify that we have the lock or panic otherwise (similar to sync.mutex) - if !dm.locked() { - panic("dsync: unlock of unlocked distributed mutex") - } - - // We don't need to wait until we have released all the locks (or the quorum) - // (a subsequent lock will retry automatically in case it would fail to get - // quorum) - for index, c := range clnts { - - if dm.locks[index] { - // broadcast lock release to all nodes the granted the lock - go sendRelease(c, dm.Name, dm.uids[index]) - - dm.locks[index] = false - } - } -} - -// sendRelease sends a release message to a node that previously granted a lock -func sendRelease(c *RPCClient, name, uid string) { - - // All client methods issuing RPCs are thread-safe and goroutine-safe, - // i.e. it is safe to call them from multiple concurrently running goroutines. - var status bool - // TODO: Send UID to server - if err := c.Call("Dsync.Unlock", name, &status); err != nil { - log.Fatal("Unlock on %s failed on client %v", name, c) - } -} diff --git a/vendor/github.com/minio/dsync/drwmutex.go b/vendor/github.com/minio/dsync/drwmutex.go index 2f9f2d3f3..4460b5a1f 100644 --- a/vendor/github.com/minio/dsync/drwmutex.go +++ b/vendor/github.com/minio/dsync/drwmutex.go @@ -17,127 +17,360 @@ package dsync import ( - "fmt" + "math" + "math/rand" + "net/rpc" "sync" + "time" ) -const maxReaders = 8 +const DRWMutexAcquireTimeout = 25 * time.Millisecond +// A DRWMutex is a distributed mutual exclusion lock. type DRWMutex struct { - rArray []*DMutex - rLockedArray []bool - w DMutex // held if there are pending writers - m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node - m2 sync.Mutex // Mutex to prevent multiple simultaneous locks from this node + Name string + locks []bool // Array of nodes that granted a lock + uids []string // Array of uids for verification of sending correct release messages + m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node } -func NewDRWMutex(name string) (drw *DRWMutex) { - - rArray := make([]*DMutex, maxReaders) - rLockedArray := make([]bool, maxReaders) - - for r := 0; r < maxReaders; r++ { - rArray[r] = &DMutex{Name: fmt.Sprintf("%s-r%d", name, r)} - } +type Granted struct { + index int + locked bool + uid string +} +func NewDRWMutex(name string) *DRWMutex { return &DRWMutex{ - rArray: rArray, - rLockedArray: rLockedArray, - w: DMutex{Name: name + "-w"}} + Name: name, + locks: make([]bool, dnodeCount), + uids: make([]string, dnodeCount), + } } -// RLock locks drw for reading. -func (drw *DRWMutex) RLock() { +// Connect to respective lock server nodes on the first Lock() call. +func connectLazy() { + if clnts == nil { + panic("rpc client connections weren't initialized.") + } + for i := range clnts { + if clnts[i].rpc != nil { + continue + } - drw.m.Lock() - defer drw.m.Unlock() + // Pass in unique path (as required by server.HandleHTTP(). + // Ignore failure to connect, the lock server node may join the + // cluster later. + clnt, err := rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i]) + if err != nil { + clnts[i].SetRPC(nil) + continue + } + clnts[i].SetRPC(clnt) + } +} - // Check if no write is active, block otherwise - // Can skip this? - drw.w.Lock() - drw.w.Unlock() +// RLock holds a read lock on dm. +// +// If the lock is already in use, the calling goroutine +// blocks until the mutex is available. +func (dm *DRWMutex) RLock() { + // Shield RLock() with local mutex in order to prevent more than + // one broadcast going out at the same time from this node + dm.m.Lock() + defer dm.m.Unlock() - // Lock either one of the reader locks - for i := 0; ; i++ { - drw.rLockedArray[i%maxReaders] = drw.rArray[i%maxReaders].tryLockTimeout() - if drw.rLockedArray[i%maxReaders] { + runs, backOff := 1, 1 + + for { + connectLazy() + + // create temp arrays on stack + locks := make([]bool, dnodeCount) + ids := make([]string, dnodeCount) + + // try to acquire the lock + isReadLock := true + success := lock(clnts, &locks, &ids, dm.Name, isReadLock) + if success { + // if success, copy array to object + copy(dm.locks, locks[:]) + copy(dm.uids, ids[:]) return } - } -} -// RUnlock undoes a single RLock call; -// it does not affect other simultaneous readers. -// It is a run-time error if rw is not locked for reading -// on entry to RUnlock. -func (drw *DRWMutex) RUnlock() { + // We timed out on the previous lock, incrementally wait for a longer back-off time, + // and try again afterwards + time.Sleep(time.Duration(backOff) * time.Millisecond) - drw.m.Lock() - defer drw.m.Unlock() + backOff += int(rand.Float64() * math.Pow(2, float64(runs))) + if backOff > 1024 { + backOff = backOff % 64 - // Unlock whichever readlock that was acquired) - for r := 0; r < maxReaders; r++ { - if drw.rLockedArray[r] { - drw.rArray[r].Unlock() - drw.rLockedArray[r] = false - // we only want to release a single read lock at a time - break + runs = 1 // reset runs + } else if runs < 10 { + runs++ } } } -// Lock locks rw for writing. -// If the lock is already locked for reading or writing, -// Lock blocks until the lock is available. -// To ensure that the lock eventually becomes available, -// a blocked Lock call excludes new readers from acquiring -// the lock. -func (drw *DRWMutex) Lock() { +// Lock locks dm. +// +// If the lock is already in use, the calling goroutine +// blocks until the mutex is available. +func (dm *DRWMutex) Lock() { - drw.m.Lock() - defer drw.m.Unlock() + // Shield Lock() with local mutex in order to prevent more than + // one broadcast going out at the same time from this node + dm.m.Lock() + defer dm.m.Unlock() - // First, resolve competition with other writers. - drw.w.Lock() + runs, backOff := 1, 1 - // Acquire all read locks. - var wg sync.WaitGroup - wg.Add(maxReaders) + for { + connectLazy() - for r := 0; r < maxReaders; r++ { - go func(r int) { - defer wg.Done() - drw.rArray[r].Lock() - drw.rLockedArray[r] = true - }(r) + // create temp arrays on stack + locks := make([]bool, dnodeCount) + ids := make([]string, dnodeCount) + + // try to acquire the lock + isReadLock := false + success := lock(clnts, &locks, &ids, dm.Name, isReadLock) + if success { + // if success, copy array to object + copy(dm.locks, locks[:]) + copy(dm.uids, ids[:]) + return + } + + // We timed out on the previous lock, incrementally wait for a longer back-off time, + // and try again afterwards + time.Sleep(time.Duration(backOff) * time.Millisecond) + + backOff += int(rand.Float64() * math.Pow(2, float64(runs))) + if backOff > 1024 { + backOff = backOff % 64 + + runs = 1 // reset runs + } else if runs < 10 { + runs++ + } } +} + +// lock tries to acquire the distributed lock, returning true or false +// +func lock(clnts []*RPCClient, locks *[]bool, uids *[]string, lockName string, isReadLock bool) bool { + + // Create buffered channel of quorum size + ch := make(chan Granted, dquorum) + + for index, c := range clnts { + + // broadcast lock request to all nodes + go func(index int, isReadLock bool, c *RPCClient) { + // All client methods issuing RPCs are thread-safe and goroutine-safe, + // i.e. it is safe to call them from multiple concurrently running go routines. + var status bool + var err error + if isReadLock { + err = c.Call("Dsync.RLock", lockName, &status) + } else { + err = c.Call("Dsync.Lock", lockName, &status) + } + + locked, uid := false, "" + if err == nil { + locked = status + // TODO: Get UIOD again + uid = "" + } else { + // If rpc call failed due to connection related errors, reset rpc.Client object + // to trigger reconnect on subsequent Lock()/Unlock() requests to the same node. + if IsRPCError(err) { + clnts[index].SetRPC(nil) + } + // silently ignore error, retry later + } + + ch <- Granted{index: index, locked: locked, uid: uid} + + }(index, isReadLock, c) + } + + var wg sync.WaitGroup + wg.Add(1) + + quorum := false + + go func(isReadLock bool) { + + // Wait until we have received (minimally) quorum number of responses or timeout + i := 0 + done := false + timeout := time.After(DRWMutexAcquireTimeout) + + for ; i < dnodeCount; i++ { + + select { + case grant := <-ch: + if grant.locked { + // Mark that this node has acquired the lock + (*locks)[grant.index] = true + (*uids)[grant.index] = grant.uid + } else { + done = true + //fmt.Println("one lock failed before quorum -- release locks acquired") + releaseAll(clnts, locks, uids, lockName, isReadLock) + } + + case <-timeout: + done = true + // timeout happened, maybe one of the nodes is slow, count + // number of locks to check whether we have quorum or not + if !quorumMet(locks) { + //fmt.Println("timed out -- release locks acquired") + releaseAll(clnts, locks, uids, lockName, isReadLock) + } + } + + if done { + break + } + } + + // Count locks in order to determine whterh we have quorum or not + quorum = quorumMet(locks) + + // Signal that we have the quorum + wg.Done() + + // Wait for the other responses and immediately release the locks + // (do not add them to the locks array because the DRWMutex could + // already has been unlocked again by the original calling thread) + for ; i < dnodeCount; i++ { + grantToBeReleased := <-ch + if grantToBeReleased.locked { + // release lock + sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.uid, isReadLock) + } + } + }(isReadLock) wg.Wait() + + return quorum } -// Unlock unlocks rw for writing. It is a run-time error if rw is -// not locked for writing on entry to Unlock. -// -// As with Mutexes, a locked RWMutex is not associated with a particular -// goroutine. One goroutine may RLock (Lock) an RWMutex and then -// arrange for another goroutine to RUnlock (Unlock) it. -func (drw *DRWMutex) Unlock() { +// quorumMet determines whether we have acquired n/2+1 underlying locks or not +func quorumMet(locks *[]bool) bool { - drw.m.Lock() - defer drw.m.Unlock() - - for r := 0; r < maxReaders; r++ { - if !drw.rLockedArray[r] { - panic("dsync: unlock of unlocked distributed rwmutex") + count := 0 + for _, locked := range *locks { + if locked { + count++ } } - // Unlock all read locks - for r := 0; r < maxReaders; r++ { - drw.rArray[r].Unlock() - drw.rLockedArray[r] = false + return count >= dquorum +} + +// releaseAll releases all locks that are marked as locked +func releaseAll(clnts []*RPCClient, locks *[]bool, ids *[]string, lockName string, isReadLock bool) { + + for lock := 0; lock < dnodeCount; lock++ { + if (*locks)[lock] { + sendRelease(clnts[lock], lockName, (*ids)[lock], isReadLock) + (*locks)[lock] = false + (*ids)[lock] = "" + } } - // Allow other writers to proceed. - drw.w.Unlock() +} + +// RUnlock releases a read lock held on dm. +// +// It is a run-time error if dm is not locked on entry to RUnlock. +func (dm *DRWMutex) RUnlock() { + // We don't panic like sync.Mutex, when an unlock is issued on an + // un-locked lock, since the lock rpc server may have restarted and + // "forgotten" about the lock. + + // We don't need to wait until we have released all the locks (or the quorum) + // (a subsequent lock will retry automatically in case it would fail to get + // quorum) + for index, c := range clnts { + + if dm.locks[index] { + // broadcast lock release to all nodes the granted the lock + isReadLock := true + sendRelease(c, dm.Name, dm.uids[index], isReadLock) + + dm.locks[index] = false + } + } +} + +// Unlock unlocks dm. +// +// It is a run-time error if dm is not locked on entry to Unlock. +func (dm *DRWMutex) Unlock() { + + // We don't panic like sync.Mutex, when an unlock is issued on an + // un-locked lock, since the lock rpc server may have restarted and + // "forgotten" about the lock. + + // We don't need to wait until we have released all the locks (or the quorum) + // (a subsequent lock will retry automatically in case it would fail to get + // quorum) + for index, c := range clnts { + + if dm.locks[index] { + // broadcast lock release to all nodes the granted the lock + isReadLock := false + sendRelease(c, dm.Name, dm.uids[index], isReadLock) + + dm.locks[index] = false + } + } +} + +// sendRelease sends a release message to a node that previously granted a lock +func sendRelease(c *RPCClient, name, uid string, isReadLock bool) { + + backOffArray := []time.Duration{30 * time.Second, 1 * time.Minute, 3 * time.Minute, 10 * time.Minute, 30 * time.Minute, 1 * time.Hour} + + go func(c *RPCClient, name, uid string) { + + for _, backOff := range backOffArray { + + // Make sure we are connected + connectLazy() + + // All client methods issuing RPCs are thread-safe and goroutine-safe, + // i.e. it is safe to call them from multiple concurrently running goroutines. + var status bool + var err error + // TODO: Send UID to server + if isReadLock { + if err = c.Call("Dsync.RUnlock", name, &status); err == nil { + // RUnlock delivered, exit out + return + } + } else { + if err = c.Call("Dsync.Unlock", name, &status); err == nil { + // Unlock delivered, exit out + return + } + } + + // If rpc call failed due to connection related errors, reset rpc.Client object + // to trigger reconnect on subsequent Lock()/Unlock() requests to the same node. + c.SetRPC(nil) + + // wait + time.Sleep(backOff) + } + }(c, name, uid) } diff --git a/vendor/github.com/minio/dsync/dsync.go b/vendor/github.com/minio/dsync/dsync.go index 66610bdf1..9172e1eef 100644 --- a/vendor/github.com/minio/dsync/dsync.go +++ b/vendor/github.com/minio/dsync/dsync.go @@ -23,16 +23,20 @@ const DebugPath = "/debug" const DefaultPath = "/rpc/dsync" -var n int +// Number of nodes participating in the distributed locking. +var dnodeCount int + +// List of nodes participating. var nodes []string + +// List of rpc paths, one per lock server. var rpcPaths []string + +// List of rpc client objects, one per lock server. var clnts []*RPCClient -func closeClients(clients []*RPCClient) { - for _, clnt := range clients { - clnt.Close() - } -} +// Simple majority based quorum, set to dNodeCount/2+1 +var dquorum int // SetNodesWithPath - initializes package-level global state variables such as // nodes, rpcPaths, clnts. @@ -41,7 +45,7 @@ func closeClients(clients []*RPCClient) { func SetNodesWithPath(nodeList []string, paths []string) (err error) { // Validate if number of nodes is within allowable range. - if n != 0 { + if dnodeCount != 0 { return errors.New("Cannot reinitialize dsync package") } else if len(nodeList) < 4 { return errors.New("Dsync not designed for less than 4 nodes") @@ -53,8 +57,9 @@ func SetNodesWithPath(nodeList []string, paths []string) (err error) { copy(nodes, nodeList[:]) rpcPaths = make([]string, len(paths)) copy(rpcPaths, paths[:]) - n = len(nodes) - clnts = make([]*RPCClient, n) + dnodeCount = len(nodes) + dquorum = dnodeCount/2 + 1 + clnts = make([]*RPCClient, dnodeCount) // Initialize node name and rpc path for each RPCClient object. for i := range clnts { clnts[i] = newClient(nodes[i], rpcPaths[i]) diff --git a/vendor/github.com/minio/dsync/rpc-client.go b/vendor/github.com/minio/dsync/rpc-client.go index b1f1ea7a3..fe87866b3 100644 --- a/vendor/github.com/minio/dsync/rpc-client.go +++ b/vendor/github.com/minio/dsync/rpc-client.go @@ -43,11 +43,6 @@ func (rpcClient *RPCClient) SetRPC(rpc *rpc.Client) { defer rpcClient.Unlock() rpcClient.rpc = rpc } -func (rpcClient *RPCClient) Close() error { - rpcClient.Lock() - defer rpcClient.Unlock() - return rpcClient.rpc.Close() -} func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply interface{}) error { rpcClient.Lock() diff --git a/vendor/vendor.json b/vendor/vendor.json index 0a0cf2e08..fdc4dfacb 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -98,10 +98,10 @@ "revisionTime": "2015-11-18T20:00:48-08:00" }, { - "checksumSHA1": "Ev8FdU+RSmpHQsLGzRpg5/ka7zE=", + "checksumSHA1": "kbVCnnU0gR/i8WA8Gs2I+/7kONY=", "path": "github.com/minio/dsync", - "revision": "b26292b87d023da097193c8fe624d4a159e0fd03", - "revisionTime": "2016-08-11T06:53:13Z" + "revision": "8f4819554f1f4fffc2e1c8c706b23e5c844997f4", + "revisionTime": "2016-08-17T23:34:37Z" }, { "path": "github.com/minio/go-homedir",