mirror of
				https://github.com/minio/minio.git
				synced 2025-10-26 13:51:30 +01:00 
			
		
		
		
	if the commit is still in pending or failed status This PR also does some minor code cleanup
		
			
				
	
	
		
			1198 lines
		
	
	
		
			42 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1198 lines
		
	
	
		
			42 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) 2015-2021 MinIO, Inc.
 | |
| //
 | |
| // This file is part of MinIO Object Storage stack
 | |
| //
 | |
| // This program is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Affero General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // This program is distributed in the hope that it will be useful
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| // GNU Affero General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Affero General Public License
 | |
| // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package cmd
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net/http"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	objectlock "github.com/minio/minio/internal/bucket/object/lock"
 | |
| 	"github.com/minio/minio/internal/color"
 | |
| 	"github.com/minio/minio/internal/config/cache"
 | |
| 	"github.com/minio/minio/internal/hash"
 | |
| 	xhttp "github.com/minio/minio/internal/http"
 | |
| 	"github.com/minio/minio/internal/logger"
 | |
| 	"github.com/minio/minio/internal/sync/errgroup"
 | |
| 	"github.com/minio/pkg/wildcard"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	cacheBlkSize          = 1 << 20
 | |
| 	cacheGCInterval       = time.Minute * 30
 | |
| 	writeBackStatusHeader = ReservedMetadataPrefixLower + "write-back-status"
 | |
| 	writeBackRetryHeader  = ReservedMetadataPrefixLower + "write-back-retry"
 | |
| )
 | |
| 
 | |
| type cacheCommitStatus string
 | |
| 
 | |
| const (
 | |
| 	// CommitPending - cache writeback with backend is pending.
 | |
| 	CommitPending cacheCommitStatus = "pending"
 | |
| 
 | |
| 	// CommitComplete - cache writeback completed ok.
 | |
| 	CommitComplete cacheCommitStatus = "complete"
 | |
| 
 | |
| 	// CommitFailed - cache writeback needs a retry.
 | |
| 	CommitFailed cacheCommitStatus = "failed"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// CommitWriteBack allows staging and write back of cached content for single object uploads
 | |
| 	CommitWriteBack string = "writeback"
 | |
| 	// CommitWriteThrough allows caching multipart uploads to disk synchronously
 | |
| 	CommitWriteThrough string = "writethrough"
 | |
| )
 | |
| 
 | |
| // String returns string representation of status
 | |
| func (s cacheCommitStatus) String() string {
 | |
| 	return string(s)
 | |
| }
 | |
| 
 | |
| // CacheStorageInfo - represents total, free capacity of
 | |
| // underlying cache storage.
 | |
| type CacheStorageInfo struct {
 | |
| 	Total uint64 // Total cache disk space.
 | |
| 	Free  uint64 // Free cache available space.
 | |
| }
 | |
| 
 | |
| // CacheObjectLayer implements primitives for cache object API layer.
 | |
| type CacheObjectLayer interface {
 | |
| 	// Object operations.
 | |
| 	GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
 | |
| 	GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error)
 | |
| 	DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error)
 | |
| 	PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	// Multipart operations.
 | |
| 	NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error)
 | |
| 	PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error)
 | |
| 	AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error
 | |
| 	CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error)
 | |
| 
 | |
| 	// Storage operations.
 | |
| 	StorageInfo(ctx context.Context) CacheStorageInfo
 | |
| 	CacheStats() *CacheStats
 | |
| }
 | |
| 
 | |
| // Abstracts disk caching - used by the S3 layer
 | |
| type cacheObjects struct {
 | |
| 	// slice of cache drives
 | |
| 	cache []*diskCache
 | |
| 	// file path patterns to exclude from cache
 | |
| 	exclude []string
 | |
| 	// number of accesses after which to cache an object
 | |
| 	after int
 | |
| 	// commit objects in async manner
 | |
| 	commitWriteback    bool
 | |
| 	commitWritethrough bool
 | |
| 
 | |
| 	// if true migration is in progress from v1 to v2
 | |
| 	migrating bool
 | |
| 	// retry queue for writeback cache mode to reattempt upload to backend
 | |
| 	wbRetryCh chan ObjectInfo
 | |
| 	// Cache stats
 | |
| 	cacheStats *CacheStats
 | |
| 
 | |
| 	InnerGetObjectNInfoFn          func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
 | |
| 	InnerGetObjectInfoFn           func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	InnerDeleteObjectFn            func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	InnerPutObjectFn               func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	InnerCopyObjectFn              func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	InnerNewMultipartUploadFn      func(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error)
 | |
| 	InnerPutObjectPartFn           func(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error)
 | |
| 	InnerAbortMultipartUploadFn    func(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error
 | |
| 	InnerCompleteMultipartUploadFn func(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	InnerCopyObjectPartFn          func(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error)
 | |
| }
 | |
| 
 | |
| func (c *cacheObjects) incHitsToMeta(ctx context.Context, dcache *diskCache, bucket, object string, size int64, eTag string, rs *HTTPRangeSpec) error {
 | |
| 	metadata := map[string]string{"etag": eTag}
 | |
| 	return dcache.SaveMetadata(ctx, bucket, object, metadata, size, rs, "", true)
 | |
| }
 | |
| 
 | |
| // Backend metadata could have changed through server side copy - reset cache metadata if that is the case
 | |
| func (c *cacheObjects) updateMetadataIfChanged(ctx context.Context, dcache *diskCache, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo, rs *HTTPRangeSpec) error {
 | |
| 
 | |
| 	bkMeta := make(map[string]string, len(bkObjectInfo.UserDefined))
 | |
| 	cacheMeta := make(map[string]string, len(cacheObjInfo.UserDefined))
 | |
| 	for k, v := range bkObjectInfo.UserDefined {
 | |
| 		if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
 | |
| 			// Do not need to send any internal metadata
 | |
| 			continue
 | |
| 		}
 | |
| 		bkMeta[http.CanonicalHeaderKey(k)] = v
 | |
| 	}
 | |
| 	for k, v := range cacheObjInfo.UserDefined {
 | |
| 		if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
 | |
| 			// Do not need to send any internal metadata
 | |
| 			continue
 | |
| 		}
 | |
| 		cacheMeta[http.CanonicalHeaderKey(k)] = v
 | |
| 	}
 | |
| 
 | |
| 	if !isMetadataSame(bkMeta, cacheMeta) ||
 | |
| 		bkObjectInfo.ETag != cacheObjInfo.ETag ||
 | |
| 		bkObjectInfo.ContentType != cacheObjInfo.ContentType ||
 | |
| 		!bkObjectInfo.Expires.Equal(cacheObjInfo.Expires) {
 | |
| 		return dcache.SaveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "", false)
 | |
| 	}
 | |
