Overload error support for Adaptive Overload Protection (Enterprise) (#26688)

* Overload error support for Enterprise

* Remove TODO comment
This commit is contained in:
Paul Banks 2024-04-29 22:11:23 +01:00 committed by GitHub
parent 5b845c83ff
commit c839854483
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 52 additions and 5 deletions

View File

@ -1021,6 +1021,11 @@ func request(core *vault.Core, w http.ResponseWriter, rawReq *http.Request, r *l
}
resp.AddWarning("Timeout hit while waiting for local replicated cluster to apply primary's write; this client may encounter stale reads of values written during this operation.")
}
if errwrap.Contains(err, consts.ErrOverloaded.Error()) {
logical.RespondWithStatusCode(resp, r, http.StatusServiceUnavailable)
respondError(w, http.StatusServiceUnavailable, err)
return resp, false, false
}
if errwrap.Contains(err, consts.ErrStandby.Error()) {
respondStandby(core, w, rawReq.URL)
return resp, false, false

View File

@ -25,4 +25,7 @@ var (
// ErrInvalidWrappingToken is returned when checking for the validity of
// a wrapping token that turns out to be invalid.
ErrInvalidWrappingToken = errors.New("wrapping token is not valid or does not exist")
// ErrOverloaded indicates the Vault server is at capacity.
ErrOverloaded = errors.New("overloaded, try again later")
)

View File

@ -65,13 +65,15 @@ type TransactionalInmemBackend struct {
// Using Uber atomic because our SemGrep rules don't like the old pointer
// trick we used above any more even though it's fine. The newer sync/atomic
// types are almost the same, but lack was to initialize them cleanly in New*
// types are almost the same, but lack ways to initialize them cleanly in New*
// functions so sticking with what SemGrep likes for now.
maxBatchEntries *uberAtomic.Int32
maxBatchSize *uberAtomic.Int32
largestBatchLen *uberAtomic.Uint64
largestBatchSize *uberAtomic.Uint64
transactionCompleteCh chan *txnCommitRequest
}
// NewInmem constructs a new in-memory backend
@ -362,9 +364,7 @@ func (t *TransactionalInmemBackend) Transaction(ctx context.Context, txns []*phy
failGetInTxn := atomic.LoadUint32(t.failGetInTxn)
size := uint64(0)
for _, t := range txns {
// We use 2x key length to match the logic in WALBackend.persistWALs
// presumably this is attempting to account for some amount of encoding
// overhead.
// We use 2x key length to match the logic in WALBackend.persistWALs.
size += uint64(2*len(t.Entry.Key) + len(t.Entry.Value))
if t.Operation == physical.GetOperation && failGetInTxn != 0 {
return GetInTxnDisabledError
@ -378,7 +378,18 @@ func (t *TransactionalInmemBackend) Transaction(ctx context.Context, txns []*phy
t.largestBatchLen.Store(uint64(len(txns)))
}
return physical.GenericTransactionHandler(ctx, t, txns)
err := physical.GenericTransactionHandler(ctx, t, txns)
// If we have a transactionCompleteCh set, we block on it before returning.
if t.transactionCompleteCh != nil {
req := &txnCommitRequest{
txns: txns,
ch: make(chan struct{}),
}
t.transactionCompleteCh <- req
<-req.ch
}
return err
}
func (t *TransactionalInmemBackend) SetMaxBatchEntries(entries int) {
@ -396,3 +407,31 @@ func (t *TransactionalInmemBackend) TransactionLimits() (int, int) {
func (t *TransactionalInmemBackend) BatchStats() (maxEntries uint64, maxSize uint64) {
return t.largestBatchLen.Load(), t.largestBatchSize.Load()
}
// TxnCommitChan returns a channel that allows deterministic control of when
// transactions are executed. Each time `Transaction` is called on the backend,
// a txnCommitRequest is sent on the chan returned and then Transaction will
// block until Done is called on that request object. This allows tests to
// deterministically wait until a persist is actually in progress, as well as
// control when the persist completes. The returned chan is buffered with a
// length of 5 which should be enough to ensure that test code doesn't deadlock
// in normal operation since we typically only have one outstanding transaction
// at at time.
func (t *TransactionalInmemBackend) TxnCommitChan() <-chan *txnCommitRequest {
t.Lock()
defer t.Unlock()
ch := make(chan *txnCommitRequest, 5)
t.transactionCompleteCh = ch
return ch
}
type txnCommitRequest struct {
txns []*physical.TxnEntry
ch chan struct{}
}
func (r *txnCommitRequest) Commit() {
close(r.ch)
}