mirror of
				https://github.com/minio/minio.git
				synced 2025-11-04 10:11:09 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			251 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			251 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
 * MinIO Cloud Storage, (C) 2020 MinIO, Inc.
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 */
 | 
						|
 | 
						|
package cmd
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"path"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/minio/minio/cmd/logger"
 | 
						|
)
 | 
						|
 | 
						|
type scanStatus uint8
 | 
						|
 | 
						|
const (
 | 
						|
	scanStateNone scanStatus = iota
 | 
						|
	scanStateStarted
 | 
						|
	scanStateSuccess
 | 
						|
	scanStateError
 | 
						|
 | 
						|
	// Time in which the initiator of a scan must have reported back.
 | 
						|
	metacacheMaxRunningAge = time.Minute
 | 
						|
 | 
						|
	// metacacheBlockSize is the number of file/directory entries to have in each block.
 | 
						|
	metacacheBlockSize = 5000
 | 
						|
 | 
						|
	// metacacheSharePrefix controls whether prefixes on dirty paths are always shared.
 | 
						|
	// This will make `test/a` and `test/b` share listings if they are concurrent.
 | 
						|
	// Enabling this will make cache sharing more likely and cause less IO,
 | 
						|
	// but may cause additional latency to some calls.
 | 
						|
	metacacheSharePrefix = false
 | 
						|
)
 | 
						|
 | 
						|
//go:generate msgp -file $GOFILE -unexported
 | 
						|
 | 
						|
// metacache contains a tracked cache entry.
 | 
						|
type metacache struct {
 | 
						|
	id           string     `msg:"id"`
 | 
						|
	bucket       string     `msg:"b"`
 | 
						|
	root         string     `msg:"root"`
 | 
						|
	recursive    bool       `msg:"rec"`
 | 
						|
	filter       string     `msg:"flt"`
 | 
						|
	status       scanStatus `msg:"stat"`
 | 
						|
	fileNotFound bool       `msg:"fnf"`
 | 
						|
	error        string     `msg:"err"`
 | 
						|
	started      time.Time  `msg:"st"`
 | 
						|
	ended        time.Time  `msg:"end"`
 | 
						|
	lastUpdate   time.Time  `msg:"u"`
 | 
						|
	lastHandout  time.Time  `msg:"lh"`
 | 
						|
	startedCycle uint64     `msg:"stc"`
 | 
						|
	endedCycle   uint64     `msg:"endc"`
 | 
						|
	dataVersion  uint8      `msg:"v"`
 | 
						|
}
 | 
						|
 | 
						|
func (m *metacache) finished() bool {
 | 
						|
	return !m.ended.IsZero()
 | 
						|
}
 | 
						|
 | 
						|
// matches returns whether the metacache matches the options given.
 | 
						|
