From b106b1c131dadfd85d80f0b00355a134bb85bb4a Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Fri, 14 Jan 2022 19:33:08 +0100 Subject: [PATCH] lock: Fix decision when a lock needs to be removed (#14095) The code was not properly deciding if a lock needs to be removed when it doesn't have quorum anymore. After this commit, a lock will be forcefully unlocked if nodes reporting they are not able to find a lock internally breaks the quorum. Simplify the code as well. --- internal/dsync/drwmutex.go | 50 +++++++++++++++--------------------- internal/dsync/dsync_test.go | 44 +++++++++++++++++++++++++++++-- 2 files changed, 63 insertions(+), 31 deletions(-) diff --git a/internal/dsync/drwmutex.go b/internal/dsync/drwmutex.go index 9ce071560..6acbc04ef 100644 --- a/internal/dsync/drwmutex.go +++ b/internal/dsync/drwmutex.go @@ -234,8 +234,8 @@ func (dm *DRWMutex) startContinousLockRefresh(lockLossCallback func(), id, sourc case <-refreshTimer.C: refreshTimer.Reset(drwMutexRefreshInterval) - refreshed, err := refresh(ctx, dm.clnt, id, source, quorum) - if err == nil && !refreshed { + noQuorum, err := refreshLock(ctx, dm.clnt, id, source, quorum) + if err == nil && noQuorum { // Clean the lock locally and in remote nodes forceUnlock(ctx, dm.clnt, id) // Execute the caller lock loss callback @@ -273,10 +273,12 @@ func forceUnlock(ctx context.Context, ds *Dsync, id string) { type refreshResult struct { offline bool - succeeded bool + refreshed bool } -func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (bool, error) { +// Refresh the given lock in all nodes, return true to indicate if a lock +// does not exist in enough quorum nodes. +func refreshLock(ctx context.Context, ds *Dsync, id, source string, quorum int) (bool, error) { restClnts, _ := ds.GetLockers() // Create buffered channel of size equal to total number of nodes. @@ -302,16 +304,12 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo defer cancel() refreshed, err := c.Refresh(ctx, args) - if refreshed && err == nil { - ch <- refreshResult{succeeded: true} + if err != nil { + ch <- refreshResult{offline: true} + log("dsync: Unable to call Refresh failed with %s for %#v at %s\n", err, args, c) } else { - if err != nil { - ch <- refreshResult{offline: true} - log("dsync: Unable to call Refresh failed with %s for %#v at %s\n", err, args, c) - } else { - ch <- refreshResult{succeeded: false} - log("dsync: Refresh returned false for %#v at %s\n", args, c) - } + ch <- refreshResult{refreshed: refreshed} + log("dsync: Refresh returned false for %#v at %s\n", args, c) } }(index, c) } @@ -322,39 +320,32 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo // b) received too many refreshed for quorum to be still possible // c) timed out // - i, refreshFailed, refreshSucceeded := 0, 0, 0 + lockNotFound, lockRefreshed := 0, 0 done := false - for ; i < len(restClnts); i++ { + for i := 0; i < len(restClnts); i++ { select { - case refresh := <-ch: - if refresh.offline { + case refreshResult := <-ch: + if refreshResult.offline { continue } - if refresh.succeeded { - refreshSucceeded++ + if refreshResult.refreshed { + lockRefreshed++ } else { - refreshFailed++ + lockNotFound++ } - if refreshFailed > quorum { - // We know that we are not going to succeed with refresh + if lockRefreshed >= quorum || lockNotFound > len(restClnts)-quorum { done = true } case <-ctx.Done(): // Refreshing is canceled return false, ctx.Err() } - if done { break } } - refreshQuorum := refreshSucceeded >= quorum - if !refreshQuorum { - refreshQuorum = refreshFailed < quorum - } - // We may have some unused results in ch, release them async. go func() { wg.Wait() @@ -363,7 +354,8 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo } }() - return refreshQuorum, nil + noQuorum := lockNotFound > len(restClnts)-quorum + return noQuorum, nil } // lock tries to acquire the distributed lock, returning true or false. diff --git a/internal/dsync/dsync_test.go b/internal/dsync/dsync_test.go index 5ba3aba0c..c2a381d0a 100644 --- a/internal/dsync/dsync_test.go +++ b/internal/dsync/dsync_test.go @@ -236,10 +236,46 @@ func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) { time.Sleep(10 * time.Millisecond) } -// Test refreshing lock +// Test refreshing lock - refresh should always return true +// +func TestSuccessfulLockRefresh(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + dm := NewDRWMutex(ds, "aap") + contextCanceled := make(chan struct{}) + + ctx, cl := context.WithCancel(context.Background()) + cancel := func() { + cl() + close(contextCanceled) + } + + if !dm.GetLock(ctx, cancel, id, source, Options{Timeout: 5 * time.Minute}) { + t.Fatal("GetLock() should be successful") + } + + timer := time.NewTimer(drwMutexRefreshInterval * 2) + + select { + case <-contextCanceled: + t.Fatal("Lock context canceled which is not expected") + case <-timer.C: + } + + // Should be safe operation in all cases + dm.Unlock() +} + +// Test canceling context while quorum servers report lock not found func TestFailedRefreshLock(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + // Simulate Refresh RPC response to return no locking found - for i := range lockServers { + for i := range lockServers[:3] { lockServers[i].setRefreshReply(false) defer lockServers[i].setRefreshReply(true) } @@ -270,6 +306,10 @@ func TestFailedRefreshLock(t *testing.T) { // Test Unlock should not timeout func TestUnlockShouldNotTimeout(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + dm := NewDRWMutex(ds, "aap") if !dm.GetLock(context.Background(), nil, id, source, Options{Timeout: 5 * time.Minute}) {