mirror of
				https://github.com/minio/minio.git
				synced 2025-11-04 10:11:09 +01:00 
			
		
		
		
	A corner case can occur where the delete-marker was propagated but the metadata could not be updated on the primary. Sending a RemoveObject call with the Delete marker version would end up permanently deleting the version on target. Instead, perform a Stat on the delete-marker version on target and redo replication only if the delete-marker is missing on target.
		
			
				
	
	
		
			1431 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1431 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright (c) 2015-2021 MinIO, Inc.
 | 
						|
//
 | 
						|
// This file is part of MinIO Object Storage stack
 | 
						|
//
 | 
						|
// This program is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Affero General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// This program is distributed in the hope that it will be useful
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
// GNU Affero General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Affero General Public License
 | 
						|
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package cmd
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"context"
 | 
						|
	"encoding/binary"
 | 
						|
	"errors"
 | 
						|
	"io/fs"
 | 
						|
	"math"
 | 
						|
	"math/rand"
 | 
						|
	"net/http"
 | 
						|
	"os"
 | 
						|
	"path"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/bits-and-blooms/bloom/v3"
 | 
						|
	"github.com/minio/madmin-go"
 | 
						|
	"github.com/minio/minio/internal/bucket/lifecycle"
 | 
						|
	"github.com/minio/minio/internal/bucket/replication"
 | 
						|
	"github.com/minio/minio/internal/color"
 | 
						|
	"github.com/minio/minio/internal/config/heal"
 | 
						|
	"github.com/minio/minio/internal/event"
 | 
						|
	"github.com/minio/minio/internal/hash"
 | 
						|
	"github.com/minio/minio/internal/logger"
 | 
						|
	"github.com/minio/pkg/console"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	dataScannerSleepPerFolder     = time.Millisecond                 // Time to wait between folders.
 | 
						|
	dataUsageUpdateDirCycles      = 16                               // Visit all folders every n cycles.
 | 
						|
	dataScannerCompactLeastObject = 500                              // Compact when there is less than this many objects in a branch.
 | 
						|
	dataScannerCompactAtChildren  = 10000                            // Compact when there are this many children in a branch.
 | 
						|
	dataScannerCompactAtFolders   = dataScannerCompactAtChildren / 4 // Compact when this many subfolders in a single folder.
 | 
						|
	dataScannerStartDelay         = 1 * time.Minute                  // Time to wait on startup and between cycles.
 | 
						|
 | 
						|
	healDeleteDangling    = true
 | 
						|
	healFolderIncludeProb = 32  // Include a clean folder one in n cycles.
 | 
						|
	healObjectSelectProb  = 512 // Overall probability of a file being scanned; one in n.
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	globalHealConfig heal.Config
 | 
						|
 | 
						|
	dataScannerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
 | 
						|
	// Sleeper values are updated when config is loaded.
 | 
						|
	scannerSleeper = newDynamicSleeper(10, 10*time.Second)
 | 
						|
	scannerCycle   = &safeDuration{
 | 
						|
		t: dataScannerStartDelay,
 | 
						|
	}
 | 
						|
)
 | 
						|
 | 
						|
// initDataScanner will start the scanner in the background.
 | 
						|