| 	return c.incHitsToMeta(ctx, dcache, bucket, object, cacheObjInfo.Size, cacheObjInfo.ETag, rs)
 | |
| }
 | |
| 
 | |
| // DeleteObject clears cache entry if backend delete operation succeeds
 | |
| func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
 | |
| 	if objInfo, err = c.InnerDeleteObjectFn(ctx, bucket, object, opts); err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	if c.isCacheExclude(bucket, object) || c.skipCache() {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	dcache, cerr := c.getCacheLoc(bucket, object)
 | |
| 	if cerr != nil {
 | |
| 		return objInfo, cerr
 | |
| 	}
 | |
| 	dcache.Delete(ctx, bucket, object)
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // DeleteObjects batch deletes objects in slice, and clears any cached entries
 | |
| func (c *cacheObjects) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
 | |
| 	errs := make([]error, len(objects))
 | |
| 	objInfos := make([]ObjectInfo, len(objects))
 | |
| 	for idx, object := range objects {
 | |
| 		opts.VersionID = object.VersionID
 | |
| 		objInfos[idx], errs[idx] = c.DeleteObject(ctx, bucket, object.ObjectName, opts)
 | |
| 	}
 | |
| 	deletedObjects := make([]DeletedObject, len(objInfos))
 | |
| 	for idx := range errs {
 | |
| 		if errs[idx] != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		if objInfos[idx].DeleteMarker {
 | |
| 			deletedObjects[idx] = DeletedObject{
 | |
| 				DeleteMarker:          objInfos[idx].DeleteMarker,
 | |
| 				DeleteMarkerVersionID: objInfos[idx].VersionID,
 | |
| 			}
 | |
| 			continue
 | |
| 		}
 | |
| 		deletedObjects[idx] = DeletedObject{
 | |
| 			ObjectName: objInfos[idx].Name,
 | |
| 			VersionID:  objInfos[idx].VersionID,
 | |
| 		}
 | |
| 	}
 | |
| 	return deletedObjects, errs
 | |
| }
 | |
| 
 | |
| // construct a metadata k-v map
 | |
| func getMetadata(objInfo ObjectInfo) map[string]string {
 | |
| 	metadata := make(map[string]string, len(objInfo.UserDefined)+4)
 | |
| 	metadata["etag"] = objInfo.ETag
 | |
| 	metadata["content-type"] = objInfo.ContentType
 | |
| 	if objInfo.ContentEncoding != "" {
 | |
| 		metadata["content-encoding"] = objInfo.ContentEncoding
 | |
| 	}
 | |
| 	if !objInfo.Expires.Equal(timeSentinel) {
 | |
| 		metadata["expires"] = objInfo.Expires.Format(http.TimeFormat)
 | |
| 	}
 | |
| 	metadata["last-modified"] = objInfo.ModTime.Format(http.TimeFormat)
 | |
| 	for k, v := range objInfo.UserDefined {
 | |
| 		metadata[k] = v
 | |
| 	}
 | |
| 	return metadata
 | |
| }
 | |
| 
 | |
| // marks cache hit
 | |
| func (c *cacheObjects) incCacheStats(size int64) {
 | |
| 	c.cacheStats.incHit()
 | |
| 	c.cacheStats.incBytesServed(size)
 | |
| }
 | |
| 
 | |
| func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
 | |
| 	if c.isCacheExclude(bucket, object) || c.skipCache() {
 | |
| 		return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 	}
 | |
| 	var cc *cacheControl
 | |
| 	var cacheObjSize int64
 | |
| 	// fetch diskCache if object is currently cached or nearest available cache drive
 | |
| 	dcache, err := c.getCacheToLoc(ctx, bucket, object)
 | |
| 	if err != nil {
 | |
| 		return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 	}
 | |
| 
 | |
| 	cacheReader, numCacheHits, cacheErr := dcache.Get(ctx, bucket, object, rs, h, opts)
 | |
