mirror of
https://github.com/hashicorp/vault.git
synced 2026-05-05 20:36:26 +02:00
physical/gcs: use separate client for updating locks (#9424)
* physical/gcs: use separate client for updating locks * Address review comments Co-authored-by: Calvin Leung Huang <cleung2010@gmail.com>
This commit is contained in:
parent
69a9b182db
commit
486ac45d18
@ -71,15 +71,23 @@ type Backend struct {
|
||||
// chunkSize is the chunk size to use for requests.
|
||||
chunkSize int
|
||||
|
||||
// client is the underlying API client for talking to gcs.
|
||||
client *storage.Client
|
||||
// client is the API client and permitPool is the allowed concurrent uses of
|
||||
// the client.
|
||||
client *storage.Client
|
||||
permitPool *physical.PermitPool
|
||||
|
||||
// haEnabled indicates if HA is enabled.
|
||||
haEnabled bool
|
||||
|
||||
// logger and permitPool are internal constructs
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
// haClient is the API client. This is managed separately from the main client
|
||||
// because a flood of requests should not block refreshing the TTLs on the
|
||||
// lock.
|
||||
//
|
||||
// This value will be nil if haEnabled is false.
|
||||
haClient *storage.Client
|
||||
|
||||
// logger is an internal logger.
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewBackend constructs a Google Cloud Storage backend with the given
|
||||
@ -115,6 +123,7 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
|
||||
chunkSize = chunkSize * 1024
|
||||
|
||||
// HA configuration
|
||||
haClient := (*storage.Client)(nil)
|
||||
haEnabled := false
|
||||
haEnabledStr := os.Getenv(envHAEnabled)
|
||||
if haEnabledStr == "" {
|
||||
@ -127,6 +136,15 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
|
||||
return nil, errwrap.Wrapf("failed to parse HA enabled: {{err}}", err)
|
||||
}
|
||||
}
|
||||
if haEnabled {
|
||||
logger.Debug("creating client")
|
||||
var err error
|
||||
ctx := context.Background()
|
||||
haClient, err = storage.NewClient(ctx, option.WithUserAgent(useragent.String()))
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf("failed to create HA storage client: {{err}}", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Max parallel
|
||||
maxParallel, err := extractInt(c["max_parallel"])
|
||||
@ -140,30 +158,24 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
|
||||
"ha_enabled", haEnabled,
|
||||
"max_parallel", maxParallel,
|
||||
)
|
||||
|
||||
logger.Debug("creating client")
|
||||
|
||||
// Client
|
||||
opts := []option.ClientOption{option.WithUserAgent(useragent.String())}
|
||||
if credentialsFile := c["credentials_file"]; credentialsFile != "" {
|
||||
logger.Warn("specifying credentials_file as an option is " +
|
||||
"deprecated. Please use the GOOGLE_APPLICATION_CREDENTIALS environment " +
|
||||
"variable or instance credentials instead.")
|
||||
opts = append(opts, option.WithCredentialsFile(credentialsFile))
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
client, err := storage.NewClient(ctx, opts...)
|
||||
client, err := storage.NewClient(ctx, option.WithUserAgent(useragent.String()))
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf("failed to create storage client: {{err}}", err)
|
||||
}
|
||||
|
||||
return &Backend{
|
||||
bucket: bucket,
|
||||
haEnabled: haEnabled,
|
||||
chunkSize: chunkSize,
|
||||
client: client,
|
||||
permitPool: physical.NewPermitPool(maxParallel),
|
||||
logger: logger,
|
||||
|
||||
haEnabled: haEnabled,
|
||||
haClient: haClient,
|
||||
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@ -44,7 +44,7 @@ var (
|
||||
// metricLockUnlock is the metric to register for a lock delete.
|
||||
metricLockUnlock = []string{"gcs", "lock", "unlock"}
|
||||
|
||||
// metricLockGet is the metric to register for a lock get.
|
||||
// metricLockLock is the metric to register for a lock get.
|
||||
metricLockLock = []string{"gcs", "lock", "lock"}
|
||||
|
||||
// metricLockValue is the metric to register for a lock create/update.
|
||||
@ -194,7 +194,7 @@ func (l *Lock) Unlock() error {
|
||||
MetagenerationMatch: r.attrs.Metageneration,
|
||||
}
|
||||
|
||||
obj := l.backend.client.Bucket(l.backend.bucket).Object(l.key)
|
||||
obj := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key)
|
||||
if err := obj.If(conds).Delete(ctx); err != nil {
|
||||
// If the pre-condition failed, it means that someone else has already
|
||||
// acquired the lock and we don't want to delete it.
|
||||
@ -324,10 +324,6 @@ OUTER:
|
||||
// - if key is empty or identity is the same or timestamp exceeds TTL
|
||||
// - update the lock to self
|
||||
func (l *Lock) writeLock() (bool, error) {
|
||||
// Pooling
|
||||
l.backend.permitPool.Acquire()
|
||||
defer l.backend.permitPool.Release()
|
||||
|
||||
// Create a transaction to read and the update (maybe)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@ -376,7 +372,7 @@ func (l *Lock) writeLock() (bool, error) {
|
||||
}
|
||||
|
||||
// Write the object
|
||||
obj := l.backend.client.Bucket(l.backend.bucket).Object(l.key)
|
||||
obj := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key)
|
||||
w := obj.If(conds).NewWriter(ctx)
|
||||
w.ObjectAttrs.CacheControl = "no-cache; no-store; max-age=0"
|
||||
w.ObjectAttrs.Metadata = map[string]string{
|
||||
@ -395,12 +391,8 @@ func (l *Lock) writeLock() (bool, error) {
|
||||
|
||||
// get retrieves the value for the lock.
|
||||
func (l *Lock) get(ctx context.Context) (*LockRecord, error) {
|
||||
// Pooling
|
||||
l.backend.permitPool.Acquire()
|
||||
defer l.backend.permitPool.Release()
|
||||
|
||||
// Read
|
||||
attrs, err := l.backend.client.Bucket(l.backend.bucket).Object(l.key).Attrs(ctx)
|
||||
attrs, err := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key).Attrs(ctx)
|
||||
if err == storage.ErrObjectNotExist {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user