mirror of
				https://github.com/minio/minio.git
				synced 2025-10-31 00:01:27 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			359 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			359 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|  * MinIO Cloud Storage, (C) 2016-2019 MinIO, Inc.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  */
 | |
| 
 | |
| package cmd
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/hex"
 | |
| 	"fmt"
 | |
| 	"net/http"
 | |
| 	"sort"
 | |
| 	"time"
 | |
| 
 | |
| 	xhttp "github.com/minio/minio/cmd/http"
 | |
| 	"github.com/minio/minio/cmd/logger"
 | |
| 	"github.com/minio/minio/pkg/bucket/replication"
 | |
| 	"github.com/minio/minio/pkg/sync/errgroup"
 | |
| 	"github.com/minio/sha256-simd"
 | |
| )
 | |
| 
 | |
| const erasureAlgorithm = "rs-vandermonde"
 | |
| 
 | |
| // byObjectPartNumber is a collection satisfying sort.Interface.
 | |
| type byObjectPartNumber []ObjectPartInfo
 | |
| 
 | |
| func (t byObjectPartNumber) Len() int           { return len(t) }
 | |
| func (t byObjectPartNumber) Swap(i, j int)      { t[i], t[j] = t[j], t[i] }
 | |
| func (t byObjectPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number }
 | |
| 
 | |
| // AddChecksumInfo adds a checksum of a part.
 | |
