diff --git a/http/handler.go b/http/handler.go index 2f0b31c8ce..69ed6bdc3a 100644 --- a/http/handler.go +++ b/http/handler.go @@ -1021,6 +1021,11 @@ func request(core *vault.Core, w http.ResponseWriter, rawReq *http.Request, r *l } resp.AddWarning("Timeout hit while waiting for local replicated cluster to apply primary's write; this client may encounter stale reads of values written during this operation.") } + if errwrap.Contains(err, consts.ErrOverloaded.Error()) { + logical.RespondWithStatusCode(resp, r, http.StatusServiceUnavailable) + respondError(w, http.StatusServiceUnavailable, err) + return resp, false, false + } if errwrap.Contains(err, consts.ErrStandby.Error()) { respondStandby(core, w, rawReq.URL) return resp, false, false diff --git a/sdk/helper/consts/error.go b/sdk/helper/consts/error.go index 5bd3f5e6e2..c7e2b51f4e 100644 --- a/sdk/helper/consts/error.go +++ b/sdk/helper/consts/error.go @@ -25,4 +25,7 @@ var ( // ErrInvalidWrappingToken is returned when checking for the validity of // a wrapping token that turns out to be invalid. ErrInvalidWrappingToken = errors.New("wrapping token is not valid or does not exist") + + // ErrOverloaded indicates the Vault server is at capacity. + ErrOverloaded = errors.New("overloaded, try again later") ) diff --git a/sdk/physical/inmem/inmem.go b/sdk/physical/inmem/inmem.go index 529949f4ac..2a9198f6af 100644 --- a/sdk/physical/inmem/inmem.go +++ b/sdk/physical/inmem/inmem.go @@ -65,13 +65,15 @@ type TransactionalInmemBackend struct { // Using Uber atomic because our SemGrep rules don't like the old pointer // trick we used above any more even though it's fine. The newer sync/atomic - // types are almost the same, but lack was to initialize them cleanly in New* + // types are almost the same, but lack ways to initialize them cleanly in New* // functions so sticking with what SemGrep likes for now. maxBatchEntries *uberAtomic.Int32 maxBatchSize *uberAtomic.Int32 largestBatchLen *uberAtomic.Uint64 largestBatchSize *uberAtomic.Uint64 + + transactionCompleteCh chan *txnCommitRequest } // NewInmem constructs a new in-memory backend @@ -362,9 +364,7 @@ func (t *TransactionalInmemBackend) Transaction(ctx context.Context, txns []*phy failGetInTxn := atomic.LoadUint32(t.failGetInTxn) size := uint64(0) for _, t := range txns { - // We use 2x key length to match the logic in WALBackend.persistWALs - // presumably this is attempting to account for some amount of encoding - // overhead. + // We use 2x key length to match the logic in WALBackend.persistWALs. size += uint64(2*len(t.Entry.Key) + len(t.Entry.Value)) if t.Operation == physical.GetOperation && failGetInTxn != 0 { return GetInTxnDisabledError @@ -378,7 +378,18 @@ func (t *TransactionalInmemBackend) Transaction(ctx context.Context, txns []*phy t.largestBatchLen.Store(uint64(len(txns))) } - return physical.GenericTransactionHandler(ctx, t, txns) + err := physical.GenericTransactionHandler(ctx, t, txns) + + // If we have a transactionCompleteCh set, we block on it before returning. + if t.transactionCompleteCh != nil { + req := &txnCommitRequest{ + txns: txns, + ch: make(chan struct{}), + } + t.transactionCompleteCh <- req + <-req.ch + } + return err } func (t *TransactionalInmemBackend) SetMaxBatchEntries(entries int) { @@ -396,3 +407,31 @@ func (t *TransactionalInmemBackend) TransactionLimits() (int, int) { func (t *TransactionalInmemBackend) BatchStats() (maxEntries uint64, maxSize uint64) { return t.largestBatchLen.Load(), t.largestBatchSize.Load() } + +// TxnCommitChan returns a channel that allows deterministic control of when +// transactions are executed. Each time `Transaction` is called on the backend, +// a txnCommitRequest is sent on the chan returned and then Transaction will +// block until Done is called on that request object. This allows tests to +// deterministically wait until a persist is actually in progress, as well as +// control when the persist completes. The returned chan is buffered with a +// length of 5 which should be enough to ensure that test code doesn't deadlock +// in normal operation since we typically only have one outstanding transaction +// at at time. +func (t *TransactionalInmemBackend) TxnCommitChan() <-chan *txnCommitRequest { + t.Lock() + defer t.Unlock() + + ch := make(chan *txnCommitRequest, 5) + t.transactionCompleteCh = ch + + return ch +} + +type txnCommitRequest struct { + txns []*physical.TxnEntry + ch chan struct{} +} + +func (r *txnCommitRequest) Commit() { + close(r.ch) +}