vault/helper/fairshare/jobmanager.go
hashicorp-copywrite[bot] 0b12cdcfd1
[COMPLIANCE] License changes (#22290)
* Adding explicit MPL license for sub-package.

This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository.

* Adding explicit MPL license for sub-package.

This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository.

* Updating the license from MPL to Business Source License.

Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at https://hashi.co/bsl-blog, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl.

* add missing license headers

* Update copyright file headers to BUS-1.1

* Fix test that expected exact offset on hcl file

---------

Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
Co-authored-by: Sarah Thompson <sthompson@hashicorp.com>
Co-authored-by: Brian Kassouf <bkassouf@hashicorp.com>
2023-08-10 18:14:03 -07:00

359 lines
9.6 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package fairshare
import (
"container/list"
"fmt"
"io/ioutil"
"math"
"sync"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/sdk/helper/logging"
)
type JobManager struct {
name string
queues map[string]*list.List
quit chan struct{}
newWork chan struct{} // must be buffered
workerPool *dispatcher
workerCount map[string]int
onceStart sync.Once
onceStop sync.Once
logger log.Logger
totalJobs int
metricSink *metricsutil.ClusterMetricSink
// waitgroup for testing stop functionality
wg sync.WaitGroup
// protects `queues`, `workerCount`, `queuesIndex`, `lastQueueAccessed`
l sync.RWMutex
// track queues by index for round robin worker assignment
queuesIndex []string
lastQueueAccessed int
}
// NewJobManager creates a job manager, with an optional name
func NewJobManager(name string, numWorkers int, l log.Logger, metricSink *metricsutil.ClusterMetricSink) *JobManager {
if l == nil {
l = logging.NewVaultLoggerWithWriter(ioutil.Discard, log.NoLevel)
}
if name == "" {
guid, err := uuid.GenerateUUID()
if err != nil {
l.Warn("uuid generator failed, using 'no-uuid'", "err", err)
guid = "no-uuid"
}
name = fmt.Sprintf("jobmanager-%s", guid)
}
wp := newDispatcher(fmt.Sprintf("%s-dispatcher", name), numWorkers, l)
j := JobManager{
name: name,
queues: make(map[string]*list.List),
quit: make(chan struct{}),
newWork: make(chan struct{}, 1),
workerPool: wp,
workerCount: make(map[string]int),
logger: l,
metricSink: metricSink,
queuesIndex: make([]string, 0),
lastQueueAccessed: -1,
}
j.logger.Trace("created job manager", "name", name, "pool_size", numWorkers)
return &j
}
// Start starts the job manager
// note: a given job manager cannot be restarted after it has been stopped
func (j *JobManager) Start() {
j.onceStart.Do(func() {
j.logger.Trace("starting job manager", "name", j.name)
j.workerPool.start()
j.assignWork()
})
}
// Stop stops the job manager asynchronously
func (j *JobManager) Stop() {
j.onceStop.Do(func() {
j.logger.Trace("terminating job manager...")
close(j.quit)
j.workerPool.stop()
})
}
// AddJob adds a job to the given queue, creating the queue if it doesn't exist
func (j *JobManager) AddJob(job Job, queueID string) {
j.l.Lock()
if len(j.queues) == 0 {
defer func() {
// newWork must be buffered to avoid deadlocks if work is added
// before the job manager is started
j.newWork <- struct{}{}
}()
}
defer j.l.Unlock()
if _, ok := j.queues[queueID]; !ok {
j.addQueue(queueID)
}
j.queues[queueID].PushBack(job)
j.totalJobs++
if j.metricSink != nil {
j.metricSink.AddSampleWithLabels([]string{j.name, "job_manager", "queue_length"}, float32(j.queues[queueID].Len()), []metrics.Label{{"queue_id", queueID}})
j.metricSink.AddSample([]string{j.name, "job_manager", "total_jobs"}, float32(j.totalJobs))
}
}
// GetPendingJobCount returns the total number of pending jobs in the job manager
func (j *JobManager) GetPendingJobCount() int {
j.l.RLock()
defer j.l.RUnlock()
cnt := 0
for _, q := range j.queues {
cnt += q.Len()
}
return cnt
}
// GetWorkerCounts() returns a map of queue ID to number of active workers
func (j *JobManager) GetWorkerCounts() map[string]int {
j.l.RLock()
defer j.l.RUnlock()
return j.workerCount
}
// GetWorkQueueLengths() returns a map of queue ID to number of jobs in the queue
func (j *JobManager) GetWorkQueueLengths() map[string]int {
out := make(map[string]int)
j.l.RLock()
defer j.l.RUnlock()
for k, v := range j.queues {
out[k] = v.Len()
}
return out
}
// getNextJob pops the next job to be processed and prunes empty queues
// it also returns the ID of the queue the job is associated with
func (j *JobManager) getNextJob() (Job, string) {
j.l.Lock()
defer j.l.Unlock()
if len(j.queues) == 0 {
return nil, ""
}
queueID, canAssignWorker := j.getNextQueue()
if !canAssignWorker {
return nil, ""
}
jobElement := j.queues[queueID].Front()
jobRaw := j.queues[queueID].Remove(jobElement)
j.totalJobs--
if j.metricSink != nil {
j.metricSink.AddSampleWithLabels([]string{j.name, "job_manager", "queue_length"}, float32(j.queues[queueID].Len()), []metrics.Label{{"queue_id", queueID}})
j.metricSink.AddSample([]string{j.name, "job_manager", "total_jobs"}, float32(j.totalJobs))
}
if j.queues[queueID].Len() == 0 {
// we remove the empty queue, but we don't remove the worker count
// in case we are still working on previous jobs from this queue.
// worker count cleanup is handled in j.decrementWorkerCount
j.removeLastQueueAccessed()
}
return jobRaw.(Job), queueID
}
// returns the next queue to assign work from, and a bool if there is a queue
// that can have a worker assigned. if there is work to be assigned,
// j.lastQueueAccessed will be updated to that queue.
// note: this must be called with j.l held
func (j *JobManager) getNextQueue() (string, bool) {
var nextQueue string
var canAssignWorker bool
// ensure we loop through all existing queues until we find an eligible
// queue, if one exists.
queueIdx := j.nextQueueIndex(j.lastQueueAccessed)
for i := 0; i < len(j.queuesIndex); i++ {
potentialQueueID := j.queuesIndex[queueIdx]
if !j.queueWorkersSaturated(potentialQueueID) {
nextQueue = potentialQueueID
canAssignWorker = true
j.lastQueueAccessed = queueIdx
break
}
queueIdx = j.nextQueueIndex(queueIdx)
}
return nextQueue, canAssignWorker
}
// get the index of the next queue in round-robin order
// note: this must be called with j.l held
func (j *JobManager) nextQueueIndex(currentIdx int) int {
return (currentIdx + 1) % len(j.queuesIndex)
}
// returns true if there are already too many workers on this queue
// note: this must be called with j.l held (at least for read).
// note: we may want to eventually factor in queue length relative to num queues
func (j *JobManager) queueWorkersSaturated(queueID string) bool {
numActiveQueues := float64(len(j.queues))
numTotalWorkers := float64(j.workerPool.numWorkers)
maxWorkersPerQueue := math.Ceil(0.9 * numTotalWorkers / numActiveQueues)
numWorkersPerQueue := j.workerCount
return numWorkersPerQueue[queueID] >= int(maxWorkersPerQueue)
}
// increment the worker count for this queue
func (j *JobManager) incrementWorkerCount(queueID string) {
j.l.Lock()
defer j.l.Unlock()
j.workerCount[queueID]++
}
// decrement the worker count for this queue
// this also removes worker tracking for this queue if needed
func (j *JobManager) decrementWorkerCount(queueID string) {
j.l.Lock()
defer j.l.Unlock()
j.workerCount[queueID]--
_, queueExists := j.queues[queueID]
if !queueExists && j.workerCount[queueID] < 1 {
delete(j.workerCount, queueID)
}
}
// assignWork continually loops checks for new jobs and dispatches them to the
// worker pool
func (j *JobManager) assignWork() {
j.wg.Add(1)
go func() {
// ticker is used to prevent memory leak of using time.After in
// for - select pattern.
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
for {
// assign work while there are jobs to distribute
select {
case <-j.quit:
j.wg.Done()
return
case <-j.newWork:
// keep the channel empty since we're already processing work
default:
}
job, queueID := j.getNextJob()
if job != nil {
j.workerPool.dispatch(job,
func() {
j.incrementWorkerCount(queueID)
},
func() {
j.decrementWorkerCount(queueID)
})
} else {
break
}
}
ticker.Reset(50 * time.Millisecond)
select {
case <-j.quit:
j.wg.Done()
return
case <-j.newWork:
// listen for wake-up when an empty job manager has been given work
case <-ticker.C:
// periodically check if new workers can be assigned. with the
// fairsharing worker distribution it can be the case that there
// is work waiting, but no queues are eligible for another worker
}
}
}()
}
// addQueue generates a new queue if a queue for `queueID` doesn't exist
// it also starts tracking workers on that queue, if not already tracked
// note: this must be called with j.l held for write
func (j *JobManager) addQueue(queueID string) {
if _, ok := j.queues[queueID]; !ok {
j.queues[queueID] = list.New()
j.queuesIndex = append(j.queuesIndex, queueID)
}
// it's possible the queue ran out of work and was pruned, but there were
// still workers operating on data formerly in that queue, which were still
// being tracked. if that is the case, we don't want to wipe out that worker
// count when the queue is re-initialized.
if _, ok := j.workerCount[queueID]; !ok {
j.workerCount[queueID] = 0
}
}
// removes the queue and index tracker for the last queue accessed.
// it is to be used when the last queue accessed has emptied.
// note: this must be called with j.l held.
func (j *JobManager) removeLastQueueAccessed() {
if j.lastQueueAccessed == -1 || j.lastQueueAccessed > len(j.queuesIndex)-1 {
j.logger.Warn("call to remove queue out of bounds", "idx", j.lastQueueAccessed)
return
}
queueID := j.queuesIndex[j.lastQueueAccessed]
// remove the queue
delete(j.queues, queueID)
// remove the index for the queue
j.queuesIndex = append(j.queuesIndex[:j.lastQueueAccessed], j.queuesIndex[j.lastQueueAccessed+1:]...)
// correct the last queue accessed for round robining
if j.lastQueueAccessed > 0 {
j.lastQueueAccessed--
} else {
j.lastQueueAccessed = len(j.queuesIndex) - 1
}
}