func (m *metacache) matches(o *listPathOptions, extend time.Duration) bool {
 | 
						|
	if o == nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	// Never return transient caches if there is no id.
 | 
						|
	if m.status == scanStateError || m.status == scanStateNone || m.dataVersion != metacacheStreamVersion {
 | 
						|
		o.debugf("cache %s state or stream version mismatch", m.id)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	if m.startedCycle < o.OldestCycle {
 | 
						|
		o.debugf("cache %s cycle too old", m.id)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	// Root of what we are looking for must at least have the same
 | 
						|
	if !strings.HasPrefix(o.BaseDir, m.root) {
 | 
						|
		o.debugf("cache %s prefix mismatch, cached:%v, want:%v", m.id, m.root, o.BaseDir)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	if m.filter != "" && strings.HasPrefix(m.filter, o.FilterPrefix) {
 | 
						|
		o.debugf("cache %s cannot be used because of filter %s", m.id, m.filter)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	if o.Recursive && !m.recursive {
 | 
						|
		o.debugf("cache %s not recursive", m.id)
 | 
						|
		// If this is recursive the cached listing must be as well.
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	if o.Separator != slashSeparator && !m.recursive {
 | 
						|
		o.debugf("cache %s not slashsep and not recursive", m.id)
 | 
						|
		// Non slash separator requires recursive.
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	if !m.finished() && time.Since(m.lastUpdate) > metacacheMaxRunningAge {
 | 
						|
		o.debugf("cache %s not running, time: %v", m.id, time.Since(m.lastUpdate))
 | 
						|
		// Abandoned
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	if m.finished() && m.endedCycle <= o.OldestCycle {
 | 
						|
		if extend <= 0 {
 | 
						|
			// If scan has ended the oldest requested must be less.
 | 
						|
			o.debugf("cache %s ended and cycle (%v) <= oldest allowed (%v)", m.id, m.endedCycle, o.OldestCycle)
 | 
						|
			return false
 | 
						|
		}
 | 
						|
		if time.Since(m.lastUpdate) > metacacheMaxRunningAge+extend {
 | 
						|
			// Cache ended within bloom cycle, but we can extend the life.
 | 
						|
			o.debugf("cache %s ended (%v) and beyond extended life (%v)", m.id, m.lastUpdate, extend+metacacheMaxRunningAge)
 | 
						|
			return false
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// worthKeeping indicates if the cache by itself is worth keeping.
 | 
						|
func (m *metacache) worthKeeping(currentCycle uint64) bool {
 | 
						|
	if m == nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	cache := m
 | 
						|
	switch {
 | 
						|
	case !cache.finished() && time.Since(cache.lastUpdate) > metacacheMaxRunningAge:
 | 
						|
		// Not finished and update for metacacheMaxRunningAge, discard it.
 | 
						|
		return false
 | 
						|
	case cache.finished() && cache.startedCycle > currentCycle:
 | 
						|
		// Cycle is somehow bigger.
 | 
						|
		return false
 | 
						|
	case cache.finished() && time.Since(cache.lastHandout) > 48*time.Hour:
 | 
						|
		// Keep only for 2 days. Fallback if crawler is clogged.
 | 
						|
		return false
 | 
						|
	case cache.finished() && currentCycle >= dataUsageUpdateDirCycles && cache.startedCycle < currentCycle-dataUsageUpdateDirCycles:
 | 
						|
		// Cycle is too old to be valuable.
 | 
						|
		return false
 | 
						|
	case cache.status == scanStateError || cache.status == scanStateNone:
 | 
						|
		// Remove failed listings after 10 minutes.
 | 
						|
		return time.Since(cache.lastUpdate) < 10*time.Minute
 | 
						|
	}
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// canBeReplacedBy.
 | 
						|
// Both must pass the worthKeeping check.
 | 
						|
func (m *metacache) canBeReplacedBy(other *metacache) bool {
 | 
						|
	// If the other is older it can never replace.
 | 
						|
	if other.started.Before(m.started) || m.id == other.id {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	if other.status == scanStateNone || other.status == scanStateError {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	if m.status == scanStateStarted && time.Since(m.lastUpdate) < metacacheMaxRunningAge {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	// Keep it around a bit longer.
 | 
						|
	if time.Since(m.lastHandout) < time.Hour || time.Since(m.lastUpdate) < metacacheMaxRunningAge {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	// Go through recursive combinations.
 | 
						|
	switch {
 | 
						|
	case !m.recursive && !other.recursive:
 | 
						|
		// If both not recursive root must match.
 | 
						|
		return m.root == other.root && strings.HasPrefix(m.filter, other.filter)
 | 
						|
	case m.recursive && !other.recursive:
 | 
						|
		// A recursive can never be replaced by a non-recursive
 | 
						|
		return false
 | 
						|
	case !m.recursive && other.recursive:
 | 
						|
		// If other is recursive it must contain this root
 | 
						|
		return strings.HasPrefix(m.root, other.root) && other.filter == ""
 | 
						|
	case m.recursive && other.recursive:
 | 
						|
		// Similar if both are recursive
 | 
						|
		return strings.HasPrefix(m.root, other.root) && other.filter == ""
 | 
						|
	}
 | 
						|
	panic("should be unreachable")
 | 
						|
}
 | 
						|
 | 
						|
// baseDirFromPrefix will return the base directory given an object path.
 | 
						|
// For example an object with name prefix/folder/object.ext will return `prefix/folder/`.
 | 
						|
func baseDirFromPrefix(prefix string) string {
 | 
						|
	b := path.Dir(prefix)
 | 
						|
	if b == "." || b == "./" || b == "/" {
 | 
						|
		b = ""
 | 
						|
	}
 | 
						|
	if !strings.Contains(prefix, slashSeparator) {
 | 
						|
		b = ""
 | 
						|
	}
 | 
						|
	if len(b) > 0 && !strings.HasSuffix(b, slashSeparator) {
 | 
						|
		b += slashSeparator
 | 
						|
	}
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
// update cache with new status.
 | 
						|
// The updates are conditional so multiple callers can update with different states.
 | 
						|
func (m *metacache) update(update metacache) {
 | 
						|
	m.lastUpdate = UTCNow()
 | 
						|
 | 
						|
	if m.status == scanStateStarted && update.status == scanStateSuccess {
 | 
						|
		m.ended = UTCNow()
 | 
						|
		m.endedCycle = update.endedCycle
 | 
						|
	}
 | 
						|
 | 
						|
	if m.status == scanStateStarted && update.status != scanStateStarted {
 | 
						|
		m.status = update.status
 | 
						|
	}
 | 
						|
 | 
						|
	if m.error == "" && update.error != "" {
 | 
						|
		m.error = update.error
 | 
						|
		m.status = scanStateError
 | 
						|
		m.ended = UTCNow()
 | 
						|
	}
 | 
						|
	m.fileNotFound = m.fileNotFound || update.fileNotFound
 | 
						|
}
 | 
						|
 | 
						|
// delete all cache data on disks.
 | 
						|
func (m *metacache) delete(ctx context.Context) {
 | 
						|
	if m.bucket == "" || m.id == "" {
 | 
						|
		logger.LogIf(ctx, fmt.Errorf("metacache.delete: bucket (%s) or id (%s) empty", m.bucket, m.id))
 | 
						|
	}
 | 
						|
	objAPI := newObjectLayerFn()
 | 
						|
	if objAPI == nil {
 | 
						|
		logger.LogIf(ctx, errors.New("metacache.delete: no object layer"))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	ez, ok := objAPI.(*erasureServerPools)
 | 
						|
	if !ok {
 | 
						|
		logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerPools"))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id))
 | 
						|
}
 |