diff --git a/cmd/admin-router.go b/cmd/admin-router.go index f6d9acf69..5d9e37572 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -341,6 +341,9 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodGet).Path(adminVersion + "/list-jobs").HandlerFunc( adminMiddleware(adminAPI.ListBatchJobs)) + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/status-job").HandlerFunc( + adminMiddleware(adminAPI.BatchJobStatus)) + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/describe-job").HandlerFunc( adminMiddleware(adminAPI.DescribeBatchJob)) adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/cancel-job").HandlerFunc( diff --git a/cmd/batch-expire.go b/cmd/batch-expire.go index 1e137ed78..bdaeed648 100644 --- a/cmd/batch-expire.go +++ b/cmd/batch-expire.go @@ -514,7 +514,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo JobType: string(job.Type()), StartTime: job.Started, } - if err := ri.load(ctx, api, job); err != nil { + if err := ri.loadOrInit(ctx, api, job); err != nil { return err } diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index f96c5b7be..1929f3b86 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -28,6 +28,7 @@ import ( "math/rand" "net/http" "net/url" + "path/filepath" "runtime" "strconv" "strings" @@ -57,6 +58,11 @@ import ( var globalBatchConfig batch.Config +const ( + // Keep the completed/failed job stats 3 days before removing it + oldJobsExpiration = 3 * 24 * time.Hour +) + // BatchJobRequest this is an internal data structure not for external consumption. type BatchJobRequest struct { ID string `yaml:"-" json:"name"` @@ -262,7 +268,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay JobType: string(job.Type()), StartTime: job.Started, } - if err := ri.load(ctx, api, job); err != nil { + if err := ri.loadOrInit(ctx, api, job); err != nil { return err } if ri.Complete { @@ -722,60 +728,82 @@ const ( batchReplJobDefaultRetryDelay = 250 * time.Millisecond ) -func getJobReportPath(job BatchJobRequest) string { - var fileName string - switch { - case job.Replicate != nil: - fileName = batchReplName - case job.KeyRotate != nil: - fileName = batchKeyRotationName - case job.Expire != nil: - fileName = batchExpireName - } - return pathJoin(batchJobReportsPrefix, job.ID, fileName) -} - func getJobPath(job BatchJobRequest) string { return pathJoin(batchJobPrefix, job.ID) } +func (ri *batchJobInfo) getJobReportPath() (string, error) { + var fileName string + switch madmin.BatchJobType(ri.JobType) { + case madmin.BatchJobReplicate: + fileName = batchReplName + case madmin.BatchJobKeyRotate: + fileName = batchKeyRotationName + case madmin.BatchJobExpire: + fileName = batchExpireName + default: + return "", fmt.Errorf("unknown job type: %v", ri.JobType) + } + return pathJoin(batchJobReportsPrefix, ri.JobID, fileName), nil +} + +func (ri *batchJobInfo) loadOrInit(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { + err := ri.load(ctx, api, job) + if errors.Is(err, errNoSuchJob) { + switch { + case job.Replicate != nil: + ri.Version = batchReplVersionV1 + ri.RetryAttempts = batchReplJobDefaultRetries + if job.Replicate.Flags.Retry.Attempts > 0 { + ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts + } + case job.KeyRotate != nil: + ri.Version = batchKeyRotateVersionV1 + ri.RetryAttempts = batchKeyRotateJobDefaultRetries + if job.KeyRotate.Flags.Retry.Attempts > 0 { + ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts + } + case job.Expire != nil: + ri.Version = batchExpireVersionV1 + ri.RetryAttempts = batchExpireJobDefaultRetries + if job.Expire.Retry.Attempts > 0 { + ri.RetryAttempts = job.Expire.Retry.Attempts + } + } + return nil + } + return err +} + func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { + path, err := job.getJobReportPath() + if err != nil { + batchLogIf(ctx, err) + return err + } + return ri.loadByPath(ctx, api, path) +} + +func (ri *batchJobInfo) loadByPath(ctx context.Context, api ObjectLayer, path string) error { var format, version uint16 - switch { - case job.Replicate != nil: + switch filepath.Base(path) { + case batchReplName: version = batchReplVersionV1 format = batchReplFormat - case job.KeyRotate != nil: + case batchKeyRotationName: version = batchKeyRotateVersionV1 format = batchKeyRotationFormat - case job.Expire != nil: + case batchExpireName: version = batchExpireVersionV1 format = batchExpireFormat default: return errors.New("no supported batch job request specified") } - data, err := readConfig(ctx, api, getJobReportPath(job)) + + data, err := readConfig(ctx, api, path) if err != nil { if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) { - ri.Version = int(version) - switch { - case job.Replicate != nil: - ri.RetryAttempts = batchReplJobDefaultRetries - if job.Replicate.Flags.Retry.Attempts > 0 { - ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts - } - case job.KeyRotate != nil: - ri.RetryAttempts = batchKeyRotateJobDefaultRetries - if job.KeyRotate.Flags.Retry.Attempts > 0 { - ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts - } - case job.Expire != nil: - ri.RetryAttempts = batchExpireJobDefaultRetries - if job.Expire.Retry.Attempts > 0 { - ri.RetryAttempts = job.Expire.Retry.Attempts - } - } - return nil + return errNoSuchJob } return err } @@ -919,7 +947,12 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati if err != nil { return err } - return saveConfig(ctx, api, getJobReportPath(job), buf) + path, err := ri.getJobReportPath() + if err != nil { + batchLogIf(ctx, err) + return err + } + return saveConfig(ctx, api, path, buf) } ri.mu.Unlock() return nil @@ -971,7 +1004,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba JobType: string(job.Type()), StartTime: job.Started, } - if err := ri.load(ctx, api, job); err != nil { + if err := ri.loadOrInit(ctx, api, job); err != nil { return err } if ri.Complete { @@ -1434,10 +1467,24 @@ func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error { } func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) { - deleteConfig(ctx, api, getJobReportPath(j)) deleteConfig(ctx, api, getJobPath(j)) } +func (j BatchJobRequest) getJobReportPath() (string, error) { + var fileName string + switch { + case j.Replicate != nil: + fileName = batchReplName + case j.KeyRotate != nil: + fileName = batchKeyRotationName + case j.Expire != nil: + fileName = batchExpireName + default: + return "", errors.New("unknown job type") + } + return pathJoin(batchJobReportsPrefix, j.ID, fileName), nil +} + func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error { if j.Replicate == nil && j.KeyRotate == nil && j.Expire == nil { return errInvalidArgument @@ -1540,6 +1587,55 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request) batchLogIf(ctx, json.NewEncoder(w).Encode(&listResult)) } +// BatchJobStatus - returns the status of a batch job saved in the disk +func (a adminAPIHandlers) BatchJobStatus(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + objectAPI, _ := validateAdminReq(ctx, w, r, policy.ListBatchJobsAction) + if objectAPI == nil { + return + } + + jobID := r.Form.Get("jobId") + if jobID == "" { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, errInvalidArgument), r.URL) + return + } + + req := BatchJobRequest{ID: jobID} + if i := strings.Index(jobID, "-"); i > 0 { + switch madmin.BatchJobType(jobID[:i]) { + case madmin.BatchJobReplicate: + req.Replicate = &BatchJobReplicateV1{} + case madmin.BatchJobKeyRotate: + req.KeyRotate = &BatchJobKeyRotateV1{} + case madmin.BatchJobExpire: + req.Expire = &BatchJobExpire{} + default: + writeErrorResponseJSON(ctx, w, toAPIError(ctx, errors.New("job ID format unrecognized")), r.URL) + return + } + } + + ri := &batchJobInfo{} + if err := ri.load(ctx, objectAPI, req); err != nil { + if !errors.Is(err, errNoSuchJob) { + batchLogIf(ctx, err) + } + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + buf, err := json.Marshal(madmin.BatchJobStatus{LastMetric: ri.metric()}) + if err != nil { + batchLogIf(ctx, err) + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + w.Write(buf) +} + var errNoSuchJob = errors.New("no such job") // DescribeBatchJob returns the currently active batch job definition @@ -1631,7 +1727,7 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request) return } - job.ID = fmt.Sprintf("%s%s%d", shortuuid.New(), getKeySeparator(), GetProxyEndpointLocalIndex(globalProxyEndpoints)) + job.ID = fmt.Sprintf("%s-%s%s%d", job.Type(), shortuuid.New(), getKeySeparator(), GetProxyEndpointLocalIndex(globalProxyEndpoints)) job.User = user job.Started = time.Now() @@ -1720,9 +1816,54 @@ func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobP } jpool.ResizeWorkers(workers) jpool.resume() + + go jpool.cleanupReports() + return jpool } +func (j *BatchJobPool) cleanupReports() { + randomWait := func() time.Duration { + // randomWait depends on the number of nodes to avoid triggering the cleanup at the same time + return time.Duration(rand.Float64() * float64(time.Duration(globalEndpoints.NEndpoints())*time.Hour)) + } + + t := time.NewTimer(randomWait()) + defer t.Stop() + + for { + select { + case <-GlobalContext.Done(): + return + case <-t.C: + results := make(chan itemOrErr[ObjectInfo], 100) + ctx, cancel := context.WithCancel(j.ctx) + defer cancel() + if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobReportsPrefix, results, WalkOptions{}); err != nil { + batchLogIf(j.ctx, err) + t.Reset(randomWait()) + continue + } + for result := range results { + if result.Err != nil { + batchLogIf(j.ctx, result.Err) + continue + } + ri := &batchJobInfo{} + if err := ri.loadByPath(ctx, j.objLayer, result.Item.Name); err != nil { + batchLogIf(ctx, err) + continue + } + if (ri.Complete || ri.Failed) && time.Since(ri.LastUpdate) > oldJobsExpiration { + deleteConfig(ctx, j.objLayer, result.Item.Name) + } + } + + t.Reset(randomWait()) + } + } +} + func (j *BatchJobPool) resume() { results := make(chan itemOrErr[ObjectInfo], 100) ctx, cancel := context.WithCancel(j.ctx) @@ -1986,7 +2127,7 @@ func (m *batchJobMetrics) purgeJobMetrics() { var toDeleteJobMetrics []string m.RLock() for id, metrics := range m.metrics { - if time.Since(metrics.LastUpdate) > 24*time.Hour && (metrics.Complete || metrics.Failed) { + if time.Since(metrics.LastUpdate) > oldJobsExpiration && (metrics.Complete || metrics.Failed) { toDeleteJobMetrics = append(toDeleteJobMetrics, id) } } diff --git a/cmd/batch-rotate.go b/cmd/batch-rotate.go index bf3a789b7..ccadaa50f 100644 --- a/cmd/batch-rotate.go +++ b/cmd/batch-rotate.go @@ -257,7 +257,7 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba JobType: string(job.Type()), StartTime: job.Started, } - if err := ri.load(ctx, api, job); err != nil { + if err := ri.loadOrInit(ctx, api, job); err != nil { return err } if ri.Complete {