func initDataScanner(ctx context.Context, objAPI ObjectLayer) {
 | 
						|
	go func() {
 | 
						|
		// Run the data scanner in a loop
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-ctx.Done():
 | 
						|
				return
 | 
						|
			default:
 | 
						|
				runDataScanner(ctx, objAPI)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
}
 | 
						|
 | 
						|
type safeDuration struct {
 | 
						|
	sync.Mutex
 | 
						|
	t time.Duration
 | 
						|
}
 | 
						|
 | 
						|
func (s *safeDuration) Update(t time.Duration) {
 | 
						|
	s.Lock()
 | 
						|
	defer s.Unlock()
 | 
						|
	s.t = t
 | 
						|
}
 | 
						|
 | 
						|
func (s *safeDuration) Get() time.Duration {
 | 
						|
	s.Lock()
 | 
						|
	defer s.Unlock()
 | 
						|
	return s.t
 | 
						|
}
 | 
						|
 | 
						|
// runDataScanner will start a data scanner.
 | 
						|
// The function will block until the context is canceled.
 | 
						|
// There should only ever be one scanner running per cluster.
 | 
						|
func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
 | 
						|
	// Make sure only 1 scanner is running on the cluster.
 | 
						|
	locker := objAPI.NewNSLock(minioMetaBucket, "runDataScanner.lock")
 | 
						|
	var ctx context.Context
 | 
						|
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
 | 
						|
	for {
 | 
						|
		lkctx, err := locker.GetLock(pctx, dataScannerLeaderLockTimeout)
 | 
						|
		if err != nil {
 | 
						|
			time.Sleep(time.Duration(r.Float64() * float64(scannerCycle.Get())))
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		ctx = lkctx.Context()
 | 
						|
		defer lkctx.Cancel()
 | 
						|
		break
 | 
						|
		// No unlock for "leader" lock.
 | 
						|
	}
 | 
						|
 | 
						|
	// Load current bloom cycle
 | 
						|
	nextBloomCycle := intDataUpdateTracker.current() + 1
 | 
						|
 | 
						|
	br, err := objAPI.GetObjectNInfo(ctx, dataUsageBucket, dataUsageBloomName, nil, http.Header{}, readLock, ObjectOptions{})
 | 
						|
	if err != nil {
 | 
						|
		if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) {
 | 
						|
			logger.LogIf(ctx, err)
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		if br.ObjInfo.Size == 8 {
 | 
						|
			if err = binary.Read(br, binary.LittleEndian, &nextBloomCycle); err != nil {
 | 
						|
				logger.LogIf(ctx, err)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		br.Close()
 | 
						|
	}
 | 
						|
 | 
						|
	scannerTimer := time.NewTimer(scannerCycle.Get())
 | 
						|
	defer scannerTimer.Stop()
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ctx.Done():
 | 
						|
			return
 | 
						|
		case <-scannerTimer.C:
 | 
						|
			// Reset the timer for next cycle.
 | 
						|
			scannerTimer.Reset(scannerCycle.Get())
 | 
						|
 | 
						|
			if intDataUpdateTracker.debug {
 | 
						|
				console.Debugln("starting scanner cycle")
 | 
						|
			}
 | 
						|
 | 
						|
			// Wait before starting next cycle and wait on startup.
 | 
						|
			results := make(chan DataUsageInfo, 1)
 | 
						|
			go storeDataUsageInBackend(ctx, objAPI, results)
 | 
						|
			bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle)
 | 
						|
			logger.LogIf(ctx, err)
 | 
						|
			err = objAPI.NSScanner(ctx, bf, results, uint32(nextBloomCycle))
 | 
						|
			logger.LogIf(ctx, err)
 | 
						|
			if err == nil {
 | 
						|
				// Store new cycle...
 | 
						|
				nextBloomCycle++
 | 
						|
				var tmp [8]byte
 | 
						|
				binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle)
 | 
						|
				r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)))
 | 
						|
				if err != nil {
 | 
						|
					logger.LogIf(ctx, err)
 | 
						|
					continue
 | 
						|
				}
 | 
						|
 | 
						|
				_, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageBloomName, NewPutObjReader(r), ObjectOptions{})
 | 
						|
				if !isErrBucketNotFound(err) {
 | 
						|
					logger.LogIf(ctx, err)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type cachedFolder struct {
 | 
						|
	name              string
 | 
						|
	parent            *dataUsageHash
 | 
						|
	objectHealProbDiv uint32
 | 
						|
}
 | 
						|
 | 
						|
type folderScanner struct {
 | 
						|
	root        string
 | 
						|
	getSize     getSizeFn
 | 
						|
	oldCache    dataUsageCache
 | 
						|
	newCache    dataUsageCache
 | 
						|
	updateCache dataUsageCache
 | 
						|
	withFilter  *bloomFilter
 | 
						|
 | 
						|
	dataUsageScannerDebug bool
 | 
						|
	healFolderInclude     uint32 // Include a clean folder one in n cycles.
 | 
						|
	healObjectSelect      uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
 | 
						|
 | 
						|
	disks []StorageAPI
 | 
						|
 | 
						|
	// If set updates will be sent regularly to this channel.
 | 
						|
	// Will not be closed when returned.
 | 
						|
	updates    chan<- dataUsageEntry
 | 
						|
	lastUpdate time.Time
 | 
						|
}
 | 
						|
 | 
						|
type scannerStats struct {
 | 
						|
	// All fields must be accessed atomically and aligned.
 | 
						|
 | 
						|
	accTotalObjects  uint64
 | 
						|
	accTotalVersions uint64
 | 
						|
	accFolders       uint64
 | 
						|
	bucketsStarted   uint64
 | 
						|
	bucketsFinished  uint64
 | 
						|
	ilmChecks        uint64
 | 
						|
 | 
						|
	// actions records actions performed.
 | 
						|
	actions [lifecycle.ActionCount]uint64
 | 
						|
}
 | 
						|
 | 
						|
var globalScannerStats scannerStats
 | 
						|
 | 
						|
// Cache structure and compaction:
 | 
						|
//
 | 
						|
// A cache structure will be kept with a tree of usages.
 | 
						|
// The cache is a tree structure where each keeps track of its children.
 | 
						|
//
 | 
						|
// An uncompacted branch contains a count of the files only directly at the
 | 
						|
// branch level, and contains link to children branches or leaves.
 | 
						|
//
 | 
						|
// The leaves are "compacted" based on a number of properties.
 | 
						|
// A compacted leaf contains the totals of all files beneath it.
 | 
						|
//
 | 
						|
// A leaf is only scanned once every dataUsageUpdateDirCycles,
 | 
						|
// rarer if the bloom filter for the path is clean and no lifecycles are applied.
 | 
						|
// Skipped leaves have their totals transferred from the previous cycle.
 | 
						|
//
 | 
						|
// A clean leaf will be included once every healFolderIncludeProb for partial heal scans.
 | 
						|
// When selected there is a one in healObjectSelectProb that any object will be chosen for heal scan.
 | 
						|
//
 | 
						|
// Compaction happens when either:
 | 
						|
//
 | 
						|
// 1) The folder (and subfolders) contains less than dataScannerCompactLeastObject objects.
 | 
						|
// 2) The folder itself contains more than dataScannerCompactAtFolders folders.
 | 
						|
// 3) The folder only contains objects and no subfolders.
 | 
						|
//
 | 
						|
// A bucket root will never be compacted.
 | 
						|
//
 | 
						|
// Furthermore if a has more than dataScannerCompactAtChildren recursive children (uncompacted folders)
 | 
						|
// the tree will be recursively scanned and the branches with the least number of objects will be
 | 
						|
// compacted until the limit is reached.
 | 
						|
//
 | 
						|
// This ensures that any branch will never contain an unreasonable amount of other branches,
 | 
						|
// and also that small branches with few objects don't take up unreasonable amounts of space.
 | 
						|
// This keeps the cache size at a reasonable size for all buckets.
 | 
						|
//
 | 
						|
// Whenever a branch is scanned, it is assumed that it will be un-compacted
 | 
						|
// before it hits any of the above limits.
 | 
						|
// This will make the branch rebalance itself when scanned if the distribution of objects has changed.
 | 
						|
 | 
						|
// scanDataFolder will scanner the basepath+cache.Info.Name and return an updated cache.
 | 
						|
// The returned cache will always be valid, but may not be updated from the existing.
 | 
						|
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
 | 
						|
// If the supplied context is canceled the function will return at the first chance.
 | 
						|
func scanDataFolder(ctx context.Context, basePath string, cache dataUsageCache, getSize getSizeFn) (dataUsageCache, error) {
 | 
						|
	t := UTCNow()
 | 
						|
 | 
						|
	logPrefix := color.Green("data-usage: ")
 | 
						|
	logSuffix := color.Blue("- %v + %v", basePath, cache.Info.Name)
 | 
						|
	atomic.AddUint64(&globalScannerStats.bucketsStarted, 1)
 | 
						|
	defer func() {
 | 
						|
		atomic.AddUint64(&globalScannerStats.bucketsFinished, 1)
 | 
						|
	}()
 | 
						|
	if intDataUpdateTracker.debug {
 | 
						|
		defer func() {
 | 
						|
			console.Debugf(logPrefix+" Scanner time: %v %s\n", time.Since(t), logSuffix)
 | 
						|
		}()
 | 
						|
 | 
						|
	}
 | 
						|
 | 
						|
	switch cache.Info.Name {
 | 
						|
	case "", dataUsageRoot:
 | 
						|
		return cache, errors.New("internal error: root scan attempted")
 | 
						|
	}
 | 
						|
 | 
						|
	s := folderScanner{
 | 
						|
		root:                  basePath,
 | 
						|
		getSize:               getSize,
 | 
						|
		oldCache:              cache,
 | 
						|
		newCache:              dataUsageCache{Info: cache.Info},
 | 
						|
		updateCache:           dataUsageCache{Info: cache.Info},
 | 
						|
		dataUsageScannerDebug: intDataUpdateTracker.debug,
 | 
						|
		healFolderInclude:     0,
 | 
						|
		healObjectSelect:      0,
 | 
						|
		updates:               cache.Info.updates,
 | 
						|
	}
 | 
						|
 | 
						|
	// Add disks for set healing.
 | 
						|
	if len(cache.Disks) > 0 {
 | 
						|
		objAPI, ok := newObjectLayerFn().(*erasureServerPools)
 | 
						|
		if ok {
 | 
						|
			s.disks = objAPI.GetDisksID(cache.Disks...)
 | 
						|
			if len(s.disks) != len(cache.Disks) {
 | 
						|
				console.Debugf(logPrefix+"Missing disks, want %d, found %d. Cannot heal. %s\n", len(cache.Disks), len(s.disks), logSuffix)
 | 
						|
				s.disks = s.disks[:0]
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Enable healing in XL mode.
 | 
						|
	if globalIsErasure && !cache.Info.SkipHealing {
 | 
						|
		// Include a clean folder one in n cycles.
 | 
						|
		s.healFolderInclude = healFolderIncludeProb
 | 
						|
		// Do a heal check on an object once every n cycles. Must divide into healFolderInclude
 | 
						|
		s.healObjectSelect = healObjectSelectProb
 | 
						|
	}
 | 
						|
	if len(cache.Info.BloomFilter) > 0 {
 | 
						|
		s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}}
 | 
						|
		_, err := s.withFilter.ReadFrom(bytes.NewReader(cache.Info.BloomFilter))
 | 
						|
		if err != nil {
 | 
						|
			logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter")
 | 
						|
			s.withFilter = nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if s.dataUsageScannerDebug {
 | 
						|
		console.Debugf(logPrefix+"Start scanning. Bloom filter: %v %s\n", s.withFilter != nil, logSuffix)
 | 
						|
	}
 | 
						|
 | 
						|
	done := ctx.Done()
 | 
						|
	if s.dataUsageScannerDebug {
 | 
						|
		console.Debugf(logPrefix+"Cycle: %v, Entries: %v %s\n", cache.Info.NextCycle, len(cache.Cache), logSuffix)
 | 
						|
	}
 | 
						|
 | 
						|
	// Read top level in bucket.
 | 
						|
	select {
 | 
						|
	case <-done:
 | 
						|
		return cache, ctx.Err()
 | 
						|
	default:
 | 
						|
	}
 | 
						|
	root := dataUsageEntry{}
 | 
						|
	folder := cachedFolder{name: cache.Info.Name, objectHealProbDiv: 1}
 | 
						|
	err := s.scanFolder(ctx, folder, &root)
 | 
						|
	if err != nil {
 | 
						|
		// No useful information...
 | 
						|
		return cache, err
 | 
						|
	}
 | 
						|
 | 
						|
	if s.dataUsageScannerDebug {
 | 
						|
		console.Debugf(logPrefix+"Finished scanner, %v entries (%+v) %s \n", len(s.newCache.Cache), *s.newCache.sizeRecursive(s.newCache.Info.Name), logSuffix)
 | 
						|
	}
 | 
						|
	s.newCache.Info.LastUpdate = UTCNow()
 | 
						|
	s.newCache.Info.NextCycle = cache.Info.NextCycle
 | 
						|
	return s.newCache, nil
 | 
						|
}
 | 
						|
 | 
						|
// sendUpdate() should be called on a regular basis when the newCache contains more recent total than previously.
 | 
						|
// May or may not send an update upstream.
 | 
						|
func (f *folderScanner) sendUpdate() {
 | 
						|
	// Send at most an update every minute.
 | 
						|
	if f.updates == nil || time.Since(f.lastUpdate) < time.Minute {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if flat := f.updateCache.sizeRecursive(f.newCache.Info.Name); flat != nil {
 | 
						|
		select {
 | 
						|
		case f.updates <- *flat:
 | 
						|
		default:
 | 
						|
		}
 | 
						|
		f.lastUpdate = time.Now()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// scanFolder will scan the provided folder.
 | 
						|
// Files found in the folders will be added to f.newCache.
 | 
						|
// If final is provided folders will be put into f.newFolders or f.existingFolders.
 | 
						|
// If final is not provided the folders found are returned from the function.
 | 
						|
func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, into *dataUsageEntry) error {
 | 
						|
	done := ctx.Done()
 | 
						|
	scannerLogPrefix := color.Green("folder-scanner:")
 | 
						|
	thisHash := hashPath(folder.name)
 | 
						|
	// Store initial compaction state.
 | 
						|
	wasCompacted := into.Compacted
 | 
						|
	atomic.AddUint64(&globalScannerStats.accFolders, 1)
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-done:
 | 
						|
			return ctx.Err()
 | 
						|
		default:
 | 
						|
		}
 | 
						|
		existing, ok := f.oldCache.Cache[thisHash.Key()]
 | 
						|
		var abandonedChildren dataUsageHashMap
 | 
						|
		if !into.Compacted {
 | 
						|
			abandonedChildren = f.oldCache.findChildrenCopy(thisHash)
 | 
						|
		}
 | 
						|
 | 
						|
		// If there are lifecycle rules for the prefix, remove the filter.
 | 
						|
		filter := f.withFilter
 | 
						|
		_, prefix := path2BucketObjectWithBasePath(f.root, folder.name)
 | 
						|
		var activeLifeCycle *lifecycle.Lifecycle
 | 
						|
		if f.oldCache.Info.lifeCycle != nil && f.oldCache.Info.lifeCycle.HasActiveRules(prefix, true) {
 | 
						|
			if f.dataUsageScannerDebug {
 | 
						|
				console.Debugf(scannerLogPrefix+" Prefix %q has active rules\n", prefix)
 | 
						|
			}
 | 
						|
			activeLifeCycle = f.oldCache.Info.lifeCycle
 | 
						|
			filter = nil
 | 
						|
		}
 | 
						|
		// If there are replication rules for the prefix, remove the filter.
 | 
						|
		var replicationCfg replicationConfig
 | 
						|
		if !f.oldCache.Info.replication.Empty() && f.oldCache.Info.replication.Config.HasActiveRules(prefix, true) {
 | 
						|
			replicationCfg = f.oldCache.Info.replication
 | 
						|
			filter = nil
 | 
						|
		}
 | 
						|
		// Check if we can skip it due to bloom filter...
 | 
						|
		if filter != nil && ok && existing.Compacted {
 | 
						|
			// If folder isn't in filter and we have data, skip it completely.
 | 
						|
			if folder.name != dataUsageRoot && !filter.containsDir(folder.name) {
 | 
						|
				if f.healObjectSelect == 0 || !thisHash.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) {
 | 
						|
					f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
 | 
						|
					f.updateCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
 | 
						|
					if f.dataUsageScannerDebug {
 | 
						|
						console.Debugf(scannerLogPrefix+" Skipping non-updated folder: %v\n", folder.name)
 | 
						|
					}
 | 
						|
					return nil
 | 
						|
				}
 | 
						|
				if f.dataUsageScannerDebug {
 | 
						|
					console.Debugf(scannerLogPrefix+" Adding non-updated folder to heal check: %v\n", folder.name)
 | 
						|
				}
 | 
						|
				// If probability was already scannerHealFolderInclude, keep it.
 | 
						|
				folder.objectHealProbDiv = f.healFolderInclude
 | 
						|
			}
 | 
						|
		}
 | 
						|
		scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder)
 | 
						|
 | 
						|
		var existingFolders, newFolders []cachedFolder
 | 
						|
		var foundObjects bool
 | 
						|
		err := readDirFn(path.Join(f.root, folder.name), func(entName string, typ os.FileMode) error {
 | 
						|
			// Parse
 | 
						|
			entName = pathClean(path.Join(folder.name, entName))
 | 
						|
			if entName == "" || entName == folder.name {
 | 
						|
				if f.dataUsageScannerDebug {
 | 
						|
					console.Debugf(scannerLogPrefix+" no entity (%s,%s)\n", f.root, entName)
 | 
						|
				}
 | 
						|
				return nil
 | 
						|
			}
 | 
						|
			bucket, prefix := path2BucketObjectWithBasePath(f.root, entName)
 | 
						|
			if bucket == "" {
 | 
						|
				if f.dataUsageScannerDebug {
 | 
						|
					console.Debugf(scannerLogPrefix+" no bucket (%s,%s)\n", f.root, entName)
 | 
						|
				}
 | 
						|
				return errDoneForNow
 | 
						|
			}
 | 
						|
 | 
						|
			if isReservedOrInvalidBucket(bucket, false) {
 | 
						|
				if f.dataUsageScannerDebug {
 | 
						|
					console.Debugf(scannerLogPrefix+" invalid bucket: %v, entry: %v\n", bucket, entName)
 | 
						|
				}
 | 
						|
				return errDoneForNow
 | 
						|
			}
 | 
						|
 | 
						|
			select {
 | 
						|
			case <-done:
 | 
						|
				return errDoneForNow
 | 
						|
			default:
 | 
						|
			}
 | 
						|
 | 
						|
			if typ&os.ModeDir != 0 {
 | 
						|
				h := hashPath(entName)
 | 
						|
				_, exists := f.oldCache.Cache[h.Key()]
 | 
						|
				if h == thisHash {
 | 
						|
					return nil
 | 
						|
				}
 | 
						|
				this := cachedFolder{name: entName, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv}
 | 
						|
				delete(abandonedChildren, h.Key()) // h.Key() already accounted for.
 | 
						|
				if exists {
 | 
						|
					existingFolders = append(existingFolders, this)
 | 
						|
					f.updateCache.copyWithChildren(&f.oldCache, h, &thisHash)
 | 
						|
				} else {
 | 
						|
					newFolders = append(newFolders, this)
 | 
						|
				}
 | 
						|
				return nil
 | 
						|
			}
 | 
						|
 | 
						|
			// Dynamic time delay.
 | 
						|
			wait := scannerSleeper.Timer(ctx)
 | 
						|
 | 
						|
			// Get file size, ignore errors.
 | 
						|
			item := scannerItem{
 | 
						|
				Path:        path.Join(f.root, entName),
 | 
						|
				Typ:         typ,
 | 
						|
				bucket:      bucket,
 | 
						|
				prefix:      path.Dir(prefix),
 | 
						|
				objectName:  path.Base(entName),
 | 
						|
				debug:       f.dataUsageScannerDebug,
 | 
						|
				lifeCycle:   activeLifeCycle,
 | 
						|
				replication: replicationCfg,
 | 
						|
				heal:        thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure,
 | 
						|
			}
 | 
						|
			// if the drive belongs to an erasure set
 | 
						|
			// that is already being healed, skip the
 | 
						|
			// healing attempt on this drive.
 | 
						|
			item.heal = item.heal && f.healObjectSelect > 0
 | 
						|
 | 
						|
			sz, err := f.getSize(item)
 | 
						|
			if err != nil {
 | 
						|
				wait() // wait to proceed to next entry.
 | 
						|
				if err != errSkipFile && f.dataUsageScannerDebug {
 | 
						|
					console.Debugf(scannerLogPrefix+" getSize \"%v/%v\" returned err: %v\n", bucket, item.objectPath(), err)
 | 
						|
				}
 | 
						|
				return nil
 | 
						|
			}
 | 
						|
 | 
						|
			// successfully read means we have a valid object.
 | 
						|
			foundObjects = true
 | 
						|
			// Remove filename i.e is the meta file to construct object name
 | 
						|
			item.transformMetaDir()
 | 
						|
 | 
						|
			// Object already accounted for, remove from heal map,
 | 
						|
			// simply because getSize() function already heals the
 | 
						|
			// object.
 | 
						|
			delete(abandonedChildren, path.Join(item.bucket, item.objectPath()))
 | 
						|
 | 
						|
			into.addSizes(sz)
 | 
						|
			into.Objects++
 | 
						|
 | 
						|
			wait() // wait to proceed to next entry.
 | 
						|
 | 
						|
			return nil
 | 
						|
		})
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		if foundObjects && globalIsErasure {
 | 
						|
			// If we found an object in erasure mode, we skip subdirs (only datadirs)...
 | 
						|
			break
 | 
						|
		}
 | 
						|
 | 
						|
		// If we have many subfolders, compact ourself.
 | 
						|
		if !into.Compacted &&
 | 
						|
			f.newCache.Info.Name != folder.name &&
 | 
						|
			len(existingFolders)+len(newFolders) >= dataScannerCompactAtFolders {
 | 
						|
			into.Compacted = true
 | 
						|
			newFolders = append(newFolders, existingFolders...)
 | 
						|
			existingFolders = nil
 | 
						|
			if f.dataUsageScannerDebug {
 | 
						|
				console.Debugf(scannerLogPrefix+" Preemptively compacting: %v, entries: %v\n", folder.name, len(existingFolders)+len(newFolders))
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		scanFolder := func(folder cachedFolder) {
 | 
						|
			if contextCanceled(ctx) {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			dst := into
 | 
						|
			if !into.Compacted {
 | 
						|
				dst = &dataUsageEntry{Compacted: false}
 | 
						|
			}
 | 
						|
			if err := f.scanFolder(ctx, folder, dst); err != nil {
 | 
						|
				logger.LogIf(ctx, err)
 | 
						|
				return
 | 
						|
			}
 | 
						|
			if !into.Compacted {
 | 
						|
				h := dataUsageHash(folder.name)
 | 
						|
				into.addChild(h)
 | 
						|
				// We scanned a folder, optionally send update.
 | 
						|
				f.updateCache.deleteRecursive(h)
 | 
						|
				f.updateCache.copyWithChildren(&f.newCache, h, folder.parent)
 | 
						|
				f.sendUpdate()
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// Transfer existing
 | 
						|
		if !into.Compacted {
 | 
						|
			for _, folder := range existingFolders {
 | 
						|
				h := hashPath(folder.name)
 | 
						|
				f.updateCache.copyWithChildren(&f.oldCache, h, folder.parent)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		// Scan new...
 | 
						|
		for _, folder := range newFolders {
 | 
						|
			h := hashPath(folder.name)
 | 
						|
			// Add new folders to the update tree so totals update for these.
 | 
						|
			if !into.Compacted {
 | 
						|
				var foundAny bool
 | 
						|
				parent := thisHash
 | 
						|
				for parent != hashPath(f.updateCache.Info.Name) {
 | 
						|
					e := f.updateCache.find(parent.Key())
 | 
						|
					if e == nil || e.Compacted {
 | 
						|
						foundAny = true
 | 
						|
						break
 | 
						|
					}
 | 
						|
					if next := f.updateCache.searchParent(parent); next == nil {
 | 
						|
						foundAny = true
 | 
						|
						break
 | 
						|
					} else {
 | 
						|
						parent = *next
 | 
						|
					}
 | 
						|
				}
 | 
						|
				if !foundAny {
 | 
						|
					// Add non-compacted empty entry.
 | 
						|
					f.updateCache.replaceHashed(h, &thisHash, dataUsageEntry{})
 | 
						|
				}
 | 
						|
			}
 | 
						|
			scanFolder(folder)
 | 
						|
			// Add new folders if this is new and we don't have existing.
 | 
						|
			if !into.Compacted {
 | 
						|
				parent := f.updateCache.find(thisHash.Key())
 | 
						|
				if parent != nil && !parent.Compacted {
 | 
						|
					f.updateCache.deleteRecursive(h)
 | 
						|
					f.updateCache.copyWithChildren(&f.newCache, h, &thisHash)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// Scan existing...
 | 
						|
		for _, folder := range existingFolders {
 | 
						|
			h := hashPath(folder.name)
 | 
						|
			// Check if we should skip scanning folder...
 | 
						|
			// We can only skip if we are not indexing into a compacted destination
 | 
						|
			// and the entry itself is compacted.
 | 
						|
			if !into.Compacted && f.oldCache.isCompacted(h) {
 | 
						|
				if !h.mod(f.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) {
 | 
						|
					if f.healObjectSelect == 0 || !h.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) {
 | 
						|
						// Transfer and add as child...
 | 
						|
						f.newCache.copyWithChildren(&f.oldCache, h, folder.parent)
 | 
						|
						into.addChild(h)
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					folder.objectHealProbDiv = dataUsageUpdateDirCycles
 | 
						|
				}
 | 
						|
			}
 | 
						|
			scanFolder(folder)
 | 
						|
		}
 | 
						|
 | 
						|
		// Scan for healing
 | 
						|
		if f.healObjectSelect == 0 || len(abandonedChildren) == 0 {
 | 
						|
			// If we are not heal scanning, return now.
 | 
						|
			break
 | 
						|
		}
 | 
						|
 | 
						|
		objAPI, ok := newObjectLayerFn().(*erasureServerPools)
 | 
						|
		if !ok || len(f.disks) == 0 {
 | 
						|
			break
 | 
						|
		}
 | 
						|
 | 
						|
		bgSeq, found := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
 | 
						|
		if !found {
 | 
						|
			break
 | 
						|
		}
 | 
						|
 | 
						|
		// Whatever remains in 'abandonedChildren' are folders at this level
 | 
						|
		// that existed in the previous run but wasn't found now.
 | 
						|
		//
 | 
						|
		// This may be because of 2 reasons:
 | 
						|
		//
 | 
						|
		// 1) The folder/object was deleted.
 | 
						|
		// 2) We come from another disk and this disk missed the write.
 | 
						|
		//
 | 
						|
		// We therefore perform a heal check.
 | 
						|
		// If that doesn't bring it back we remove the folder and assume it was deleted.
 | 
						|
		// This means that the next run will not look for it.
 | 
						|
		// How to resolve results.
 | 
						|
		resolver := metadataResolutionParams{
 | 
						|
			dirQuorum: getReadQuorum(len(f.disks)),
 | 
						|
			objQuorum: getReadQuorum(len(f.disks)),
 | 
						|
			bucket:    "",
 | 
						|
			strict:    false,
 | 
						|
		}
 | 
						|
 | 
						|
		healObjectsPrefix := color.Green("healObjects:")
 | 
						|
		for k := range abandonedChildren {
 | 
						|
			bucket, prefix := path2BucketObject(k)
 | 
						|
			if f.dataUsageScannerDebug {
 | 
						|
				console.Debugf(scannerLogPrefix+" checking disappeared folder: %v/%v\n", bucket, prefix)
 | 
						|
			}
 | 
						|
 | 
						|
			if bucket != resolver.bucket {
 | 
						|
				// Bucket might be missing as well with abandoned children.
 | 
						|
				// make sure it is created first otherwise healing won't proceed
 | 
						|
				// for objects.
 | 
						|
				_, _ = objAPI.HealBucket(ctx, bucket, madmin.HealOpts{})
 | 
						|
			}
 | 
						|
 | 
						|
			resolver.bucket = bucket
 | 
						|
 | 
						|
			foundObjs := false
 | 
						|
			ctx, cancel := context.WithCancel(ctx)
 | 
						|
 | 
						|
			err := listPathRaw(ctx, listPathRawOptions{
 | 
						|
				disks:          f.disks,
 | 
						|
				bucket:         bucket,
 | 
						|
				path:           prefix,
 | 
						|
				recursive:      true,
 | 
						|
				reportNotFound: true,
 | 
						|
				minDisks:       len(f.disks), // We want full consistency.
 | 
						|
				// Weird, maybe transient error.
 | 
						|
				agreed: func(entry metaCacheEntry) {
 | 
						|
					if f.dataUsageScannerDebug {
 | 
						|
						console.Debugf(healObjectsPrefix+" got agreement: %v\n", entry.name)
 | 
						|
					}
 | 
						|
				},
 | 
						|
				// Some disks have data for this.
 | 
						|
				partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
 | 
						|
					if f.dataUsageScannerDebug {
 | 
						|
						console.Debugf(healObjectsPrefix+" got partial, %d agreed, errs: %v\n", nAgreed, errs)
 | 
						|
					}
 | 
						|
 | 
						|
					entry, ok := entries.resolve(&resolver)
 | 
						|
					if !ok {
 | 
						|
						// check if we can get one entry atleast
 | 
						|
						// proceed to heal nonetheless, since
 | 
						|
						// this object might be dangling.
 | 
						|
						entry, _ = entries.firstFound()
 | 
						|
					}
 | 
						|
 | 
						|
					if f.dataUsageScannerDebug {
 | 
						|
						console.Debugf(healObjectsPrefix+" resolved to: %v, dir: %v\n", entry.name, entry.isDir())
 | 
						|
					}
 | 
						|
 | 
						|
					if entry.isDir() {
 | 
						|
						return
 | 
						|
					}
 | 
						|
 | 
						|
					// wait on timer per object.
 | 
						|
					wait := scannerSleeper.Timer(ctx)
 | 
						|
 | 
						|
					// We got an entry which we should be able to heal.
 | 
						|
					fiv, err := entry.fileInfoVersions(bucket)
 | 
						|
					if err != nil {
 | 
						|
						wait()
 | 
						|
						err := bgSeq.queueHealTask(healSource{
 | 
						|
							bucket:    bucket,
 | 
						|
							object:    entry.name,
 | 
						|
							versionID: "",
 | 
						|
						}, madmin.HealItemObject)
 | 
						|
						logger.LogIf(ctx, err)
 | 
						|
						foundObjs = foundObjs || err == nil
 | 
						|
						return
 | 
						|
					}
 | 
						|
 | 
						|
					for _, ver := range fiv.Versions {
 | 
						|
						// Sleep and reset.
 | 
						|
						wait()
 | 
						|
						wait = scannerSleeper.Timer(ctx)
 | 
						|
 | 
						|
						err := bgSeq.queueHealTask(healSource{
 | 
						|
							bucket:    bucket,
 | 
						|
							object:    fiv.Name,
 | 
						|
							versionID: ver.VersionID,
 | 
						|
						}, madmin.HealItemObject)
 | 
						|
						logger.LogIf(ctx, err)
 | 
						|
						foundObjs = foundObjs || err == nil
 | 
						|
					}
 | 
						|
				},
 | 
						|
				// Too many disks failed.
 | 
						|
				finished: func(errs []error) {
 | 
						|
					if f.dataUsageScannerDebug {
 | 
						|
						console.Debugf(healObjectsPrefix+" too many errors: %v\n", errs)
 | 
						|
					}
 | 
						|
					cancel()
 | 
						|
				},
 | 
						|
			})
 | 
						|
 | 
						|
			if f.dataUsageScannerDebug && err != nil && err != errFileNotFound {
 | 
						|
				console.Debugf(healObjectsPrefix+" checking returned value %v (%T)\n", err, err)
 | 
						|
			}
 | 
						|
 | 
						|
			// Add unless healing returned an error.
 | 
						|
			if foundObjs {
 | 
						|
				this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: 1}
 | 
						|
				scanFolder(this)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		break
 | 
						|
	}
 | 
						|
	if !wasCompacted {
 | 
						|
		f.newCache.replaceHashed(thisHash, folder.parent, *into)
 | 
						|
	}
 | 
						|
 | 
						|
	if !into.Compacted && f.newCache.Info.Name != folder.name {
 | 
						|
		flat := f.newCache.sizeRecursive(thisHash.Key())
 | 
						|
		flat.Compacted = true
 | 
						|
		var compact bool
 | 
						|
		if flat.Objects < dataScannerCompactLeastObject {
 | 
						|
			if f.dataUsageScannerDebug && flat.Objects > 1 {
 | 
						|
				// Disabled, rather chatty:
 | 
						|
				// console.Debugf(scannerLogPrefix+" Only %d objects, compacting %s -> %+v\n", flat.Objects, folder.name, flat)
 | 
						|
			}
 | 
						|
			compact = true
 | 
						|
		} else {
 | 
						|
			// Compact if we only have objects as children...
 | 
						|
			compact = true
 | 
						|
			for k := range into.Children {
 | 
						|
				if v, ok := f.newCache.Cache[k]; ok {
 | 
						|
					if len(v.Children) > 0 || v.Objects > 1 {
 | 
						|
						compact = false
 | 
						|
						break
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
			if f.dataUsageScannerDebug && compact {
 | 
						|
				// Disabled, rather chatty:
 | 
						|
				// console.Debugf(scannerLogPrefix+" Only objects (%d), compacting %s -> %+v\n", flat.Objects, folder.name, flat)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if compact {
 | 
						|
			f.newCache.deleteRecursive(thisHash)
 | 
						|
			f.newCache.replaceHashed(thisHash, folder.parent, *flat)
 | 
						|
		}
 | 
						|
 | 
						|
	}
 | 
						|
	// Compact if too many children...
 | 
						|
	if !into.Compacted {
 | 
						|
		f.newCache.reduceChildrenOf(thisHash, dataScannerCompactAtChildren, f.newCache.Info.Name != folder.name)
 | 
						|
	}
 | 
						|
	if _, ok := f.updateCache.Cache[thisHash.Key()]; !wasCompacted && ok {
 | 
						|
		// Replace if existed before.
 | 
						|
		if flat := f.newCache.sizeRecursive(thisHash.Key()); flat != nil {
 | 
						|
			f.updateCache.deleteRecursive(thisHash)
 | 
						|
			f.updateCache.replaceHashed(thisHash, folder.parent, *flat)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// scannerItem represents each file while walking.
 | 
						|
type scannerItem struct {
 | 
						|
	Path        string
 | 
						|
	bucket      string // Bucket.
 | 
						|
	prefix      string // Only the prefix if any, does not have final object name.
 | 
						|
	objectName  string // Only the object name without prefixes.
 | 
						|
	replication replicationConfig
 | 
						|
	lifeCycle   *lifecycle.Lifecycle
 | 
						|
	Typ         fs.FileMode
 | 
						|
	heal        bool // Has the object been selected for heal check?
 | 
						|
	debug       bool
 | 
						|
}
 | 
						|
 | 
						|
type sizeSummary struct {
 | 
						|
	totalSize       int64
 | 
						|
	versions        uint64
 | 
						|
	replicatedSize  int64
 | 
						|
	pendingSize     int64
 | 
						|
	failedSize      int64
 | 
						|
	replicaSize     int64
 | 
						|
	pendingCount    uint64
 | 
						|
	failedCount     uint64
 | 
						|
	replTargetStats map[string]replTargetSizeSummary
 | 
						|
	tiers           map[string]tierStats
 | 
						|
}
 | 
						|
 | 
						|
// replTargetSizeSummary holds summary of replication stats by target
 | 
						|
type replTargetSizeSummary struct {
 | 
						|
	replicatedSize int64
 | 
						|
	pendingSize    int64
 | 
						|
	failedSize     int64
 | 
						|
	pendingCount   uint64
 | 
						|
	failedCount    uint64
 | 
						|
}
 | 
						|
 | 
						|
type getSizeFn func(item scannerItem) (sizeSummary, error)
 | 
						|
 | 
						|
// transformMetaDir will transform a directory to prefix/file.ext
 | 
						|
func (i *scannerItem) transformMetaDir() {
 | 
						|
	split := strings.Split(i.prefix, SlashSeparator)
 | 
						|
	if len(split) > 1 {
 | 
						|
		i.prefix = path.Join(split[:len(split)-1]...)
 | 
						|
	} else {
 | 
						|
		i.prefix = ""
 | 
						|
	}
 | 
						|
	// Object name is last element
 | 
						|
	i.objectName = split[len(split)-1]
 | 
						|
}
 | 
						|
 | 
						|
var applyActionsLogPrefix = color.Green("applyActions:")
 | 
						|
var applyVersionActionsLogPrefix = color.Green("applyVersionActions:")
 | 
						|
 | 
						|
func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi ObjectInfo) (size int64) {
 | 
						|
	if i.debug {
 | 
						|
		if oi.VersionID != "" {
 | 
						|
			console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), oi.VersionID)
 | 
						|
		} else {
 | 
						|
			console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v\n", i.bucket, i.objectPath())
 | 
						|
		}
 | 
						|
	}
 | 
						|
	healOpts := madmin.HealOpts{
 | 
						|
		Remove:   healDeleteDangling,
 | 
						|
		ScanMode: globalHealConfig.ScanMode(),
 | 
						|
	}
 | 
						|
	res, err := o.HealObject(ctx, i.bucket, i.objectPath(), oi.VersionID, healOpts)
 | 
						|
	if err != nil && !errors.Is(err, NotImplemented{}) {
 | 
						|
		logger.LogIf(ctx, err)
 | 
						|
		return 0
 | 
						|
	}
 | 
						|
	return res.ObjectSize
 | 
						|
}
 | 
						|
 | 
						|
func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi ObjectInfo) (applied bool, size int64) {
 | 
						|
	size, err := oi.GetActualSize()
 | 
						|
	if i.debug {
 | 
						|
		logger.LogIf(ctx, err)
 | 
						|
	}
 | 
						|
	if i.lifeCycle == nil {
 | 
						|
		if i.debug {
 | 
						|
			// disabled, very chatty:
 | 
						|
			// console.Debugf(applyActionsLogPrefix+" no lifecycle rules to apply: %q\n", i.objectPath())
 | 
						|
		}
 | 
						|
		return false, size
 | 
						|
	}
 | 
						|
 | 
						|
	atomic.AddUint64(&globalScannerStats.ilmChecks, 1)
 | 
						|
	versionID := oi.VersionID
 | 
						|
	action := i.lifeCycle.ComputeAction(
 | 
						|
		lifecycle.ObjectOpts{
 | 
						|
			Name:             i.objectPath(),
 | 
						|
			UserTags:         oi.UserTags,
 | 
						|
			ModTime:          oi.ModTime,
 | 
						|
			VersionID:        oi.VersionID,
 | 
						|
			DeleteMarker:     oi.DeleteMarker,
 | 
						|
			IsLatest:         oi.IsLatest,
 | 
						|
			NumVersions:      oi.NumVersions,
 | 
						|
			SuccessorModTime: oi.SuccessorModTime,
 | 
						|
			RestoreOngoing:   oi.RestoreOngoing,
 | 
						|
			RestoreExpires:   oi.RestoreExpires,
 | 
						|
			TransitionStatus: oi.TransitionedObject.Status,
 | 
						|
		})
 | 
						|
	if i.debug {
 | 
						|
		if versionID != "" {
 | 
						|
			console.Debugf(applyActionsLogPrefix+" lifecycle: %q (version-id=%s), Initial scan: %v\n", i.objectPath(), versionID, action)
 | 
						|
		} else {
 | 
						|
			console.Debugf(applyActionsLogPrefix+" lifecycle: %q Initial scan: %v\n", i.objectPath(), action)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	atomic.AddUint64(&globalScannerStats.actions[action], 1)
 | 
						|
 | 
						|
	switch action {
 | 
						|
	case lifecycle.DeleteAction, lifecycle.DeleteVersionAction, lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
 | 
						|
		return applyLifecycleAction(action, oi), 0
 | 
						|
	case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
 | 
						|
		return applyLifecycleAction(action, oi), size
 | 
						|
	default:
 | 
						|
		// No action.
 | 
						|
		return false, size
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// applyTierObjSweep removes remote object pending deletion and the free-version
 | 
						|
// tracking this information.
 | 
						|
func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi ObjectInfo) {
 | 
						|
	if !oi.TransitionedObject.FreeVersion {
 | 
						|
		// nothing to be done
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	ignoreNotFoundErr := func(err error) error {
 | 
						|
		switch {
 | 
						|
		case isErrVersionNotFound(err), isErrObjectNotFound(err):
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	// Remove the remote object
 | 
						|
	err := deleteObjectFromRemoteTier(ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier)
 | 
						|
	if ignoreNotFoundErr(err) != nil {
 | 
						|
		logger.LogIf(ctx, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// Remove this free version
 | 
						|
	_, err = o.DeleteObject(ctx, oi.Bucket, oi.Name, ObjectOptions{
 | 
						|
		VersionID: oi.VersionID,
 | 
						|
	})
 | 
						|
	if err == nil {
 | 
						|
		auditLogLifecycle(ctx, oi, ILMFreeVersionDelete)
 | 
						|
	}
 | 
						|
	if ignoreNotFoundErr(err) != nil {
 | 
						|
		logger.LogIf(ctx, err)
 | 
						|
	}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
// applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured.
 | 
						|
// Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return.
 | 
						|
func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo) ([]FileInfo, error) {
 | 
						|
	if i.lifeCycle == nil {
 | 
						|
		return fivs, nil
 | 
						|
	}
 | 
						|
 | 
						|
	_, days, lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()})
 | 
						|
	if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions
 | 
						|
		return fivs, nil
 | 
						|
	}
 | 
						|
 | 
						|
	overflowVersions := fivs[lim+1:]
 | 
						|
	// current version + most recent lim noncurrent versions
 | 
						|
	fivs = fivs[:lim+1]
 | 
						|
 | 
						|
	rcfg, _ := globalBucketObjectLockSys.Get(i.bucket)
 | 
						|
	toDel := make([]ObjectToDelete, 0, len(overflowVersions))
 | 
						|
	for _, fi := range overflowVersions {
 | 
						|
		obj := fi.ToObjectInfo(i.bucket, i.objectPath())
 | 
						|
		// skip versions with object locking enabled
 | 
						|
		if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) {
 | 
						|
			if i.debug {
 | 
						|
				if obj.VersionID != "" {
 | 
						|
					console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID)
 | 
						|
				} else {
 | 
						|
					console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name)
 | 
						|
				}
 | 
						|
			}
 | 
						|
			// add this version back to remaining versions for
 | 
						|
			// subsequent lifecycle policy applications
 | 
						|
			fivs = append(fivs, fi)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// NoncurrentDays not passed yet.
 | 
						|
		if time.Now().UTC().Before(lifecycle.ExpectedExpiryTime(obj.SuccessorModTime, days)) {
 | 
						|
			// add this version back to remaining versions for
 | 
						|
			// subsequent lifecycle policy applications
 | 
						|
			fivs = append(fivs, fi)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		toDel = append(toDel, ObjectToDelete{
 | 
						|
			ObjectName: fi.Name,
 | 
						|
			VersionID:  fi.VersionID,
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	globalExpiryState.enqueueByNewerNoncurrent(i.bucket, toDel)
 | 
						|
	return fivs, nil
 | 
						|
}
 | 
						|
 | 
						|
// applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain
 | 
						|
// after applying lifecycle checks configured.
 | 
						|
func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]FileInfo, error) {
 | 
						|
	return i.applyNewerNoncurrentVersionLimit(ctx, o, fivs)
 | 
						|
}
 | 
						|
 | 
						|
// applyActions will apply lifecycle checks on to a scanned item.
 | 
						|
// The resulting size on disk will always be returned.
 | 
						|
// The metadata will be compared to consensus on the object layer before any changes are applied.
 | 
						|
// If no metadata is supplied, -1 is returned if no action is taken.
 | 
						|
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) int64 {
 | 
						|
	applied, size := i.applyLifecycle(ctx, o, oi)
 | 
						|
	// For instance, an applied lifecycle means we remove/transitioned an object
 | 
						|
	// from the current deployment, which means we don't have to call healing
 | 
						|
	// routine even if we are asked to do via heal flag.
 | 
						|
	if !applied {
 | 
						|
		if i.heal {
 | 
						|
			size = i.applyHealing(ctx, o, oi)
 | 
						|
		}
 | 
						|
		// replicate only if lifecycle rules are not applied.
 | 
						|
		i.healReplication(ctx, o, oi.Clone(), sizeS)
 | 
						|
	}
 | 
						|
	return size
 | 
						|
}
 | 
						|
 | 
						|
func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, obj ObjectInfo, debug bool) (action lifecycle.Action) {
 | 
						|
	action = lc.ComputeAction(obj.ToLifecycleOpts())
 | 
						|
	if debug {
 | 
						|
		console.Debugf(applyActionsLogPrefix+" lifecycle: Secondary scan: %v\n", action)
 | 
						|
	}
 | 
						|
 | 
						|
	if action == lifecycle.NoneAction {
 | 
						|
		return action
 | 
						|
	}
 | 
						|
 | 
						|
	switch action {
 | 
						|
	case lifecycle.DeleteVersionAction, lifecycle.DeleteRestoredVersionAction:
 | 
						|
		// Defensive code, should never happen
 | 
						|
		if obj.VersionID == "" {
 | 
						|
			return lifecycle.NoneAction
 | 
						|
		}
 | 
						|
		if rcfg, _ := globalBucketObjectLockSys.Get(obj.Bucket); rcfg.LockEnabled {
 | 
						|
			locked := enforceRetentionForDeletion(ctx, obj)
 | 
						|
			if locked {
 | 
						|
				if debug {
 | 
						|
					if obj.VersionID != "" {
 | 
						|
						console.Debugf(applyActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID)
 | 
						|
					} else {
 | 
						|
						console.Debugf(applyActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name)
 | 
						|
					}
 | 
						|
				}
 | 
						|
				return lifecycle.NoneAction
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return action
 | 
						|
}
 | 
						|
 | 
						|
func applyTransitionRule(obj ObjectInfo) bool {
 | 
						|
	if obj.DeleteMarker {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	globalTransitionState.queueTransitionTask(obj)
 | 
						|
	return true
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func applyExpiryOnTransitionedObject(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject bool) bool {
 | 
						|
	action := expireObj
 | 
						|
	if restoredObject {
 | 
						|
		action = expireRestoredObj
 | 
						|
	}
 | 
						|
	if err := expireTransitionedObject(ctx, objLayer, &obj, obj.ToLifecycleOpts(), action); err != nil {
 | 
						|
		if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
 | 
						|
			return false
 | 
						|
		}
 | 
						|
		logger.LogIf(ctx, err)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	// Notification already sent in *expireTransitionedObject*, just return 'true' here.
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, applyOnVersion bool) bool {
 | 
						|
	opts := ObjectOptions{
 | 
						|
		Expiration: ExpirationOptions{Expire: true},
 | 
						|
	}
 | 
						|
 | 
						|
	if applyOnVersion {
 | 
						|
		opts.VersionID = obj.VersionID
 | 
						|
	}
 | 
						|
	if opts.VersionID == "" {
 | 
						|
		opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket)
 | 
						|
	}
 | 
						|
 | 
						|
	obj, err := objLayer.DeleteObject(ctx, obj.Bucket, obj.Name, opts)
 | 
						|
	if err != nil {
 | 
						|
		if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
 | 
						|
			return false
 | 
						|
		}
 | 
						|
		// Assume it is still there.
 | 
						|
		logger.LogIf(ctx, err)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	// Send audit for the lifecycle delete operation
 | 
						|
	auditLogLifecycle(ctx, obj, ILMExpiry)
 | 
						|
 | 
						|
	eventName := event.ObjectRemovedDelete
 | 
						|
	if obj.DeleteMarker {
 | 
						|
		eventName = event.ObjectRemovedDeleteMarkerCreated
 | 
						|
	}
 | 
						|
 | 
						|
	// Notify object deleted event.
 | 
						|
	sendEvent(eventArgs{
 | 
						|
		EventName:  eventName,
 | 
						|
		BucketName: obj.Bucket,
 | 
						|
		Object:     obj,
 | 
						|
		Host:       "Internal: [ILM-EXPIRY]",
 | 
						|
	})
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// Apply object, object version, restored object or restored object version action on the given object
 | 
						|
func applyExpiryRule(obj ObjectInfo, restoredObject, applyOnVersion bool) bool {
 | 
						|
	globalExpiryState.enqueueByDays(obj, restoredObject, applyOnVersion)
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// Perform actions (removal or transitioning of objects), return true the action is successfully performed
 | 
						|
func applyLifecycleAction(action lifecycle.Action, obj ObjectInfo) (success bool) {
 | 
						|
	switch action {
 | 
						|
	case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
 | 
						|
		success = applyExpiryRule(obj, false, action == lifecycle.DeleteVersionAction)
 | 
						|
	case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
 | 
						|
		success = applyExpiryRule(obj, true, action == lifecycle.DeleteRestoredVersionAction)
 | 
						|
	case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
 | 
						|
		success = applyTransitionRule(obj)
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// objectPath returns the prefix and object name.
 | 
						|
func (i *scannerItem) objectPath() string {
 | 
						|
	return path.Join(i.prefix, i.objectName)
 | 
						|
}
 | 
						|
 | 
						|
// healReplication will heal a scanned item that has failed replication.
 | 
						|
func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) {
 | 
						|
	roi := getHealReplicateObjectInfo(oi, i.replication)
 | 
						|
 | 
						|
	if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
 | 
						|
		// heal delete marker replication failure or versioned delete replication failure
 | 
						|
		if oi.ReplicationStatus == replication.Pending ||
 | 
						|
			oi.ReplicationStatus == replication.Failed ||
 | 
						|
			oi.VersionPurgeStatus == Failed || oi.VersionPurgeStatus == Pending {
 | 
						|
			i.healReplicationDeletes(ctx, o, roi)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		// if replication status is Complete on DeleteMarker and existing object resync required
 | 
						|
		if roi.ExistingObjResync.mustResync() && (oi.ReplicationStatus == replication.Completed || oi.ReplicationStatus.Empty()) {
 | 
						|
			i.healReplicationDeletes(ctx, o, roi)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if roi.ExistingObjResync.mustResync() {
 | 
						|
		roi.OpType = replication.ExistingObjectReplicationType
 | 
						|
	}
 | 
						|
 | 
						|
	if roi.TargetStatuses != nil {
 | 
						|
		if sizeS.replTargetStats == nil {
 | 
						|
			sizeS.replTargetStats = make(map[string]replTargetSizeSummary)
 | 
						|
		}
 | 
						|
		for arn, tgtStatus := range roi.TargetStatuses {
 | 
						|
			tgtSizeS, ok := sizeS.replTargetStats[arn]
 | 
						|
			if !ok {
 | 
						|
				tgtSizeS = replTargetSizeSummary{}
 | 
						|
			}
 | 
						|
			switch tgtStatus {
 | 
						|
			case replication.Pending:
 | 
						|
				tgtSizeS.pendingCount++
 | 
						|
				tgtSizeS.pendingSize += oi.Size
 | 
						|
				sizeS.pendingCount++
 | 
						|
				sizeS.pendingSize += oi.Size
 | 
						|
			case replication.Failed:
 | 
						|
				tgtSizeS.failedSize += oi.Size
 | 
						|
				tgtSizeS.failedCount++
 | 
						|
				sizeS.failedSize += oi.Size
 | 
						|
				sizeS.failedCount++
 | 
						|
			case replication.Completed, "COMPLETE":
 | 
						|
				tgtSizeS.replicatedSize += oi.Size
 | 
						|
				sizeS.replicatedSize += oi.Size
 | 
						|
			}
 | 
						|
			sizeS.replTargetStats[arn] = tgtSizeS
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	switch oi.ReplicationStatus {
 | 
						|
	case replication.Pending, replication.Failed:
 | 
						|
		globalReplicationPool.queueReplicaTask(roi)
 | 
						|
		return
 | 
						|
	case replication.Replica:
 | 
						|
		sizeS.replicaSize += oi.Size
 | 
						|
	}
 | 
						|
	if roi.ExistingObjResync.mustResync() {
 | 
						|
		globalReplicationPool.queueReplicaTask(roi)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// healReplicationDeletes will heal a scanned deleted item that failed to replicate deletes.
 | 
						|
func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, roi ReplicateObjectInfo) {
 | 
						|
	// handle soft delete and permanent delete failures here.
 | 
						|
	if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
 | 
						|
		versionID := ""
 | 
						|
		dmVersionID := ""
 | 
						|
		if roi.VersionPurgeStatus.Empty() {
 | 
						|
			dmVersionID = roi.VersionID
 | 
						|
		} else {
 | 
						|
			versionID = roi.VersionID
 | 
						|
		}
 | 
						|
 | 
						|
		doi := DeletedObjectReplicationInfo{
 | 
						|
			DeletedObject: DeletedObject{
 | 
						|
				ObjectName:            roi.Name,
 | 
						|
				DeleteMarkerVersionID: dmVersionID,
 | 
						|
				VersionID:             versionID,
 | 
						|
				ReplicationState:      roi.getReplicationState(roi.Dsc.String(), versionID, true),
 | 
						|
				DeleteMarkerMTime:     DeleteMarkerMTime{roi.ModTime},
 | 
						|
				DeleteMarker:          roi.DeleteMarker,
 | 
						|
			},
 | 
						|
			Bucket: roi.Bucket,
 | 
						|
			OpType: replication.HealReplicationType,
 | 
						|
		}
 | 
						|
		if roi.ExistingObjResync.mustResync() {
 | 
						|
			doi.OpType = replication.ExistingObjectReplicationType
 | 
						|
			queueReplicateDeletesWrapper(doi, roi.ExistingObjResync)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		globalReplicationPool.queueReplicaDeleteTask(doi)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type dynamicSleeper struct {
 | 
						|
	mu sync.RWMutex
 | 
						|
 | 
						|
	// Sleep factor
 | 
						|
	factor float64
 | 
						|
 | 
						|
	// maximum sleep cap,
 | 
						|
	// set to <= 0 to disable.
 | 
						|
	maxSleep time.Duration
 | 
						|
 | 
						|
	// Don't sleep at all, if time taken is below this value.
 | 
						|
	// This is to avoid too small costly sleeps.
 | 
						|
	minSleep time.Duration
 | 
						|
 | 
						|
	// cycle will be closed
 | 
						|
	cycle chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
// newDynamicSleeper
 | 
						|
func newDynamicSleeper(factor float64, maxWait time.Duration) *dynamicSleeper {
 | 
						|
	return &dynamicSleeper{
 | 
						|
		factor:   factor,
 | 
						|
		cycle:    make(chan struct{}),
 | 
						|
		maxSleep: maxWait,
 | 
						|
		minSleep: 100 * time.Microsecond,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Timer returns a timer that has started.
 | 
						|
// When the returned function is called it will wait.
 | 
						|
func (d *dynamicSleeper) Timer(ctx context.Context) func() {
 | 
						|
	t := time.Now()
 | 
						|
	return func() {
 | 
						|
		doneAt := time.Now()
 | 
						|
		for {
 | 
						|
			// Grab current values
 | 
						|
			d.mu.RLock()
 | 
						|
			minWait, maxWait := d.minSleep, d.maxSleep
 | 
						|
			factor := d.factor
 | 
						|
			cycle := d.cycle
 | 
						|
			d.mu.RUnlock()
 | 
						|
			elapsed := doneAt.Sub(t)
 | 
						|
			// Don't sleep for really small amount of time
 | 
						|
			wantSleep := time.Duration(float64(elapsed) * factor)
 | 
						|
			if wantSleep <= minWait {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			if maxWait > 0 && wantSleep > maxWait {
 | 
						|
				wantSleep = maxWait
 | 
						|
			}
 | 
						|
			timer := time.NewTimer(wantSleep)
 | 
						|
			select {
 | 
						|
			case <-ctx.Done():
 | 
						|
				if !timer.Stop() {
 | 
						|
					<-timer.C
 | 
						|
				}
 | 
						|
				return
 | 
						|
			case <-timer.C:
 | 
						|
				return
 | 
						|
			case <-cycle:
 | 
						|
				if !timer.Stop() {
 | 
						|
					// We expired.
 | 
						|
					<-timer.C
 | 
						|
					return
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Sleep sleeps the specified time multiplied by the sleep factor.
 | 
						|
// If the factor is updated the sleep will be done again with the new factor.
 | 
						|
func (d *dynamicSleeper) Sleep(ctx context.Context, base time.Duration) {
 | 
						|
	for {
 | 
						|
		// Grab current values
 | 
						|
		d.mu.RLock()
 | 
						|
		minWait, maxWait := d.minSleep, d.maxSleep
 | 
						|
		factor := d.factor
 | 
						|
		cycle := d.cycle
 | 
						|
		d.mu.RUnlock()
 | 
						|
		// Don't sleep for really small amount of time
 | 
						|
		wantSleep := time.Duration(float64(base) * factor)
 | 
						|
		if wantSleep <= minWait {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		if maxWait > 0 && wantSleep > maxWait {
 | 
						|
			wantSleep = maxWait
 | 
						|
		}
 | 
						|
		timer := time.NewTimer(wantSleep)
 | 
						|
		select {
 | 
						|
		case <-ctx.Done():
 | 
						|
			if !timer.Stop() {
 | 
						|
				<-timer.C
 | 
						|
			}
 | 
						|
			return
 | 
						|
		case <-timer.C:
 | 
						|
			return
 | 
						|
		case <-cycle:
 | 
						|
			if !timer.Stop() {
 | 
						|
				// We expired.
 | 
						|
				<-timer.C
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Update the current settings and cycle all waiting.
 | 
						|
// Parameters are the same as in the contructor.
 | 
						|
func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error {
 | 
						|
	d.mu.Lock()
 | 
						|
	defer d.mu.Unlock()
 | 
						|
	if math.Abs(d.factor-factor) < 1e-10 && d.maxSleep == maxWait {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	// Update values and cycle waiting.
 | 
						|
	close(d.cycle)
 | 
						|
	d.factor = factor
 | 
						|
	d.maxSleep = maxWait
 | 
						|
	d.cycle = make(chan struct{})
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
const (
 | 
						|
	// ILMExpiry - audit trail for ILM expiry
 | 
						|
	ILMExpiry = "ilm:expiry"
 | 
						|
	// ILMFreeVersionDelete - audit trail for ILM free-version delete
 | 
						|
	ILMFreeVersionDelete = "ilm:free-version-delete"
 | 
						|
	// ILMTransition - audit trail for ILM transitioning.
 | 
						|
	ILMTransition = " ilm:transition"
 | 
						|
)
 | 
						|
 | 
						|
func auditLogLifecycle(ctx context.Context, oi ObjectInfo, trigger string) {
 | 
						|
	var apiName string
 | 
						|
	switch trigger {
 | 
						|
	case ILMExpiry:
 | 
						|
		apiName = "ILMExpiry"
 | 
						|
	case ILMFreeVersionDelete:
 | 
						|
		apiName = "ILMFreeVersionDelete"
 | 
						|
	case ILMTransition:
 | 
						|
		apiName = "ILMTransition"
 | 
						|
	}
 | 
						|
	auditLogInternal(ctx, oi.Bucket, oi.Name, AuditLogOptions{
 | 
						|
		Trigger:   trigger,
 | 
						|
		APIName:   apiName,
 | 
						|
		VersionID: oi.VersionID,
 | 
						|
	})
 | 
						|
}
 |