diff --git a/physical/dynamodb.go b/physical/dynamodb.go index e014385e74..963d7fe6e5 100644 --- a/physical/dynamodb.go +++ b/physical/dynamodb.go @@ -60,8 +60,9 @@ const ( // a DynamoDB table. It can be run in high-availability mode // as DynamoDB has locking capabilities. type DynamoDBBackend struct { - table string - client *dynamodb.DynamoDB + table string + client *dynamodb.DynamoDB + recovery bool } // DynamoDBRecord is the representation of a vault entry in @@ -79,6 +80,7 @@ type DynamoDBLock struct { value, key string held bool lock sync.Mutex + recovery bool } // newDynamoDBBackend constructs a DynamoDB backend. If the @@ -167,9 +169,15 @@ func newDynamoDBBackend(conf map[string]string) (Backend, error) { return nil, err } + recoveryMode := os.Getenv("RECOVERY_MODE") + if recoveryMode == "" { + recoveryMode = conf["recovery_mode"] + } + return &DynamoDBBackend{ - table: table, - client: client, + table: table, + client: client, + recovery: recoveryMode == "1", }, nil } @@ -318,9 +326,10 @@ func (d *DynamoDBBackend) List(prefix string) ([]string, error) { // LockWith is used for mutual exclusion based on the given key. func (d *DynamoDBBackend) LockWith(key, value string) (Lock, error) { return &DynamoDBLock{ - backend: d, - key: filepath.Join(filepath.Dir(key), DynamoDBLockPrefix+filepath.Base(key)), - value: value, + backend: d, + key: filepath.Join(filepath.Dir(key), DynamoDBLockPrefix+filepath.Base(key)), + value: value, + recovery: d.recovery, }, nil } @@ -460,6 +469,20 @@ func (l *DynamoDBLock) tryToLock(stop, success chan struct{}, errors chan error) if err, ok := err.(awserr.Error); ok && err.Code() != "ConditionalCheckFailedException" { errors <- err } + if l.recovery { + _, err := l.backend.client.DeleteItem(&dynamodb.DeleteItemInput{ + TableName: aws.String(l.backend.table), + Key: map[string]*dynamodb.AttributeValue{ + "Path": {S: aws.String(record.Path)}, + "Key": {S: aws.String(record.Key)}, + }, + }) + if err != nil { + errors <- fmt.Errorf("could not delete lock record: %s", err) + } else { + l.recovery = false + } + } } else { ticker.Stop() close(success)