| 	if cacheErr == nil {
 | |
| 		cacheObjSize = cacheReader.ObjInfo.Size
 | |
| 		if rs != nil {
 | |
| 			if _, len, err := rs.GetOffsetLength(cacheObjSize); err == nil {
 | |
| 				cacheObjSize = len
 | |
| 			}
 | |
| 		}
 | |
| 		cc = cacheControlOpts(cacheReader.ObjInfo)
 | |
| 		if cc != nil && (!cc.isStale(cacheReader.ObjInfo.ModTime) ||
 | |
| 			cc.onlyIfCached) {
 | |
| 			// This is a cache hit, mark it so
 | |
| 			bytesServed := cacheReader.ObjInfo.Size
 | |
| 			if rs != nil {
 | |
| 				if _, len, err := rs.GetOffsetLength(bytesServed); err == nil {
 | |
| 					bytesServed = len
 | |
| 				}
 | |
| 			}
 | |
| 			c.cacheStats.incHit()
 | |
| 			c.cacheStats.incBytesServed(bytesServed)
 | |
| 			c.incHitsToMeta(ctx, dcache, bucket, object, cacheReader.ObjInfo.Size, cacheReader.ObjInfo.ETag, rs)
 | |
| 			return cacheReader, nil
 | |
| 		}
 | |
| 		if cc != nil && cc.noStore {
 | |
| 			cacheReader.Close()
 | |
| 			c.cacheStats.incMiss()
 | |
| 			bReader, err := c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 			bReader.ObjInfo.CacheLookupStatus = CacheHit
 | |
| 			bReader.ObjInfo.CacheStatus = CacheMiss
 | |
| 			return bReader, err
 | |
| 		}
 | |
| 		// serve cached content without ETag verification if writeback commit is not yet complete
 | |
| 		if skipETagVerification(cacheReader.ObjInfo.UserDefined) {
 | |
| 			return cacheReader, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	objInfo, err := c.InnerGetObjectInfoFn(ctx, bucket, object, opts)
 | |
| 	if backendDownError(err) && cacheErr == nil {
 | |
| 		c.incCacheStats(cacheObjSize)
 | |
| 		return cacheReader, nil
 | |
| 	} else if err != nil {
 | |
| 		if cacheErr == nil {
 | |
| 			cacheReader.Close()
 | |
| 		}
 | |
| 		if _, ok := err.(ObjectNotFound); ok {
 | |
| 			if cacheErr == nil {
 | |
| 				// Delete cached entry if backend object
 | |
| 				// was deleted.
 | |
| 				dcache.Delete(ctx, bucket, object)
 | |
| 			}
 | |
| 		}
 | |
| 		c.cacheStats.incMiss()
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if !objInfo.IsCacheable() {
 | |
| 		if cacheErr == nil {
 | |
| 			cacheReader.Close()
 | |
| 		}
 | |
| 		c.cacheStats.incMiss()
 | |
| 		return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 	}
 | |
| 	// skip cache for objects with locks
 | |
| 	objRetention := objectlock.GetObjectRetentionMeta(objInfo.UserDefined)
 | |
| 	legalHold := objectlock.GetObjectLegalHoldMeta(objInfo.UserDefined)
 | |
| 	if objRetention.Mode.Valid() || legalHold.Status.Valid() {
 | |
| 		if cacheErr == nil {
 | |
| 			cacheReader.Close()
 | |
| 		}
 | |
| 		c.cacheStats.incMiss()
 | |
| 		return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 	}
 | |
| 	if cacheErr == nil {
 | |
| 		// if ETag matches for stale cache entry, serve from cache
 | |
| 		if cacheReader.ObjInfo.ETag == objInfo.ETag {
 | |
| 			// Update metadata in case server-side copy might have changed object metadata
 | |
| 			c.updateMetadataIfChanged(ctx, dcache, bucket, object, objInfo, cacheReader.ObjInfo, rs)
 | |
| 			c.incCacheStats(cacheObjSize)
 | |
| 			return cacheReader, nil
 | |
| 		}
 | |
| 		cacheReader.Close()
 | |
| 		// Object is stale, so delete from cache
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 	}
 | |
| 
 | |
| 	// Reaching here implies cache miss
 | |
| 	c.cacheStats.incMiss()
 | |
| 
 | |
| 	bkReader, bkErr := c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 
 | |
| 	if bkErr != nil {
 | |
| 		return bkReader, bkErr
 | |
| 	}
 | |
| 	// If object has less hits than configured cache after, just increment the hit counter
 | |
| 	// but do not cache it.
 | |
| 	if numCacheHits < c.after {
 | |
| 		c.incHitsToMeta(ctx, dcache, bucket, object, objInfo.Size, objInfo.ETag, rs)
 | |
| 		return bkReader, bkErr
 | |
| 	}
 | |
| 
 | |
| 	// Record if cache has a hit that was invalidated by ETag verification
 | |
| 	if cacheErr == nil {
 | |
| 		bkReader.ObjInfo.CacheLookupStatus = CacheHit
 | |
| 	}
 | |
| 
 | |
| 	// Check if we can add it without exceeding total cache size.
 | |
| 	if !dcache.diskSpaceAvailable(objInfo.Size) {
 | |
| 		return bkReader, bkErr
 | |
| 	}
 | |
| 
 | |
| 	if rs != nil && !dcache.enableRange {
 | |
| 		go func() {
 | |
| 			// if range caching is disabled, download entire object.
 | |
| 			rs = nil
 | |
| 			// fill cache in the background for range GET requests
 | |
| 			bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, rs, h, lockType, opts)
 | |
| 			if bErr != nil {
 | |
| 				return
 | |
| 			}
 | |
| 			defer bReader.Close()
 | |
| 			oi, _, _, err := dcache.statRange(GlobalContext, bucket, object, rs)
 | |
| 			// avoid cache overwrite if another background routine filled cache
 | |
| 			if err != nil || oi.ETag != bReader.ObjInfo.ETag {
 | |
| 				// use a new context to avoid locker prematurely timing out operation when the GetObjectNInfo returns.
 | |
| 				dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{
 | |
| 					UserDefined: getMetadata(bReader.ObjInfo),
 | |
| 				}, false, false)
 | |
| 				return
 | |
| 			}
 | |
| 		}()
 | |
| 		return bkReader, bkErr
 | |
| 	}
 | |
| 
 | |
| 	// Initialize pipe.
 | |
| 	pr, pw := io.Pipe()
 | |
| 	var wg sync.WaitGroup
 | |
| 	teeReader := io.TeeReader(bkReader, pw)
 | |
| 	userDefined := getMetadata(bkReader.ObjInfo)
 | |
| 	wg.Add(1)
 | |
| 	go func() {
 | |
| 		_, putErr := dcache.Put(ctx, bucket, object,
 | |
| 			io.LimitReader(pr, bkReader.ObjInfo.Size),
 | |
| 			bkReader.ObjInfo.Size, rs, ObjectOptions{
 | |
| 				UserDefined: userDefined,
 | |
| 			}, false, false)
 | |
| 		// close the read end of the pipe, so the error gets
 | |
| 		// propagated to teeReader
 | |
| 		pr.CloseWithError(putErr)
 | |
| 		wg.Done()
 | |
| 	}()
 | |
| 	cleanupBackend := func() {
 | |
| 		pw.CloseWithError(bkReader.Close())
 | |
| 		wg.Wait()
 | |
| 	}
 | |
| 	return NewGetObjectReaderFromReader(teeReader, bkReader.ObjInfo, opts, cleanupBackend)
 | |
| }
 | |
| 
 | |
| // Returns ObjectInfo from cache if available.
 | |
| func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
 | |
| 	getObjectInfoFn := c.InnerGetObjectInfoFn
 | |
| 
 | |
| 	if c.isCacheExclude(bucket, object) || c.skipCache() {
 | |
| 		return getObjectInfoFn(ctx, bucket, object, opts)
 | |
| 	}
 | |
| 
 | |
| 	// fetch diskCache if object is currently cached or nearest available cache drive
 | |
| 	dcache, err := c.getCacheToLoc(ctx, bucket, object)
 | |
| 	if err != nil {
 | |
| 		return getObjectInfoFn(ctx, bucket, object, opts)
 | |
| 	}
 | |
| 	var cc *cacheControl
 | |
| 	// if cache control setting is valid, avoid HEAD operation to backend
 | |
| 	cachedObjInfo, _, cerr := dcache.Stat(ctx, bucket, object)
 | |
