mirror of
				https://github.com/minio/minio.git
				synced 2025-11-04 02:01:05 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			1116 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1116 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
 * Minio Cloud Storage, (C) 2018 Minio, Inc.
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 */
 | 
						|
 | 
						|
package cmd
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"io/ioutil"
 | 
						|
	"net/http"
 | 
						|
	"os"
 | 
						|
	"sort"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/djherbis/atime"
 | 
						|
 | 
						|
	"github.com/minio/minio/cmd/crypto"
 | 
						|
	"github.com/minio/minio/cmd/logger"
 | 
						|
	"github.com/minio/minio/pkg/hash"
 | 
						|
	"github.com/minio/minio/pkg/wildcard"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// disk cache needs to have  object size space free for a cache entry to be created.
 | 
						|
	cacheTrashDir        = "trash"
 | 
						|
	cacheCleanupInterval = 10 // in minutes
 | 
						|
)
 | 
						|
 | 
						|
// abstract slice of cache drives backed by FS.
 | 
						|
type diskCache struct {
 | 
						|
	cfs []*cacheFSObjects
 | 
						|
}
 | 
						|
 | 
						|
// Abstracts disk caching - used by the S3 layer
 | 
						|
type cacheObjects struct {
 | 
						|
	// pointer to disk cache
 | 
						|
	cache *diskCache
 | 
						|
	// ListObjects pool management.
 | 
						|
	listPool *treeWalkPool
 | 
						|
	// file path patterns to exclude from cache
 | 
						|
	exclude []string
 | 
						|
	// Object functions pointing to the corresponding functions of backend implementation.
 | 
						|
	GetObjectNInfoFn          func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
 | 
						|
	GetObjectFn               func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
 | 
						|
	GetObjectInfoFn           func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | 
						|
	PutObjectFn               func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | 
						|
	DeleteObjectFn            func(ctx context.Context, bucket, object string) error
 | 
						|
	ListObjectsFn             func(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
 | 
						|
	ListObjectsV2Fn           func(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
 | 
						|
	ListBucketsFn             func(ctx context.Context) (buckets []BucketInfo, err error)
 | 
						|
	GetBucketInfoFn           func(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error)
 | 
						|
	NewMultipartUploadFn      func(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error)
 | 
						|
	PutObjectPartFn           func(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader, opts ObjectOptions) (info PartInfo, err error)
 | 
						|
	AbortMultipartUploadFn    func(ctx context.Context, bucket, object, uploadID string) error
 | 
						|
	CompleteMultipartUploadFn func(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error)
 | 
						|
	DeleteBucketFn            func(ctx context.Context, bucket string) error
 | 
						|
}
 | 
						|
 | 
						|
// CacheStorageInfo - represents total, free capacity of
 | 
						|
// underlying cache storage.
 | 
						|
type CacheStorageInfo struct {
 | 
						|
	Total uint64 // Total cache disk space.
 | 
						|
	Free  uint64 // Free cache available space.
 | 
						|
}
 | 
						|
 | 
						|
// CacheObjectLayer implements primitives for cache object API layer.
 | 
						|
type CacheObjectLayer interface {
 | 
						|
	// Bucket operations.
 | 
						|
	ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
 | 
						|
	ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
 | 
						|
	GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error)
 | 
						|
	ListBuckets(ctx context.Context) (buckets []BucketInfo, err error)
 | 
						|
	DeleteBucket(ctx context.Context, bucket string) error
 | 
						|
	// Object operations.
 | 
						|
	GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
 | 
						|
	GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
 | 
						|
	GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | 
						|
	PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | 
						|
	DeleteObject(ctx context.Context, bucket, object string) error
 | 
						|
 | 
						|
	// Multipart operations.
 | 
						|
	NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error)
 | 
						|
	PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader, opts ObjectOptions) (info PartInfo, err error)
 | 
						|
	AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error
 | 
						|
	CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error)
 | 
						|
 | 
						|
	// Storage operations.
 | 
						|
	StorageInfo(ctx context.Context) CacheStorageInfo
 | 
						|
}
 | 
						|
 | 
						|
// IsCacheable returns if the object should be saved in the cache.
 | 
						|
func (o ObjectInfo) IsCacheable() bool {
 | 
						|
	return !crypto.IsEncrypted(o.UserDefined)
 | 
						|
}
 | 
						|
 | 
						|
// backendDownError returns true if err is due to backend failure or faulty disk if in server mode
 | 
						|
func backendDownError(err error) bool {
 | 
						|
	_, backendDown := err.(BackendDown)
 | 
						|
	return backendDown || IsErr(err, baseErrs...)
 | 
						|
}
 | 
						|
 | 
						|
// get cache disk where object is currently cached for a GET operation. If object does not exist at that location,
 | 
						|
// treat the list of cache drives as a circular buffer and walk through them starting at hash index
 | 
						|
// until an online drive is found.If object is not found, fall back to the first online cache drive
 | 
						|
// closest to the hash index, so that object can be recached.
 | 
						|
