From 757cf413cb630892752e6c5facafa2f9edcbb7fe Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Tue, 2 Jul 2024 09:17:52 +0100 Subject: [PATCH] Add batch status API (#19679) Currently the status of a completed or failed batch is held in the memory, a simple restart will lose the status and the user will not have any visibility of the job that was long running. In addition to the metrics, add a new API that reads the batch status from the drives. A batch job will be cleaned up three days after completion. Also add the batch type in the batch id, the reason is that the batch job request is removed immediately when the job is finished, then we do not know the type of batch job anymore, hence a difficulty to locate the job report --- cmd/admin-router.go | 3 + cmd/batch-expire.go | 2 +- cmd/batch-handlers.go | 227 ++++++++++++++++++++++++++++++++++-------- cmd/batch-rotate.go | 2 +- 4 files changed, 189 insertions(+), 45 deletions(-) diff --git a/cmd/admin-router.go b/cmd/admin-router.go index f6d9acf69..5d9e37572 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -341,6 +341,9 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodGet).Path(adminVersion + "/list-jobs").HandlerFunc( adminMiddleware(adminAPI.ListBatchJobs)) + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/status-job").HandlerFunc( + adminMiddleware(adminAPI.BatchJobStatus)) + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/describe-job").HandlerFunc( adminMiddleware(adminAPI.DescribeBatchJob)) adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/cancel-job").HandlerFunc( diff --git a/cmd/batch-expire.go b/cmd/batch-expire.go index 1e137ed78..bdaeed648 100644 --- a/cmd/batch-expire.go +++ b/cmd/batch-expire.go @@ -514,7 +514,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo JobType: string(job.Type()), StartTime: job.Started, } - if err := ri.load(ctx, api, job); err != nil { + if err := ri.loadOrInit(ctx, api, job); err != nil { return err } diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index f96c5b7be..1929f3b86 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -28,6 +28,7 @@ import ( "math/rand" "net/http" "net/url" + "path/filepath" "runtime" "strconv" "strings" @@ -57,6 +58,11 @@ import ( var globalBatchConfig batch.Config +const ( + // Keep the completed/failed job stats 3 days before removing it + oldJobsExpiration = 3 * 24 * time.Hour +) + // BatchJobRequest this is an internal data structure not for external consumption. type BatchJobRequest struct { ID string `yaml:"-" json:"name"` @@ -262,7 +268,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay JobType: string(job.Type()), StartTime: job.Started, } - if err := ri.load(ctx, api, job); err != nil { + if err := ri.loadOrInit(ctx, api, job); err != nil { return err } if ri.Complete { @@ -722,60 +728,82 @@ const ( batchReplJobDefaultRetryDelay = 250 * time.Millisecond ) -func getJobReportPath(job BatchJobRequest) string { - var fileName string - switch { - case job.Replicate != nil: - fileName = batchReplName - case job.KeyRotate != nil: - fileName = batchKeyRotationName - case job.Expire != nil: - fileName = batchExpireName - } - return pathJoin(batchJobReportsPrefix, job.ID, fileName) -} - func getJobPath(job BatchJobRequest) string { return pathJoin(batchJobPrefix, job.ID) } +func (ri *batchJobInfo) getJobReportPath() (string, error) { + var fileName string + switch madmin.BatchJobType(ri.JobType) { + case madmin.BatchJobReplicate: + fileName = batchReplName + case madmin.BatchJobKeyRotate: + fileName = batchKeyRotationName + case madmin.BatchJobExpire: + fileName = batchExpireName + default: + return "", fmt.Errorf("unknown job type: %v", ri.JobType) + } + return pathJoin(batchJobReportsPrefix, ri.JobID, fileName), nil +} + +func (ri *batchJobInfo) loadOrInit(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { + err := ri.load(ctx, api, job) + if errors.Is(err, errNoSuchJob) { + switch { + case job.Replicate != nil: + ri.Version = batchReplVersionV1 + ri.RetryAttempts = batchReplJobDefaultRetries + if job.Replicate.Flags.Retry.Attempts > 0 { + ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts + } + case job.KeyRotate != nil: + ri.Version = batchKeyRotateVersionV1 + ri.RetryAttempts = batchKeyRotateJobDefaultRetries + if job.KeyRotate.Flags.Retry.Attempts > 0 { + ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts + } + case job.Expire != nil: + ri.Version = batchExpireVersionV1 + ri.RetryAttempts = batchExpireJobDefaultRetries + if job.Expire.Retry.Attempts > 0 { + ri.RetryAttempts = job.Expire.Retry.Attempts + } + } + return nil + } + return err +} + func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { + path, err := job.getJobReportPath() + if err != nil { + batchLogIf(ctx, err) + return err + } + return ri.loadByPath(ctx, api, path) +} + +func (ri *batchJobInfo) loadByPath(ctx context.Context, api ObjectLayer, path string) error { var format, version uint16 - switch { - case job.Replicate != nil: + switch filepath.Base(path) { + case batchReplName: version = batchReplVersionV1 format = batchReplFormat - case job.KeyRotate != nil: + case batchKeyRotationName: version = batchKeyRotateVersionV1 format = batchKeyRotationFormat - case job.Expire != nil: + case batchExpireName: version = batchExpireVersionV1 format = batchExpireFormat default: return errors.New("no supported batch job request specified") } - data, err := readConfig(ctx, api, getJobReportPath(job)) + + data, err := readConfig(ctx, api, path) if err != nil { if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) { - ri.Version = int(version) - switch { - case job.Replicate != nil: - ri.RetryAttempts = batchReplJobDefaultRetries - if job.Replicate.Flags.Retry.Attempts > 0 { - ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts - } - case job.KeyRotate != nil: - ri.RetryAttempts = batchKeyRotateJobDefaultRetries - if job.KeyRotate.Flags.Retry.Attempts > 0 { - ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts - } - case job.Expire != nil: - ri.RetryAttempts = batchExpireJobDefaultRetries - if job.Expire.Retry.Attempts > 0 { - ri.RetryAttempts = job.Expire.Retry.Attempts - } - } - return nil + return errNoSuchJob } return err } @@ -919,7 +947,12 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati if err != nil { return err } - return saveConfig(ctx, api, getJobReportPath(job), buf) + path, err := ri.getJobReportPath() + if err != nil { + batchLogIf(ctx, err) + return err + } + return saveConfig(ctx, api, path, buf) } ri.mu.Unlock() return nil @@ -971,7 +1004,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba JobType: string(job.Type()), StartTime: job.Started, } - if err := ri.load(ctx, api, job); err != nil { + if err := ri.loadOrInit(ctx, api, job); err != nil { return err } if ri.Complete { @@ -1434,10 +1467,24 @@ func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error { } func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) { - deleteConfig(ctx, api, getJobReportPath(j)) deleteConfig(ctx, api, getJobPath(j)) } +func (j BatchJobRequest) getJobReportPath() (string, error) { + var fileName string + switch { + case j.Replicate != nil: + fileName = batchReplName + case j.KeyRotate != nil: + fileName = batchKeyRotationName + case j.Expire != nil: + fileName = batchExpireName + default: + return "", errors.New("unknown job type") + } + return pathJoin(batchJobReportsPrefix, j.ID, fileName), nil +} + func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error { if j.Replicate == nil && j.KeyRotate == nil && j.Expire == nil { return errInvalidArgument @@ -1540,6 +1587,55 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request) batchLogIf(ctx, json.NewEncoder(w).Encode(&listResult)) } +// BatchJobStatus - returns the status of a batch job saved in the disk +func (a adminAPIHandlers) BatchJobStatus(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + objectAPI, _ := validateAdminReq(ctx, w, r, policy.ListBatchJobsAction) + if objectAPI == nil { + return + } + + jobID := r.Form.Get("jobId") + if jobID == "" { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL) + return + } + + req := BatchJobRequest{ID: jobID} + if i := strings.Index(jobID, "-"); i > 0 { + switch madmin.BatchJobType(jobID[:i]) { + case madmin.BatchJobReplicate: + req.Replicate = &BatchJobReplicateV1{} + case madmin.BatchJobKeyRotate: + req.KeyRotate = &BatchJobKeyRotateV1{} + case madmin.BatchJobExpire: + req.Expire = &BatchJobExpire{} + default: + writeErrorResponseJSON(ctx, w, toAPIError(ctx, errors.New("job ID format unrecognized")), r.URL) + return + } + } + + ri := &batchJobInfo{} + if err := ri.load(ctx, objectAPI, req); err != nil { + if !errors.Is(err, errNoSuchJob) { + batchLogIf(ctx, err) + } + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + buf, err := json.Marshal(madmin.BatchJobStatus{LastMetric: ri.metric()}) + if err != nil { + batchLogIf(ctx, err) + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + w.Write(buf) +} + var errNoSuchJob = errors.New("no such job") // DescribeBatchJob returns the currently active batch job definition @@ -1631,7 +1727,7 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request) return } - job.ID = fmt.Sprintf("%s%s%d", shortuuid.New(), getKeySeparator(), GetProxyEndpointLocalIndex(globalProxyEndpoints)) + job.ID = fmt.Sprintf("%s-%s%s%d", job.Type(), shortuuid.New(), getKeySeparator(), GetProxyEndpointLocalIndex(globalProxyEndpoints)) job.User = user job.Started = time.Now() @@ -1720,9 +1816,54 @@ func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobP } jpool.ResizeWorkers(workers) jpool.resume() + + go jpool.cleanupReports() + return jpool } +func (j *BatchJobPool) cleanupReports() { + randomWait := func() time.Duration { + // randomWait depends on the number of nodes to avoid triggering the cleanup at the same time + return time.Duration(rand.Float64() * float64(time.Duration(globalEndpoints.NEndpoints())*time.Hour)) + } + + t := time.NewTimer(randomWait()) + defer t.Stop() + + for { + select { + case <-GlobalContext.Done(): + return + case <-t.C: + results := make(chan itemOrErr[ObjectInfo], 100) + ctx, cancel := context.WithCancel(j.ctx) + defer cancel() + if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobReportsPrefix, results, WalkOptions{}); err != nil { + batchLogIf(j.ctx, err) + t.Reset(randomWait()) + continue + } + for result := range results { + if result.Err != nil { + batchLogIf(j.ctx, result.Err) + continue + } + ri := &batchJobInfo{} + if err := ri.loadByPath(ctx, j.objLayer, result.Item.Name); err != nil { + batchLogIf(ctx, err) + continue + } + if (ri.Complete || ri.Failed) && time.Since(ri.LastUpdate) > oldJobsExpiration { + deleteConfig(ctx, j.objLayer, result.Item.Name) + } + } + + t.Reset(randomWait()) + } + } +} + func (j *BatchJobPool) resume() { results := make(chan itemOrErr[ObjectInfo], 100) ctx, cancel := context.WithCancel(j.ctx) @@ -1986,7 +2127,7 @@ func (m *batchJobMetrics) purgeJobMetrics() { var toDeleteJobMetrics []string m.RLock() for id, metrics := range m.metrics { - if time.Since(metrics.LastUpdate) > 24*time.Hour && (metrics.Complete || metrics.Failed) { + if time.Since(metrics.LastUpdate) > oldJobsExpiration && (metrics.Complete || metrics.Failed) { toDeleteJobMetrics = append(toDeleteJobMetrics, id) } } diff --git a/cmd/batch-rotate.go b/cmd/batch-rotate.go index bf3a789b7..ccadaa50f 100644 --- a/cmd/batch-rotate.go +++ b/cmd/batch-rotate.go @@ -257,7 +257,7 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba JobType: string(job.Type()), StartTime: job.Started, } - if err := ri.load(ctx, api, job); err != nil { + if err := ri.loadOrInit(ctx, api, job); err != nil { return err } if ri.Complete {