Vault-1983: Use fairsharing to distribute workers between queues (#11789)

* prelim fairshare prototype, untested and prototype status

* add tests for new fairshare infra - this likely fails tests for being racy

* probably fix races for code and test

* one more lock to fix for races

* fairsharing queue work distribution, tests, fixes, etc

* comment, shorten wait time

* typos and comments

* fix inverted worker count logic

* Update helper/fairshare/jobmanager.go

typo

* Update helper/fairshare/jobmanager.go

clarify comment

* move back to round robin between queues

* improvements from self review

* add job manager stress test
This commit is contained in:
swayne275 2021-06-25 14:06:49 -06:00 committed by GitHub
parent 9fbd002207
commit ccddbb6192
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 428 additions and 73 deletions

View File

@ -4,7 +4,9 @@ import (
"container/list"
"fmt"
"io/ioutil"
"math"
"sync"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
@ -13,37 +15,33 @@ import (
"github.com/hashicorp/vault/sdk/helper/logging"
)
/*
Future Work:
- track workers per queue. this will involve things like:
- somehow wrap the Execute/OnFailure functions to increment counter when
they start running, and decrement when they stop running
-- put a queue.IncrementCounter() call at the beginning
-- call the provided work function in the middle
-- put a queue.DecrementCounter() call at the end
- job has a queueID or reference to the queue
- queue only removed when empty AND no workers
*/
type JobManager struct {
name string
queues map[string]*list.List
queuesIndex []string
lastQueueAccessed int
quit chan struct{}
newWork chan struct{} // must be buffered
workerPool *dispatcher
onceStart sync.Once
onceStop sync.Once
logger log.Logger
totalJobs int
metricSink *metricsutil.ClusterMetricSink
name string
queues map[string]*list.List
quit chan struct{}
newWork chan struct{} // must be buffered
workerPool *dispatcher
workerCount map[string]int
onceStart sync.Once
onceStop sync.Once
logger log.Logger
totalJobs int
metricSink *metricsutil.ClusterMetricSink
// waitgroup for testing stop functionality
wg sync.WaitGroup
// protects `queues`, `queuesIndex`, `lastQueueAccessed`
// protects `queues`, `workerCount`, `queuesIndex`, `lastQueueAccessed`
l sync.RWMutex
// track queues by index for round robin worker assignment
queuesIndex []string
lastQueueAccessed int
}
// NewJobManager creates a job manager, with an optional name
@ -66,13 +64,14 @@ func NewJobManager(name string, numWorkers int, l log.Logger, metricSink *metric
j := JobManager{
name: name,
queues: make(map[string]*list.List),
queuesIndex: make([]string, 0),
lastQueueAccessed: -1,
quit: make(chan struct{}),
newWork: make(chan struct{}, 1),
workerPool: wp,
workerCount: make(map[string]int),
logger: l,
metricSink: metricSink,
queuesIndex: make([]string, 0),
lastQueueAccessed: -1,
}
j.logger.Trace("created job manager", "name", name, "pool_size", numWorkers)
@ -138,11 +137,12 @@ func (j *JobManager) GetPendingJobCount() int {
// GetWorkerCounts() returns a map of queue ID to number of active workers
func (j *JobManager) GetWorkerCounts() map[string]int {
// TODO implement with VLT-145
return nil
j.l.RLock()
defer j.l.RUnlock()
return j.workerCount
}
// GetWorkQueueLengths() returns a map of queue ID to number of active workers
// GetWorkQueueLengths() returns a map of queue ID to number of jobs in the queue
func (j *JobManager) GetWorkQueueLengths() map[string]int {
out := make(map[string]int)
@ -156,20 +156,23 @@ func (j *JobManager) GetWorkQueueLengths() map[string]int {
return out
}
// getNextJob grabs the next job to be processed and prunes empty queues
func (j *JobManager) getNextJob() Job {
// getNextJob pops the next job to be processed and prunes empty queues
// it also returns the ID of the queue the job is associated with
func (j *JobManager) getNextJob() (Job, string) {
j.l.Lock()
defer j.l.Unlock()
if len(j.queues) == 0 {
return nil
return nil, ""
}
j.lastQueueAccessed = (j.lastQueueAccessed + 1) % len(j.queuesIndex)
queueID := j.queuesIndex[j.lastQueueAccessed]
queueID, canAssignWorker := j.getNextQueue()
if !canAssignWorker {
return nil, ""
}
jobElement := j.queues[queueID].Front()
out := j.queues[queueID].Remove(jobElement)
jobRaw := j.queues[queueID].Remove(jobElement)
j.totalJobs--
@ -179,10 +182,81 @@ func (j *JobManager) getNextJob() Job {
}
if j.queues[queueID].Len() == 0 {
// we remove the empty queue, but we don't remove the worker count
// in case we are still working on previous jobs from this queue.
// worker count cleanup is handled in j.decrementWorkerCount
j.removeLastQueueAccessed()
}
return out.(Job)
return jobRaw.(Job), queueID
}
// returns the next queue to assign work from, and a bool if there is a queue
// that can have a worker assigned. if there is work to be assigned,
// j.lastQueueAccessed will be updated to that queue.
// note: this must be called with j.l held
func (j *JobManager) getNextQueue() (string, bool) {
var nextQueue string
var canAssignWorker bool
// ensure we loop through all existing queues until we find an eligible
// queue, if one exists.
queueIdx := j.nextQueueIndex(j.lastQueueAccessed)
for i := 0; i < len(j.queuesIndex); i++ {
potentialQueueID := j.queuesIndex[queueIdx]
if !j.queueWorkersSaturated(potentialQueueID) {
nextQueue = potentialQueueID
canAssignWorker = true
j.lastQueueAccessed = queueIdx
break
}
queueIdx = j.nextQueueIndex(queueIdx)
}
return nextQueue, canAssignWorker
}
// get the index of the next queue in round-robin order
// note: this must be called with j.l held
func (j *JobManager) nextQueueIndex(currentIdx int) int {
return (currentIdx + 1) % len(j.queuesIndex)
}
// returns true if there are already too many workers on this queue
// note: this must be called with j.l held (at least for read).
// note: we may want to eventually factor in queue length relative to num queues
func (j *JobManager) queueWorkersSaturated(queueID string) bool {
numActiveQueues := float64(len(j.queues))
numTotalWorkers := float64(j.workerPool.numWorkers)
maxWorkersPerQueue := math.Ceil(0.9 * numTotalWorkers / numActiveQueues)
numWorkersPerQueue := j.workerCount
return numWorkersPerQueue[queueID] >= int(maxWorkersPerQueue)
}
// increment the worker count for this queue
func (j *JobManager) incrementWorkerCount(queueID string) {
j.l.Lock()
defer j.l.Unlock()
j.workerCount[queueID]++
}
// decrement the worker count for this queue
// this also removes worker tracking for this queue if needed
func (j *JobManager) decrementWorkerCount(queueID string) {
j.l.Lock()
defer j.l.Unlock()
j.workerCount[queueID]--
_, queueExists := j.queues[queueID]
if !queueExists && j.workerCount[queueID] < 1 {
delete(j.workerCount, queueID)
}
}
// assignWork continually loops checks for new jobs and dispatches them to the
@ -203,39 +277,56 @@ func (j *JobManager) assignWork() {
default:
}
job := j.getNextJob()
job, queueID := j.getNextJob()
if job != nil {
j.workerPool.dispatch(job)
j.workerPool.dispatch(job,
func() {
j.incrementWorkerCount(queueID)
},
func() {
j.decrementWorkerCount(queueID)
})
} else {
break
}
}
// listen for a wake-up when an emtpy job manager has been given
// new work
select {
case <-j.quit:
j.wg.Done()
return
case <-j.newWork:
break
// listen for wake-up when an empty job manager has been given work
case <-time.After(50 * time.Millisecond):
// periodically check if new workers can be assigned. with the
// fairsharing worker distribution it can be the case that there
// is work waiting, but no queues are eligible for another worker
}
}
}()
}
// addQueue generates a new queue if a queue for `queueID` doesn't exist
// note: this must be called with l held for write
// it also starts tracking workers on that queue, if not already tracked
// note: this must be called with j.l held for write
func (j *JobManager) addQueue(queueID string) {
if _, ok := j.queues[queueID]; !ok {
j.queues[queueID] = list.New()
j.queuesIndex = append(j.queuesIndex, queueID)
}
// it's possible the queue ran out of work and was pruned, but there were
// still workers operating on data formerly in that queue, which were still
// being tracked. if that is the case, we don't want to wipe out that worker
// count when the queue is re-initialized.
if _, ok := j.workerCount[queueID]; !ok {
j.workerCount[queueID] = 0
}
}
// removeLastQueueAccessed removes the queue and index map for the last queue
// accessed. It is to be used when the last queue accessed has emptied.
// note: this must be called with l held for write
// removes the queue and index tracker for the last queue accessed.
// it is to be used when the last queue accessed has emptied.
// note: this must be called with j.l held.
func (j *JobManager) removeLastQueueAccessed() {
if j.lastQueueAccessed == -1 || j.lastQueueAccessed > len(j.queuesIndex)-1 {
j.logger.Warn("call to remove queue out of bounds", "idx", j.lastQueueAccessed)

View File

@ -312,10 +312,10 @@ func TestJobManager_GetWorkQueueLengths(t *testing.T) {
j.AddJob(&job, queueID)
if _, ok := expected[queueID]; !ok {
expected[queueID] = 1
} else {
expected[queueID]++
expected[queueID] = 0
}
expected[queueID]++
}
pendingJobs := j.GetWorkQueueLengths()
@ -327,10 +327,6 @@ func TestJobManager_GetWorkQueueLengths(t *testing.T) {
func TestJobManager_removeLastQueueAccessed(t *testing.T) {
j := NewJobManager("job-mgr-test", 1, newTestLogger("jobmanager-test"), nil)
j.addQueue("queue-0")
j.addQueue("queue-1")
j.addQueue("queue-2")
testCases := []struct {
lastQueueAccessed int
updatedLastQueueAccessed int
@ -376,6 +372,11 @@ func TestJobManager_removeLastQueueAccessed(t *testing.T) {
j.l.Lock()
defer j.l.Unlock()
j.addQueue("queue-0")
j.addQueue("queue-1")
j.addQueue("queue-2")
for _, tc := range testCases {
j.lastQueueAccessed = tc.lastQueueAccessed
j.removeLastQueueAccessed()
@ -398,7 +399,7 @@ func TestJobManager_removeLastQueueAccessed(t *testing.T) {
}
}
func TestJobManager_getNextJob(t *testing.T) {
func TestJobManager_EndToEnd(t *testing.T) {
testCases := []struct {
name string
queueID string
@ -439,17 +440,11 @@ func TestJobManager_getNextJob(t *testing.T) {
expectedOrder := []string{"job-1", "job-2", "job-4", "job-3", "job-5"}
// use one worker to guarantee ordering
numWorkers := 1
resultsCh := make(chan string)
defer close(resultsCh)
j := NewJobManager("test-job-mgr", numWorkers, newTestLogger("jobmanager-test"), nil)
doneCh := make(chan struct{})
var mu sync.Mutex
order := make([]string, 0)
timeout := time.After(5 * time.Second)
go func() {
for {
@ -475,6 +470,8 @@ func TestJobManager_getNextJob(t *testing.T) {
}
onFail := func(_ error) {}
// use one worker to guarantee ordering
j := NewJobManager("test-job-mgr", 1, newTestLogger("jobmanager-test"), nil)
for _, tc := range testCases {
wg.Add(1)
job := newTestJob(t, tc.name, ex, onFail)
@ -484,11 +481,13 @@ func TestJobManager_getNextJob(t *testing.T) {
j.Start()
defer j.Stop()
doneCh := make(chan struct{})
go func() {
wg.Wait()
doneCh <- struct{}{}
}()
timeout := time.After(5 * time.Second)
select {
case <-doneCh:
break
@ -503,9 +502,245 @@ func TestJobManager_getNextJob(t *testing.T) {
}
}
func TestFairshare_StressTest(t *testing.T) {
var wg sync.WaitGroup
ex := func(name string) error {
wg.Done()
return nil
}
onFail := func(_ error) {}
j := NewJobManager("test-job-mgr", 15, nil, nil)
j.Start()
defer j.Stop()
for i := 0; i < 3000; i++ {
wg.Add(1)
job := newTestJob(t, fmt.Sprintf("a-job-%d", i), ex, onFail)
j.AddJob(&job, "a")
}
for i := 0; i < 4000; i++ {
wg.Add(1)
job := newTestJob(t, fmt.Sprintf("b-job-%d", i), ex, onFail)
j.AddJob(&job, "b")
}
for i := 0; i < 3000; i++ {
wg.Add(1)
job := newTestJob(t, fmt.Sprintf("c-job-%d", i), ex, onFail)
j.AddJob(&job, "c")
}
doneCh := make(chan struct{})
go func() {
wg.Wait()
doneCh <- struct{}{}
}()
timeout := time.After(5 * time.Second)
select {
case <-doneCh:
break
case <-timeout:
t.Fatal("timed out")
}
}
func TestFairshare_nilLoggerJobManager(t *testing.T) {
j := NewJobManager("test-job-mgr", 1, nil, nil)
if j.logger == nil {
t.Error("logger not set up properly")
}
}
func TestFairshare_getNextQueue(t *testing.T) {
j := NewJobManager("test-job-mgr", 18, nil, nil)
for i := 0; i < 10; i++ {
job := newDefaultTestJob(t, fmt.Sprintf("job-%d", i))
j.AddJob(&job, "a")
j.AddJob(&job, "b")
j.AddJob(&job, "c")
}
j.l.Lock()
defer j.l.Unlock()
// fake out some number of workers with various remaining work scenario
// no queue can be assigned more than 6 workers
j.workerCount["a"] = 1
j.workerCount["b"] = 2
j.workerCount["c"] = 5
expectedOrder := []string{"a", "b", "c", "a", "b", "a", "b", "a", "b", "a"}
for _, expectedQueueID := range expectedOrder {
queueID, canAssignWorker := j.getNextQueue()
if !canAssignWorker {
t.Fatalf("expected have work true, got false for queue %q", queueID)
}
if queueID != expectedQueueID {
t.Errorf("expected queueID %q, got %q", expectedQueueID, queueID)
}
// simulate a worker being added to that queue
j.workerCount[queueID]++
}
// queues are saturated with work, we shouldn't be able to find a queue
// eligible for a worker (and last accessed queue shouldn't update)
expectedLastQueueAccessed := j.lastQueueAccessed
queueID, canAssignWork := j.getNextQueue()
if canAssignWork {
t.Error("should not be able to assign work with all queues saturated")
}
if queueID != "" {
t.Errorf("expected no queueID, got %s", queueID)
}
if j.lastQueueAccessed != expectedLastQueueAccessed {
t.Errorf("expected no last queue accessed update. had %d, got %d", expectedLastQueueAccessed, j.lastQueueAccessed)
}
}
func TestJobManager_pruneEmptyQueues(t *testing.T) {
j := NewJobManager("test-job-mgr", 18, nil, nil)
// add a few jobs to test out queue pruning
// for test simplicity, we'll keep the number of workers per queue at 0
testJob := newDefaultTestJob(t, "job-0")
j.AddJob(&testJob, "a")
j.AddJob(&testJob, "a")
j.AddJob(&testJob, "b")
job, queueID := j.getNextJob()
if queueID != "a" || job == nil {
t.Fatalf("bad next job: queueID %s, job: %#v", queueID, job)
}
j.l.RLock()
if _, ok := j.queues["a"]; !ok {
t.Error("expected queue 'a' to exist")
}
if _, ok := j.queues["b"]; !ok {
t.Error("expected queue 'b' to exist")
}
j.l.RUnlock()
job, queueID = j.getNextJob()
if queueID != "b" || job == nil {
t.Fatalf("bad next job: queueID %s, job: %#v", queueID, job)
}
j.l.RLock()
if _, ok := j.queues["a"]; !ok {
t.Error("expected queue 'a' to exist")
}
if _, ok := j.queues["b"]; ok {
t.Error("expected queue 'b' to be pruned")
}
j.l.RUnlock()
job, queueID = j.getNextJob()
if queueID != "a" || job == nil {
t.Fatalf("bad next job: queueID %s, job: %#v", queueID, job)
}
j.l.RLock()
if _, ok := j.queues["a"]; ok {
t.Error("expected queue 'a' to be pruned")
}
if _, ok := j.queues["b"]; ok {
t.Error("expected queue 'b' to be pruned")
}
j.l.RUnlock()
job, queueID = j.getNextJob()
if job != nil {
t.Errorf("expected no more jobs (out of queues). queueID: %s, job: %#v", queueID, job)
}
}
func TestFairshare_WorkerCount_IncrementAndDecrement(t *testing.T) {
j := NewJobManager("test-job-mgr", 18, nil, nil)
job := newDefaultTestJob(t, "job-0")
j.AddJob(&job, "a")
j.AddJob(&job, "b")
j.AddJob(&job, "c")
// test to make sure increment works
j.incrementWorkerCount("a")
workerCounts := j.GetWorkerCounts()
if workerCounts["a"] != 1 {
t.Fatalf("expected 1 worker on 'a', got %d", workerCounts["a"])
}
if workerCounts["b"] != 0 {
t.Fatalf("expected 0 workers on 'b', got %d", workerCounts["b"])
}
if workerCounts["c"] != 0 {
t.Fatalf("expected 0 workers on 'c', got %d", workerCounts["c"])
}
// test to make sure decrement works (when there is still work for the queue)
j.decrementWorkerCount("a")
workerCounts = j.GetWorkerCounts()
if workerCounts["a"] != 0 {
t.Fatalf("expected 0 workers on 'a', got %d", workerCounts["a"])
}
// add a worker to queue "a" and remove all work to ensure worker count gets
// cleared out for "a"
j.incrementWorkerCount("a")
j.l.Lock()
delete(j.queues, "a")
j.l.Unlock()
j.decrementWorkerCount("a")
workerCounts = j.GetWorkerCounts()
if _, ok := workerCounts["a"]; ok {
t.Fatalf("expected no worker count for 'a', got %#v", workerCounts)
}
}
func TestFairshare_queueWorkersSaturated(t *testing.T) {
j := NewJobManager("test-job-mgr", 20, nil, nil)
job := newDefaultTestJob(t, "job-0")
j.AddJob(&job, "a")
j.AddJob(&job, "b")
// no more than 9 workers can be assigned to a single queue in this example
for i := 0; i < 8; i++ {
j.incrementWorkerCount("a")
j.incrementWorkerCount("b")
j.l.RLock()
if j.queueWorkersSaturated("a") {
j.l.RUnlock()
t.Fatalf("queue 'a' falsely saturated: %#v", j.GetWorkerCounts())
}
if j.queueWorkersSaturated("b") {
j.l.RUnlock()
t.Fatalf("queue 'b' falsely saturated: %#v", j.GetWorkerCounts())
}
j.l.RUnlock()
}
// adding the 9th and 10th workers should saturate the number of workers we
// can have per queue
for i := 8; i < 10; i++ {
j.incrementWorkerCount("a")
j.incrementWorkerCount("b")
j.l.RLock()
if !j.queueWorkersSaturated("a") {
j.l.RUnlock()
t.Fatalf("queue 'a' falsely unsaturated: %#v", j.GetWorkerCounts())
}
if !j.queueWorkersSaturated("b") {
j.l.RUnlock()
t.Fatalf("queue 'b' falsely unsaturated: %#v", j.GetWorkerCounts())
}
j.l.RUnlock()
}
}

View File

@ -12,14 +12,28 @@ import (
// Job is an interface for jobs used with this job manager
type Job interface {
// Execute performs the work.
// It should be synchronous if a cleanupFn is provided.
Execute() error
// OnFailure handles the error resulting from a failed Execute().
// It should be synchronous if a cleanupFn is provided.
OnFailure(err error)
}
type initFn func()
type cleanupFn func()
type wrappedJob struct {
job Job
init initFn
cleanup cleanupFn
}
// worker represents a single worker in a pool
type worker struct {
name string
jobCh <-chan Job
jobCh <-chan wrappedJob
quit chan struct{}
logger log.Logger
@ -37,10 +51,18 @@ func (w *worker) start() {
case <-w.quit:
w.wg.Done()
return
case job := <-w.jobCh:
err := job.Execute()
case wJob := <-w.jobCh:
if wJob.init != nil {
wJob.init()
}
err := wJob.job.Execute()
if err != nil {
job.OnFailure(err)
wJob.job.OnFailure(err)
}
if wJob.cleanup != nil {
wJob.cleanup()
}
}
}
@ -52,7 +74,7 @@ type dispatcher struct {
name string
numWorkers int
workers []worker
jobCh chan Job
jobCh chan wrappedJob
onceStart sync.Once
onceStop sync.Once
quit chan struct{}
@ -68,10 +90,17 @@ func newDispatcher(name string, numWorkers int, l log.Logger) *dispatcher {
return d
}
// dispatch dispatches a job to the worker pool
func (d *dispatcher) dispatch(job Job) {
// dispatch dispatches a job to the worker pool, with optional initialization
// and cleanup functions (useful for tracking job progress)
func (d *dispatcher) dispatch(job Job, init initFn, cleanup cleanupFn) {
wJob := wrappedJob{
init: init,
job: job,
cleanup: cleanup,
}
select {
case d.jobCh <- job:
case d.jobCh <- wJob:
case <-d.quit:
d.logger.Info("shutting down during dispatch")
}
@ -123,7 +152,7 @@ func createDispatcher(name string, numWorkers int, l log.Logger) *dispatcher {
name: name,
numWorkers: numWorkers,
workers: make([]worker, 0),
jobCh: make(chan Job),
jobCh: make(chan wrappedJob),
quit: make(chan struct{}),
logger: l,
wg: &wg,

View File

@ -184,7 +184,7 @@ func TestFairshare_startWorker(t *testing.T) {
timeout := time.After(5 * time.Second)
wg.Add(1)
d.dispatch(&job)
d.dispatch(&job, nil, nil)
go func() {
wg.Wait()
doneCh <- struct{}{}
@ -222,7 +222,7 @@ func TestFairshare_start(t *testing.T) {
for i := 0; i < numJobs; i++ {
job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail)
d.dispatch(&job)
d.dispatch(&job, nil, nil)
}
select {
@ -318,7 +318,7 @@ func TestFairshare_dispatch(t *testing.T) {
go func() {
for _, id := range expectedIDs {
job := newTestJob(t, id, ex, onFail)
d.dispatch(&job)
d.dispatch(&job, nil, nil)
}
}()
@ -376,7 +376,7 @@ func TestFairshare_jobFailure(t *testing.T) {
for i := 0; i < numJobs; i++ {
job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail)
d.dispatch(&job)
d.dispatch(&job, nil, nil)
}
select {