| 	if cerr == nil {
 | |
| 		cc = cacheControlOpts(cachedObjInfo)
 | |
| 		if cc == nil || (cc != nil && !cc.isStale(cachedObjInfo.ModTime)) {
 | |
| 			// This is a cache hit, mark it so
 | |
| 			c.cacheStats.incHit()
 | |
| 			return cachedObjInfo, nil
 | |
| 		}
 | |
| 		// serve cache metadata without ETag verification if writeback commit is not yet complete
 | |
| 		if skipETagVerification(cachedObjInfo.UserDefined) {
 | |
| 			return cachedObjInfo, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	objInfo, err := getObjectInfoFn(ctx, bucket, object, opts)
 | |
| 	if err != nil {
 | |
| 		if _, ok := err.(ObjectNotFound); ok {
 | |
| 			// Delete the cached entry if backend object was deleted.
 | |
| 			dcache.Delete(ctx, bucket, object)
 | |
| 			c.cacheStats.incMiss()
 | |
| 			return ObjectInfo{}, err
 | |
| 		}
 | |
| 		if !backendDownError(err) {
 | |
| 			c.cacheStats.incMiss()
 | |
| 			return ObjectInfo{}, err
 | |
| 		}
 | |
| 		if cerr == nil {
 | |
| 			// This is a cache hit, mark it so
 | |
| 			c.cacheStats.incHit()
 | |
| 			return cachedObjInfo, nil
 | |
| 		}
 | |
| 		c.cacheStats.incMiss()
 | |
| 		return ObjectInfo{}, BackendDown{}
 | |
| 	}
 | |
| 	// Reaching here implies cache miss
 | |
| 	c.cacheStats.incMiss()
 | |
| 	// when backend is up, do a sanity check on cached object
 | |
| 	if cerr != nil {
 | |
| 		return objInfo, nil
 | |
| 	}
 | |
| 	if cachedObjInfo.ETag != objInfo.ETag {
 | |
| 		// Delete the cached entry if the backend object was replaced.
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 	}
 | |
| 	return objInfo, nil
 | |
| }
 | |
| 
 | |
| // CopyObject reverts to backend after evicting any stale cache entries
 | |
| func (c *cacheObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
 | |
| 	copyObjectFn := c.InnerCopyObjectFn
 | |
| 	if c.isCacheExclude(srcBucket, srcObject) || c.skipCache() {
 | |
| 		return copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
 | |
| 	}
 | |
| 	if srcBucket != dstBucket || srcObject != dstObject {
 | |
| 		return copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
 | |
| 	}
 | |
| 	// fetch diskCache if object is currently cached or nearest available cache drive
 | |
| 	dcache, err := c.getCacheToLoc(ctx, srcBucket, srcObject)
 | |
| 	if err != nil {
 | |
| 		return copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
 | |
| 	}
 | |
| 	// if currently cached, evict old entry and revert to backend.
 | |
| 	if cachedObjInfo, _, cerr := dcache.Stat(ctx, srcBucket, srcObject); cerr == nil {
 | |
| 		cc := cacheControlOpts(cachedObjInfo)
 | |
| 		if cc == nil || !cc.isStale(cachedObjInfo.ModTime) {
 | |
| 			dcache.Delete(ctx, srcBucket, srcObject)
 | |
| 		}
 | |
| 	}
 | |
| 	return copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
 | |
| }
 | |
| 
 | |
| // StorageInfo - returns underlying storage statistics.
 | |
| func (c *cacheObjects) StorageInfo(ctx context.Context) (cInfo CacheStorageInfo) {
 | |
| 	var total, free uint64
 | |
| 	for _, cache := range c.cache {
 | |
| 		if cache == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		info, err := getDiskInfo(cache.dir)
 | |
| 		logger.GetReqInfo(ctx).AppendTags("cachePath", cache.dir)
 | |
| 		logger.LogIf(ctx, err)
 | |
| 		total += info.Total
 | |
| 		free += info.Free
 | |
| 	}
 | |
| 	return CacheStorageInfo{
 | |
| 		Total: total,
 | |
| 		Free:  free,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // CacheStats - returns underlying storage statistics.
 | |
| func (c *cacheObjects) CacheStats() (cs *CacheStats) {
 | |
| 	return c.cacheStats
 | |
| }
 | |
| 
 | |
| // skipCache() returns true if cache migration is in progress
 | |
| func (c *cacheObjects) skipCache() bool {
 | |
| 	return c.migrating
 | |
| }
 | |
| 
 | |
| // Returns true if object should be excluded from cache
 | |
| func (c *cacheObjects) isCacheExclude(bucket, object string) bool {
 | |
| 	// exclude directories from cache
 | |
| 	if strings.HasSuffix(object, SlashSeparator) {
 | |
| 		return true
 | |
| 	}
 | |
| 	for _, pattern := range c.exclude {
 | |
| 		matchStr := fmt.Sprintf("%s/%s", bucket, object)
 | |
| 		if ok := wildcard.MatchSimple(pattern, matchStr); ok {
 | |
| 			return true
 | |
| 		}
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // choose a cache deterministically based on hash of bucket,object. The hash index is treated as
 | |
| // a hint. In the event that the cache drive at hash index is offline, treat the list of cache drives
 | |
| // as a circular buffer and walk through them starting at hash index until an online drive is found.
 | |
| func (c *cacheObjects) getCacheLoc(bucket, object string) (*diskCache, error) {
 | |
| 	index := c.hashIndex(bucket, object)
 | |
| 	numDisks := len(c.cache)
 | |
| 	for k := 0; k < numDisks; k++ {
 | |
| 		i := (index + k) % numDisks
 | |
| 		if c.cache[i] == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		if c.cache[i].IsOnline() {
 | |
| 			return c.cache[i], nil
 | |
| 		}
 | |
| 	}
 | |
| 	return nil, errDiskNotFound
 | |
| }
 | |
| 
 | |
| // get cache disk where object is currently cached for a GET operation. If object does not exist at that location,
 | |
| // treat the list of cache drives as a circular buffer and walk through them starting at hash index
 | |
| // until an online drive is found.If object is not found, fall back to the first online cache drive
 | |
| // closest to the hash index, so that object can be re-cached.
 | |
| func (c *cacheObjects) getCacheToLoc(ctx context.Context, bucket, object string) (*diskCache, error) {
 | |
| 	index := c.hashIndex(bucket, object)
 | |
| 
 | |
| 	numDisks := len(c.cache)
 | |
| 	// save first online cache disk closest to the hint index
 | |
| 	var firstOnlineDisk *diskCache
 | |
| 	for k := 0; k < numDisks; k++ {
 | |
| 		i := (index + k) % numDisks
 | |
| 		if c.cache[i] == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		if c.cache[i].IsOnline() {
 | |
| 			if firstOnlineDisk == nil {
 | |
| 				firstOnlineDisk = c.cache[i]
 | |
| 			}
 | |
| 			if c.cache[i].Exists(ctx, bucket, object) {
 | |
| 				return c.cache[i], nil
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if firstOnlineDisk != nil {
 | |
| 		return firstOnlineDisk, nil
 | |
| 	}
 | |
| 	return nil, errDiskNotFound
 | |
| }
 | |
| 
 | |
| // Compute a unique hash sum for bucket and object
 | |
| func (c *cacheObjects) hashIndex(bucket, object string) int {
 | |
| 	return crcHashMod(pathJoin(bucket, object), len(c.cache))
 | |
| }
 | |
| 
 | |
| // newCache initializes the cacheFSObjects for the "drives" specified in config.json
 | |
| // or the global env overrides.
 | |
| func newCache(config cache.Config) ([]*diskCache, bool, error) {
 | |
| 	var caches []*diskCache
 | |
| 	ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{})
 | |
| 	formats, migrating, err := loadAndValidateCacheFormat(ctx, config.Drives)
 | |
| 	if err != nil {
 | |
| 		return nil, false, err
 | |
| 	}
 | |
| 	for i, dir := range config.Drives {
 | |
| 		// skip diskCache creation for cache drives missing a format.json
 | |
| 		if formats[i] == nil {
 | |
| 			caches = append(caches, nil)
 | |
| 			continue
 | |
| 		}
 | |
| 		if err := checkAtimeSupport(dir); err != nil {
 | |
| 			return nil, false, errors.New("Atime support required for disk caching")
 | |
| 		}
 | |
| 
 | |
| 		cache, err := newDiskCache(ctx, dir, config)
 | |
| 		if err != nil {
 | |
| 			return nil, false, err
 | |
| 		}
 | |
| 		caches = append(caches, cache)
 | |
| 	}
 | |
| 	return caches, migrating, nil
 | |
| }
 | |
| 
 | |
| func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) {
 | |
| 	logStartupMessage(color.Blue("Cache migration initiated ...."))
 | |
| 
 | |
| 	g := errgroup.WithNErrs(len(c.cache))
 | |
| 	for index, dc := range c.cache {
 | |
| 		if dc == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		index := index
 | |
| 		g.Go(func() error {
 | |
| 			// start migration from V1 to V2
 | |
| 			return migrateOldCache(ctx, c.cache[index])
 | |
| 		}, index)
 | |
| 	}
 | |
| 
 | |
| 	errCnt := 0
 | |
| 	for _, err := range g.Wait() {
 | |
| 		if err != nil {
 | |
| 			errCnt++
 | |
| 			logger.LogIf(ctx, err)
 | |
| 			continue
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if errCnt > 0 {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// update migration status
 | |
| 	c.migrating = false
 | |
| 	logStartupMessage(color.Blue("Cache migration completed successfully."))
 | |
| }
 | |
| 
 | |
| // PutObject - caches the uploaded object for single Put operations
 | |
| func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
 | |
| 	putObjectFn := c.InnerPutObjectFn
 | |
| 	dcache, err := c.getCacheToLoc(ctx, bucket, object)
 | |
| 	if err != nil {
 | |
| 		// disk cache could not be located,execute backend call.
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 	size := r.Size()
 | |
| 	if c.skipCache() {
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 
 | |
| 	// fetch from backend if there is no space on cache drive
 | |
| 	if !dcache.diskSpaceAvailable(size) {
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 
 | |
| 	if opts.ServerSideEncryption != nil {
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 
 | |
| 	// skip cache for objects with locks
 | |
| 	objRetention := objectlock.GetObjectRetentionMeta(opts.UserDefined)
 | |
| 	legalHold := objectlock.GetObjectLegalHoldMeta(opts.UserDefined)
 | |
| 	if objRetention.Mode.Valid() || legalHold.Status.Valid() {
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 
 | |
| 	// fetch from backend if cache exclude pattern or cache-control
 | |
| 	// directive set to exclude
 | |
| 	if c.isCacheExclude(bucket, object) {
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 	if c.commitWriteback {
 | |
| 		oi, err := dcache.Put(ctx, bucket, object, r, r.Size(), nil, opts, false, true)
 | |
| 		if err != nil {
 | |
| 			return ObjectInfo{}, err
 | |
| 		}
 | |
| 		go c.uploadObject(GlobalContext, oi)
 | |
| 		return oi, nil
 | |
| 	}
 | |
| 	if !c.commitWritethrough {
 | |
| 		objInfo, err = putObjectFn(ctx, bucket, object, r, opts)
 | |
| 		if err == nil {
 | |
| 			go func() {
 | |
| 				// fill cache in the background
 | |
| 				bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
 | |
| 				if bErr != nil {
 | |
| 					return
 | |
| 				}
 | |
| 				defer bReader.Close()
 | |
| 				oi, _, err := dcache.Stat(GlobalContext, bucket, object)
 | |
| 				// avoid cache overwrite if another background routine filled cache
 | |
| 				if err != nil || oi.ETag != bReader.ObjInfo.ETag {
 | |
| 					dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false, true)
 | |
| 				}
 | |
| 			}()
 | |
| 		}
 | |
| 		return objInfo, err
 | |
| 	}
 | |
| 	cLock, lkctx, cerr := dcache.GetLockContext(GlobalContext, bucket, object)
 | |
| 	if cerr != nil {
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 	defer cLock.Unlock(lkctx.Cancel)
 | |
| 	// Initialize pipe to stream data to backend
 | |
| 	pipeReader, pipeWriter := io.Pipe()
 | |
| 	hashReader, err := hash.NewReader(pipeReader, size, "", "", r.ActualSize())
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	// Initialize pipe to stream data to cache
 | |
| 	rPipe, wPipe := io.Pipe()
 | |
| 	infoCh := make(chan ObjectInfo)
 | |
| 	errorCh := make(chan error)
 | |
| 	go func() {
 | |
| 		info, err := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader), opts)
 | |
| 		if err != nil {
 | |
| 			close(infoCh)
 | |
| 			pipeReader.CloseWithError(err)
 | |
| 			rPipe.CloseWithError(err)
 | |
| 			errorCh <- err
 | |
| 			return
 | |
| 		}
 | |
| 		close(errorCh)
 | |
| 		infoCh <- info
 | |
| 	}()
 | |
| 
 | |
| 	go func() {
 | |
| 		_, err := dcache.put(lkctx.Context(), bucket, object, rPipe, r.Size(), nil, opts, false, false)
 | |
| 		if err != nil {
 | |
| 			rPipe.CloseWithError(err)
 | |
| 			return
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	mwriter := cacheMultiWriter(pipeWriter, wPipe)
 | |
| 	_, err = io.Copy(mwriter, r)
 | |
| 	pipeWriter.Close()
 | |
| 	wPipe.Close()
 | |
| 
 | |
| 	if err != nil {
 | |
| 		err = <-errorCh
 | |
| 		return ObjectInfo{}, err
 | |
| 	}
 | |
| 	info := <-infoCh
 | |
| 	if cerr = dcache.updateMetadata(lkctx.Context(), bucket, object, info.ETag, info.ModTime, info.Size); cerr != nil {
 | |
| 		dcache.delete(bucket, object)
 | |
| 	}
 | |
| 	return info, err
 | |
| }
 | |
| 
 | |
| // upload cached object to backend in async commit mode.
 | |
| func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) {
 | |
| 	dcache, err := c.getCacheToLoc(ctx, oi.Bucket, oi.Name)
 | |
| 	if err != nil {
 | |
| 		// disk cache could not be located.
 | |
| 		logger.LogIf(ctx, fmt.Errorf("Could not upload %s/%s to backend: %w", oi.Bucket, oi.Name, err))
 | |
| 		return
 | |
| 	}
 | |
| 	cReader, _, bErr := dcache.Get(ctx, oi.Bucket, oi.Name, nil, http.Header{}, ObjectOptions{})
 | |
| 	if bErr != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	defer cReader.Close()
 | |
| 
 | |
| 	if cReader.ObjInfo.ETag != oi.ETag {
 | |
| 		return
 | |
| 	}
 | |
| 	st := cacheCommitStatus(oi.UserDefined[writeBackStatusHeader])
 | |
| 	if st == CommitComplete || st.String() == "" {
 | |
| 		return
 | |
| 	}
 | |
| 	hashReader, err := hash.NewReader(cReader, oi.Size, "", "", oi.Size)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	var opts ObjectOptions
 | |
| 	opts.UserDefined = make(map[string]string)
 | |
| 	opts.UserDefined[xhttp.ContentMD5] = oi.UserDefined["content-md5"]
 | |
| 	objInfo, err := c.InnerPutObjectFn(ctx, oi.Bucket, oi.Name, NewPutObjReader(hashReader), opts)
 | |
| 	wbCommitStatus := CommitComplete
 | |
| 	if err != nil {
 | |
| 		wbCommitStatus = CommitFailed
 | |
| 	}
 | |
| 
 | |
| 	meta := cloneMSS(cReader.ObjInfo.UserDefined)
 | |
| 	retryCnt := 0
 | |
| 	if wbCommitStatus == CommitFailed {
 | |
| 		retryCnt, _ = strconv.Atoi(meta[writeBackRetryHeader])
 | |
| 		retryCnt++
 | |
| 		meta[writeBackRetryHeader] = strconv.Itoa(retryCnt)
 | |
| 	} else {
 | |
| 		delete(meta, writeBackRetryHeader)
 | |
| 	}
 | |
| 	meta[writeBackStatusHeader] = wbCommitStatus.String()
 | |
| 	meta["etag"] = oi.ETag
 | |
| 	dcache.SaveMetadata(ctx, oi.Bucket, oi.Name, meta, objInfo.Size, nil, "", false)
 | |
| 	if retryCnt > 0 {
 | |
| 		// slow down retries
 | |
| 		time.Sleep(time.Second * time.Duration(retryCnt%10+1))
 | |
| 		c.queueWritebackRetry(oi)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *cacheObjects) queueWritebackRetry(oi ObjectInfo) {
 | |
| 	select {
 | |
| 	case <-GlobalContext.Done():
 | |
| 		return
 | |
| 	case c.wbRetryCh <- oi:
 | |
| 		c.uploadObject(GlobalContext, oi)
 | |
| 	default:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Returns cacheObjects for use by Server.
 | |
| func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjectLayer, error) {
 | |
| 	// list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var.
 | |
| 	cache, migrateSw, err := newCache(config)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	c := &cacheObjects{
 | |
| 		cache:              cache,
 | |
| 		exclude:            config.Exclude,
 | |
| 		after:              config.After,
 | |
| 		migrating:          migrateSw,
 | |
| 		commitWriteback:    config.CacheCommitMode == CommitWriteBack,
 | |
| 		commitWritethrough: config.CacheCommitMode == CommitWriteThrough,
 | |
| 
 | |
| 		cacheStats: newCacheStats(),
 | |
| 		InnerGetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
 | |
| 			return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
 | |
| 		},
 | |
| 		InnerGetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
 | |
| 			return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 		},
 | |
| 		InnerDeleteObjectFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
 | |
| 			return newObjectLayerFn().DeleteObject(ctx, bucket, object, opts)
 | |
| 		},
 | |
| 		InnerPutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
 | |
| 			return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts)
 | |
| 		},
 | |
| 		InnerCopyObjectFn: func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
 | |
| 			return newObjectLayerFn().CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
 | |
| 		},
 | |
| 		InnerNewMultipartUploadFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) {
 | |
| 			return newObjectLayerFn().NewMultipartUpload(ctx, bucket, object, opts)
 | |
| 		},
 | |
| 		InnerPutObjectPartFn: func(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) {
 | |
| 			return newObjectLayerFn().PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
 | |
| 		},
 | |
| 		InnerAbortMultipartUploadFn: func(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
 | |
| 			return newObjectLayerFn().AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
 | |
| 		},
 | |
| 		InnerCompleteMultipartUploadFn: func(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) {
 | |
| 			return newObjectLayerFn().CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
 | |
| 		},
 | |
| 		InnerCopyObjectPartFn: func(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error) {
 | |
| 			return newObjectLayerFn().CopyObjectPart(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts)
 | |
| 		},
 | |
| 	}
 | |
| 	c.cacheStats.GetDiskStats = func() []CacheDiskStats {
 | |
| 		cacheDiskStats := make([]CacheDiskStats, len(c.cache))
 | |
| 		for i := range c.cache {
 | |
| 			dcache := c.cache[i]
 | |
| 			cacheDiskStats[i] = CacheDiskStats{}
 | |
| 			if dcache != nil {
 | |
| 				info, err := getDiskInfo(dcache.dir)
 | |
| 				logger.LogIf(ctx, err)
 | |
| 				cacheDiskStats[i].UsageSize = info.Used
 | |
| 				cacheDiskStats[i].TotalCapacity = info.Total
 | |
| 				cacheDiskStats[i].Dir = dcache.stats.Dir
 | |
| 				atomic.StoreInt32(&cacheDiskStats[i].UsageState, atomic.LoadInt32(&dcache.stats.UsageState))
 | |
| 				atomic.StoreUint64(&cacheDiskStats[i].UsagePercent, atomic.LoadUint64(&dcache.stats.UsagePercent))
 | |
| 			}
 | |
| 		}
 | |
| 		return cacheDiskStats
 | |
| 	}
 | |
| 	if migrateSw {
 | |
| 		go c.migrateCacheFromV1toV2(ctx)
 | |
| 	}
 | |
| 	go c.gc(ctx)
 | |
| 	if c.commitWriteback {
 | |
| 		c.wbRetryCh = make(chan ObjectInfo, 10000)
 | |
| 		go func() {
 | |
| 			<-GlobalContext.Done()
 | |
| 			close(c.wbRetryCh)
 | |
| 		}()
 | |
| 		go c.queuePendingWriteback(ctx)
 | |
| 	}
 | |
| 
 | |
| 	return c, nil
 | |
| }
 | |
| 
 | |
| func (c *cacheObjects) gc(ctx context.Context) {
 | |
| 	ticker := time.NewTicker(cacheGCInterval)
 | |
| 
 | |
| 	defer ticker.Stop()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			if c.migrating {
 | |
| 				continue
 | |
| 			}
 | |
| 			for _, dcache := range c.cache {
 | |
| 				if dcache != nil {
 | |
| 					// Check if there is disk.
 | |
| 					// Will queue a GC scan if at high watermark.
 | |
| 					dcache.diskSpaceAvailable(0)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // queues any pending or failed async commits when server restarts
 | |
| func (c *cacheObjects) queuePendingWriteback(ctx context.Context) {
 | |
| 	for _, dcache := range c.cache {
 | |
| 		if dcache != nil {
 | |
| 			for {
 | |
| 				select {
 | |
| 				case <-ctx.Done():
 | |
| 					return
 | |
| 				case oi, ok := <-dcache.retryWritebackCh:
 | |
| 					if !ok {
 | |
| 						goto next
 | |
| 					}
 | |
| 					c.queueWritebackRetry(oi)
 | |
| 				default:
 | |
| 					time.Sleep(time.Second * 1)
 | |
| 				}
 | |
| 			}
 | |
| 		next:
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewMultipartUpload - Starts a new multipart upload operation to backend - if writethrough mode is enabled, starts caching the multipart.
 | |
| func (c *cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) {
 | |
| 	newMultipartUploadFn := c.InnerNewMultipartUploadFn
 | |
| 	dcache, err := c.getCacheToLoc(ctx, bucket, object)
 | |
| 	if err != nil {
 | |
| 		// disk cache could not be located,execute backend call.
 | |
| 		return newMultipartUploadFn(ctx, bucket, object, opts)
 | |
| 	}
 | |
| 	if c.skipCache() {
 | |
| 		return newMultipartUploadFn(ctx, bucket, object, opts)
 | |
| 	}
 | |
| 
 | |
| 	if opts.ServerSideEncryption != nil { // avoid caching encrypted objects
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 		return newMultipartUploadFn(ctx, bucket, object, opts)
 | |
| 	}
 | |
| 
 | |
| 	// skip cache for objects with locks
 | |
| 	objRetention := objectlock.GetObjectRetentionMeta(opts.UserDefined)
 | |
| 	legalHold := objectlock.GetObjectLegalHoldMeta(opts.UserDefined)
 | |
| 	if objRetention.Mode.Valid() || legalHold.Status.Valid() {
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 		return newMultipartUploadFn(ctx, bucket, object, opts)
 | |
| 	}
 | |
| 
 | |
| 	// fetch from backend if cache exclude pattern or cache-control
 | |
| 	// directive set to exclude
 | |
| 	if c.isCacheExclude(bucket, object) {
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 		return newMultipartUploadFn(ctx, bucket, object, opts)
 | |
| 	}
 | |
| 	if !c.commitWritethrough && !c.commitWriteback {
 | |
| 		return newMultipartUploadFn(ctx, bucket, object, opts)
 | |
| 	}
 | |
| 
 | |
| 	// perform multipart upload on backend and cache simultaneously
 | |
| 	uploadID, err = newMultipartUploadFn(ctx, bucket, object, opts)
 | |
| 	dcache.NewMultipartUpload(GlobalContext, bucket, object, uploadID, opts)
 | |
| 	return uploadID, err
 | |
| }
 | |
| 
 | |
| // PutObjectPart streams part to cache concurrently if writethrough mode is enabled. Otherwise redirects the call to remote
 | |
| func (c *cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) {
 | |
| 	putObjectPartFn := c.InnerPutObjectPartFn
 | |
| 	dcache, err := c.getCacheToLoc(ctx, bucket, object)
 | |
| 	if err != nil {
 | |
| 		// disk cache could not be located,execute backend call.
 | |
| 		return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
 | |
| 	}
 | |
| 
 | |
| 	if !c.commitWritethrough && !c.commitWriteback {
 | |
| 		return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
 | |
| 	}
 | |
| 	if c.skipCache() {
 | |
| 		return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
 | |
| 	}
 | |
| 	size := data.Size()
 | |
| 
 | |
| 	// avoid caching part if space unavailable
 | |
| 	if !dcache.diskSpaceAvailable(size) {
 | |
| 		return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
 | |
| 	}
 | |
| 
 | |
| 	if opts.ServerSideEncryption != nil {
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 		return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
 | |
| 	}
 | |
| 
 | |
| 	// skip cache for objects with locks
 | |
| 	objRetention := objectlock.GetObjectRetentionMeta(opts.UserDefined)
 | |
| 	legalHold := objectlock.GetObjectLegalHoldMeta(opts.UserDefined)
 | |
| 	if objRetention.Mode.Valid() || legalHold.Status.Valid() {
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 		return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
 | |
| 	}
 | |
| 
 | |
| 	// fetch from backend if cache exclude pattern or cache-control
 | |
| 	// directive set to exclude
 | |
| 	if c.isCacheExclude(bucket, object) {
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 		return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
 | |
| 	}
 | |
| 
 | |
| 	info = PartInfo{}
 | |
| 	// Initialize pipe to stream data to backend
 | |
| 	pipeReader, pipeWriter := io.Pipe()
 | |
| 	hashReader, err := hash.NewReader(pipeReader, size, "", "", data.ActualSize())
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	// Initialize pipe to stream data to cache
 | |
| 	rPipe, wPipe := io.Pipe()
 | |
| 	pinfoCh := make(chan PartInfo)
 | |
| 	cinfoCh := make(chan PartInfo)
 | |
| 
 | |
| 	errorCh := make(chan error)
 | |
| 	go func() {
 | |
| 		info, err = putObjectPartFn(ctx, bucket, object, uploadID, partID, NewPutObjReader(hashReader), opts)
 | |
| 		if err != nil {
 | |
| 			close(pinfoCh)
 | |
| 			pipeReader.CloseWithError(err)
 | |
| 			rPipe.CloseWithError(err)
 | |
| 			errorCh <- err
 | |
| 			return
 | |
| 		}
 | |
| 		close(errorCh)
 | |
| 		pinfoCh <- info
 | |
| 	}()
 | |
| 	go func() {
 | |
| 		pinfo, perr := dcache.PutObjectPart(GlobalContext, bucket, object, uploadID, partID, rPipe, data.Size(), opts)
 | |
| 		if perr != nil {
 | |
| 			rPipe.CloseWithError(perr)
 | |
| 			close(cinfoCh)
 | |
| 			// clean up upload
 | |
| 			dcache.AbortUpload(bucket, object, uploadID)
 | |
| 			return
 | |
| 		}
 | |
| 		cinfoCh <- pinfo
 | |
| 	}()
 | |
| 
 | |
| 	mwriter := cacheMultiWriter(pipeWriter, wPipe)
 | |
| 	_, err = io.Copy(mwriter, data)
 | |
| 	pipeWriter.Close()
 | |
| 	wPipe.Close()
 | |
| 
 | |
| 	if err != nil {
 | |
| 		err = <-errorCh
 | |
| 		return PartInfo{}, err
 | |
| 	}
 | |
| 	info = <-pinfoCh
 | |
| 	cachedInfo := <-cinfoCh
 | |
| 	if info.PartNumber == cachedInfo.PartNumber {
 | |
| 		cachedInfo.ETag = info.ETag
 | |
| 		cachedInfo.LastModified = info.LastModified
 | |
| 		dcache.SavePartMetadata(GlobalContext, bucket, object, uploadID, partID, cachedInfo)
 | |
| 	}
 | |
| 	return info, err
 | |
| }
 | |
| 
 | |
| // CopyObjectPart behaves similar to PutObjectPart - caches part to upload dir if writethrough mode is enabled.
 | |
| func (c *cacheObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error) {
 | |
| 	copyObjectPartFn := c.InnerCopyObjectPartFn
 | |
| 	dcache, err := c.getCacheToLoc(ctx, dstBucket, dstObject)
 | |
| 	if err != nil {
 | |
| 		// disk cache could not be located,execute backend call.
 | |
| 		return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts)
 | |
| 	}
 | |
| 
 | |
| 	if !c.commitWritethrough && !c.commitWriteback {
 | |
| 		return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts)
 | |
| 	}
 | |
| 	if err := dcache.uploadIDExists(dstBucket, dstObject, uploadID); err != nil {
 | |
| 		return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts)
 | |
| 	}
 | |
| 	partInfo, err := copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts)
 | |
| 	if err != nil {
 | |
| 		return pi, toObjectErr(err, dstBucket, dstObject)
 | |
| 	}
 | |
| 	go func() {
 | |
| 		isSuffixLength := false
 | |
| 		if startOffset < 0 {
 | |
| 			isSuffixLength = true
 | |
| 		}
 | |
| 
 | |
| 		rs := &HTTPRangeSpec{
 | |
| 			IsSuffixLength: isSuffixLength,
 | |
| 			Start:          startOffset,
 | |
| 			End:            startOffset + length,
 | |
| 		}
 | |
| 		// fill cache in the background
 | |
| 		bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, srcBucket, srcObject, rs, http.Header{}, readLock, ObjectOptions{})
 | |
| 		if bErr != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		defer bReader.Close()
 | |
| 		// avoid cache overwrite if another background routine filled cache
 | |
| 		dcache.PutObjectPart(GlobalContext, dstBucket, dstObject, uploadID, partID, bReader, length, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
 | |
| 	}()
 | |
| 	// Success.
 | |
| 	return partInfo, nil
 | |
| }
 | |
| 
 | |
| // CompleteMultipartUpload - completes multipart upload operation on the backend. If writethrough mode is enabled, this also
 | |
| // finalizes the upload saved in cache multipart dir.
 | |
| func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
 | |
| 	completeMultipartUploadFn := c.InnerCompleteMultipartUploadFn
 | |
| 	if !c.commitWritethrough && !c.commitWriteback {
 | |
| 		return completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts, opts)
 | |
| 	}
 | |
| 	dcache, err := c.getCacheToLoc(ctx, bucket, object)
 | |
| 	if err != nil {
 | |
| 		// disk cache could not be located,execute backend call.
 | |
| 		return completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts, opts)
 | |
| 	}
 | |
| 
 | |
| 	// perform multipart upload on backend and cache simultaneously
 | |
| 	oi, err = completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts, opts)
 | |
| 	if err == nil {
 | |
| 		// fill cache in the background
 | |
| 		go func() {
 | |
| 			_, err := dcache.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, oi, opts)
 | |
| 			if err != nil {
 | |
| 				// fill cache in the background
 | |
| 				bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
 | |
| 				if bErr != nil {
 | |
| 					return
 | |
| 				}
 | |
| 				defer bReader.Close()
 | |
| 				oi, _, err := dcache.Stat(GlobalContext, bucket, object)
 | |
| 				// avoid cache overwrite if another background routine filled cache
 | |
| 				if err != nil || oi.ETag != bReader.ObjInfo.ETag {
 | |
| 					dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false, false)
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // AbortMultipartUpload - aborts multipart upload on backend and cache.
 | |
| func (c *cacheObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
 | |
| 	abortMultipartUploadFn := c.InnerAbortMultipartUploadFn
 | |
| 	if !c.commitWritethrough && !c.commitWriteback {
 | |
| 		return abortMultipartUploadFn(ctx, bucket, object, uploadID, opts)
 | |
| 	}
 | |
| 	dcache, err := c.getCacheToLoc(ctx, bucket, object)
 | |
| 	if err != nil {
 | |
| 		// disk cache could not be located,execute backend call.
 | |
| 		return abortMultipartUploadFn(ctx, bucket, object, uploadID, opts)
 | |
| 	}
 | |
| 	if err = dcache.uploadIDExists(bucket, object, uploadID); err != nil {
 | |
| 		return toObjectErr(err, bucket, object, uploadID)
 | |
| 	}
 | |
| 
 | |
| 	// execute backend operation
 | |
| 	err = abortMultipartUploadFn(ctx, bucket, object, uploadID, opts)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	// abort multipart upload on cache
 | |
| 	go dcache.AbortUpload(bucket, object, uploadID)
 | |
| 	return nil
 | |
| }
 |