diff --git a/cmd/erasure-healing-common.go b/cmd/erasure-healing-common.go index a834f9b05..e8be444b3 100644 --- a/cmd/erasure-healing-common.go +++ b/cmd/erasure-healing-common.go @@ -25,9 +25,11 @@ import ( ) // commonTime returns a maximally occurring time from a list of time. -func commonTime(modTimes []time.Time) (modTime time.Time, count int) { +func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dataDir string) { var maxima int // Counter for remembering max occurrence of elements. - timeOccurenceMap := make(map[int64]int) + + timeOccurenceMap := make(map[int64]int, len(modTimes)) + dataDirOccurenceMap := make(map[string]int, len(dataDirs)) // Ignore the uuid sentinel and count the rest. for _, time := range modTimes { if time.Equal(timeSentinel) { @@ -36,6 +38,13 @@ func commonTime(modTimes []time.Time) (modTime time.Time, count int) { timeOccurenceMap[time.UnixNano()]++ } + for _, dataDir := range dataDirs { + if dataDir == "" { + continue + } + dataDirOccurenceMap[dataDir]++ + } + // Find the common cardinality from previously collected // occurrences of elements. for nano, count := range timeOccurenceMap { @@ -46,8 +55,18 @@ func commonTime(modTimes []time.Time) (modTime time.Time, count int) { } } + // Find the common cardinality from the previously collected + // occurrences of elements. + var dmaxima int + for ddataDir, count := range dataDirOccurenceMap { + if count > dmaxima { + dmaxima = count + dataDir = ddataDir + } + } + // Return the collected common uuid. - return modTime, maxima + return modTime, dataDir } // Beginning of unix time is treated as sentinel value here. @@ -101,24 +120,33 @@ func listObjectModtimes(partsMetadata []FileInfo, errs []error) (modTimes []time // - a slice of disks where disk having 'older' xl.meta (or nothing) // are set to nil. // - latest (in time) of the maximally occurring modTime(s). -func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error) (onlineDisks []StorageAPI, modTime time.Time) { +func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error) (onlineDisks []StorageAPI, modTime time.Time, dataDir string) { onlineDisks = make([]StorageAPI, len(disks)) // List all the file commit ids from parts metadata. modTimes := listObjectModtimes(partsMetadata, errs) + dataDirs := make([]string, len(partsMetadata)) + for idx, fi := range partsMetadata { + if errs[idx] != nil { + continue + } + dataDirs[idx] = fi.DataDir + } + // Reduce list of UUIDs to a single common value. - modTime, _ = commonTime(modTimes) + modTime, dataDir = commonTime(modTimes, dataDirs) // Create a new online disks slice, which have common uuid. for index, t := range modTimes { - if t.Equal(modTime) { + if partsMetadata[index].IsValid() && t.Equal(modTime) && partsMetadata[index].DataDir == dataDir { onlineDisks[index] = disks[index] } else { onlineDisks[index] = nil } } - return onlineDisks, modTime + + return onlineDisks, modTime, dataDir } // Returns the latest updated FileInfo files and error in case of failure. @@ -131,16 +159,24 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err // List all the file commit ids from parts metadata. modTimes := listObjectModtimes(partsMetadata, errs) + dataDirs := make([]string, len(partsMetadata)) + for idx, fi := range partsMetadata { + if errs[idx] != nil { + continue + } + dataDirs[idx] = fi.DataDir + } + // Count all latest updated FileInfo values var count int var latestFileInfo FileInfo // Reduce list of UUIDs to a single common value - i.e. the last updated Time - modTime, _ := commonTime(modTimes) + modTime, dataDir := commonTime(modTimes, dataDirs) // Interate through all the modTimes and count the FileInfo(s) with latest time. for index, t := range modTimes { - if t.Equal(modTime) && partsMetadata[index].IsValid() { + if partsMetadata[index].IsValid() && t.Equal(modTime) && dataDir == partsMetadata[index].DataDir { latestFileInfo = partsMetadata[index] count++ } diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index 6a190f1ab..541c110de 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -82,7 +82,7 @@ func TestCommonTime(t *testing.T) { // common modtime. Tests fail if modtime does not match. for i, testCase := range testCases { // Obtain a common mod time from modTimes slice. - ctime, _ := commonTime(testCase.times) + ctime, _ := commonTime(testCase.times, nil) if !testCase.time.Equal(ctime) { t.Fatalf("Test case %d, expect to pass but failed. Wanted modTime: %s, got modTime: %s\n", i+1, testCase.time, ctime) } @@ -181,8 +181,8 @@ func TestListOnlineDisks(t *testing.T) { z := obj.(*erasureServerPools) erasureDisks := z.serverPools[0].sets[0].getDisks() for i, test := range testCases { + test := test t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{}) if err != nil { t.Fatalf("Failed to putObject %v", err) @@ -196,7 +196,7 @@ func TestListOnlineDisks(t *testing.T) { for j := range partsMetadata { if errs[j] != nil { - t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[j]) + t.Fatalf("expected error to be nil: %s", errs[j]) } partsMetadata[j].ModTime = test.modTimes[j] } @@ -215,8 +215,7 @@ func TestListOnlineDisks(t *testing.T) { tamperedIndex = index dErr := erasureDisks[index].Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false) if dErr != nil { - t.Fatalf("Test %d: Failed to delete %s - %v", i+1, - filepath.Join(object, "part.1"), dErr) + t.Fatalf("Failed to delete %s - %v", filepath.Join(object, "part.1"), dErr) } break } @@ -242,19 +241,22 @@ func TestListOnlineDisks(t *testing.T) { } - onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs) + onlineDisks, modTime, dataDir := listOnlineDisks(erasureDisks, partsMetadata, test.errs) if !modTime.Equal(test.expectedTime) { - t.Fatalf("Test %d: Expected modTime to be equal to %v but was found to be %v", - i+1, test.expectedTime, modTime) + t.Fatalf("Expected modTime to be equal to %v but was found to be %v", + test.expectedTime, modTime) + } + if fi.DataDir != dataDir { + t.Fatalf("Expected dataDir to be equal to %v but was found to be %v", + fi.DataDir, dataDir) } - availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan) test.errs = newErrs if test._tamperBackend != noTamper { if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil { - t.Fatalf("Test %d: disk (%v) with part.1 missing is not a disk with available data", - i+1, erasureDisks[tamperedIndex]) + t.Fatalf("disk (%v) with part.1 missing is not a disk with available data", + erasureDisks[tamperedIndex]) } } }) @@ -354,22 +356,22 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { z := obj.(*erasureServerPools) erasureDisks := z.serverPools[0].sets[0].getDisks() for i, test := range testCases { + test := test t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{}) if err != nil { t.Fatalf("Failed to putObject %v", err) } partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true) - _, err := getLatestFileInfo(ctx, partsMetadata, errs) + fi, err := getLatestFileInfo(ctx, partsMetadata, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo %v", err) } for j := range partsMetadata { if errs[j] != nil { - t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[j]) + t.Fatalf("expected error to be nil: %s", errs[j]) } partsMetadata[j].ModTime = test.modTimes[j] } @@ -392,8 +394,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { tamperedIndex = index dErr := erasureDisks[index].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false) if dErr != nil { - t.Fatalf("Test %d: Failed to delete %s - %v", i+1, - pathJoin(object, xlStorageFormatFile), dErr) + t.Fatalf("Failed to delete %s - %v", pathJoin(object, xlStorageFormatFile), dErr) } break } @@ -424,10 +425,15 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { t.Fatalf("Failed to getLatestFileInfo %v", err) } - onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs) + onlineDisks, modTime, dataDir := listOnlineDisks(erasureDisks, partsMetadata, test.errs) if !modTime.Equal(test.expectedTime) { - t.Fatalf("Test %d: Expected modTime to be equal to %v but was found to be %v", - i+1, test.expectedTime, modTime) + t.Fatalf("Expected modTime to be equal to %v but was found to be %v", + test.expectedTime, modTime) + } + + if fi.DataDir != dataDir { + t.Fatalf("Expected dataDir to be equal to %v but was found to be %v", + fi.DataDir, dataDir) } availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan) @@ -435,8 +441,8 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { if test._tamperBackend != noTamper { if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil { - t.Fatalf("Test %d: disk (%v) with part.1 missing is not a disk with available data", - i+1, erasureDisks[tamperedIndex]) + t.Fatalf("disk (%v) with part.1 missing is not a disk with available data", + erasureDisks[tamperedIndex]) } } }) diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 8aadce618..ab7874c16 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -260,7 +260,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // List of disks having latest version of the object er.meta // (by modtime). - latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + latestDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // List of disks having all parts as per latest er.meta. availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object, scanMode) @@ -350,7 +350,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // Latest FileInfo for reference. If a valid metadata is not // present, it is as good as object not found. - latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks) + latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, result.DataBlocks) if err != nil { return result, toObjectErr(err, bucket, object, versionID) } @@ -471,7 +471,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s Algorithm: checksumAlgo, Hash: bitrotWriterSum(writers[i]), }) - if len(inlineBuffers) > 0 { + if len(inlineBuffers) > 0 && inlineBuffers[i] != nil { partsMetadata[i].Data = inlineBuffers[i].Bytes() } else { partsMetadata[i].Data = nil diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index 75c80c3f6..90f4a2770 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -148,11 +148,15 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve return metadataArray, g.Wait() } +// shuffleDisksAndPartsMetadataByIndex this function should be always used by GetObjectNInfo() +// and CompleteMultipartUpload code path, it is not meant to be used with PutObject, +// NewMultipartUpload metadata shuffling. func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo, fi FileInfo) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) { shuffledDisks = make([]StorageAPI, len(disks)) shuffledPartsMetadata = make([]FileInfo, len(disks)) - var inconsistent int distribution := fi.Erasure.Distribution + + var inconsistent int for i, meta := range metaArr { if disks[i] == nil { // Assuming offline drives as inconsistent, @@ -161,6 +165,14 @@ func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo, inconsistent++ continue } + if !meta.IsValid() { + inconsistent++ + continue + } + if len(fi.Data) != len(meta.Data) { + inconsistent++ + continue + } // check if erasure distribution order matches the index // position if this is not correct we discard the disk // and move to collect others @@ -180,18 +192,36 @@ func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo, } // fall back to original distribution based order. - return shuffleDisksAndPartsMetadata(disks, metaArr, distribution) + return shuffleDisksAndPartsMetadata(disks, metaArr, fi) } -// Return shuffled partsMetadata depending on distribution. -func shuffleDisksAndPartsMetadata(disks []StorageAPI, partsMetadata []FileInfo, distribution []int) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) { - if distribution == nil { - return disks, partsMetadata - } +// Return shuffled partsMetadata depending on fi.Distribution. +// additional validation is attempted and invalid metadata is +// automatically skipped only when fi.ModTime is non-zero +// indicating that this is called during read-phase +func shuffleDisksAndPartsMetadata(disks []StorageAPI, partsMetadata []FileInfo, fi FileInfo) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) { shuffledDisks = make([]StorageAPI, len(disks)) shuffledPartsMetadata = make([]FileInfo, len(partsMetadata)) + distribution := fi.Erasure.Distribution + + init := fi.ModTime.IsZero() // Shuffle slice xl metadata for expected distribution. for index := range partsMetadata { + if disks[index] == nil { + continue + } + if !init && !partsMetadata[index].IsValid() { + // Check for parts metadata validity for only + // fi.ModTime is not empty - ModTime is always set, + // if object was ever written previously. + continue + } + if !init && len(fi.Data) != len(partsMetadata[index].Data) { + // Check for length of data parts only when + // fi.ModTime is not empty - ModTime is always set, + // if object was ever written previously. + continue + } blockIndex := distribution[index] shuffledPartsMetadata[blockIndex-1] = partsMetadata[index] shuffledDisks[blockIndex-1] = disks[index] diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index c9a99a6b6..c7bec4db1 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -232,15 +232,17 @@ func (fi FileInfo) ObjectToPartOffset(ctx context.Context, offset int64) (partIn return 0, 0, InvalidRange{} } -func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (xmv FileInfo, e error) { +func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, dataDir string, quorum int) (xmv FileInfo, e error) { metaHashes := make([]string, len(metaArr)) h := sha256.New() for i, meta := range metaArr { - if meta.IsValid() && meta.ModTime.Equal(modTime) { + if meta.IsValid() && meta.ModTime.Equal(modTime) && meta.DataDir == dataDir { for _, part := range meta.Parts { h.Write([]byte(fmt.Sprintf("part.%d", part.Number))) } h.Write([]byte(fmt.Sprintf("%v", meta.Erasure.Distribution))) + // make sure that length of Data is same + h.Write([]byte(fmt.Sprintf("%v", len(meta.Data)))) metaHashes[i] = hex.EncodeToString(h.Sum(nil)) h.Reset() } @@ -278,8 +280,8 @@ func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time. // pickValidFileInfo - picks one valid FileInfo content and returns from a // slice of FileInfo. -func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (xmv FileInfo, e error) { - return findFileInfoInQuorum(ctx, metaArr, modTime, quorum) +func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, dataDir string, quorum int) (xmv FileInfo, e error) { + return findFileInfoInQuorum(ctx, metaArr, modTime, dataDir, quorum) } // writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently. diff --git a/cmd/erasure-metadata_test.go b/cmd/erasure-metadata_test.go index d18eab792..b5bd79cb1 100644 --- a/cmd/erasure-metadata_test.go +++ b/cmd/erasure-metadata_test.go @@ -157,10 +157,11 @@ func TestObjectToPartOffset(t *testing.T) { } func TestFindFileInfoInQuorum(t *testing.T) { - getNFInfo := func(n int, quorum int, t int64) []FileInfo { + getNFInfo := func(n int, quorum int, t int64, dataDir string) []FileInfo { fi := newFileInfo("test", 8, 8) fi.AddObjectPart(1, "etag", 100, 100) fi.ModTime = time.Unix(t, 0) + fi.DataDir = dataDir fis := make([]FileInfo, n) for i := range fis { fis[i] = fi @@ -176,16 +177,19 @@ func TestFindFileInfoInQuorum(t *testing.T) { tests := []struct { fis []FileInfo modTime time.Time + dataDir string expectedErr error }{ { - fis: getNFInfo(16, 16, 1603863445), + fis: getNFInfo(16, 16, 1603863445, "36a21454-a2ca-11eb-bbaa-93a81c686f21"), modTime: time.Unix(1603863445, 0), + dataDir: "36a21454-a2ca-11eb-bbaa-93a81c686f21", expectedErr: nil, }, { - fis: getNFInfo(16, 7, 1603863445), + fis: getNFInfo(16, 7, 1603863445, "36a21454-a2ca-11eb-bbaa-93a81c686f21"), modTime: time.Unix(1603863445, 0), + dataDir: "36a21454-a2ca-11eb-bbaa-93a81c686f21", expectedErr: errErasureReadQuorum, }, } @@ -193,7 +197,7 @@ func TestFindFileInfoInQuorum(t *testing.T) { for _, test := range tests { test := test t.Run("", func(t *testing.T) { - _, err := findFileInfoInQuorum(context.Background(), test.fis, test.modTime, 8) + _, err := findFileInfoInQuorum(context.Background(), test.fis, test.modTime, test.dataDir, 8) if err != test.expectedErr { t.Errorf("Expected %s, got %s", test.expectedErr, err) } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index cc124d819..f78699012 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -66,10 +66,10 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object } // List all online disks. - _, modTime := listOnlineDisks(disks, metaArr, errs) + _, modTime, dataDir := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. - _, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + _, err = pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum) return err } @@ -283,74 +283,63 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec // operation(s) on the object. func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (string, error) { onlineDisks := er.getDisks() - parityBlocks := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass]) - if parityBlocks <= 0 { - parityBlocks = er.defaultParityCount + parityDrives := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass]) + if parityDrives <= 0 { + parityDrives = er.defaultParityCount } - dataBlocks := len(onlineDisks) - parityBlocks - fi := newFileInfo(pathJoin(bucket, object), dataBlocks, parityBlocks) - + dataDrives := len(onlineDisks) - parityDrives // we now know the number of blocks this object needs for data and parity. // establish the writeQuorum using this data - writeQuorum := dataBlocks - if dataBlocks == parityBlocks { + writeQuorum := dataDrives + if dataDrives == parityDrives { writeQuorum++ } - if opts.UserDefined["content-type"] == "" { - contentType := mimedb.TypeByExtension(path.Ext(object)) - opts.UserDefined["content-type"] = contentType - } + // Initialize parts metadata + partsMetadata := make([]FileInfo, len(onlineDisks)) - // Calculate the version to be saved. + fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives) if opts.Versioned { fi.VersionID = opts.VersionID if fi.VersionID == "" { fi.VersionID = mustGetUUID() } } - fi.DataDir = mustGetUUID() - fi.ModTime = UTCNow() - fi.Metadata = cloneMSS(opts.UserDefined) + + // Initialize erasure metadata. + for index := range partsMetadata { + partsMetadata[index] = fi + } + + // Guess content-type from the extension if possible. + if opts.UserDefined["content-type"] == "" { + opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object)) + } + + modTime := opts.MTime + if opts.MTime.IsZero() { + modTime = UTCNow() + } + + onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(onlineDisks, partsMetadata, fi) + + // Fill all the necessary metadata. + // Update `xl.meta` content on each disks. + for index := range partsMetadata { + partsMetadata[index].Metadata = opts.UserDefined + partsMetadata[index].ModTime = modTime + } uploadID := mustGetUUID() uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) - tempUploadIDPath := uploadID - // Delete the tmp path later in case we fail to commit (ignore - // returned errors) - this will be a no-op in case of a commit - // success. - var online int - defer func() { - if online != len(onlineDisks) { - er.deleteObject(context.Background(), minioMetaTmpBucket, tempUploadIDPath, writeQuorum) - } - }() - - var partsMetadata = make([]FileInfo, len(onlineDisks)) - for i := range onlineDisks { - partsMetadata[i] = fi - } - - onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(onlineDisks, partsMetadata, fi.Erasure.Distribution) - - var err error // Write updated `xl.meta` to all disks. - onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, writeQuorum) - if err != nil { - return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath) - } - - // Attempt to rename temp upload object to actual upload path object - _, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, true, writeQuorum, nil) - if err != nil { + if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } - online = countOnlineDisks(onlineDisks) - // Return success. return uploadID, nil } @@ -435,10 +424,10 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } // List all online disks. - onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) + fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return pi, err } @@ -545,10 +534,10 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } // Get current highest version based on re-read partsMetadata. - onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs) + onlineDisks, modTime, dataDir = listOnlineDisks(onlineDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) + fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return pi, err } @@ -633,10 +622,10 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) } - _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + _, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum) + fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, readQuorum) if err != nil { return result, err } @@ -682,10 +671,10 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) } - _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + _, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) + fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return result, err } @@ -787,7 +776,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str return oi, toObjectErr(reducedErr, bucket, object) } - onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) + + // Pick one from the first valid metadata. + fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) + if err != nil { + return oi, err + } // Calculate full object size. var objectSize int64 @@ -795,12 +790,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // Calculate consolidated actual size. var objectActualSize int64 - // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) - if err != nil { - return oi, err - } - // Order online disks in accordance with distribution order. // Order parts metadata in accordance with distribution order. onlineDisks, partsMetadata = shuffleDisksAndPartsMetadataByIndex(onlineDisks, partsMetadata, fi) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index b12b1cc90..b8c734113 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -81,10 +81,10 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d } // List all online disks. - onlineDisks, modTime := listOnlineDisks(storageDisks, metaArr, errs) + onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, metaArr, errs) // Pick latest valid metadata. - fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + fi, err := pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum) if err != nil { return oi, toObjectErr(err, srcBucket, srcObject) } @@ -421,10 +421,10 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s } // List all online disks. - onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs) + onlineDisks, modTime, dataDir := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. - fi, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + fi, err = pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum) if err != nil { return fi, nil, nil, err } @@ -435,7 +435,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s missingBlocks++ continue } - if metaArr[i].IsValid() && metaArr[i].ModTime.Equal(fi.ModTime) { + if metaArr[i].IsValid() && metaArr[i].ModTime.Equal(fi.ModTime) && metaArr[i].DataDir == fi.DataDir { continue } missingBlocks++ @@ -658,7 +658,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st // Order disks according to erasure distribution var onlineDisks []StorageAPI - onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi.Erasure.Distribution) + onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi) erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { @@ -755,6 +755,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } if len(inlineBuffers) > 0 && inlineBuffers[i] != nil { partsMetadata[i].Data = inlineBuffers[i].Bytes() + } else { + partsMetadata[i].Data = nil } partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize()) partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ @@ -783,9 +785,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st partsMetadata[index].Metadata = opts.UserDefined partsMetadata[index].Size = n partsMetadata[index].ModTime = modTime - if len(inlineBuffers) > 0 && inlineBuffers[index] != nil { - partsMetadata[index].Data = inlineBuffers[index].Bytes() - } } // Rename the successfully written temporary object to final location. @@ -1183,10 +1182,10 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s } // List all online disks. - _, modTime := listOnlineDisks(disks, metaArr, errs) + _, modTime, dataDir := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. - fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + fi, err := pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -1234,10 +1233,10 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin } // List all online disks. - _, modTime := listOnlineDisks(disks, metaArr, errs) + _, modTime, dataDir := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. - fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + fi, err := pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) }