diff --git a/helper/fairshare/jobmanager.go b/helper/fairshare/jobmanager.go index 9b9e924e83..a878d29fc4 100644 --- a/helper/fairshare/jobmanager.go +++ b/helper/fairshare/jobmanager.go @@ -4,7 +4,9 @@ import ( "container/list" "fmt" "io/ioutil" + "math" "sync" + "time" "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" @@ -13,37 +15,33 @@ import ( "github.com/hashicorp/vault/sdk/helper/logging" ) -/* -Future Work: -- track workers per queue. this will involve things like: - - somehow wrap the Execute/OnFailure functions to increment counter when - they start running, and decrement when they stop running - -- put a queue.IncrementCounter() call at the beginning - -- call the provided work function in the middle - -- put a queue.DecrementCounter() call at the end - - job has a queueID or reference to the queue -- queue only removed when empty AND no workers -*/ - type JobManager struct { - name string - queues map[string]*list.List - queuesIndex []string - lastQueueAccessed int - quit chan struct{} - newWork chan struct{} // must be buffered - workerPool *dispatcher - onceStart sync.Once - onceStop sync.Once - logger log.Logger - totalJobs int - metricSink *metricsutil.ClusterMetricSink + name string + queues map[string]*list.List + + quit chan struct{} + newWork chan struct{} // must be buffered + + workerPool *dispatcher + workerCount map[string]int + + onceStart sync.Once + onceStop sync.Once + + logger log.Logger + + totalJobs int + metricSink *metricsutil.ClusterMetricSink // waitgroup for testing stop functionality wg sync.WaitGroup - // protects `queues`, `queuesIndex`, `lastQueueAccessed` + // protects `queues`, `workerCount`, `queuesIndex`, `lastQueueAccessed` l sync.RWMutex + + // track queues by index for round robin worker assignment + queuesIndex []string + lastQueueAccessed int } // NewJobManager creates a job manager, with an optional name @@ -66,13 +64,14 @@ func NewJobManager(name string, numWorkers int, l log.Logger, metricSink *metric j := JobManager{ name: name, queues: make(map[string]*list.List), - queuesIndex: make([]string, 0), - lastQueueAccessed: -1, quit: make(chan struct{}), newWork: make(chan struct{}, 1), workerPool: wp, + workerCount: make(map[string]int), logger: l, metricSink: metricSink, + queuesIndex: make([]string, 0), + lastQueueAccessed: -1, } j.logger.Trace("created job manager", "name", name, "pool_size", numWorkers) @@ -138,11 +137,12 @@ func (j *JobManager) GetPendingJobCount() int { // GetWorkerCounts() returns a map of queue ID to number of active workers func (j *JobManager) GetWorkerCounts() map[string]int { - // TODO implement with VLT-145 - return nil + j.l.RLock() + defer j.l.RUnlock() + return j.workerCount } -// GetWorkQueueLengths() returns a map of queue ID to number of active workers +// GetWorkQueueLengths() returns a map of queue ID to number of jobs in the queue func (j *JobManager) GetWorkQueueLengths() map[string]int { out := make(map[string]int) @@ -156,20 +156,23 @@ func (j *JobManager) GetWorkQueueLengths() map[string]int { return out } -// getNextJob grabs the next job to be processed and prunes empty queues -func (j *JobManager) getNextJob() Job { +// getNextJob pops the next job to be processed and prunes empty queues +// it also returns the ID of the queue the job is associated with +func (j *JobManager) getNextJob() (Job, string) { j.l.Lock() defer j.l.Unlock() if len(j.queues) == 0 { - return nil + return nil, "" } - j.lastQueueAccessed = (j.lastQueueAccessed + 1) % len(j.queuesIndex) - queueID := j.queuesIndex[j.lastQueueAccessed] + queueID, canAssignWorker := j.getNextQueue() + if !canAssignWorker { + return nil, "" + } jobElement := j.queues[queueID].Front() - out := j.queues[queueID].Remove(jobElement) + jobRaw := j.queues[queueID].Remove(jobElement) j.totalJobs-- @@ -179,10 +182,81 @@ func (j *JobManager) getNextJob() Job { } if j.queues[queueID].Len() == 0 { + // we remove the empty queue, but we don't remove the worker count + // in case we are still working on previous jobs from this queue. + // worker count cleanup is handled in j.decrementWorkerCount j.removeLastQueueAccessed() } - return out.(Job) + return jobRaw.(Job), queueID +} + +// returns the next queue to assign work from, and a bool if there is a queue +// that can have a worker assigned. if there is work to be assigned, +// j.lastQueueAccessed will be updated to that queue. +// note: this must be called with j.l held +func (j *JobManager) getNextQueue() (string, bool) { + var nextQueue string + var canAssignWorker bool + + // ensure we loop through all existing queues until we find an eligible + // queue, if one exists. + queueIdx := j.nextQueueIndex(j.lastQueueAccessed) + for i := 0; i < len(j.queuesIndex); i++ { + potentialQueueID := j.queuesIndex[queueIdx] + + if !j.queueWorkersSaturated(potentialQueueID) { + nextQueue = potentialQueueID + canAssignWorker = true + j.lastQueueAccessed = queueIdx + break + } + + queueIdx = j.nextQueueIndex(queueIdx) + } + + return nextQueue, canAssignWorker +} + +// get the index of the next queue in round-robin order +// note: this must be called with j.l held +func (j *JobManager) nextQueueIndex(currentIdx int) int { + return (currentIdx + 1) % len(j.queuesIndex) +} + +// returns true if there are already too many workers on this queue +// note: this must be called with j.l held (at least for read). +// note: we may want to eventually factor in queue length relative to num queues +func (j *JobManager) queueWorkersSaturated(queueID string) bool { + numActiveQueues := float64(len(j.queues)) + numTotalWorkers := float64(j.workerPool.numWorkers) + maxWorkersPerQueue := math.Ceil(0.9 * numTotalWorkers / numActiveQueues) + + numWorkersPerQueue := j.workerCount + + return numWorkersPerQueue[queueID] >= int(maxWorkersPerQueue) +} + +// increment the worker count for this queue +func (j *JobManager) incrementWorkerCount(queueID string) { + j.l.Lock() + defer j.l.Unlock() + + j.workerCount[queueID]++ +} + +// decrement the worker count for this queue +// this also removes worker tracking for this queue if needed +func (j *JobManager) decrementWorkerCount(queueID string) { + j.l.Lock() + defer j.l.Unlock() + + j.workerCount[queueID]-- + + _, queueExists := j.queues[queueID] + if !queueExists && j.workerCount[queueID] < 1 { + delete(j.workerCount, queueID) + } } // assignWork continually loops checks for new jobs and dispatches them to the @@ -203,39 +277,56 @@ func (j *JobManager) assignWork() { default: } - job := j.getNextJob() + job, queueID := j.getNextJob() if job != nil { - j.workerPool.dispatch(job) + j.workerPool.dispatch(job, + func() { + j.incrementWorkerCount(queueID) + }, + func() { + j.decrementWorkerCount(queueID) + }) } else { break } } - // listen for a wake-up when an emtpy job manager has been given - // new work select { case <-j.quit: j.wg.Done() return case <-j.newWork: - break + // listen for wake-up when an empty job manager has been given work + case <-time.After(50 * time.Millisecond): + // periodically check if new workers can be assigned. with the + // fairsharing worker distribution it can be the case that there + // is work waiting, but no queues are eligible for another worker } } }() } // addQueue generates a new queue if a queue for `queueID` doesn't exist -// note: this must be called with l held for write +// it also starts tracking workers on that queue, if not already tracked +// note: this must be called with j.l held for write func (j *JobManager) addQueue(queueID string) { if _, ok := j.queues[queueID]; !ok { j.queues[queueID] = list.New() j.queuesIndex = append(j.queuesIndex, queueID) } + + // it's possible the queue ran out of work and was pruned, but there were + // still workers operating on data formerly in that queue, which were still + // being tracked. if that is the case, we don't want to wipe out that worker + // count when the queue is re-initialized. + if _, ok := j.workerCount[queueID]; !ok { + j.workerCount[queueID] = 0 + } } -// removeLastQueueAccessed removes the queue and index map for the last queue -// accessed. It is to be used when the last queue accessed has emptied. -// note: this must be called with l held for write +// removes the queue and index tracker for the last queue accessed. +// it is to be used when the last queue accessed has emptied. +// note: this must be called with j.l held. func (j *JobManager) removeLastQueueAccessed() { if j.lastQueueAccessed == -1 || j.lastQueueAccessed > len(j.queuesIndex)-1 { j.logger.Warn("call to remove queue out of bounds", "idx", j.lastQueueAccessed) diff --git a/helper/fairshare/jobmanager_test.go b/helper/fairshare/jobmanager_test.go index bab9db473d..d903147822 100644 --- a/helper/fairshare/jobmanager_test.go +++ b/helper/fairshare/jobmanager_test.go @@ -312,10 +312,10 @@ func TestJobManager_GetWorkQueueLengths(t *testing.T) { j.AddJob(&job, queueID) if _, ok := expected[queueID]; !ok { - expected[queueID] = 1 - } else { - expected[queueID]++ + expected[queueID] = 0 } + + expected[queueID]++ } pendingJobs := j.GetWorkQueueLengths() @@ -327,10 +327,6 @@ func TestJobManager_GetWorkQueueLengths(t *testing.T) { func TestJobManager_removeLastQueueAccessed(t *testing.T) { j := NewJobManager("job-mgr-test", 1, newTestLogger("jobmanager-test"), nil) - j.addQueue("queue-0") - j.addQueue("queue-1") - j.addQueue("queue-2") - testCases := []struct { lastQueueAccessed int updatedLastQueueAccessed int @@ -376,6 +372,11 @@ func TestJobManager_removeLastQueueAccessed(t *testing.T) { j.l.Lock() defer j.l.Unlock() + + j.addQueue("queue-0") + j.addQueue("queue-1") + j.addQueue("queue-2") + for _, tc := range testCases { j.lastQueueAccessed = tc.lastQueueAccessed j.removeLastQueueAccessed() @@ -398,7 +399,7 @@ func TestJobManager_removeLastQueueAccessed(t *testing.T) { } } -func TestJobManager_getNextJob(t *testing.T) { +func TestJobManager_EndToEnd(t *testing.T) { testCases := []struct { name string queueID string @@ -439,17 +440,11 @@ func TestJobManager_getNextJob(t *testing.T) { expectedOrder := []string{"job-1", "job-2", "job-4", "job-3", "job-5"} - // use one worker to guarantee ordering - numWorkers := 1 resultsCh := make(chan string) defer close(resultsCh) - j := NewJobManager("test-job-mgr", numWorkers, newTestLogger("jobmanager-test"), nil) - - doneCh := make(chan struct{}) var mu sync.Mutex order := make([]string, 0) - timeout := time.After(5 * time.Second) go func() { for { @@ -475,6 +470,8 @@ func TestJobManager_getNextJob(t *testing.T) { } onFail := func(_ error) {} + // use one worker to guarantee ordering + j := NewJobManager("test-job-mgr", 1, newTestLogger("jobmanager-test"), nil) for _, tc := range testCases { wg.Add(1) job := newTestJob(t, tc.name, ex, onFail) @@ -484,11 +481,13 @@ func TestJobManager_getNextJob(t *testing.T) { j.Start() defer j.Stop() + doneCh := make(chan struct{}) go func() { wg.Wait() doneCh <- struct{}{} }() + timeout := time.After(5 * time.Second) select { case <-doneCh: break @@ -503,9 +502,245 @@ func TestJobManager_getNextJob(t *testing.T) { } } +func TestFairshare_StressTest(t *testing.T) { + var wg sync.WaitGroup + ex := func(name string) error { + wg.Done() + return nil + } + onFail := func(_ error) {} + + j := NewJobManager("test-job-mgr", 15, nil, nil) + j.Start() + defer j.Stop() + + for i := 0; i < 3000; i++ { + wg.Add(1) + job := newTestJob(t, fmt.Sprintf("a-job-%d", i), ex, onFail) + j.AddJob(&job, "a") + } + for i := 0; i < 4000; i++ { + wg.Add(1) + job := newTestJob(t, fmt.Sprintf("b-job-%d", i), ex, onFail) + j.AddJob(&job, "b") + } + for i := 0; i < 3000; i++ { + wg.Add(1) + job := newTestJob(t, fmt.Sprintf("c-job-%d", i), ex, onFail) + j.AddJob(&job, "c") + } + + doneCh := make(chan struct{}) + go func() { + wg.Wait() + doneCh <- struct{}{} + }() + + timeout := time.After(5 * time.Second) + select { + case <-doneCh: + break + case <-timeout: + t.Fatal("timed out") + } +} + func TestFairshare_nilLoggerJobManager(t *testing.T) { j := NewJobManager("test-job-mgr", 1, nil, nil) if j.logger == nil { t.Error("logger not set up properly") } } + +func TestFairshare_getNextQueue(t *testing.T) { + j := NewJobManager("test-job-mgr", 18, nil, nil) + + for i := 0; i < 10; i++ { + job := newDefaultTestJob(t, fmt.Sprintf("job-%d", i)) + j.AddJob(&job, "a") + j.AddJob(&job, "b") + j.AddJob(&job, "c") + } + + j.l.Lock() + defer j.l.Unlock() + + // fake out some number of workers with various remaining work scenario + // no queue can be assigned more than 6 workers + j.workerCount["a"] = 1 + j.workerCount["b"] = 2 + j.workerCount["c"] = 5 + + expectedOrder := []string{"a", "b", "c", "a", "b", "a", "b", "a", "b", "a"} + + for _, expectedQueueID := range expectedOrder { + queueID, canAssignWorker := j.getNextQueue() + + if !canAssignWorker { + t.Fatalf("expected have work true, got false for queue %q", queueID) + } + if queueID != expectedQueueID { + t.Errorf("expected queueID %q, got %q", expectedQueueID, queueID) + } + + // simulate a worker being added to that queue + j.workerCount[queueID]++ + } + + // queues are saturated with work, we shouldn't be able to find a queue + // eligible for a worker (and last accessed queue shouldn't update) + expectedLastQueueAccessed := j.lastQueueAccessed + queueID, canAssignWork := j.getNextQueue() + if canAssignWork { + t.Error("should not be able to assign work with all queues saturated") + } + if queueID != "" { + t.Errorf("expected no queueID, got %s", queueID) + } + if j.lastQueueAccessed != expectedLastQueueAccessed { + t.Errorf("expected no last queue accessed update. had %d, got %d", expectedLastQueueAccessed, j.lastQueueAccessed) + } +} + +func TestJobManager_pruneEmptyQueues(t *testing.T) { + j := NewJobManager("test-job-mgr", 18, nil, nil) + + // add a few jobs to test out queue pruning + // for test simplicity, we'll keep the number of workers per queue at 0 + testJob := newDefaultTestJob(t, "job-0") + j.AddJob(&testJob, "a") + j.AddJob(&testJob, "a") + j.AddJob(&testJob, "b") + + job, queueID := j.getNextJob() + if queueID != "a" || job == nil { + t.Fatalf("bad next job: queueID %s, job: %#v", queueID, job) + } + + j.l.RLock() + if _, ok := j.queues["a"]; !ok { + t.Error("expected queue 'a' to exist") + } + if _, ok := j.queues["b"]; !ok { + t.Error("expected queue 'b' to exist") + } + j.l.RUnlock() + + job, queueID = j.getNextJob() + if queueID != "b" || job == nil { + t.Fatalf("bad next job: queueID %s, job: %#v", queueID, job) + } + + j.l.RLock() + if _, ok := j.queues["a"]; !ok { + t.Error("expected queue 'a' to exist") + } + if _, ok := j.queues["b"]; ok { + t.Error("expected queue 'b' to be pruned") + } + j.l.RUnlock() + + job, queueID = j.getNextJob() + if queueID != "a" || job == nil { + t.Fatalf("bad next job: queueID %s, job: %#v", queueID, job) + } + + j.l.RLock() + if _, ok := j.queues["a"]; ok { + t.Error("expected queue 'a' to be pruned") + } + if _, ok := j.queues["b"]; ok { + t.Error("expected queue 'b' to be pruned") + } + j.l.RUnlock() + + job, queueID = j.getNextJob() + if job != nil { + t.Errorf("expected no more jobs (out of queues). queueID: %s, job: %#v", queueID, job) + } +} + +func TestFairshare_WorkerCount_IncrementAndDecrement(t *testing.T) { + j := NewJobManager("test-job-mgr", 18, nil, nil) + + job := newDefaultTestJob(t, "job-0") + j.AddJob(&job, "a") + j.AddJob(&job, "b") + j.AddJob(&job, "c") + + // test to make sure increment works + j.incrementWorkerCount("a") + workerCounts := j.GetWorkerCounts() + if workerCounts["a"] != 1 { + t.Fatalf("expected 1 worker on 'a', got %d", workerCounts["a"]) + } + if workerCounts["b"] != 0 { + t.Fatalf("expected 0 workers on 'b', got %d", workerCounts["b"]) + } + if workerCounts["c"] != 0 { + t.Fatalf("expected 0 workers on 'c', got %d", workerCounts["c"]) + } + + // test to make sure decrement works (when there is still work for the queue) + j.decrementWorkerCount("a") + workerCounts = j.GetWorkerCounts() + if workerCounts["a"] != 0 { + t.Fatalf("expected 0 workers on 'a', got %d", workerCounts["a"]) + } + + // add a worker to queue "a" and remove all work to ensure worker count gets + // cleared out for "a" + j.incrementWorkerCount("a") + j.l.Lock() + delete(j.queues, "a") + j.l.Unlock() + + j.decrementWorkerCount("a") + workerCounts = j.GetWorkerCounts() + if _, ok := workerCounts["a"]; ok { + t.Fatalf("expected no worker count for 'a', got %#v", workerCounts) + } +} + +func TestFairshare_queueWorkersSaturated(t *testing.T) { + j := NewJobManager("test-job-mgr", 20, nil, nil) + + job := newDefaultTestJob(t, "job-0") + j.AddJob(&job, "a") + j.AddJob(&job, "b") + + // no more than 9 workers can be assigned to a single queue in this example + for i := 0; i < 8; i++ { + j.incrementWorkerCount("a") + j.incrementWorkerCount("b") + + j.l.RLock() + if j.queueWorkersSaturated("a") { + j.l.RUnlock() + t.Fatalf("queue 'a' falsely saturated: %#v", j.GetWorkerCounts()) + } + if j.queueWorkersSaturated("b") { + j.l.RUnlock() + t.Fatalf("queue 'b' falsely saturated: %#v", j.GetWorkerCounts()) + } + j.l.RUnlock() + } + + // adding the 9th and 10th workers should saturate the number of workers we + // can have per queue + for i := 8; i < 10; i++ { + j.incrementWorkerCount("a") + j.incrementWorkerCount("b") + + j.l.RLock() + if !j.queueWorkersSaturated("a") { + j.l.RUnlock() + t.Fatalf("queue 'a' falsely unsaturated: %#v", j.GetWorkerCounts()) + } + if !j.queueWorkersSaturated("b") { + j.l.RUnlock() + t.Fatalf("queue 'b' falsely unsaturated: %#v", j.GetWorkerCounts()) + } + j.l.RUnlock() + } +} diff --git a/helper/fairshare/workerpool.go b/helper/fairshare/workerpool.go index 3cc4009b8d..1db6ece329 100644 --- a/helper/fairshare/workerpool.go +++ b/helper/fairshare/workerpool.go @@ -12,14 +12,28 @@ import ( // Job is an interface for jobs used with this job manager type Job interface { + // Execute performs the work. + // It should be synchronous if a cleanupFn is provided. Execute() error + + // OnFailure handles the error resulting from a failed Execute(). + // It should be synchronous if a cleanupFn is provided. OnFailure(err error) } +type initFn func() +type cleanupFn func() + +type wrappedJob struct { + job Job + init initFn + cleanup cleanupFn +} + // worker represents a single worker in a pool type worker struct { name string - jobCh <-chan Job + jobCh <-chan wrappedJob quit chan struct{} logger log.Logger @@ -37,10 +51,18 @@ func (w *worker) start() { case <-w.quit: w.wg.Done() return - case job := <-w.jobCh: - err := job.Execute() + case wJob := <-w.jobCh: + if wJob.init != nil { + wJob.init() + } + + err := wJob.job.Execute() if err != nil { - job.OnFailure(err) + wJob.job.OnFailure(err) + } + + if wJob.cleanup != nil { + wJob.cleanup() } } } @@ -52,7 +74,7 @@ type dispatcher struct { name string numWorkers int workers []worker - jobCh chan Job + jobCh chan wrappedJob onceStart sync.Once onceStop sync.Once quit chan struct{} @@ -68,10 +90,17 @@ func newDispatcher(name string, numWorkers int, l log.Logger) *dispatcher { return d } -// dispatch dispatches a job to the worker pool -func (d *dispatcher) dispatch(job Job) { +// dispatch dispatches a job to the worker pool, with optional initialization +// and cleanup functions (useful for tracking job progress) +func (d *dispatcher) dispatch(job Job, init initFn, cleanup cleanupFn) { + wJob := wrappedJob{ + init: init, + job: job, + cleanup: cleanup, + } + select { - case d.jobCh <- job: + case d.jobCh <- wJob: case <-d.quit: d.logger.Info("shutting down during dispatch") } @@ -123,7 +152,7 @@ func createDispatcher(name string, numWorkers int, l log.Logger) *dispatcher { name: name, numWorkers: numWorkers, workers: make([]worker, 0), - jobCh: make(chan Job), + jobCh: make(chan wrappedJob), quit: make(chan struct{}), logger: l, wg: &wg, diff --git a/helper/fairshare/workerpool_test.go b/helper/fairshare/workerpool_test.go index 4732f531cb..a3c3f68a1e 100644 --- a/helper/fairshare/workerpool_test.go +++ b/helper/fairshare/workerpool_test.go @@ -184,7 +184,7 @@ func TestFairshare_startWorker(t *testing.T) { timeout := time.After(5 * time.Second) wg.Add(1) - d.dispatch(&job) + d.dispatch(&job, nil, nil) go func() { wg.Wait() doneCh <- struct{}{} @@ -222,7 +222,7 @@ func TestFairshare_start(t *testing.T) { for i := 0; i < numJobs; i++ { job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail) - d.dispatch(&job) + d.dispatch(&job, nil, nil) } select { @@ -318,7 +318,7 @@ func TestFairshare_dispatch(t *testing.T) { go func() { for _, id := range expectedIDs { job := newTestJob(t, id, ex, onFail) - d.dispatch(&job) + d.dispatch(&job, nil, nil) } }() @@ -376,7 +376,7 @@ func TestFairshare_jobFailure(t *testing.T) { for i := 0; i < numJobs; i++ { job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail) - d.dispatch(&job) + d.dispatch(&job, nil, nil) } select {