diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index a6b738c0f..8e4a4a701 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -72,6 +72,7 @@ func NewLifecycleSys() *LifecycleSys { } func ilmTrace(startTime time.Time, duration time.Duration, oi ObjectInfo, event string) madmin.TraceInfo { + sz, _ := oi.GetActualSize() return madmin.TraceInfo{ TraceType: madmin.TraceILM, Time: startTime, @@ -79,6 +80,7 @@ func ilmTrace(startTime time.Time, duration time.Duration, oi ObjectInfo, event FuncName: event, Duration: duration, Path: pathJoin(oi.Bucket, oi.Name), + Bytes: sz, Error: "", Message: getSource(4), Custom: map[string]string{"version-id": oi.VersionID}, diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 7f04fa66d..5ec05281b 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -2849,17 +2849,19 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object ReplicationProxyRequest: "false", }, }) + sz := roi.Size if err != nil { if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) { st.ReplicatedCount++ } else { st.FailedCount++ } + sz = 0 } else { st.ReplicatedCount++ st.ReplicatedSize += roi.Size } - traceFn(err) + traceFn(sz, err) select { case <-ctx.Done(): return @@ -2974,17 +2976,17 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt return nil } -func (s *replicationResyncer) trace(resyncID string, path string) func(err error) { +func (s *replicationResyncer) trace(resyncID string, path string) func(sz int64, err error) { startTime := time.Now() - return func(err error) { + return func(sz int64, err error) { duration := time.Since(startTime) if globalTrace.NumSubscribers(madmin.TraceReplicationResync) > 0 { - globalTrace.Publish(replicationResyncTrace(resyncID, startTime, duration, path, err)) + globalTrace.Publish(replicationResyncTrace(resyncID, startTime, duration, path, err, sz)) } } } -func replicationResyncTrace(resyncID string, startTime time.Time, duration time.Duration, path string, err error) madmin.TraceInfo { +func replicationResyncTrace(resyncID string, startTime time.Time, duration time.Duration, path string, err error, sz int64) madmin.TraceInfo { var errStr string if err != nil { errStr = err.Error() @@ -2998,6 +3000,7 @@ func replicationResyncTrace(resyncID string, startTime time.Time, duration time. Duration: duration, Path: path, Error: errStr, + Bytes: sz, } } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 54e43b199..660d232f4 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -1065,6 +1065,7 @@ func healTrace(funcName healingMetric, startTime time.Time, bucket, object strin if result != nil { tr.Custom["version-id"] = result.VersionID tr.Custom["disks"] = strconv.Itoa(result.DiskCount) + tr.Bytes = result.ObjectSize } } if err != nil { diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index afc47c0bd..e13c77791 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -836,7 +836,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool if filterLifecycle(bi.Name, version.Name, version) { expired++ decommissioned++ - stopFn(errors.New("ILM expired object/version will be skipped")) + stopFn(version.Size, errors.New("ILM expired object/version will be skipped")) continue } @@ -846,7 +846,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool remainingVersions := len(fivs.Versions) - expired if version.Deleted && remainingVersions == 1 { decommissioned++ - stopFn(errors.New("DELETE marked object with no other non-current versions will be skipped")) + stopFn(version.Size, errors.New("DELETE marked object with no other non-current versions will be skipped")) continue } @@ -877,7 +877,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool err = nil } } - stopFn(err) + stopFn(version.Size, err) if err != nil { decomLogIf(ctx, err) failure = true @@ -902,12 +902,12 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool MTime: version.ModTime, UserDefined: version.Metadata, }); err != nil { - stopFn(err) + stopFn(version.Size, err) failure = true decomLogIf(ctx, err) continue } - stopFn(nil) + stopFn(version.Size, nil) failure = false break } @@ -925,14 +925,14 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool if isErrObjectNotFound(err) || isErrVersionNotFound(err) { // object deleted by the application, nothing to do here we move on. ignore = true - stopFn(nil) + stopFn(version.Size, nil) break } if err != nil && !ignore { // if usage-cache.bin is not readable log and ignore it. if bi.Name == minioMetaBucket && strings.Contains(version.Name, dataUsageCacheName) { ignore = true - stopFn(err) + stopFn(version.Size, err) decomLogIf(ctx, err) break } @@ -940,16 +940,16 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool if err != nil { failure = true decomLogIf(ctx, err) - stopFn(err) + stopFn(version.Size, err) continue } if err = z.decommissionObject(ctx, bi.Name, gr); err != nil { - stopFn(err) + stopFn(version.Size, err) failure = true decomLogIf(ctx, err) continue } - stopFn(nil) + stopFn(version.Size, nil) failure = false break } @@ -977,7 +977,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool NoAuditLog: true, }, ) - stopFn(err) + stopFn(0, err) auditLogDecom(ctx, "DecomDeleteObject", bi.Name, entry.name, "", err) if err != nil { decomLogIf(ctx, err) @@ -1038,7 +1038,7 @@ const ( decomMetricDecommissionRemoveObject ) -func decomTrace(d decomMetric, poolIdx int, startTime time.Time, duration time.Duration, path string, err error) madmin.TraceInfo { +func decomTrace(d decomMetric, poolIdx int, startTime time.Time, duration time.Duration, path string, err error, sz int64) madmin.TraceInfo { var errStr string if err != nil { errStr = err.Error() @@ -1051,15 +1051,16 @@ func decomTrace(d decomMetric, poolIdx int, startTime time.Time, duration time.D Duration: duration, Path: path, Error: errStr, + Bytes: sz, } } -func (m *decomMetrics) log(d decomMetric, poolIdx int, paths ...string) func(err error) { +func (m *decomMetrics) log(d decomMetric, poolIdx int, paths ...string) func(z int64, err error) { startTime := time.Now() - return func(err error) { + return func(sz int64, err error) { duration := time.Since(startTime) if globalTrace.NumSubscribers(madmin.TraceDecommission) > 0 { - globalTrace.Publish(decomTrace(d, poolIdx, startTime, duration, strings.Join(paths, " "), err)) + globalTrace.Publish(decomTrace(d, poolIdx, startTime, duration, strings.Join(paths, " "), err, sz)) } } } @@ -1092,10 +1093,10 @@ func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx i } stopFn := globalDecommissionMetrics.log(decomMetricDecommissionBucket, idx, bucket.Name) if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil { - stopFn(err) + stopFn(0, err) return err } - stopFn(nil) + stopFn(0, nil) z.poolMetaMutex.Lock() if z.poolMeta.BucketDone(idx, bucket) { diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go index 59b537b2a..f4327cb50 100644 --- a/cmd/erasure-server-pool-rebalance.go +++ b/cmd/erasure-server-pool-rebalance.go @@ -427,7 +427,7 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg) err := z.saveRebalanceStats(GlobalContext, poolIdx, rebalSaveStats) - stopFn(err) + stopFn(0, err) rebalanceLogIf(GlobalContext, err) if quit { @@ -456,7 +456,7 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket) if err = z.rebalanceBucket(ctx, bucket, poolIdx); err != nil { - stopFn(err) + stopFn(0, err) if errors.Is(err, errServerNotInitialized) || errors.Is(err, errBucketMetadataNotInitialized) { continue } @@ -464,7 +464,7 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) doneCh <- err return } - stopFn(nil) + stopFn(0, nil) z.bucketRebalanceDone(bucket, poolIdx) } @@ -692,24 +692,24 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, if isErrObjectNotFound(err) || isErrVersionNotFound(err) { // object deleted by the application, nothing to do here we move on. ignore = true - stopFn(nil) + stopFn(0, nil) break } if err != nil { failure = true rebalanceLogIf(ctx, err) - stopFn(err) + stopFn(0, err) continue } if err = z.rebalanceObject(ctx, bucket, gr); err != nil { failure = true rebalanceLogIf(ctx, err) - stopFn(err) + stopFn(version.Size, err) continue } - stopFn(nil) + stopFn(version.Size, nil) failure = false break } @@ -735,7 +735,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, NoAuditLog: true, }, ) - stopFn(err) + stopFn(0, err) auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err) if err != nil { rebalanceLogIf(ctx, err) @@ -935,7 +935,7 @@ func (z *erasureServerPools) StartRebalance() { go func(idx int) { stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx) err := z.rebalanceBuckets(ctx, idx) - stopfn(err) + stopfn(0, err) }(poolIdx) } } @@ -975,7 +975,7 @@ const ( rebalanceMetricSaveMetadata ) -func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string) madmin.TraceInfo { +func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string, sz int64) madmin.TraceInfo { var errStr string if err != nil { errStr = err.Error() @@ -988,15 +988,16 @@ func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duratio Duration: duration, Path: path, Error: errStr, + Bytes: sz, } } -func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(err error) { +func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(sz int64, err error) { startTime := time.Now() - return func(err error) { + return func(sz int64, err error) { duration := time.Since(startTime) if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 { - globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " "))) + globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " "), sz)) } } } diff --git a/cmd/ftp-server-driver.go b/cmd/ftp-server-driver.go index 4c64b1e19..b49b2170b 100644 --- a/cmd/ftp-server-driver.go +++ b/cmd/ftp-server-driver.go @@ -105,7 +105,7 @@ type ftpMetrics struct{} var globalFtpMetrics ftpMetrics -func ftpTrace(s *ftp.Context, startTime time.Time, source, objPath string, err error) madmin.TraceInfo { +func ftpTrace(s *ftp.Context, startTime time.Time, source, objPath string, err error, sz int64) madmin.TraceInfo { var errStr string if err != nil { errStr = err.Error() @@ -114,25 +114,33 @@ func ftpTrace(s *ftp.Context, startTime time.Time, source, objPath string, err e TraceType: madmin.TraceFTP, Time: startTime, NodeName: globalLocalNodeName, - FuncName: fmt.Sprintf("ftp USER=%s COMMAND=%s PARAM=%s ISLOGIN=%t, Source=%s", s.Sess.LoginUser(), s.Cmd, s.Param, s.Sess.IsLogin(), source), + FuncName: fmt.Sprintf(s.Cmd), Duration: time.Since(startTime), Path: objPath, Error: errStr, + Bytes: sz, + Custom: map[string]string{ + "user": s.Sess.LoginUser(), + "cmd": s.Cmd, + "param": s.Param, + "login": fmt.Sprintf("%t", s.Sess.IsLogin()), + "source": source, + }, } } -func (m *ftpMetrics) log(s *ftp.Context, paths ...string) func(err error) { +func (m *ftpMetrics) log(s *ftp.Context, paths ...string) func(sz int64, err error) { startTime := time.Now() source := getSource(2) - return func(err error) { - globalTrace.Publish(ftpTrace(s, startTime, source, strings.Join(paths, " "), err)) + return func(sz int64, err error) { + globalTrace.Publish(ftpTrace(s, startTime, source, strings.Join(paths, " "), err, sz)) } } // Stat implements ftpDriver func (driver *ftpDriver) Stat(ctx *ftp.Context, objPath string) (fi os.FileInfo, err error) { stopFn := globalFtpMetrics.log(ctx, objPath) - defer stopFn(err) + defer stopFn(0, err) if objPath == SlashSeparator { return &minioFileInfo{ @@ -190,7 +198,7 @@ func (driver *ftpDriver) Stat(ctx *ftp.Context, objPath string) (fi os.FileInfo, // ListDir implements ftpDriver func (driver *ftpDriver) ListDir(ctx *ftp.Context, objPath string, callback func(os.FileInfo) error) (err error) { stopFn := globalFtpMetrics.log(ctx, objPath) - defer stopFn(err) + defer stopFn(0, err) clnt, err := driver.getMinIOClient(ctx) if err != nil { @@ -252,7 +260,7 @@ func (driver *ftpDriver) ListDir(ctx *ftp.Context, objPath string, callback func func (driver *ftpDriver) CheckPasswd(c *ftp.Context, username, password string) (ok bool, err error) { stopFn := globalFtpMetrics.log(c, username) - defer stopFn(err) + defer stopFn(0, err) if globalIAMSys.LDAPConfig.Enabled() { sa, _, err := globalIAMSys.getServiceAccount(context.Background(), username) @@ -376,7 +384,7 @@ func (driver *ftpDriver) getMinIOClient(ctx *ftp.Context) (*minio.Client, error) // DeleteDir implements ftpDriver func (driver *ftpDriver) DeleteDir(ctx *ftp.Context, objPath string) (err error) { stopFn := globalFtpMetrics.log(ctx, objPath) - defer stopFn(err) + defer stopFn(0, err) bucket, prefix := path2BucketObject(objPath) if bucket == "" { @@ -426,7 +434,7 @@ func (driver *ftpDriver) DeleteDir(ctx *ftp.Context, objPath string) (err error) // DeleteFile implements ftpDriver func (driver *ftpDriver) DeleteFile(ctx *ftp.Context, objPath string) (err error) { stopFn := globalFtpMetrics.log(ctx, objPath) - defer stopFn(err) + defer stopFn(0, err) bucket, object := path2BucketObject(objPath) if bucket == "" { @@ -444,7 +452,7 @@ func (driver *ftpDriver) DeleteFile(ctx *ftp.Context, objPath string) (err error // Rename implements ftpDriver func (driver *ftpDriver) Rename(ctx *ftp.Context, fromObjPath string, toObjPath string) (err error) { stopFn := globalFtpMetrics.log(ctx, fromObjPath, toObjPath) - defer stopFn(err) + defer stopFn(0, err) return NotImplemented{} } @@ -452,7 +460,7 @@ func (driver *ftpDriver) Rename(ctx *ftp.Context, fromObjPath string, toObjPath // MakeDir implements ftpDriver func (driver *ftpDriver) MakeDir(ctx *ftp.Context, objPath string) (err error) { stopFn := globalFtpMetrics.log(ctx, objPath) - defer stopFn(err) + defer stopFn(0, err) bucket, prefix := path2BucketObject(objPath) if bucket == "" { @@ -479,7 +487,7 @@ func (driver *ftpDriver) MakeDir(ctx *ftp.Context, objPath string) (err error) { // GetFile implements ftpDriver func (driver *ftpDriver) GetFile(ctx *ftp.Context, objPath string, offset int64) (n int64, rc io.ReadCloser, err error) { stopFn := globalFtpMetrics.log(ctx, objPath) - defer stopFn(err) + defer stopFn(n, err) bucket, object := path2BucketObject(objPath) if bucket == "" { @@ -511,14 +519,14 @@ func (driver *ftpDriver) GetFile(ctx *ftp.Context, objPath string, offset int64) if err != nil { return 0, nil, err } - - return info.Size - offset, obj, nil + n = info.Size - offset + return n, obj, nil } // PutFile implements ftpDriver func (driver *ftpDriver) PutFile(ctx *ftp.Context, objPath string, data io.Reader, offset int64) (n int64, err error) { stopFn := globalFtpMetrics.log(ctx, objPath) - defer stopFn(err) + defer stopFn(n, err) bucket, object := path2BucketObject(objPath) if bucket == "" { @@ -539,5 +547,6 @@ func (driver *ftpDriver) PutFile(ctx *ftp.Context, objPath string, data io.Reade ContentType: mimedb.TypeByExtension(path.Ext(object)), DisableContentSha256: true, }) - return info.Size, err + n = info.Size + return n, err } diff --git a/cmd/http-tracer.go b/cmd/http-tracer.go index 439549edd..cbc2ce4ac 100644 --- a/cmd/http-tracer.go +++ b/cmd/http-tracer.go @@ -142,6 +142,7 @@ func httpTracerMiddleware(h http.Handler) http.Handler { Time: reqStartTime, Duration: reqEndTime.Sub(respRecorder.StartTime), Path: reqPath, + Bytes: int64(inputBytes + respRecorder.Size()), HTTP: &madmin.TraceHTTPStats{ ReqInfo: madmin.TraceRequestInfo{ Time: reqStartTime, diff --git a/cmd/sftp-server-driver.go b/cmd/sftp-server-driver.go index f28e15dfc..b255f2df3 100644 --- a/cmd/sftp-server-driver.go +++ b/cmd/sftp-server-driver.go @@ -53,7 +53,7 @@ type sftpMetrics struct{} var globalSftpMetrics sftpMetrics -func sftpTrace(s *sftp.Request, startTime time.Time, source string, user string, err error) madmin.TraceInfo { +func sftpTrace(s *sftp.Request, startTime time.Time, source string, user string, err error, sz int64) madmin.TraceInfo { var errStr string if err != nil { errStr = err.Error() @@ -62,18 +62,25 @@ func sftpTrace(s *sftp.Request, startTime time.Time, source string, user string, TraceType: madmin.TraceFTP, Time: startTime, NodeName: globalLocalNodeName, - FuncName: fmt.Sprintf("sftp USER=%s COMMAND=%s PARAM=%s, Source=%s", user, s.Method, s.Filepath, source), + FuncName: s.Method, Duration: time.Since(startTime), Path: s.Filepath, Error: errStr, + Bytes: sz, + Custom: map[string]string{ + "user": user, + "cmd": s.Method, + "param": s.Filepath, + "source": source, + }, } } -func (m *sftpMetrics) log(s *sftp.Request, user string) func(err error) { +func (m *sftpMetrics) log(s *sftp.Request, user string) func(sz int64, err error) { startTime := time.Now() source := getSource(2) - return func(err error) { - globalTrace.Publish(sftpTrace(s, startTime, source, user, err)) + return func(sz int64, err error) { + globalTrace.Publish(sftpTrace(s, startTime, source, user, err, sz)) } } @@ -194,8 +201,9 @@ func (f *sftpDriver) AccessKey() string { } func (f *sftpDriver) Fileread(r *sftp.Request) (ra io.ReaderAt, err error) { + // This is not timing the actual read operation, but the time it takes to prepare the reader. stopFn := globalSftpMetrics.log(r, f.AccessKey()) - defer stopFn(err) + defer stopFn(0, err) flags := r.Pflags() if !flags.Read { @@ -301,7 +309,12 @@ again: func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) { stopFn := globalSftpMetrics.log(r, f.AccessKey()) - defer stopFn(err) + defer func() { + if err != nil { + // If there is an error, we never started the goroutine. + stopFn(0, err) + } + }() flags := r.Pflags() if !flags.Write { @@ -336,10 +349,11 @@ func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) { } wa.wg.Add(1) go func() { - _, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{ + oi, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{ ContentType: mimedb.TypeByExtension(path.Ext(object)), DisableContentSha256: true, }) + stopFn(oi.Size, err) pr.CloseWithError(err) wa.wg.Done() }() @@ -348,7 +362,7 @@ func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) { func (f *sftpDriver) Filecmd(r *sftp.Request) (err error) { stopFn := globalSftpMetrics.log(r, f.AccessKey()) - defer stopFn(err) + defer stopFn(0, err) clnt, err := f.getMinIOClient() if err != nil { @@ -444,7 +458,7 @@ func (f listerAt) ListAt(ls []os.FileInfo, offset int64) (int, error) { func (f *sftpDriver) Filelist(r *sftp.Request) (la sftp.ListerAt, err error) { stopFn := globalSftpMetrics.log(r, f.AccessKey()) - defer stopFn(err) + defer stopFn(0, err) clnt, err := f.getMinIOClient() if err != nil {