| func (e *ErasureInfo) AddChecksumInfo(ckSumInfo ChecksumInfo) {
 | |
| 	for i, sum := range e.Checksums {
 | |
| 		if sum.PartNumber == ckSumInfo.PartNumber {
 | |
| 			e.Checksums[i] = ckSumInfo
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	e.Checksums = append(e.Checksums, ckSumInfo)
 | |
| }
 | |
| 
 | |
| // GetChecksumInfo - get checksum of a part.
 | |
| func (e ErasureInfo) GetChecksumInfo(partNumber int) (ckSum ChecksumInfo) {
 | |
| 	for _, sum := range e.Checksums {
 | |
| 		if sum.PartNumber == partNumber {
 | |
| 			// Return the checksum
 | |
| 			return sum
 | |
| 		}
 | |
| 	}
 | |
| 	return ChecksumInfo{}
 | |
| }
 | |
| 
 | |
| // ShardFileSize - returns final erasure size from original size.
 | |
| func (e ErasureInfo) ShardFileSize(totalLength int64) int64 {
 | |
| 	if totalLength == 0 {
 | |
| 		return 0
 | |
| 	}
 | |
| 	if totalLength == -1 {
 | |
| 		return -1
 | |
| 	}
 | |
| 	numShards := totalLength / e.BlockSize
 | |
| 	lastBlockSize := totalLength % e.BlockSize
 | |
| 	lastShardSize := ceilFrac(lastBlockSize, int64(e.DataBlocks))
 | |
| 	return numShards*e.ShardSize() + lastShardSize
 | |
| }
 | |
| 
 | |
| // ShardSize - returns actual shared size from erasure blockSize.
 | |
| func (e ErasureInfo) ShardSize() int64 {
 | |
| 	return ceilFrac(e.BlockSize, int64(e.DataBlocks))
 | |
| }
 | |
| 
 | |
| // IsValid - tells if erasure info fields are valid.
 | |
| func (fi FileInfo) IsValid() bool {
 | |
| 	if fi.Deleted {
 | |
| 		// Delete marker has no data, no need to check
 | |
| 		// for erasure coding information
 | |
| 		return true
 | |
| 	}
 | |
| 	dataBlocks := fi.Erasure.DataBlocks
 | |
| 	parityBlocks := fi.Erasure.ParityBlocks
 | |
| 	correctIndexes := (fi.Erasure.Index > 0 &&
 | |
| 		fi.Erasure.Index <= dataBlocks+parityBlocks &&
 | |
| 		len(fi.Erasure.Distribution) == (dataBlocks+parityBlocks))
 | |
| 	return ((dataBlocks >= parityBlocks) &&
 | |
| 		(dataBlocks != 0) && (parityBlocks != 0) &&
 | |
| 		correctIndexes)
 | |
| }
 | |
| 
 | |
| // ToObjectInfo - Converts metadata to object info.
 | |
| func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
 | |
| 	object = decodeDirObject(object)
 | |
| 	versionID := fi.VersionID
 | |
| 	if globalBucketVersioningSys.Enabled(bucket) && versionID == "" {
 | |
| 		versionID = nullVersionID
 | |
| 	}
 | |
| 
 | |
| 	objInfo := ObjectInfo{
 | |
| 		IsDir:           HasSuffix(object, SlashSeparator),
 | |
| 		Bucket:          bucket,
 | |
| 		Name:            object,
 | |
| 		VersionID:       versionID,
 | |
| 		IsLatest:        fi.IsLatest,
 | |
| 		DeleteMarker:    fi.Deleted,
 | |
| 		Size:            fi.Size,
 | |
| 		ModTime:         fi.ModTime,
 | |
| 		Legacy:          fi.XLV1,
 | |
| 		ContentType:     fi.Metadata["content-type"],
 | |
| 		ContentEncoding: fi.Metadata["content-encoding"],
 | |
| 	}
 | |
| 	// Update expires
 | |
| 	var (
 | |
| 		t time.Time
 | |
| 		e error
 | |
| 	)
 | |
| 	if exp, ok := fi.Metadata["expires"]; ok {
 | |
| 		if t, e = time.Parse(http.TimeFormat, exp); e == nil {
 | |
| 			objInfo.Expires = t.UTC()
 | |
| 		}
 | |
| 	}
 | |
| 	objInfo.backendType = BackendErasure
 | |
| 
 | |
| 	// Extract etag from metadata.
 | |
| 	objInfo.ETag = extractETag(fi.Metadata)
 | |
| 
 | |
| 	// Add user tags to the object info
 | |
| 	objInfo.UserTags = fi.Metadata[xhttp.AmzObjectTagging]
 | |
| 
 | |
| 	// Add replication status to the object info
 | |
| 	objInfo.ReplicationStatus = replication.StatusType(fi.Metadata[xhttp.AmzBucketReplicationStatus])
 | |
| 	if fi.Deleted {
 | |
| 		objInfo.ReplicationStatus = replication.StatusType(fi.DeleteMarkerReplicationStatus)
 | |
| 	}
 | |
| 
 | |
| 	objInfo.TransitionStatus = fi.TransitionStatus
 | |
| 
 | |
| 	// etag/md5Sum has already been extracted. We need to
 | |
| 	// remove to avoid it from appearing as part of
 | |
| 	// response headers. e.g, X-Minio-* or X-Amz-*.
 | |
| 	// Tags have also been extracted, we remove that as well.
 | |
| 	objInfo.UserDefined = cleanMetadata(fi.Metadata)
 | |
| 
 | |
| 	// All the parts per object.
 | |
| 	objInfo.Parts = fi.Parts
 | |
| 
 | |
| 	// Update storage class
 | |
| 	if sc, ok := fi.Metadata[xhttp.AmzStorageClass]; ok {
 | |
| 		objInfo.StorageClass = sc
 | |
| 	} else {
 | |
| 		objInfo.StorageClass = globalMinioDefaultStorageClass
 | |
| 	}
 | |
| 	objInfo.VersionPurgeStatus = fi.VersionPurgeStatus
 | |
| 	// set restore status for transitioned object
 | |
| 	if ongoing, exp, err := parseRestoreHeaderFromMeta(fi.Metadata); err == nil {
 | |
| 		objInfo.RestoreOngoing = ongoing
 | |
| 		objInfo.RestoreExpires = exp
 | |
| 	}
 | |
| 	// Success.
 | |
| 	return objInfo
 | |
| }
 | |
| 
 | |
| // objectPartIndex - returns the index of matching object part number.
 | |
| func objectPartIndex(parts []ObjectPartInfo, partNumber int) int {
 | |
| 	for i, part := range parts {
 | |
| 		if partNumber == part.Number {
 | |
| 			return i
 | |
| 		}
 | |
| 	}
 | |
| 	return -1
 | |
| }
 | |
| 
 | |
| // AddObjectPart - add a new object part in order.
 | |
| func (fi *FileInfo) AddObjectPart(partNumber int, partETag string, partSize int64, actualSize int64) {
 | |
| 	partInfo := ObjectPartInfo{
 | |
| 		Number:     partNumber,
 | |
| 		ETag:       partETag,
 | |
| 		Size:       partSize,
 | |
| 		ActualSize: actualSize,
 | |
| 	}
 | |
| 
 | |
| 	// Update part info if it already exists.
 | |
| 	for i, part := range fi.Parts {
 | |
| 		if partNumber == part.Number {
 | |
| 			fi.Parts[i] = partInfo
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Proceed to include new part info.
 | |
| 	fi.Parts = append(fi.Parts, partInfo)
 | |
| 
 | |
| 	// Parts in FileInfo should be in sorted order by part number.
 | |
| 	sort.Sort(byObjectPartNumber(fi.Parts))
 | |
| }
 | |
| 
 | |
| // ObjectToPartOffset - translate offset of an object to offset of its individual part.
 | |
| func (fi FileInfo) ObjectToPartOffset(ctx context.Context, offset int64) (partIndex int, partOffset int64, err error) {
 | |
| 	if offset == 0 {
 | |
| 		// Special case - if offset is 0, then partIndex and partOffset are always 0.
 | |
| 		return 0, 0, nil
 | |
| 	}
 | |
| 	partOffset = offset
 | |
| 	// Seek until object offset maps to a particular part offset.
 | |
| 	for i, part := range fi.Parts {
 | |
| 		partIndex = i
 | |
| 		// Offset is smaller than size we have reached the proper part offset.
 | |
| 		if partOffset < part.Size {
 | |
| 			return partIndex, partOffset, nil
 | |
| 		}
 | |
| 		// Continue to towards the next part.
 | |
| 		partOffset -= part.Size
 | |
| 	}
 | |
| 	logger.LogIf(ctx, InvalidRange{})
 | |
| 	// Offset beyond the size of the object return InvalidRange.
 | |
| 	return 0, 0, InvalidRange{}
 | |
| }
 | |
| 
 | |
| func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (xmv FileInfo, e error) {
 | |
| 	metaHashes := make([]string, len(metaArr))
 | |
| 	h := sha256.New()
 | |
| 	for i, meta := range metaArr {
 | |
| 		if meta.IsValid() && meta.ModTime.Equal(modTime) {
 | |
| 			for _, part := range meta.Parts {
 | |
| 				h.Write([]byte(fmt.Sprintf("part.%d", part.Number)))
 | |
| 			}
 | |
| 			h.Write([]byte(fmt.Sprintf("%v", meta.Erasure.Distribution)))
 | |
| 			metaHashes[i] = hex.EncodeToString(h.Sum(nil))
 | |
| 			h.Reset()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	metaHashCountMap := make(map[string]int)
 | |
| 	for _, hash := range metaHashes {
 | |
| 		if hash == "" {
 | |
| 			continue
 | |
| 		}
 | |
| 		metaHashCountMap[hash]++
 | |
| 	}
 | |
| 
 | |
| 	maxHash := ""
 | |
| 	maxCount := 0
 | |
| 	for hash, count := range metaHashCountMap {
 | |
| 		if count > maxCount {
 | |
| 			maxCount = count
 | |
| 			maxHash = hash
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if maxCount < quorum {
 | |
| 		return FileInfo{}, errErasureReadQuorum
 | |
| 	}
 | |
| 
 | |
| 	for i, hash := range metaHashes {
 | |
| 		if hash == maxHash {
 | |
| 			return metaArr[i], nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return FileInfo{}, errErasureReadQuorum
 | |
| }
 | |
| 
 | |
| // pickValidFileInfo - picks one valid FileInfo content and returns from a
 | |
| // slice of FileInfo.
 | |
| func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (xmv FileInfo, e error) {
 | |
| 	return findFileInfoInQuorum(ctx, metaArr, modTime, quorum)
 | |
| }
 | |
| 
 | |
| // Rename metadata content to destination location for each disk concurrently.
 | |
| func renameFileInfo(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) ([]StorageAPI, error) {
 | |
| 	ignoredErr := []error{errFileNotFound}
 | |
| 
 | |
| 	g := errgroup.WithNErrs(len(disks))
 | |
| 
 | |
| 	// Rename file on all underlying storage disks.
 | |
| 	for index := range disks {
 | |
| 		index := index
 | |
| 		g.Go(func() error {
 | |
| 			if disks[index] == nil {
 | |
| 				return errDiskNotFound
 | |
| 			}
 | |
| 			if err := disks[index].RenameData(ctx, srcBucket, srcEntry, "", dstBucket, dstEntry); err != nil {
 | |
| 				if !IsErrIgnored(err, ignoredErr...) {
 | |
| 					return err
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		}, index)
 | |
| 	}
 | |
| 
 | |
| 	// Wait for all renames to finish.
 | |
| 	errs := g.Wait()
 | |
| 
 | |
| 	// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
 | |
| 	// otherwise return failure. Cleanup successful renames.
 | |
| 	err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum)
 | |
| 	return evalDisks(disks, errs), err
 | |
| }
 | |
| 
 | |
| // writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently.
 | |
| func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix string, files []FileInfo, quorum int) ([]StorageAPI, error) {
 | |
| 	g := errgroup.WithNErrs(len(disks))
 | |
| 
 | |
| 	// Start writing `xl.meta` to all disks in parallel.
 | |
| 	for index := range disks {
 | |
| 		index := index
 | |
| 		g.Go(func() error {
 | |
| 			if disks[index] == nil {
 | |
| 				return errDiskNotFound
 | |
| 			}
 | |
| 			// Pick one FileInfo for a disk at index.
 | |
| 			files[index].Erasure.Index = index + 1
 | |
| 			return disks[index].WriteMetadata(ctx, bucket, prefix, files[index])
 | |
| 		}, index)
 | |
| 	}
 | |
| 
 | |
| 	// Wait for all the routines.
 | |
| 	mErrs := g.Wait()
 | |
| 
 | |
| 	err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
 | |
| 	return evalDisks(disks, mErrs), err
 | |
| }
 | |
| 
 | |
| // Returns per object readQuorum and writeQuorum
 | |
| // readQuorum is the min required disks to read data.
 | |
| // writeQuorum is the min required disks to write data.
 | |
| func objectQuorumFromMeta(ctx context.Context, er erasureObjects, partsMetaData []FileInfo, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) {
 | |
| 	// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
 | |
| 	latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs)
 | |
| 	if err != nil {
 | |
| 		return 0, 0, err
 | |
| 	}
 | |
| 
 | |
| 	dataBlocks := latestFileInfo.Erasure.DataBlocks
 | |
| 	parityBlocks := globalStorageClass.GetParityForSC(latestFileInfo.Metadata[xhttp.AmzStorageClass])
 | |
| 	if parityBlocks == 0 {
 | |
| 		parityBlocks = dataBlocks
 | |
| 	}
 | |
| 
 | |
| 	writeQuorum := dataBlocks
 | |
| 	if dataBlocks == parityBlocks {
 | |
| 		writeQuorum = dataBlocks + 1
 | |
| 	}
 | |
| 
 | |
| 	// Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks
 | |
| 	// from latestFileInfo to get the quorum
 | |
| 	return dataBlocks, writeQuorum, nil
 | |
| }
 |