func (c diskCache) getCachedFSLoc(ctx context.Context, bucket, object string) (*cacheFSObjects, error) {
 | 
						|
	index := c.hashIndex(bucket, object)
 | 
						|
	numDisks := len(c.cfs)
 | 
						|
	// save first online cache disk closest to the hint index
 | 
						|
	var firstOnlineDisk *cacheFSObjects
 | 
						|
	for k := 0; k < numDisks; k++ {
 | 
						|
		i := (index + k) % numDisks
 | 
						|
		if c.cfs[i] == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if c.cfs[i].IsOnline() {
 | 
						|
			if firstOnlineDisk == nil {
 | 
						|
				firstOnlineDisk = c.cfs[i]
 | 
						|
			}
 | 
						|
			if c.cfs[i].Exists(ctx, bucket, object) {
 | 
						|
				return c.cfs[i], nil
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if firstOnlineDisk != nil {
 | 
						|
		return firstOnlineDisk, nil
 | 
						|
	}
 | 
						|
	return nil, errDiskNotFound
 | 
						|
}
 | 
						|
 | 
						|
// choose a cache deterministically based on hash of bucket,object. The hash index is treated as
 | 
						|
// a hint. In the event that the cache drive at hash index is offline, treat the list of cache drives
 | 
						|
// as a circular buffer and walk through them starting at hash index until an online drive is found.
 | 
						|
func (c diskCache) getCacheFS(ctx context.Context, bucket, object string) (*cacheFSObjects, error) {
 | 
						|
	index := c.hashIndex(bucket, object)
 | 
						|
	numDisks := len(c.cfs)
 | 
						|
	for k := 0; k < numDisks; k++ {
 | 
						|
		i := (index + k) % numDisks
 | 
						|
		if c.cfs[i] == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if c.cfs[i].IsOnline() {
 | 
						|
			return c.cfs[i], nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil, errDiskNotFound
 | 
						|
}
 | 
						|
 | 
						|
// Compute a unique hash sum for bucket and object
 | 
						|
func (c diskCache) hashIndex(bucket, object string) int {
 | 
						|
	return crcHashMod(pathJoin(bucket, object), len(c.cfs))
 | 
						|
}
 | 
						|
 | 
						|
// construct a metadata k-v map
 | 
						|
func (c cacheObjects) getMetadata(objInfo ObjectInfo) map[string]string {
 | 
						|
	metadata := make(map[string]string)
 | 
						|
	metadata["etag"] = objInfo.ETag
 | 
						|
	metadata["content-type"] = objInfo.ContentType
 | 
						|
	metadata["content-encoding"] = objInfo.ContentEncoding
 | 
						|
 | 
						|
	for key, val := range objInfo.UserDefined {
 | 
						|
		metadata[key] = val
 | 
						|
	}
 | 
						|
	return metadata
 | 
						|
}
 | 
						|
 | 
						|
func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
 | 
						|
	if c.isCacheExclude(bucket, object) {
 | 
						|
		return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, writeLock, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	// fetch cacheFSObjects if object is currently cached or nearest available cache drive
 | 
						|
	dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object)
 | 
						|
	if err != nil {
 | 
						|
		return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, writeLock, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	cacheReader, cacheErr := dcache.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
 | 
						|
 | 
						|
	objInfo, err := c.GetObjectInfoFn(ctx, bucket, object, opts)
 | 
						|
	if backendDownError(err) && cacheErr == nil {
 | 
						|
		return cacheReader, nil
 | 
						|
	} else if err != nil {
 | 
						|
		if _, ok := err.(ObjectNotFound); ok {
 | 
						|
			// Delete cached entry if backend object was deleted.
 | 
						|
			dcache.Delete(ctx, bucket, object)
 | 
						|
		}
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if !objInfo.IsCacheable() || filterFromCache(objInfo.UserDefined) {
 | 
						|
		return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, writeLock, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	if cacheErr == nil {
 | 
						|
		if cacheReader.ObjInfo.ETag == objInfo.ETag && !isStaleCache(objInfo) {
 | 
						|
			// Object is not stale, so serve from cache
 | 
						|
			return cacheReader, nil
 | 
						|
		}
 | 
						|
		cacheReader.Close()
 | 
						|
		// Object is stale, so delete from cache
 | 
						|
		dcache.Delete(ctx, bucket, object)
 | 
						|
	}
 | 
						|
 | 
						|
	// Since we got here, we are serving the request from backend,
 | 
						|
	// and also adding the object to the cache.
 | 
						|
 | 
						|
	if rs != nil {
 | 
						|
		// We don't cache partial objects.
 | 
						|
		return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, writeLock, opts)
 | 
						|
	}
 | 
						|
	if !dcache.diskAvailable(objInfo.Size) {
 | 
						|
		return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, writeLock, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, writeLock, opts)
 | 
						|
	if bkErr != nil {
 | 
						|
		return nil, bkErr
 | 
						|
	}
 | 
						|
 | 
						|
	// Initialize pipe.
 | 
						|
	pipeReader, pipeWriter := io.Pipe()
 | 
						|
	teeReader := io.TeeReader(bkReader, pipeWriter)
 | 
						|
	hashReader, herr := hash.NewReader(pipeReader, bkReader.ObjInfo.Size, "", "", bkReader.ObjInfo.Size)
 | 
						|
	if herr != nil {
 | 
						|
		bkReader.Close()
 | 
						|
		return nil, herr
 | 
						|
	}
 | 
						|
 | 
						|
	go func() {
 | 
						|
		opts := ObjectOptions{}
 | 
						|
		putErr := dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(bkReader.ObjInfo), opts)
 | 
						|
		// close the write end of the pipe, so the error gets
 | 
						|
		// propagated to getObjReader
 | 
						|
		pipeWriter.CloseWithError(putErr)
 | 
						|
	}()
 | 
						|
 | 
						|
	cleanupBackend := func() { bkReader.Close() }
 | 
						|
	cleanupPipe := func() { pipeReader.Close() }
 | 
						|
	gr = NewGetObjectReaderFromReader(teeReader, bkReader.ObjInfo, cleanupBackend, cleanupPipe)
 | 
						|
	return gr, nil
 | 
						|
}
 | 
						|
 | 
						|
// Uses cached-object to serve the request. If object is not cached it serves the request from the backend and also
 | 
						|
// stores it in the cache for serving subsequent requests.
 | 
						|
func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
 | 
						|
	GetObjectFn := c.GetObjectFn
 | 
						|
	GetObjectInfoFn := c.GetObjectInfoFn
 | 
						|
 | 
						|
	if c.isCacheExclude(bucket, object) {
 | 
						|
		return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
 | 
						|
	}
 | 
						|
	// fetch cacheFSObjects if object is currently cached or nearest available cache drive
 | 
						|
	dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object)
 | 
						|
	if err != nil {
 | 
						|
		return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
 | 
						|
	}
 | 
						|
	// stat object on backend
 | 
						|
	objInfo, err := GetObjectInfoFn(ctx, bucket, object, opts)
 | 
						|
	backendDown := backendDownError(err)
 | 
						|
	if err != nil && !backendDown {
 | 
						|
		if _, ok := err.(ObjectNotFound); ok {
 | 
						|
			// Delete the cached entry if backend object was deleted.
 | 
						|
			dcache.Delete(ctx, bucket, object)
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if !backendDown && !objInfo.IsCacheable() {
 | 
						|
		return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	if !backendDown && filterFromCache(objInfo.UserDefined) {
 | 
						|
		return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	cachedObjInfo, err := dcache.GetObjectInfo(ctx, bucket, object, opts)
 | 
						|
	if err == nil {
 | 
						|
		if backendDown {
 | 
						|
			// If the backend is down, serve the request from cache.
 | 
						|
			return dcache.Get(ctx, bucket, object, startOffset, length, writer, etag, opts)
 | 
						|
		}
 | 
						|
		if cachedObjInfo.ETag == objInfo.ETag && !isStaleCache(objInfo) {
 | 
						|
			return dcache.Get(ctx, bucket, object, startOffset, length, writer, etag, opts)
 | 
						|
		}
 | 
						|
		dcache.Delete(ctx, bucket, object)
 | 
						|
	}
 | 
						|
	if startOffset != 0 || length != objInfo.Size {
 | 
						|
		// We don't cache partial objects.
 | 
						|
		return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
 | 
						|
	}
 | 
						|
	if !dcache.diskAvailable(objInfo.Size) {
 | 
						|
		return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
 | 
						|
	}
 | 
						|
	// Initialize pipe.
 | 
						|
	pipeReader, pipeWriter := io.Pipe()
 | 
						|
	hashReader, err := hash.NewReader(pipeReader, objInfo.Size, "", "", objInfo.Size)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	go func() {
 | 
						|
		gerr := GetObjectFn(ctx, bucket, object, 0, objInfo.Size, io.MultiWriter(writer, pipeWriter), etag, opts)
 | 
						|
		pipeWriter.CloseWithError(gerr) // Close writer explicitly signaling we wrote all data.
 | 
						|
	}()
 | 
						|
	err = dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(objInfo), opts)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	pipeReader.Close()
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// Returns ObjectInfo from cache if available.
 | 
						|
func (c cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
 | 
						|
	getObjectInfoFn := c.GetObjectInfoFn
 | 
						|
	if c.isCacheExclude(bucket, object) {
 | 
						|
		return getObjectInfoFn(ctx, bucket, object, opts)
 | 
						|
	}
 | 
						|
	// fetch cacheFSObjects if object is currently cached or nearest available cache drive
 | 
						|
	dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object)
 | 
						|
	if err != nil {
 | 
						|
		return getObjectInfoFn(ctx, bucket, object, opts)
 | 
						|
	}
 | 
						|
	objInfo, err := getObjectInfoFn(ctx, bucket, object, opts)
 | 
						|
	if err != nil {
 | 
						|
		if _, ok := err.(ObjectNotFound); ok {
 | 
						|
			// Delete the cached entry if backend object was deleted.
 | 
						|
			dcache.Delete(ctx, bucket, object)
 | 
						|
			return ObjectInfo{}, err
 | 
						|
		}
 | 
						|
		if !backendDownError(err) {
 | 
						|
			return ObjectInfo{}, err
 | 
						|
		}
 | 
						|
		// when backend is down, serve from cache.
 | 
						|
		cachedObjInfo, cerr := dcache.GetObjectInfo(ctx, bucket, object, opts)
 | 
						|
		if cerr == nil {
 | 
						|
			return cachedObjInfo, nil
 | 
						|
		}
 | 
						|
		return ObjectInfo{}, BackendDown{}
 | 
						|
	}
 | 
						|
	// when backend is up, do a sanity check on cached object
 | 
						|
	cachedObjInfo, err := dcache.GetObjectInfo(ctx, bucket, object, opts)
 | 
						|
	if err != nil {
 | 
						|
		return objInfo, nil
 | 
						|
	}
 | 
						|
	if cachedObjInfo.ETag != objInfo.ETag {
 | 
						|
		// Delete the cached entry if the backend object was replaced.
 | 
						|
		dcache.Delete(ctx, bucket, object)
 | 
						|
	}
 | 
						|
	return objInfo, nil
 | 
						|
}
 | 
						|
 | 
						|
// Returns function "listDir" of the type listDirFunc.
 | 
						|
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
 | 
						|
// disks - list of fsObjects
 | 
						|
func listDirCacheFactory(isLeaf isLeafFunc, disks []*cacheFSObjects) listDirFunc {
 | 
						|
	listCacheDirs := func(bucket, prefixDir, prefixEntry string) (dirs []string) {
 | 
						|
		var entries []string
 | 
						|
		for _, disk := range disks {
 | 
						|
			// ignore disk-caches that might be missing/offline
 | 
						|
			if disk == nil {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			fs := disk.FSObjects
 | 
						|
			var err error
 | 
						|
			entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir))
 | 
						|
			if err != nil {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			// Filter entries that have the prefix prefixEntry.
 | 
						|
			entries = filterMatchingPrefix(entries, prefixEntry)
 | 
						|
			dirs = append(dirs, entries...)
 | 
						|
		}
 | 
						|
		return dirs
 | 
						|
	}
 | 
						|
 | 
						|
	// listDir - lists all the entries at a given prefix and given entry in the prefix.
 | 
						|
	listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool) {
 | 
						|
		cacheEntries := listCacheDirs(bucket, prefixDir, prefixEntry)
 | 
						|
		for _, entry := range cacheEntries {
 | 
						|
			// Find elements in entries which are not in mergedEntries
 | 
						|
			idx := sort.SearchStrings(mergedEntries, entry)
 | 
						|
			// if entry is already present in mergedEntries don't add.
 | 
						|
			if idx < len(mergedEntries) && mergedEntries[idx] == entry {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			mergedEntries = append(mergedEntries, entry)
 | 
						|
			sort.Strings(mergedEntries)
 | 
						|
		}
 | 
						|
		return mergedEntries, false
 | 
						|
	}
 | 
						|
	return listDir
 | 
						|
}
 | 
						|
 | 
						|
// List all objects at prefix upto maxKeys, optionally delimited by '/' from the cache. Maintains the list pool
 | 
						|
// state for future re-entrant list requests.
 | 
						|
func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
 | 
						|
	var objInfos []ObjectInfo
 | 
						|
	var eof bool
 | 
						|
	var nextMarker string
 | 
						|
 | 
						|
	recursive := true
 | 
						|
	if delimiter == slashSeparator {
 | 
						|
		recursive = false
 | 
						|
	}
 | 
						|
	walkResultCh, endWalkCh := c.listPool.Release(listParams{bucket, recursive, marker, prefix, false})
 | 
						|
	if walkResultCh == nil {
 | 
						|
		endWalkCh = make(chan struct{})
 | 
						|
		isLeaf := func(bucket, object string) bool {
 | 
						|
			fs, err := c.cache.getCacheFS(ctx, bucket, object)
 | 
						|
			if err != nil {
 | 
						|
				return false
 | 
						|
			}
 | 
						|
			_, err = fs.getObjectInfo(ctx, bucket, object)
 | 
						|
			return err == nil
 | 
						|
		}
 | 
						|
 | 
						|
		isLeafDir := func(bucket, object string) bool {
 | 
						|
			fs, err := c.cache.getCacheFS(ctx, bucket, object)
 | 
						|
			if err != nil {
 | 
						|
				return false
 | 
						|
			}
 | 
						|
			return fs.isObjectDir(bucket, object)
 | 
						|
		}
 | 
						|
 | 
						|
		listDir := listDirCacheFactory(isLeaf, c.cache.cfs)
 | 
						|
		walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
 | 
						|
	}
 | 
						|
 | 
						|
	for i := 0; i < maxKeys; {
 | 
						|
		walkResult, ok := <-walkResultCh
 | 
						|
		if !ok {
 | 
						|
			// Closed channel.
 | 
						|
			eof = true
 | 
						|
			break
 | 
						|
		}
 | 
						|
		// For any walk error return right away.
 | 
						|
		if walkResult.err != nil {
 | 
						|
			return result, toObjectErr(walkResult.err, bucket, prefix)
 | 
						|
		}
 | 
						|
 | 
						|
		entry := walkResult.entry
 | 
						|
		var objInfo ObjectInfo
 | 
						|
		if hasSuffix(entry, slashSeparator) {
 | 
						|
			// Object name needs to be full path.
 | 
						|
			objInfo.Bucket = bucket
 | 
						|
			objInfo.Name = entry
 | 
						|
			objInfo.IsDir = true
 | 
						|
		} else {
 | 
						|
			// Set the Mode to a "regular" file.
 | 
						|
			var err error
 | 
						|
			fs, err := c.cache.getCacheFS(ctx, bucket, entry)
 | 
						|
			if err != nil {
 | 
						|
				// Ignore errDiskNotFound
 | 
						|
				if err == errDiskNotFound {
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				return result, toObjectErr(err, bucket, prefix)
 | 
						|
			}
 | 
						|
			objInfo, err = fs.getObjectInfo(ctx, bucket, entry)
 | 
						|
			if err != nil {
 | 
						|
				// Ignore ObjectNotFound error
 | 
						|
				if _, ok := err.(ObjectNotFound); ok {
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				return result, toObjectErr(err, bucket, prefix)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		nextMarker = objInfo.Name
 | 
						|
		objInfos = append(objInfos, objInfo)
 | 
						|
		i++
 | 
						|
		if walkResult.end {
 | 
						|
			eof = true
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	params := listParams{bucket, recursive, nextMarker, prefix, false}
 | 
						|
	if !eof {
 | 
						|
		c.listPool.Set(params, walkResultCh, endWalkCh)
 | 
						|
	}
 | 
						|
 | 
						|
	result = ListObjectsInfo{IsTruncated: !eof}
 | 
						|
	for _, objInfo := range objInfos {
 | 
						|
		result.NextMarker = objInfo.Name
 | 
						|
		if objInfo.IsDir && delimiter == slashSeparator {
 | 
						|
			result.Prefixes = append(result.Prefixes, objInfo.Name)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		result.Objects = append(result.Objects, objInfo)
 | 
						|
	}
 | 
						|
	return result, nil
 | 
						|
}
 | 
						|
 | 
						|
// listCacheV2Objects lists all blobs in bucket filtered by prefix from the cache
 | 
						|
func (c cacheObjects) listCacheV2Objects(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
 | 
						|
	loi, err := c.listCacheObjects(ctx, bucket, prefix, continuationToken, delimiter, maxKeys)
 | 
						|
	if err != nil {
 | 
						|
		return result, err
 | 
						|
	}
 | 
						|
 | 
						|
	listObjectsV2Info := ListObjectsV2Info{
 | 
						|
		IsTruncated:           loi.IsTruncated,
 | 
						|
		ContinuationToken:     continuationToken,
 | 
						|
		NextContinuationToken: loi.NextMarker,
 | 
						|
		Objects:               loi.Objects,
 | 
						|
		Prefixes:              loi.Prefixes,
 | 
						|
	}
 | 
						|
	return listObjectsV2Info, err
 | 
						|
}
 | 
						|
 | 
						|
// List all objects at prefix upto maxKeys., optionally delimited by '/'. Maintains the list pool
 | 
						|
// state for future re-entrant list requests. Retrieve from cache if backend is down
 | 
						|
func (c cacheObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
 | 
						|
 | 
						|
	listObjectsFn := c.ListObjectsFn
 | 
						|
 | 
						|
	result, err = listObjectsFn(ctx, bucket, prefix, marker, delimiter, maxKeys)
 | 
						|
	if err != nil {
 | 
						|
		if backendDownError(err) {
 | 
						|
			return c.listCacheObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// ListObjectsV2 lists all blobs in bucket filtered by prefix
 | 
						|
func (c cacheObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
 | 
						|
	listObjectsV2Fn := c.ListObjectsV2Fn
 | 
						|
 | 
						|
	result, err = listObjectsV2Fn(ctx, bucket, prefix, continuationToken, delimiter, maxKeys, fetchOwner, startAfter)
 | 
						|
	if err != nil {
 | 
						|
		if backendDownError(err) {
 | 
						|
			return c.listCacheV2Objects(ctx, bucket, prefix, continuationToken, delimiter, maxKeys, fetchOwner, startAfter)
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// Lists all the buckets in the cache
 | 
						|
func (c cacheObjects) listBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
 | 
						|
	m := make(map[string]string)
 | 
						|
	for _, cache := range c.cache.cfs {
 | 
						|
		// ignore disk-caches that might be missing/offline
 | 
						|
		if cache == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		entries, err := cache.ListBuckets(ctx)
 | 
						|
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		for _, entry := range entries {
 | 
						|
			_, ok := m[entry.Name]
 | 
						|
			if !ok {
 | 
						|
				m[entry.Name] = entry.Name
 | 
						|
				buckets = append(buckets, entry)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// Sort bucket infos by bucket name.
 | 
						|
	sort.Sort(byBucketName(buckets))
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// Returns list of buckets from cache or the backend. If the backend is down, buckets
 | 
						|
// available on cache are served.
 | 
						|
func (c cacheObjects) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
 | 
						|
	listBucketsFn := c.ListBucketsFn
 | 
						|
	buckets, err = listBucketsFn(ctx)
 | 
						|
	if err != nil {
 | 
						|
		if backendDownError(err) {
 | 
						|
			return c.listBuckets(ctx)
 | 
						|
		}
 | 
						|
		return []BucketInfo{}, err
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// Returns bucket info from cache if backend is down.
 | 
						|
func (c cacheObjects) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
 | 
						|
	getBucketInfoFn := c.GetBucketInfoFn
 | 
						|
	bucketInfo, err = getBucketInfoFn(ctx, bucket)
 | 
						|
	if backendDownError(err) {
 | 
						|
		for _, cache := range c.cache.cfs {
 | 
						|
			// ignore disk-caches that might be missing/offline
 | 
						|
			if cache == nil {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			if bucketInfo, err = cache.GetBucketInfo(ctx, bucket); err == nil {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// Delete Object deletes from cache as well if backend operation succeeds
 | 
						|
func (c cacheObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) {
 | 
						|
	if err = c.DeleteObjectFn(ctx, bucket, object); err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if c.isCacheExclude(bucket, object) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	dcache, cerr := c.cache.getCachedFSLoc(ctx, bucket, object)
 | 
						|
	if cerr == nil {
 | 
						|
		_ = dcache.DeleteObject(ctx, bucket, object)
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// Returns true if object should be excluded from cache
 | 
						|
func (c cacheObjects) isCacheExclude(bucket, object string) bool {
 | 
						|
	for _, pattern := range c.exclude {
 | 
						|
		matchStr := fmt.Sprintf("%s/%s", bucket, object)
 | 
						|
		if ok := wildcard.MatchSimple(pattern, matchStr); ok {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// PutObject - caches the uploaded object for single Put operations
 | 
						|
func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
 | 
						|
	putObjectFn := c.PutObjectFn
 | 
						|
	dcache, err := c.cache.getCacheFS(ctx, bucket, object)
 | 
						|
	if err != nil {
 | 
						|
		// disk cache could not be located,execute backend call.
 | 
						|
		return putObjectFn(ctx, bucket, object, r, metadata, opts)
 | 
						|
	}
 | 
						|
	size := r.Size()
 | 
						|
 | 
						|
	// fetch from backend if there is no space on cache drive
 | 
						|
	if !dcache.diskAvailable(size) {
 | 
						|
		return putObjectFn(ctx, bucket, object, r, metadata, opts)
 | 
						|
	}
 | 
						|
	// fetch from backend if cache exclude pattern or cache-control
 | 
						|
	// directive set to exclude
 | 
						|
	if c.isCacheExclude(bucket, object) || filterFromCache(metadata) {
 | 
						|
		dcache.Delete(ctx, bucket, object)
 | 
						|
		return putObjectFn(ctx, bucket, object, r, metadata, opts)
 | 
						|
	}
 | 
						|
	objInfo = ObjectInfo{}
 | 
						|
	// Initialize pipe to stream data to backend
 | 
						|
	pipeReader, pipeWriter := io.Pipe()
 | 
						|
	hashReader, err := hash.NewReader(pipeReader, size, r.MD5HexString(), r.SHA256HexString(), r.ActualSize())
 | 
						|
	if err != nil {
 | 
						|
		return ObjectInfo{}, err
 | 
						|
	}
 | 
						|
	// Initialize pipe to stream data to cache
 | 
						|
	rPipe, wPipe := io.Pipe()
 | 
						|
	cHashReader, err := hash.NewReader(rPipe, size, r.MD5HexString(), r.SHA256HexString(), r.ActualSize())
 | 
						|
	if err != nil {
 | 
						|
		return ObjectInfo{}, err
 | 
						|
	}
 | 
						|
	oinfoCh := make(chan ObjectInfo)
 | 
						|
	errCh := make(chan error)
 | 
						|
	go func() {
 | 
						|
		oinfo, perr := putObjectFn(ctx, bucket, object, hashReader, metadata, opts)
 | 
						|
		if perr != nil {
 | 
						|
			pipeWriter.CloseWithError(perr)
 | 
						|
			wPipe.CloseWithError(perr)
 | 
						|
			close(oinfoCh)
 | 
						|
			errCh <- perr
 | 
						|
			return
 | 
						|
		}
 | 
						|
		close(errCh)
 | 
						|
		oinfoCh <- oinfo
 | 
						|
	}()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		if err = dcache.Put(ctx, bucket, object, cHashReader, metadata, opts); err != nil {
 | 
						|
			wPipe.CloseWithError(err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	mwriter := io.MultiWriter(pipeWriter, wPipe)
 | 
						|
	_, err = io.Copy(mwriter, r)
 | 
						|
	if err != nil {
 | 
						|
		err = <-errCh
 | 
						|
		return objInfo, err
 | 
						|
	}
 | 
						|
	pipeWriter.Close()
 | 
						|
	wPipe.Close()
 | 
						|
	objInfo = <-oinfoCh
 | 
						|
	return objInfo, err
 | 
						|
}
 | 
						|
 | 
						|
// NewMultipartUpload - Starts a new multipart upload operation to backend and cache.
 | 
						|
func (c cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error) {
 | 
						|
	newMultipartUploadFn := c.NewMultipartUploadFn
 | 
						|
 | 
						|
	if c.isCacheExclude(bucket, object) || filterFromCache(metadata) {
 | 
						|
		return newMultipartUploadFn(ctx, bucket, object, metadata, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	dcache, err := c.cache.getCacheFS(ctx, bucket, object)
 | 
						|
	if err != nil {
 | 
						|
		// disk cache could not be located,execute backend call.
 | 
						|
		return newMultipartUploadFn(ctx, bucket, object, metadata, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	uploadID, err = newMultipartUploadFn(ctx, bucket, object, metadata, opts)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	// create new multipart upload in cache with same uploadID
 | 
						|
	dcache.NewMultipartUpload(ctx, bucket, object, metadata, uploadID, opts)
 | 
						|
	return uploadID, err
 | 
						|
}
 | 
						|
 | 
						|
// PutObjectPart - uploads part to backend and cache simultaneously.
 | 
						|
func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader, opts ObjectOptions) (info PartInfo, err error) {
 | 
						|
	putObjectPartFn := c.PutObjectPartFn
 | 
						|
	dcache, err := c.cache.getCacheFS(ctx, bucket, object)
 | 
						|
	if err != nil {
 | 
						|
		// disk cache could not be located,execute backend call.
 | 
						|
		return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	if c.isCacheExclude(bucket, object) {
 | 
						|
		return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	// make sure cache has at least size space available
 | 
						|
	size := data.Size()
 | 
						|
	if !dcache.diskAvailable(size) {
 | 
						|
		select {
 | 
						|
		case dcache.purgeChan <- struct{}{}:
 | 
						|
		default:
 | 
						|
		}
 | 
						|
		return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
 | 
						|
	}
 | 
						|
 | 
						|
	info = PartInfo{}
 | 
						|
	// Initialize pipe to stream data to backend
 | 
						|
	pipeReader, pipeWriter := io.Pipe()
 | 
						|
	hashReader, err := hash.NewReader(pipeReader, size, data.MD5HexString(), data.SHA256HexString(), data.ActualSize())
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	// Initialize pipe to stream data to cache
 | 
						|
	rPipe, wPipe := io.Pipe()
 | 
						|
	cHashReader, err := hash.NewReader(rPipe, size, data.MD5HexString(), data.SHA256HexString(), data.ActualSize())
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	pinfoCh := make(chan PartInfo)
 | 
						|
	errorCh := make(chan error)
 | 
						|
	go func() {
 | 
						|
		info, err = putObjectPartFn(ctx, bucket, object, uploadID, partID, hashReader, opts)
 | 
						|
		if err != nil {
 | 
						|
			close(pinfoCh)
 | 
						|
			pipeWriter.CloseWithError(err)
 | 
						|
			wPipe.CloseWithError(err)
 | 
						|
			errorCh <- err
 | 
						|
			return
 | 
						|
		}
 | 
						|
		close(errorCh)
 | 
						|
		pinfoCh <- info
 | 
						|
	}()
 | 
						|
	go func() {
 | 
						|
		if _, perr := dcache.PutObjectPart(ctx, bucket, object, uploadID, partID, cHashReader, opts); perr != nil {
 | 
						|
			wPipe.CloseWithError(perr)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	mwriter := io.MultiWriter(pipeWriter, wPipe)
 | 
						|
	_, err = io.Copy(mwriter, data)
 | 
						|
	if err != nil {
 | 
						|
		err = <-errorCh
 | 
						|
		return PartInfo{}, err
 | 
						|
	}
 | 
						|
	pipeWriter.Close()
 | 
						|
	wPipe.Close()
 | 
						|
	info = <-pinfoCh
 | 
						|
	return info, err
 | 
						|
}
 | 
						|
 | 
						|
// AbortMultipartUpload - aborts multipart upload on backend and cache.
 | 
						|
func (c cacheObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
 | 
						|
	abortMultipartUploadFn := c.AbortMultipartUploadFn
 | 
						|
 | 
						|
	if c.isCacheExclude(bucket, object) {
 | 
						|
		return abortMultipartUploadFn(ctx, bucket, object, uploadID)
 | 
						|
	}
 | 
						|
 | 
						|
	dcache, err := c.cache.getCacheFS(ctx, bucket, object)
 | 
						|
	if err != nil {
 | 
						|
		// disk cache could not be located,execute backend call.
 | 
						|
		return abortMultipartUploadFn(ctx, bucket, object, uploadID)
 | 
						|
	}
 | 
						|
	// execute backend operation
 | 
						|
	err = abortMultipartUploadFn(ctx, bucket, object, uploadID)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	// abort multipart upload on cache
 | 
						|
	dcache.AbortMultipartUpload(ctx, bucket, object, uploadID)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// CompleteMultipartUpload - completes multipart upload operation on backend and cache.
 | 
						|
func (c cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error) {
 | 
						|
	completeMultipartUploadFn := c.CompleteMultipartUploadFn
 | 
						|
 | 
						|
	if c.isCacheExclude(bucket, object) {
 | 
						|
		return completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts)
 | 
						|
	}
 | 
						|
 | 
						|
	dcache, err := c.cache.getCacheFS(ctx, bucket, object)
 | 
						|
	if err != nil {
 | 
						|
		// disk cache could not be located,execute backend call.
 | 
						|
		return completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts)
 | 
						|
	}
 | 
						|
	// perform backend operation
 | 
						|
	objInfo, err = completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	// create new multipart upload in cache with same uploadID
 | 
						|
	dcache.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts)
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// StorageInfo - returns underlying storage statistics.
 | 
						|
func (c cacheObjects) StorageInfo(ctx context.Context) (cInfo CacheStorageInfo) {
 | 
						|
	var total, free uint64
 | 
						|
	for _, cfs := range c.cache.cfs {
 | 
						|
		if cfs == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		info, err := getDiskInfo(cfs.fsPath)
 | 
						|
		logger.GetReqInfo(ctx).AppendTags("cachePath", cfs.fsPath)
 | 
						|
		logger.LogIf(ctx, err)
 | 
						|
		total += info.Total
 | 
						|
		free += info.Free
 | 
						|
	}
 | 
						|
	return CacheStorageInfo{
 | 
						|
		Total: total,
 | 
						|
		Free:  free,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// DeleteBucket - marks bucket to be deleted from cache if bucket is deleted from backend.
 | 
						|
func (c cacheObjects) DeleteBucket(ctx context.Context, bucket string) (err error) {
 | 
						|
	deleteBucketFn := c.DeleteBucketFn
 | 
						|
	var toDel []*cacheFSObjects
 | 
						|
	for _, cfs := range c.cache.cfs {
 | 
						|
		// ignore disk-caches that might be missing/offline
 | 
						|
		if cfs == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if _, cerr := cfs.GetBucketInfo(ctx, bucket); cerr == nil {
 | 
						|
			toDel = append(toDel, cfs)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// perform backend operation
 | 
						|
	err = deleteBucketFn(ctx, bucket)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	// move bucket metadata and content to cache's trash dir
 | 
						|
	for _, d := range toDel {
 | 
						|
		d.moveBucketToTrash(ctx, bucket)
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// newCache initializes the cacheFSObjects for the "drives" specified in config.json
 | 
						|
// or the global env overrides.
 | 
						|
func newCache(config CacheConfig) (*diskCache, error) {
 | 
						|
	var cfsObjects []*cacheFSObjects
 | 
						|
	ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{})
 | 
						|
	formats, err := loadAndValidateCacheFormat(ctx, config.Drives)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	for i, dir := range config.Drives {
 | 
						|
		// skip cacheFSObjects creation for cache drives missing a format.json
 | 
						|
		if formats[i] == nil {
 | 
						|
			cfsObjects = append(cfsObjects, nil)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if err := checkAtimeSupport(dir); err != nil {
 | 
						|
			return nil, errors.New("Atime support required for disk caching")
 | 
						|
		}
 | 
						|
		cache, err := newCacheFSObjects(dir, config.Expiry, config.MaxUse)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		// Start the purging go-routine for entries that have expired
 | 
						|
		go cache.purge()
 | 
						|
 | 
						|
		// Start trash purge routine for deleted buckets.
 | 
						|
		go cache.purgeTrash()
 | 
						|
 | 
						|
		cfsObjects = append(cfsObjects, cache)
 | 
						|
	}
 | 
						|
	return &diskCache{cfs: cfsObjects}, nil
 | 
						|
}
 | 
						|
 | 
						|
// Return error if Atime is disabled on the O/S
 | 
						|
func checkAtimeSupport(dir string) (err error) {
 | 
						|
	file, err := ioutil.TempFile(dir, "prefix")
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	defer os.Remove(file.Name())
 | 
						|
	finfo1, err := os.Stat(file.Name())
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if _, err = io.Copy(ioutil.Discard, file); err != io.EOF {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	finfo2, err := os.Stat(file.Name())
 | 
						|
 | 
						|
	if atime.Get(finfo2).Equal(atime.Get(finfo1)) {
 | 
						|
		return errors.New("Atime not supported")
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// Returns cacheObjects for use by Server.
 | 
						|
func newServerCacheObjects(config CacheConfig) (CacheObjectLayer, error) {
 | 
						|
	// list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var.
 | 
						|
	dcache, err := newCache(config)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &cacheObjects{
 | 
						|
		cache:    dcache,
 | 
						|
		exclude:  config.Exclude,
 | 
						|
		listPool: newTreeWalkPool(globalLookupTimeout),
 | 
						|
		GetObjectFn: func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
 | 
						|
			return newObjectLayerFn().GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts)
 | 
						|
		},
 | 
						|
		GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
 | 
						|
			return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
 | 
						|
		},
 | 
						|
		GetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
 | 
						|
			return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
 | 
						|
		},
 | 
						|
		PutObjectFn: func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
 | 
						|
			return newObjectLayerFn().PutObject(ctx, bucket, object, data, metadata, opts)
 | 
						|
		},
 | 
						|
		DeleteObjectFn: func(ctx context.Context, bucket, object string) error {
 | 
						|
			return newObjectLayerFn().DeleteObject(ctx, bucket, object)
 | 
						|
		},
 | 
						|
		ListObjectsFn: func(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
 | 
						|
			return newObjectLayerFn().ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
 | 
						|
		},
 | 
						|
		ListObjectsV2Fn: func(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
 | 
						|
			return newObjectLayerFn().ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, maxKeys, fetchOwner, startAfter)
 | 
						|
		},
 | 
						|
		ListBucketsFn: func(ctx context.Context) (buckets []BucketInfo, err error) {
 | 
						|
			return newObjectLayerFn().ListBuckets(ctx)
 | 
						|
		},
 | 
						|
		GetBucketInfoFn: func(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
 | 
						|
			return newObjectLayerFn().GetBucketInfo(ctx, bucket)
 | 
						|
		},
 | 
						|
		NewMultipartUploadFn: func(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error) {
 | 
						|
			return newObjectLayerFn().NewMultipartUpload(ctx, bucket, object, metadata, opts)
 | 
						|
		},
 | 
						|
		PutObjectPartFn: func(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader, opts ObjectOptions) (info PartInfo, err error) {
 | 
						|
			return newObjectLayerFn().PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
 | 
						|
		},
 | 
						|
		AbortMultipartUploadFn: func(ctx context.Context, bucket, object, uploadID string) error {
 | 
						|
			return newObjectLayerFn().AbortMultipartUpload(ctx, bucket, object, uploadID)
 | 
						|
		},
 | 
						|
		CompleteMultipartUploadFn: func(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error) {
 | 
						|
			return newObjectLayerFn().CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts)
 | 
						|
		},
 | 
						|
		DeleteBucketFn: func(ctx context.Context, bucket string) error {
 | 
						|
			return newObjectLayerFn().DeleteBucket(ctx, bucket)
 | 
						|
		},
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
type cacheControl struct {
 | 
						|
	exclude  bool
 | 
						|
	expiry   time.Time
 | 
						|
	maxAge   int
 | 
						|
	sMaxAge  int
 | 
						|
	minFresh int
 | 
						|
}
 | 
						|
 | 
						|
// cache exclude directives in cache-control header
 | 
						|
var cacheExcludeDirectives = []string{
 | 
						|
	"no-cache",
 | 
						|
	"no-store",
 | 
						|
	"must-revalidate",
 | 
						|
}
 | 
						|
 | 
						|
// returns true if cache exclude directives are set.
 | 
						|
func isCacheExcludeDirective(s string) bool {
 | 
						|
	for _, directive := range cacheExcludeDirectives {
 | 
						|
		if s == directive {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// returns struct with cache-control settings from user metadata.
 | 
						|
func getCacheControlOpts(m map[string]string) (c cacheControl, err error) {
 | 
						|
	var headerVal string
 | 
						|
	for k, v := range m {
 | 
						|
		if k == "cache-control" {
 | 
						|
			headerVal = v
 | 
						|
		}
 | 
						|
		if k == "expires" {
 | 
						|
			if e, err := http.ParseTime(v); err == nil {
 | 
						|
				c.expiry = e
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if headerVal == "" {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	headerVal = strings.ToLower(headerVal)
 | 
						|
	headerVal = strings.TrimSpace(headerVal)
 | 
						|
 | 
						|
	vals := strings.Split(headerVal, ",")
 | 
						|
	for _, val := range vals {
 | 
						|
		val = strings.TrimSpace(val)
 | 
						|
		p := strings.Split(val, "=")
 | 
						|
		if isCacheExcludeDirective(p[0]) {
 | 
						|
			c.exclude = true
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		if len(p) != 2 {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if p[0] == "max-age" ||
 | 
						|
			p[0] == "s-maxage" ||
 | 
						|
			p[0] == "min-fresh" {
 | 
						|
			i, err := strconv.Atoi(p[1])
 | 
						|
			if err != nil {
 | 
						|
				return c, err
 | 
						|
			}
 | 
						|
			if p[0] == "max-age" {
 | 
						|
				c.maxAge = i
 | 
						|
			}
 | 
						|
			if p[0] == "s-maxage" {
 | 
						|
				c.sMaxAge = i
 | 
						|
			}
 | 
						|
			if p[0] == "min-fresh" {
 | 
						|
				c.minFresh = i
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return c, nil
 | 
						|
}
 | 
						|
 | 
						|
// return true if metadata has a cache-control header
 | 
						|
// directive to exclude object from cache.
 | 
						|
func filterFromCache(m map[string]string) bool {
 | 
						|
	c, err := getCacheControlOpts(m)
 | 
						|
	if err != nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	return c.exclude
 | 
						|
}
 | 
						|
 | 
						|
// returns true if cache expiry conditions met in cache-control/expiry metadata.
 | 
						|
func isStaleCache(objInfo ObjectInfo) bool {
 | 
						|
	c, err := getCacheControlOpts(objInfo.UserDefined)
 | 
						|
	if err != nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	now := time.Now()
 | 
						|
	if c.sMaxAge > 0 && c.sMaxAge > int(now.Sub(objInfo.ModTime).Seconds()) {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	if c.maxAge > 0 && c.maxAge > int(now.Sub(objInfo.ModTime).Seconds()) {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	if !c.expiry.Equal(time.Time{}) && c.expiry.Before(time.Now()) {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	if c.minFresh > 0 && c.minFresh <= int(now.Sub(objInfo.ModTime).Seconds()) {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 |