diff --git a/cmd/common-main.go b/cmd/common-main.go index f34c9a3b1..d32fd727f 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -113,4 +113,34 @@ func handleCommonEnvVars() { // or is not set to 'off', if MINIO_UPDATE is set to 'off' then // in-place update is off. globalInplaceUpdateDisabled = strings.EqualFold(os.Getenv("MINIO_UPDATE"), "off") + + // Validate and store the storage class env variables only for XL/Dist XL setups + if globalIsXL { + var err error + + // Check for environment variables and parse into storageClass struct + if ssc := os.Getenv(standardStorageClass); ssc != "" { + globalStandardStorageClass, err = parseStorageClass(ssc) + fatalIf(err, "Invalid value set in environment variable %s.", standardStorageClass) + } + + if rrsc := os.Getenv(reducedRedundancyStorageClass); rrsc != "" { + globalRRStorageClass, err = parseStorageClass(rrsc) + fatalIf(err, "Invalid value set in environment variable %s.", reducedRedundancyStorageClass) + } + + // Validation is done after parsing both the storage classes. This is needed because we need one + // storage class value to deduce the correct value of the other storage class. + if globalRRStorageClass.Scheme != "" { + err := validateRRSParity(globalRRStorageClass.Parity, globalStandardStorageClass.Parity) + fatalIf(err, "Invalid value set in environment variable %s.", reducedRedundancyStorageClass) + globalIsStorageClass = true + } + + if globalStandardStorageClass.Scheme != "" { + err := validateSSParity(globalStandardStorageClass.Parity, globalRRStorageClass.Parity) + fatalIf(err, "Invalid value set in environment variable %s.", standardStorageClass) + globalIsStorageClass = true + } + } } diff --git a/cmd/config-current.go b/cmd/config-current.go index a4736a9ff..2946379c7 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io/ioutil" + "strconv" "sync" "github.com/minio/minio/pkg/auth" @@ -36,9 +37,9 @@ import ( // 6. Make changes in config-current_test.go for any test change // Config version -const serverConfigVersion = "21" +const serverConfigVersion = "22" -type serverConfig = serverConfigV21 +type serverConfig = serverConfigV22 var ( // globalServerConfig server config. @@ -103,6 +104,52 @@ func (s *serverConfig) SetBrowser(b bool) { s.Browser = BrowserFlag(b) } +func (s *serverConfig) SetStorageClass(standardClass, rrsClass storageClass) { + s.Lock() + defer s.Unlock() + + // Set the values + s.StorageClass.Standard = standardClass.Scheme + strconv.Itoa(standardClass.Parity) + s.StorageClass.RRS = rrsClass.Scheme + strconv.Itoa(rrsClass.Parity) +} + +func (s *serverConfig) GetStorageClass() (standardStorageClass, rrsStorageClass storageClass) { + s.RLock() + defer s.RUnlock() + + var err error + var ssc storageClass + var rrsc storageClass + + if s.StorageClass.Standard != "" { + // Parse the values read from config file into storageClass struct + ssc, err = parseStorageClass(s.StorageClass.Standard) + fatalIf(err, "Invalid value %s set in config.json", s.StorageClass.Standard) + } + + if s.StorageClass.RRS != "" { + // Parse the values read from config file into storageClass struct + rrsc, err = parseStorageClass(s.StorageClass.RRS) + fatalIf(err, "Invalid value %s set in config.json", s.StorageClass.RRS) + } + + // Validation is done after parsing both the storage classes. This is needed because we need one + // storage class value to deduce the correct value of the other storage class. + if rrsc.Scheme != "" { + err := validateRRSParity(rrsc.Parity, ssc.Parity) + fatalIf(err, "Invalid value %s set in config.json", s.StorageClass.RRS) + globalIsStorageClass = true + } + + if ssc.Scheme != "" { + err := validateSSParity(ssc.Parity, rrsc.Parity) + fatalIf(err, "Invalid value %s set in config.json", s.StorageClass.Standard) + globalIsStorageClass = true + } + + return +} + // GetCredentials get current credentials. func (s *serverConfig) GetBrowser() bool { s.RLock() @@ -175,6 +222,10 @@ func newConfig() error { srvCfg.Domain = globalDomainName } + if globalIsStorageClass { + srvCfg.SetStorageClass(globalStandardStorageClass, globalRRStorageClass) + } + // hold the mutex lock before a new config is assigned. // Save the new config globally. // unlock the mutex. @@ -303,6 +354,10 @@ func loadConfig() error { srvCfg.Domain = globalDomainName } + if globalIsStorageClass { + srvCfg.SetStorageClass(globalStandardStorageClass, globalRRStorageClass) + } + // hold the mutex lock before a new config is assigned. globalServerConfigMu.Lock() globalServerConfig = srvCfg @@ -318,6 +373,9 @@ func loadConfig() error { if !globalIsEnvDomainName { globalDomainName = globalServerConfig.Domain } + if !globalIsStorageClass { + globalStandardStorageClass, globalRRStorageClass = globalServerConfig.GetStorageClass() + } globalServerConfigMu.Unlock() return nil diff --git a/cmd/config-migrate.go b/cmd/config-migrate.go index 337ca7445..36bc17ba9 100644 --- a/cmd/config-migrate.go +++ b/cmd/config-migrate.go @@ -158,6 +158,10 @@ func migrateConfig() error { return err } fallthrough + case "21": + if err = migrateV21ToV22(); err != nil { + return err + } case serverConfigVersion: // No migration needed. this always points to current version. err = nil @@ -1704,3 +1708,110 @@ func migrateV20ToV21() error { log.Printf(configMigrateMSGTemplate, configFile, cv20.Version, srvConfig.Version) return nil } + +func migrateV21ToV22() error { + configFile := getConfigFile() + + cv21 := &serverConfigV21{} + _, err := quick.Load(configFile, cv21) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return fmt.Errorf("Unable to load config version ‘21’. %v", err) + } + if cv21.Version != "21" { + return nil + } + + // Copy over fields from V21 into V22 config struct + srvConfig := &serverConfigV22{ + Notify: ¬ifier{}, + } + srvConfig.Version = serverConfigVersion + srvConfig.Credential = cv21.Credential + srvConfig.Region = cv21.Region + if srvConfig.Region == "" { + // Region needs to be set for AWS Signature Version 4. + srvConfig.Region = globalMinioDefaultRegion + } + + // check and set notifiers config + if len(cv21.Notify.AMQP) == 0 { + srvConfig.Notify.AMQP = make(map[string]amqpNotify) + srvConfig.Notify.AMQP["1"] = amqpNotify{} + } else { + // New deliveryMode parameter is added for AMQP, + // default value is already 0, so nothing to + // explicitly migrate here. + srvConfig.Notify.AMQP = cv21.Notify.AMQP + } + if len(cv21.Notify.ElasticSearch) == 0 { + srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify) + srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{ + Format: formatNamespace, + } + } else { + srvConfig.Notify.ElasticSearch = cv21.Notify.ElasticSearch + } + if len(cv21.Notify.Redis) == 0 { + srvConfig.Notify.Redis = make(map[string]redisNotify) + srvConfig.Notify.Redis["1"] = redisNotify{ + Format: formatNamespace, + } + } else { + srvConfig.Notify.Redis = cv21.Notify.Redis + } + if len(cv21.Notify.PostgreSQL) == 0 { + srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify) + srvConfig.Notify.PostgreSQL["1"] = postgreSQLNotify{ + Format: formatNamespace, + } + } else { + srvConfig.Notify.PostgreSQL = cv21.Notify.PostgreSQL + } + if len(cv21.Notify.Kafka) == 0 { + srvConfig.Notify.Kafka = make(map[string]kafkaNotify) + srvConfig.Notify.Kafka["1"] = kafkaNotify{} + } else { + srvConfig.Notify.Kafka = cv21.Notify.Kafka + } + if len(cv21.Notify.NATS) == 0 { + srvConfig.Notify.NATS = make(map[string]natsNotify) + srvConfig.Notify.NATS["1"] = natsNotify{} + } else { + srvConfig.Notify.NATS = cv21.Notify.NATS + } + if len(cv21.Notify.Webhook) == 0 { + srvConfig.Notify.Webhook = make(map[string]webhookNotify) + srvConfig.Notify.Webhook["1"] = webhookNotify{} + } else { + srvConfig.Notify.Webhook = cv21.Notify.Webhook + } + if len(cv21.Notify.MySQL) == 0 { + srvConfig.Notify.MySQL = make(map[string]mySQLNotify) + srvConfig.Notify.MySQL["1"] = mySQLNotify{ + Format: formatNamespace, + } + } else { + srvConfig.Notify.MySQL = cv21.Notify.MySQL + } + if len(cv21.Notify.MQTT) == 0 { + srvConfig.Notify.MQTT = make(map[string]mqttNotify) + srvConfig.Notify.MQTT["1"] = mqttNotify{} + } else { + srvConfig.Notify.MQTT = cv21.Notify.MQTT + } + + // Load browser config from existing config in the file. + srvConfig.Browser = cv21.Browser + + // Load domain config from existing config in the file. + srvConfig.Domain = cv21.Domain + + if err = quick.Save(configFile, srvConfig); err != nil { + return fmt.Errorf("Failed to migrate config from ‘%s’ to ‘%s’. %v", cv21.Version, srvConfig.Version, err) + } + + log.Printf(configMigrateMSGTemplate, configFile, cv21.Version, srvConfig.Version) + return nil +} diff --git a/cmd/config-versions.go b/cmd/config-versions.go index 2db1d8612..6fa551dcb 100644 --- a/cmd/config-versions.go +++ b/cmd/config-versions.go @@ -530,3 +530,22 @@ type serverConfigV21 struct { // Notification queue configuration. Notify *notifier `json:"notify"` } + +// serverConfigV22 is just like version '21' with added support +// for StorageClass +type serverConfigV22 struct { + sync.RWMutex + Version string `json:"version"` + + // S3 API configuration. + Credential auth.Credentials `json:"credential"` + Region string `json:"region"` + Browser BrowserFlag `json:"browser"` + Domain string `json:"domain"` + + // Storage class configuration + StorageClass storageClassConfig `json:"storageclass"` + + // Notification queue configuration. + Notify *notifier `json:"notify"` +} diff --git a/cmd/globals.go b/cmd/globals.go index 575a15ac5..889510ed1 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -149,7 +149,6 @@ var ( globalIsEnvDomainName bool globalDomainName string // Root domain for virtual host style requests - // Add new variable global values here. globalListingTimeout = newDynamicTimeout( /*30*/ 600*time.Second /*5*/, 600*time.Second) // timeout for listing related ops globalObjectTimeout = newDynamicTimeout( /*1*/ 10*time.Minute /*10*/, 600*time.Second) // timeout for Object API related ops @@ -158,6 +157,16 @@ var ( // Keep connection active for clients actively using ListenBucketNotification. globalSNSConnAlive = 5 * time.Second // Send a whitespace every 5 seconds. + + // Storage classes + // Set to indicate if storage class is set up + globalIsStorageClass bool + // Set to store reduced redundancy storage class + globalRRStorageClass storageClass + // Set to store standard storage class + globalStandardStorageClass storageClass + + // Add new variable global values here. ) // global colors. diff --git a/cmd/handler-utils.go b/cmd/handler-utils.go index 1a7d7f80f..7dd137798 100644 --- a/cmd/handler-utils.go +++ b/cmd/handler-utils.go @@ -60,6 +60,7 @@ var supportedHeaders = []string{ "cache-control", "content-encoding", "content-disposition", + amzStorageClass, // Add more supported headers here. } @@ -116,7 +117,8 @@ func extractMetadataFromHeader(header http.Header) (map[string]string, error) { return nil, errors.Trace(errInvalidArgument) } metadata := make(map[string]string) - // Save standard supported headers. + + // Save all supported headers. for _, supportedHeader := range supportedHeaders { canonicalHeader := http.CanonicalHeaderKey(supportedHeader) // HTTP headers are case insensitive, look for both canonical @@ -127,6 +129,7 @@ func extractMetadataFromHeader(header http.Header) (map[string]string, error) { metadata[supportedHeader] = header.Get(supportedHeader) } } + // Go through all other headers for any additional headers that needs to be saved. for key := range header { if key != http.CanonicalHeaderKey(key) { diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 5f892e930..1f6e9f957 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -43,10 +43,10 @@ type StorageInfo struct { Type BackendType // Following fields are only meaningful if BackendType is Erasure. - OnlineDisks int // Online disks during server startup. - OfflineDisks int // Offline disks during server startup. - ReadQuorum int // Minimum disks required for successful read operations. - WriteQuorum int // Minimum disks required for successful write operations. + OnlineDisks int // Online disks during server startup. + OfflineDisks int // Offline disks during server startup. + standardSCParity int // Parity disks for currently configured Standard storage class. + rrSCParity int // Parity disks for currently configured Reduced Redundancy storage class. } } diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index 981a0b738..f1a52eb12 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -238,7 +238,8 @@ func testPutObjectPartDiskNotFound(obj ObjectLayer, instanceType string, disks [ if err == nil { t.Fatalf("Test %s: expected to fail but passed instead", instanceType) } - expectedErr := InsufficientWriteQuorum{} + // as majority of xl.json are not available, we expect InsufficientReadQuorum while trying to fetch the object quorum + expectedErr := InsufficientReadQuorum{} if err.Error() != expectedErr.Error() { t.Fatalf("Test %s: expected error %s, got %s instead.", instanceType, expectedErr, err) } diff --git a/cmd/server-startup-msg.go b/cmd/server-startup-msg.go index 7176d62dd..e5da23b0c 100644 --- a/cmd/server-startup-msg.go +++ b/cmd/server-startup-msg.go @@ -47,6 +47,16 @@ func printStartupMessage(apiEndPoints []string) { strippedAPIEndpoints := stripStandardPorts(apiEndPoints) + // Object layer is initialized then print StorageInfo. + objAPI := newObjectLayerFn() + if objAPI != nil { + printStorageInfo(objAPI.StorageInfo()) + // Storage class info only printed for Erasure backend + if objAPI.StorageInfo().Backend.Type == Erasure { + printStorageClassInfoMsg(objAPI.StorageInfo()) + } + } + // Prints credential, region and browser access. printServerCommonMsg(strippedAPIEndpoints) @@ -57,12 +67,6 @@ func printStartupMessage(apiEndPoints []string) { // Prints documentation message. printObjectAPIMsg() - // Object layer is initialized then print StorageInfo. - objAPI := newObjectLayerFn() - if objAPI != nil { - printStorageInfo(objAPI.StorageInfo()) - } - // SSL is configured reads certification chain, prints // authority and expiry. if globalIsSSL { @@ -173,18 +177,42 @@ func getStorageInfoMsg(storageInfo StorageInfo) string { humanize.IBytes(uint64(storageInfo.Total))) if storageInfo.Backend.Type == Erasure { diskInfo := fmt.Sprintf(" %d Online, %d Offline. ", storageInfo.Backend.OnlineDisks, storageInfo.Backend.OfflineDisks) - if maxDiskFailures := storageInfo.Backend.ReadQuorum - storageInfo.Backend.OfflineDisks; maxDiskFailures >= 0 { - diskInfo += fmt.Sprintf("We can withstand [%d] drive failure(s).", maxDiskFailures) - } msg += colorBlue("\nStatus:") + fmt.Sprintf(getFormatStr(len(diskInfo), 8), diskInfo) } return msg } +func printStorageClassInfoMsg(storageInfo StorageInfo) { + standardClassMsg := getStandardStorageClassInfoMsg(storageInfo) + rrsClassMsg := getRRSStorageClassInfoMsg(storageInfo) + storageClassMsg := fmt.Sprintf(getFormatStr(len(standardClassMsg), 3), standardClassMsg) + fmt.Sprintf(getFormatStr(len(rrsClassMsg), 3), rrsClassMsg) + // Print storage class section only if data is present + if storageClassMsg != "" { + log.Println(colorBlue("Storage Class:")) + log.Println(storageClassMsg) + } +} + +func getStandardStorageClassInfoMsg(storageInfo StorageInfo) string { + var msg string + if maxDiskFailures := storageInfo.Backend.standardSCParity - storageInfo.Backend.OfflineDisks; maxDiskFailures >= 0 { + msg += fmt.Sprintf("Objects with Standard class can withstand [%d] drive failure(s).\n", maxDiskFailures) + } + return msg +} + +func getRRSStorageClassInfoMsg(storageInfo StorageInfo) string { + var msg string + if maxDiskFailures := storageInfo.Backend.rrSCParity - storageInfo.Backend.OfflineDisks; maxDiskFailures >= 0 { + msg += fmt.Sprintf("Objects with Reduced Redundancy class can withstand [%d] drive failure(s).\n", maxDiskFailures) + } + return msg +} + // Prints startup message of storage capacity and erasure information. func printStorageInfo(storageInfo StorageInfo) { - log.Println() log.Println(getStorageInfoMsg(storageInfo)) + log.Println() } // Prints certificate expiry date warning diff --git a/cmd/server-startup-msg_test.go b/cmd/server-startup-msg_test.go index aebfbaa26..5ee36663b 100644 --- a/cmd/server-startup-msg_test.go +++ b/cmd/server-startup-msg_test.go @@ -35,11 +35,11 @@ func TestStorageInfoMsg(t *testing.T) { Total: 10 * humanize.GiByte, Free: 2 * humanize.GiByte, Backend: struct { - Type BackendType - OnlineDisks int - OfflineDisks int - ReadQuorum int - WriteQuorum int + Type BackendType + OnlineDisks int + OfflineDisks int + standardSCParity int + rrSCParity int }{Erasure, 7, 1, 4, 5}, } @@ -155,3 +155,97 @@ func TestPrintStartupMessage(t *testing.T) { apiEndpoints := []string{"http://127.0.0.1:9000"} printStartupMessage(apiEndpoints) } + +func TestGetStandardStorageClassInfoMsg(t *testing.T) { + tests := []struct { + name string + args StorageInfo + want string + }{ + {"1", StorageInfo{ + Total: 20 * humanize.GiByte, + Free: 2 * humanize.GiByte, + Backend: struct { + Type BackendType + OnlineDisks int + OfflineDisks int + standardSCParity int + rrSCParity int + }{Erasure, 15, 1, 5, 3}, + }, "Objects with Standard class can withstand [4] drive failure(s).\n"}, + {"2", StorageInfo{ + Total: 30 * humanize.GiByte, + Free: 3 * humanize.GiByte, + Backend: struct { + Type BackendType + OnlineDisks int + OfflineDisks int + standardSCParity int + rrSCParity int + }{Erasure, 10, 0, 5, 3}, + }, "Objects with Standard class can withstand [5] drive failure(s).\n"}, + {"3", StorageInfo{ + Total: 15 * humanize.GiByte, + Free: 2 * humanize.GiByte, + Backend: struct { + Type BackendType + OnlineDisks int + OfflineDisks int + standardSCParity int + rrSCParity int + }{Erasure, 12, 3, 6, 2}, + }, "Objects with Standard class can withstand [3] drive failure(s).\n"}, + } + for _, tt := range tests { + if got := getStandardStorageClassInfoMsg(tt.args); got != tt.want { + t.Errorf("Test %s failed, expected %v, got %v", tt.name, tt.want, got) + } + } +} + +func TestGetRRSStorageClassInfoMsg(t *testing.T) { + tests := []struct { + name string + args StorageInfo + want string + }{ + {"1", StorageInfo{ + Total: 20 * humanize.GiByte, + Free: 2 * humanize.GiByte, + Backend: struct { + Type BackendType + OnlineDisks int + OfflineDisks int + standardSCParity int + rrSCParity int + }{Erasure, 15, 1, 5, 3}, + }, "Objects with Reduced Redundancy class can withstand [2] drive failure(s).\n"}, + {"2", StorageInfo{ + Total: 30 * humanize.GiByte, + Free: 3 * humanize.GiByte, + Backend: struct { + Type BackendType + OnlineDisks int + OfflineDisks int + standardSCParity int + rrSCParity int + }{Erasure, 16, 0, 5, 3}, + }, "Objects with Reduced Redundancy class can withstand [3] drive failure(s).\n"}, + {"3", StorageInfo{ + Total: 15 * humanize.GiByte, + Free: 2 * humanize.GiByte, + Backend: struct { + Type BackendType + OnlineDisks int + OfflineDisks int + standardSCParity int + rrSCParity int + }{Erasure, 12, 3, 6, 5}, + }, "Objects with Reduced Redundancy class can withstand [2] drive failure(s).\n"}, + } + for _, tt := range tests { + if got := getRRSStorageClassInfoMsg(tt.args); got != tt.want { + t.Errorf("Test %s failed, expected %v, got %v", tt.name, tt.want, got) + } + } +} diff --git a/cmd/storage-class.go b/cmd/storage-class.go new file mode 100644 index 000000000..34dc5c8f0 --- /dev/null +++ b/cmd/storage-class.go @@ -0,0 +1,187 @@ +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "errors" + "fmt" + "strconv" + "strings" +) + +const ( + // metadata entry for storage class + amzStorageClass = "x-amz-storage-class" + // Reduced redundancy storage class + reducedRedundancyStorageClass = "MINIO_STORAGE_CLASS_RRS" + // Standard storage class + standardStorageClass = "MINIO_STORAGE_CLASS_STANDARD" + // Supported storage class scheme is EC + supportedStorageClassScheme = "EC" + // Minimum parity disks + minimumParityDisks = 2 + defaultRRSParity = 2 +) + +// Struct to hold storage class +type storageClass struct { + Scheme string + Parity int +} + +type storageClassConfig struct { + Standard string `json:"standard"` + RRS string `json:"rrs"` +} + +// Parses given storageClassEnv and returns a storageClass structure. +// Supported Storage Class format is "Scheme:Number of parity disks". +// Currently only supported scheme is "EC". +func parseStorageClass(storageClassEnv string) (sc storageClass, err error) { + s := strings.Split(storageClassEnv, ":") + + // only two elements allowed in the string - "scheme" and "number of parity disks" + if len(s) > 2 { + return storageClass{}, errors.New("Too many sections in " + storageClassEnv) + } else if len(s) < 2 { + return storageClass{}, errors.New("Too few sections in " + storageClassEnv) + } + + // only allowed scheme is "EC" + if s[0] != supportedStorageClassScheme { + return storageClass{}, errors.New("Unsupported scheme " + s[0] + ". Supported scheme is EC") + } + + // Number of parity disks should be integer + parityDisks, err := strconv.Atoi(s[1]) + if err != nil { + return storageClass{}, err + } + + sc = storageClass{ + Scheme: s[0], + Parity: parityDisks, + } + + return sc, nil +} + +// Validates the parity disks for Reduced Redundancy storage class +func validateRRSParity(rrsParity, ssParity int) (err error) { + + // Reduced redundancy storage class is not supported for 4 disks erasure coded setup. + if len(globalEndpoints) == 4 && rrsParity != 0 { + return fmt.Errorf("Reduced redundancy storage class not supported for " + strconv.Itoa(len(globalEndpoints)) + " disk setup") + } + + // RRS parity disks should be greater than or equal to minimumParityDisks. Parity below minimumParityDisks is not recommended. + if rrsParity < minimumParityDisks { + return fmt.Errorf("Reduced redundancy storage class parity should be greater than or equal to " + strconv.Itoa(minimumParityDisks)) + } + + // Reduced redundancy implies lesser parity than standard storage class. So, RRS parity disks should be + // - less than N/2, if StorageClass parity is not set. + // - less than StorageClass Parity, if Storage class parity is set. + switch ssParity { + case 0: + if rrsParity >= len(globalEndpoints)/2 { + return fmt.Errorf("Reduced redundancy storage class parity disks should be less than " + strconv.Itoa(len(globalEndpoints)/2)) + } + default: + if rrsParity >= ssParity { + return fmt.Errorf("Reduced redundancy storage class parity disks should be less than " + strconv.Itoa(ssParity)) + } + } + + return nil +} + +// Validates the parity disks for Standard storage class +func validateSSParity(ssParity, rrsParity int) (err error) { + + // Standard storage class implies more parity than Reduced redundancy storage class. So, Standard storage parity disks should be + // - greater than or equal to 2, if RRS parity is not set. + // - greater than RRS Parity, if RRS parity is set. + switch rrsParity { + case 0: + if ssParity < minimumParityDisks { + return fmt.Errorf("Standard storage class parity disks should be greater than or equal to " + strconv.Itoa(minimumParityDisks)) + } + default: + if ssParity <= rrsParity { + return fmt.Errorf("Standard storage class parity disks should be greater than " + strconv.Itoa(rrsParity)) + } + } + + // Standard storage class parity should be less than or equal to N/2 + if ssParity > len(globalEndpoints)/2 { + return fmt.Errorf("Standard storage class parity disks should be less than or equal to " + strconv.Itoa(len(globalEndpoints)/2)) + } + + return nil +} + +// Returns the data and parity drive count based on storage class +// If storage class is set using the env vars MINIO_STORAGE_CLASS_RRS and MINIO_STORAGE_CLASS_STANDARD +// -- corresponding values are returned +// If storage class is not set using environment variables, default values are returned +// -- Default for Reduced Redundancy Storage class is, parity = 2 and data = N-Parity +// -- Default for Standard Storage class is, parity = N/2, data = N/2 +// If storage class is not present in metadata, default value is data = N/2, parity = N/2 +func getDrivesCount(sc string, disks []StorageAPI) (data, parity int) { + totalDisks := len(disks) + parity = totalDisks / 2 + switch sc { + case reducedRedundancyStorageClass: + if globalRRStorageClass.Parity != 0 { + // set the rrs parity if available + parity = globalRRStorageClass.Parity + } else { + // else fall back to default value + parity = defaultRRSParity + } + case standardStorageClass: + if globalStandardStorageClass.Parity != 0 { + // set the standard parity if available + parity = globalStandardStorageClass.Parity + } + } + // data is always totalDisks - parity + return totalDisks - parity, parity +} + +// Returns per object readQuorum and writeQuorum +// readQuorum is the minimum required disks to read data. +// writeQuorum is the minimum required disks to write data. +func objectQuorumFromMeta(xl xlObjects, partsMetaData []xlMetaV1, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) { + + // get the latest updated Metadata and a count of all the latest updated xlMeta(s) + latestXLMeta, count := getLatestXLMeta(partsMetaData, errs) + + // latestXLMeta is updated most recently. + // We implicitly assume that all the xlMeta(s) have same dataBlocks and parityBlocks. + // We now check that at least dataBlocks number of xlMeta is available. This means count + // should be greater than or equal to dataBlocks field of latestXLMeta. If not we throw read quorum error. + if count < latestXLMeta.Erasure.DataBlocks { + // This is the case when we can't reliably deduce object quorum + return 0, 0, errXLReadQuorum + } + + // Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks + // from latestXLMeta to get the quorum + return latestXLMeta.Erasure.DataBlocks, latestXLMeta.Erasure.DataBlocks + 1, nil +} diff --git a/cmd/storage-class_test.go b/cmd/storage-class_test.go new file mode 100644 index 000000000..d31450483 --- /dev/null +++ b/cmd/storage-class_test.go @@ -0,0 +1,355 @@ +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "errors" + "reflect" + "testing" +) + +func TestParseStorageClass(t *testing.T) { + ExecObjectLayerTest(t, testParseStorageClass) +} + +func testParseStorageClass(obj ObjectLayer, instanceType string, t TestErrHandler) { + tests := []struct { + name int + storageClassEnv string + wantSc storageClass + expectedError error + }{ + {1, "EC:3", storageClass{ + Scheme: "EC", + Parity: 3}, + nil}, + {2, "EC:4", storageClass{ + Scheme: "EC", + Parity: 4}, + nil}, + {3, "AB:4", storageClass{ + Scheme: "EC", + Parity: 4}, + errors.New("Unsupported scheme AB. Supported scheme is EC")}, + {4, "EC:4:5", storageClass{ + Scheme: "EC", + Parity: 4}, + errors.New("Too many sections in EC:4:5")}, + {5, "AB", storageClass{ + Scheme: "EC", + Parity: 4}, + errors.New("Too few sections in AB")}, + } + for _, tt := range tests { + gotSc, err := parseStorageClass(tt.storageClassEnv) + if err != nil && tt.expectedError == nil { + t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err) + return + } + if err == nil && tt.expectedError != nil { + t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err) + return + } + if tt.expectedError == nil && !reflect.DeepEqual(gotSc, tt.wantSc) { + t.Errorf("Test %d, Expected %v, got %v", tt.name, tt.wantSc, gotSc) + return + } + if tt.expectedError != nil && !reflect.DeepEqual(err, tt.expectedError) { + t.Errorf("Test %d, Expected %v, got %v", tt.name, tt.expectedError, err) + } + } +} + +func TestValidateRRSParity(t *testing.T) { + ExecObjectLayerTestWithDirs(t, testValidateRRSParity) +} + +func testValidateRRSParity(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) { + // Reset global storage class flags + resetGlobalStorageEnvs() + // Set globalEndpoints for a single node XL setup. + globalEndpoints = mustGetNewEndpointList(dirs...) + + tests := []struct { + name int + rrsParity int + ssParity int + expectedError error + }{ + {1, 2, 4, nil}, + {2, 1, 4, errors.New("Reduced redundancy storage class parity should be greater than or equal to 2")}, + {3, 7, 6, errors.New("Reduced redundancy storage class parity disks should be less than 6")}, + {4, 9, 0, errors.New("Reduced redundancy storage class parity disks should be less than 8")}, + {5, 3, 3, errors.New("Reduced redundancy storage class parity disks should be less than 3")}, + } + for _, tt := range tests { + err := validateRRSParity(tt.rrsParity, tt.ssParity) + if err != nil && tt.expectedError == nil { + t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err) + return + } + if err == nil && tt.expectedError != nil { + t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err) + return + } + if tt.expectedError != nil && !reflect.DeepEqual(err, tt.expectedError) { + t.Errorf("Test %d, Expected %v, got %v", tt.name, tt.expectedError, err) + } + } +} + +func TestValidateSSParity(t *testing.T) { + ExecObjectLayerTestWithDirs(t, testValidateSSParity) +} + +func testValidateSSParity(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) { + // Reset global storage class flags + resetGlobalStorageEnvs() + // Set globalEndpoints for a single node XL setup. + globalEndpoints = mustGetNewEndpointList(dirs...) + + tests := []struct { + name int + ssParity int + rrsParity int + expectedError error + }{ + {1, 4, 2, nil}, + {2, 6, 5, nil}, + {3, 1, 0, errors.New("Standard storage class parity disks should be greater than or equal to 2")}, + {4, 4, 6, errors.New("Standard storage class parity disks should be greater than 6")}, + {5, 9, 0, errors.New("Standard storage class parity disks should be less than or equal to 8")}, + {6, 3, 3, errors.New("Standard storage class parity disks should be greater than 3")}, + } + for _, tt := range tests { + err := validateSSParity(tt.ssParity, tt.rrsParity) + if err != nil && tt.expectedError == nil { + t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err) + return + } + if err == nil && tt.expectedError != nil { + t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err) + return + } + if tt.expectedError != nil && !reflect.DeepEqual(err, tt.expectedError) { + t.Errorf("Test %d, Expected %v, got %v", tt.name, tt.expectedError, err) + } + } +} + +func TestGetDrivesCount(t *testing.T) { + ExecObjectLayerTestWithDirs(t, testGetDrivesCount) +} + +func testGetDrivesCount(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) { + // Reset global storage class flags + resetGlobalStorageEnvs() + xl := obj.(*xlObjects) + + tests := []struct { + name int + sc string + disks []StorageAPI + expectedData int + expectedParity int + }{ + {1, reducedRedundancyStorageClass, xl.storageDisks, 14, 2}, + {2, standardStorageClass, xl.storageDisks, 8, 8}, + {3, "", xl.storageDisks, 8, 8}, + {4, reducedRedundancyStorageClass, xl.storageDisks, 9, 7}, + {5, standardStorageClass, xl.storageDisks, 10, 6}, + } + for _, tt := range tests { + // Set env var for test case 4 + if tt.name == 4 { + globalRRStorageClass.Parity = 7 + } + // Set env var for test case 5 + if tt.name == 5 { + globalStandardStorageClass.Parity = 6 + } + data, parity := getDrivesCount(tt.sc, tt.disks) + if data != tt.expectedData { + t.Errorf("Test %d, Expected data disks %d, got %d", tt.name, tt.expectedData, data) + return + } + if parity != tt.expectedParity { + t.Errorf("Test %d, Expected parity disks %d, got %d", tt.name, tt.expectedParity, parity) + return + } + } +} + +func TestObjectQuorumFromMeta(t *testing.T) { + ExecObjectLayerTestWithDirs(t, testObjectQuorumFromMeta) +} + +func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) { + // Reset global storage class flags + resetGlobalStorageEnvs() + bucket := getRandomBucketName() + + // make data with more than one part + partCount := 3 + data := bytes.Repeat([]byte("a"), int(globalPutPartSize)*partCount) + xl := obj.(*xlObjects) + xlDisks := xl.storageDisks + + err := obj.MakeBucketWithLocation(bucket, globalMinioDefaultRegion) + if err != nil { + t.Fatalf("Failed to make a bucket %v", err) + } + + // Object for test case 1 - No StorageClass defined, no MetaData in PutObject + object1 := "object1" + _, err = obj.PutObject(bucket, object1, mustGetHashReader(t, bytes.NewReader(data), int64(len(data)), "", ""), nil) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts1, errs1 := readAllXLMetadata(xlDisks, bucket, object1) + + // Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class + object2 := "object2" + metadata2 := make(map[string]string) + metadata2["x-amz-storage-class"] = reducedRedundancyStorageClass + _, err = obj.PutObject(bucket, object2, mustGetHashReader(t, bytes.NewReader(data), int64(len(data)), "", ""), metadata2) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts2, errs2 := readAllXLMetadata(xlDisks, bucket, object2) + + // Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class + object3 := "object3" + metadata3 := make(map[string]string) + metadata3["x-amz-storage-class"] = standardStorageClass + _, err = obj.PutObject(bucket, object3, mustGetHashReader(t, bytes.NewReader(data), int64(len(data)), "", ""), metadata3) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts3, errs3 := readAllXLMetadata(xlDisks, bucket, object3) + + // Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class + object4 := "object4" + metadata4 := make(map[string]string) + metadata4["x-amz-storage-class"] = standardStorageClass + globalStandardStorageClass = storageClass{ + Parity: 6, + Scheme: "EC", + } + + _, err = obj.PutObject(bucket, object4, mustGetHashReader(t, bytes.NewReader(data), int64(len(data)), "", ""), metadata4) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts4, errs4 := readAllXLMetadata(xlDisks, bucket, object4) + + // Object for test case 5 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting RRS Class + // Reset global storage class flags + resetGlobalStorageEnvs() + object5 := "object5" + metadata5 := make(map[string]string) + metadata5["x-amz-storage-class"] = reducedRedundancyStorageClass + globalRRStorageClass = storageClass{ + Parity: 2, + Scheme: "EC", + } + + _, err = obj.PutObject(bucket, object5, mustGetHashReader(t, bytes.NewReader(data), int64(len(data)), "", ""), metadata5) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts5, errs5 := readAllXLMetadata(xlDisks, bucket, object5) + + // Object for test case 6 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting Standard Storage Class + // Reset global storage class flags + resetGlobalStorageEnvs() + object6 := "object6" + metadata6 := make(map[string]string) + metadata6["x-amz-storage-class"] = standardStorageClass + globalRRStorageClass = storageClass{ + Parity: 2, + Scheme: "EC", + } + + _, err = obj.PutObject(bucket, object6, mustGetHashReader(t, bytes.NewReader(data), int64(len(data)), "", ""), metadata6) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts6, errs6 := readAllXLMetadata(xlDisks, bucket, object6) + + // Object for test case 7 - Standard StorageClass defined as Parity 5, MetaData in PutObject requesting RRS Class + // Reset global storage class flags + resetGlobalStorageEnvs() + object7 := "object7" + metadata7 := make(map[string]string) + metadata7["x-amz-storage-class"] = reducedRedundancyStorageClass + globalStandardStorageClass = storageClass{ + Parity: 5, + Scheme: "EC", + } + + _, err = obj.PutObject(bucket, object7, mustGetHashReader(t, bytes.NewReader(data), int64(len(data)), "", ""), metadata7) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + parts7, errs7 := readAllXLMetadata(xlDisks, bucket, object7) + + tests := []struct { + name int + xl xlObjects + parts []xlMetaV1 + errs []error + expectedReadQuorum int + expectedWriteQuorum int + expectedError error + }{ + {1, *xl, parts1, errs1, 8, 9, nil}, + {2, *xl, parts2, errs2, 14, 15, nil}, + {3, *xl, parts3, errs3, 8, 9, nil}, + {4, *xl, parts4, errs4, 10, 11, nil}, + {5, *xl, parts5, errs5, 14, 15, nil}, + {6, *xl, parts6, errs6, 8, 9, nil}, + {7, *xl, parts7, errs7, 14, 15, nil}, + } + for _, tt := range tests { + actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(tt.xl, tt.parts, tt.errs) + if tt.expectedError != nil && err == nil { + t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err) + return + } + if tt.expectedError == nil && err != nil { + t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err) + return + } + if tt.expectedReadQuorum != actualReadQuorum { + t.Errorf("Test %d, Expected Read Quorum %d, got %d", tt.name, tt.expectedReadQuorum, actualReadQuorum) + return + } + if tt.expectedWriteQuorum != actualWriteQuorum { + t.Errorf("Test %d, Expected Write Quorum %d, got %d", tt.name, tt.expectedWriteQuorum, actualWriteQuorum) + return + } + } +} diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 4e844516a..4a59d480d 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -489,6 +489,12 @@ func resetGlobalIsEnvs() { globalIsEnvCreds = false globalIsEnvBrowser = false globalIsEnvRegion = false + globalIsStorageClass = false +} + +func resetGlobalStorageEnvs() { + globalStandardStorageClass = storageClass{} + globalRRStorageClass = storageClass{} } // Resets all the globals used modified in tests. @@ -510,6 +516,8 @@ func resetTestGlobals() { resetGlobalIsXL() // Reset global isEnvCreds flag. resetGlobalIsEnvs() + // Reset global storage class flags + resetGlobalStorageEnvs() } // Configure the server for the test run. @@ -1968,6 +1976,9 @@ type objAPITestType func(obj ObjectLayer, instanceType string, bucketName string // Regular object test type. type objTestType func(obj ObjectLayer, instanceType string, t TestErrHandler) +// Special test type for test with directories +type objTestTypeWithDirs func(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) + // Special object test type for disk not found situations. type objTestDiskNotFoundType func(obj ObjectLayer, instanceType string, dirs []string, t *testing.T) @@ -1999,6 +2010,31 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) { defer removeRoots(append(fsDirs, fsDir)) } +// ExecObjectLayerTestWithDirs - executes object layer tests. +// Creates single node and XL ObjectLayer instance and runs test for both the layers. +func ExecObjectLayerTestWithDirs(t TestErrHandler, objTest objTestTypeWithDirs) { + // initialize the server and obtain the credentials and root. + // credentials are necessary to sign the HTTP request. + rootPath, err := newTestConfig(globalMinioDefaultRegion) + if err != nil { + t.Fatal("Unexpected error", err) + } + defer os.RemoveAll(rootPath) + + objLayer, fsDir, err := prepareFS() + if err != nil { + t.Fatalf("Initialization of object layer failed for single node setup: %s", err) + } + + objLayer, fsDirs, err := prepareXL() + if err != nil { + t.Fatalf("Initialization of object layer failed for XL setup: %s", err) + } + // Executing the object layer tests for XL. + objTest(objLayer, XLTestStr, fsDirs, t) + defer removeRoots(append(fsDirs, fsDir)) +} + // ExecObjectLayerDiskAlteredTest - executes object layer tests while altering // disks in between tests. Creates XL ObjectLayer instance and runs test for XL layer. func ExecObjectLayerDiskAlteredTest(t *testing.T, objTest objTestDiskNotFoundType) { diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index e3d69670b..b780de09d 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -64,7 +64,8 @@ func (xl xlObjects) MakeBucketWithLocation(bucket, location string) error { // Wait for all make vol to finish. wg.Wait() - err := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum) + writeQuorum := len(xl.storageDisks)/2 + 1 + err := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, writeQuorum) if errors.Cause(err) == errXLWriteQuorum { // Purge successfully created buckets if we don't have writeQuorum. undoMakeBucket(xl.storageDisks, bucket) @@ -142,7 +143,8 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err // reduce to one error based on read quorum. // `nil` is deliberately passed for ignoredErrs // because these errors were already ignored. - return BucketInfo{}, reduceReadQuorumErrs(bucketErrs, nil, xl.readQuorum) + readQuorum := len(xl.storageDisks) / 2 + return BucketInfo{}, reduceReadQuorumErrs(bucketErrs, nil, readQuorum) } // GetBucketInfo - returns BucketInfo for a bucket. @@ -251,7 +253,8 @@ func (xl xlObjects) DeleteBucket(bucket string) error { // Wait for all the delete vols to finish. wg.Wait() - err := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, xl.writeQuorum) + writeQuorum := len(xl.storageDisks)/2 + 1 + err := reduceWriteQuorumErrs(dErrs, bucketOpIgnoredErrs, writeQuorum) if errors.Cause(err) == errXLWriteQuorum { xl.undoDeleteBucket(bucket) } diff --git a/cmd/xl-v1-healing-common.go b/cmd/xl-v1-healing-common.go index b88a468a8..5e0e964ef 100644 --- a/cmd/xl-v1-healing-common.go +++ b/cmd/xl-v1-healing-common.go @@ -121,6 +121,29 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) return onlineDisks, modTime } +// Returns one of the latest updated xlMeta files and count of total valid xlMeta(s) updated latest +func getLatestXLMeta(partsMetadata []xlMetaV1, errs []error) (xlMetaV1, int) { + // List all the file commit ids from parts metadata. + modTimes := listObjectModtimes(partsMetadata, errs) + + // Count all lastest updated xlMeta values + var count int + var latestXLMeta xlMetaV1 + + // Reduce list of UUIDs to a single common value - i.e. the last updated Time + modTime, _ := commonTime(modTimes) + + // Interate through all the modTimes and count the xlMeta(s) with latest time. + for index, t := range modTimes { + if t == modTime && partsMetadata[index].IsValid() { + latestXLMeta = partsMetadata[index] + count++ + } + } + // Return one of the latest xlMetaData, and the count of lastest updated xlMeta files + return latestXLMeta, count +} + // outDatedDisks - return disks which don't have the latest object (i.e xl.json). // disks that are offline are not 'marked' outdated. func outDatedDisks(disks, latestDisks []StorageAPI, errs []error, partsMetadata []xlMetaV1, @@ -184,7 +207,11 @@ func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealObject // Less than quorum erasure coded blocks of the object have the same create time. // This object can't be healed with the information we have. modTime, count := commonTime(listObjectModtimes(partsMetadata, errs)) - if count < xl.readQuorum { + + // get read quorum for this object + readQuorum, _, err := objectQuorumFromMeta(xl, partsMetadata, errs) + + if count < readQuorum || err != nil { return HealObjectInfo{ Status: quorumUnavailable, MissingDataCount: 0, @@ -217,7 +244,7 @@ func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealObject disksMissing = true fallthrough case errFileNotFound: - if xlMeta.Erasure.Distribution[i]-1 < xl.dataBlocks { + if xlMeta.Erasure.Distribution[i]-1 < xlMeta.Erasure.DataBlocks { missingDataCount++ } else { missingParityCount++ diff --git a/cmd/xl-v1-healing-common_test.go b/cmd/xl-v1-healing-common_test.go index adb792033..87964397b 100644 --- a/cmd/xl-v1-healing-common_test.go +++ b/cmd/xl-v1-healing-common_test.go @@ -364,7 +364,8 @@ func TestDisksWithAllParts(t *testing.T) { } partsMetadata, errs := readAllXLMetadata(xlDisks, bucket, object) - if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum); reducedErr != nil { + readQuorum := len(xl.storageDisks) / 2 + if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { t.Fatalf("Failed to read xl meta data %v", reducedErr) } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 2c718c2a0..8635bb80c 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -81,13 +81,16 @@ func (xl xlObjects) HealBucket(bucket string) error { return err } + // get write quorum for an object + writeQuorum := len(xl.storageDisks)/2 + 1 + // Heal bucket. - if err := healBucket(xl.storageDisks, bucket, xl.writeQuorum); err != nil { + if err := healBucket(xl.storageDisks, bucket, writeQuorum); err != nil { return err } // Proceed to heal bucket metadata. - return healBucketMetadata(xl.storageDisks, bucket, xl.readQuorum) + return healBucketMetadata(xl, bucket) } // Heal bucket - create buckets on disks where it does not exist. @@ -139,17 +142,11 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error // Heals all the metadata associated for a given bucket, this function // heals `policy.json`, `notification.xml` and `listeners.json`. -func healBucketMetadata(storageDisks []StorageAPI, bucket string, readQuorum int) error { +func healBucketMetadata(xlObj xlObjects, bucket string) error { healBucketMetaFn := func(metaPath string) error { - metaLock := globalNSMutex.NewNSLock(minioMetaBucket, metaPath) - if err := metaLock.GetRLock(globalHealingTimeout); err != nil { + if _, _, err := xlObj.HealObject(minioMetaBucket, metaPath); err != nil && !isErrObjectNotFound(err) { return err } - defer metaLock.RUnlock() - // Heals the given file at metaPath. - if _, _, err := healObject(storageDisks, minioMetaBucket, metaPath, readQuorum); err != nil && !isErrObjectNotFound(err) { - return err - } // Success. return nil } @@ -308,9 +305,9 @@ func (xl xlObjects) ListBucketsHeal() ([]BucketInfo, error) { // during startup i.e healing of buckets, bucket metadata (policy.json, // notification.xml, listeners.json) etc. Currently this function // supports quick healing of buckets, bucket metadata. -func quickHeal(storageDisks []StorageAPI, writeQuorum int, readQuorum int) error { +func quickHeal(xlObj xlObjects, writeQuorum int, readQuorum int) error { // List all bucket name occurrence from all disks. - _, bucketOcc, err := listAllBuckets(storageDisks) + _, bucketOcc, err := listAllBuckets(xlObj.storageDisks) if err != nil { return err } @@ -318,10 +315,10 @@ func quickHeal(storageDisks []StorageAPI, writeQuorum int, readQuorum int) error // All bucket names and bucket metadata that should be healed. for bucketName, occCount := range bucketOcc { // Heal bucket only if healing is needed. - if occCount != len(storageDisks) { + if occCount != len(xlObj.storageDisks) { // Heal bucket and then proceed to heal bucket metadata if any. - if err = healBucket(storageDisks, bucketName, writeQuorum); err == nil { - if err = healBucketMetadata(storageDisks, bucketName, readQuorum); err == nil { + if err = healBucket(xlObj.storageDisks, bucketName, writeQuorum); err == nil { + if err = healBucketMetadata(xlObj, bucketName); err == nil { continue } return err @@ -535,6 +532,15 @@ func healObject(storageDisks []StorageAPI, bucket, object string, quorum int) (i // and later the disk comes back up again, heal on the object // should delete it. func (xl xlObjects) HealObject(bucket, object string) (int, int, error) { + // Read metadata files from all the disks + partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) + + // get read quorum for this object + readQuorum, _, err := objectQuorumFromMeta(xl, partsMetadata, errs) + if err != nil { + return 0, 0, err + } + // Lock the object before healing. objectLock := globalNSMutex.NewNSLock(bucket, object) if err := objectLock.GetRLock(globalHealingTimeout); err != nil { @@ -543,5 +549,5 @@ func (xl xlObjects) HealObject(bucket, object string) (int, int, error) { defer objectLock.RUnlock() // Heal the object. - return healObject(xl.storageDisks, bucket, object, xl.readQuorum) + return healObject(xl.storageDisks, bucket, object, readQuorum) } diff --git a/cmd/xl-v1-healing_test.go b/cmd/xl-v1-healing_test.go index 64b6a65a6..48ab81b18 100644 --- a/cmd/xl-v1-healing_test.go +++ b/cmd/xl-v1-healing_test.go @@ -333,8 +333,12 @@ func TestQuickHeal(t *testing.T) { } } + // figure out read and write quorum + readQuorum := len(xl.storageDisks) / 2 + writeQuorum := len(xl.storageDisks)/2 + 1 + // Heal the missing buckets. - if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil { + if err = quickHeal(*xl, writeQuorum, readQuorum); err != nil { t.Fatal(err) } @@ -351,7 +355,7 @@ func TestQuickHeal(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errUnformattedDisk) - if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != errUnformattedDisk { + if err = quickHeal(*xl, writeQuorum, readQuorum); err != errUnformattedDisk { t.Fatal(err) } @@ -368,7 +372,7 @@ func TestQuickHeal(t *testing.T) { } xl = obj.(*xlObjects) xl.storageDisks[0] = nil - if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil { + if err = quickHeal(*xl, writeQuorum, readQuorum); err != nil { t.Fatal("Got an unexpected error: ", err) } @@ -390,7 +394,7 @@ func TestQuickHeal(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errDiskNotFound) - if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil { + if err = quickHeal(*xl, writeQuorum, readQuorum); err != nil { t.Fatal("Got an unexpected error: ", err) } } @@ -533,7 +537,8 @@ func TestHealObjectXL(t *testing.T) { // Try healing now, expect to receive errDiskNotFound. _, _, err = obj.HealObject(bucket, object) - if errors.Cause(err) != errDiskNotFound { + // since majority of xl.jsons are not available, object quorum can't be read properly and error will be errXLReadQuorum + if errors.Cause(err) != errXLReadQuorum { t.Errorf("Expected %v but received %v", errDiskNotFound, err) } } diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 5eb81971b..904b8d868 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -396,7 +396,8 @@ func (xl xlObjects) readXLMetaParts(bucket, object string) (xlMetaParts []object } // If all errors were ignored, reduce to maximal occurrence // based on the read quorum. - return nil, reduceReadQuorumErrs(ignoredErrs, nil, xl.readQuorum) + readQuorum := len(xl.storageDisks) / 2 + return nil, reduceReadQuorumErrs(ignoredErrs, nil, readQuorum) } // readXLMetaStat - return xlMetaV1.Stat and xlMetaV1.Meta from one of the disks picked at random. @@ -423,7 +424,8 @@ func (xl xlObjects) readXLMetaStat(bucket, object string) (xlStat statInfo, xlMe } // If all errors were ignored, reduce to maximal occurrence // based on the read quorum. - return statInfo{}, nil, reduceReadQuorumErrs(ignoredErrs, nil, xl.readQuorum) + readQuorum := len(xl.storageDisks) / 2 + return statInfo{}, nil, reduceReadQuorumErrs(ignoredErrs, nil, readQuorum) } // deleteXLMetadata - deletes `xl.json` on a single disk. @@ -513,7 +515,7 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] } // writeSameXLMetadata - write `xl.json` on all disks in order. -func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMetaV1, writeQuorum, readQuorum int) ([]StorageAPI, error) { +func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMetaV1, writeQuorum int) ([]StorageAPI, error) { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(disks)) diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index c1ee34bc1..2486b0020 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -32,7 +32,7 @@ import ( ) // updateUploadJSON - add or remove upload ID info in all `uploads.json`. -func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated time.Time, isRemove bool) error { +func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated time.Time, writeQuorum int, isRemove bool) error { uploadsPath := path.Join(bucket, object, uploadsJSONFile) tmpUploadsPath := mustGetUUID() @@ -95,7 +95,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated // Wait for all the writes to finish. wg.Wait() - err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) + err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum) if errors.Cause(err) == errXLWriteQuorum { // No quorum. Perform cleanup on the minority of disks // on which the operation succeeded. @@ -151,13 +151,13 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated } // addUploadID - add upload ID and its initiated time to 'uploads.json'. -func (xl xlObjects) addUploadID(bucket, object string, uploadID string, initiated time.Time) error { - return xl.updateUploadJSON(bucket, object, uploadID, initiated, false) +func (xl xlObjects) addUploadID(bucket, object string, uploadID string, initiated time.Time, writeQuorum int) error { + return xl.updateUploadJSON(bucket, object, uploadID, initiated, writeQuorum, false) } // removeUploadID - remove upload ID in 'uploads.json'. -func (xl xlObjects) removeUploadID(bucket, object string, uploadID string) error { - return xl.updateUploadJSON(bucket, object, uploadID, time.Time{}, true) +func (xl xlObjects) removeUploadID(bucket, object string, uploadID string, writeQuorum int) error { + return xl.updateUploadJSON(bucket, object, uploadID, time.Time{}, writeQuorum, true) } // Returns if the prefix is a multipart upload. @@ -228,7 +228,8 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf } // If all errors were ignored, reduce to maximal occurrence // based on the read quorum. - return FileInfo{}, reduceReadQuorumErrs(ignoredErrs, nil, xl.readQuorum) + readQuorum := len(xl.storageDisks) / 2 + return FileInfo{}, reduceReadQuorumErrs(ignoredErrs, nil, readQuorum) } // commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. @@ -499,7 +500,15 @@ func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMark // disks. `uploads.json` carries metadata regarding on-going multipart // operation(s) on the object. func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (string, error) { - xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks) + + dataBlocks, parityBlocks := getDrivesCount(meta[amzStorageClass], xl.storageDisks) + + xlMeta := newXLMetaV1(object, dataBlocks, parityBlocks) + + // we now know the number of blocks this object needs for data and parity. + // establish the writeQuorum using this data + writeQuorum := dataBlocks + 1 + // If not set default to "application/octet-stream" if meta["content-type"] == "" { contentType := "application/octet-stream" @@ -528,7 +537,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st tempUploadIDPath := uploadID // Write updated `xl.json` to all disks. - disks, err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum) + disks, err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, writeQuorum) if err != nil { return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath) } @@ -538,14 +547,14 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st defer xl.deleteObject(minioMetaTmpBucket, tempUploadIDPath) // Attempt to rename temp upload object to actual upload path object - _, rErr := renameObject(disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) + _, rErr := renameObject(disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum) if rErr != nil { return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) } initiated := UTCNow() // Create or update 'uploads.json' - if err = xl.addUploadID(bucket, object, uploadID, initiated); err != nil { + if err = xl.addUploadID(bucket, object, uploadID, initiated, writeQuorum); err != nil { return "", err } // Return success. @@ -641,7 +650,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d // Read metadata associated with the object from all disks. partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath) - reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) + + // get Quorum for this object + _, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs) + if err != nil { + return pi, toObjectErr(err, bucket, object) + } + + reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum) if errors.Cause(reducedErr) == errXLWriteQuorum { preUploadIDLock.RUnlock() return pi, toObjectErr(reducedErr, bucket, object) @@ -669,7 +685,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d // Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete. defer xl.deleteObject(minioMetaTmpBucket, tmpPart) if data.Size() > 0 { - if pErr := xl.prepareFile(minioMetaTmpBucket, tmpPartPath, data.Size(), onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks); err != nil { + if pErr := xl.prepareFile(minioMetaTmpBucket, tmpPartPath, data.Size(), onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum); err != nil { return pi, toObjectErr(pErr, bucket, object) } @@ -680,7 +696,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d return pi, toObjectErr(err, bucket, object) } buffer := make([]byte, xlMeta.Erasure.BlockSize, 2*xlMeta.Erasure.BlockSize) // alloc additional space for parity blocks created while erasure coding - file, err := storage.CreateFile(data, minioMetaTmpBucket, tmpPartPath, buffer, DefaultBitrotAlgorithm, xl.writeQuorum) + file, err := storage.CreateFile(data, minioMetaTmpBucket, tmpPartPath, buffer, DefaultBitrotAlgorithm, writeQuorum) if err != nil { return pi, toObjectErr(err, bucket, object) } @@ -705,14 +721,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d // Rename temporary part file to its final location. partPath := path.Join(uploadIDPath, partSuffix) - onlineDisks, err = renamePart(onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, xl.writeQuorum) + onlineDisks, err = renamePart(onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum) if err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } // Read metadata again because it might be updated with parallel upload of another part. partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaMultipartBucket, uploadIDPath) - reducedErr = reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) + reducedErr = reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum) if errors.Cause(reducedErr) == errXLWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) } @@ -747,11 +763,11 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d tempXLMetaPath := newUUID // Writes a unique `xl.json` each disk carrying new checksum related information. - if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum); err != nil { + if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, writeQuorum); err != nil { return pi, toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath) } - if _, err = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum); err != nil { + if _, err = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum); err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } @@ -904,7 +920,14 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Read metadata associated with the object from all disks. partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath) - reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) + + // get Quorum for this object + _, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs) + if err != nil { + return oi, toObjectErr(err, bucket, object) + } + + reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum) if errors.Cause(reducedErr) == errXLWriteQuorum { return oi, toObjectErr(reducedErr, bucket, object) } @@ -992,12 +1015,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Write unique `xl.json` for each disk. - if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil { + if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, writeQuorum); err != nil { return oi, toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath) } var rErr error - onlineDisks, rErr = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) + onlineDisks, rErr = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum) if rErr != nil { return oi, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) } @@ -1025,7 +1048,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // NOTE: Do not use online disks slice here. // The reason is that existing object should be purged // regardless of `xl.json` status and rolled back in case of errors. - _, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum) + _, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, writeQuorum) if err != nil { return oi, toObjectErr(err, bucket, object) } @@ -1045,7 +1068,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Rename the multipart object to final location. - if _, err = renameObject(onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, xl.writeQuorum); err != nil { + if _, err = renameObject(onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, writeQuorum); err != nil { return oi, toObjectErr(err, bucket, object) } @@ -1060,7 +1083,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload defer objectMPartPathLock.Unlock() // remove entry from uploads.json with quorum - if err = xl.removeUploadID(bucket, object, uploadID); err != nil { + if err = xl.removeUploadID(bucket, object, uploadID, writeQuorum); err != nil { return oi, toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object)) } @@ -1081,13 +1104,10 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Wrapper which removes all the uploaded parts. -func (xl xlObjects) cleanupUploadedParts(bucket, object, uploadID string) error { +func (xl xlObjects) cleanupUploadedParts(uploadIDPath string, writeQuorum int) error { var errs = make([]error, len(xl.storageDisks)) var wg = &sync.WaitGroup{} - // Construct uploadIDPath. - uploadIDPath := path.Join(bucket, object, uploadID) - // Cleanup uploadID for all disks. for index, disk := range xl.storageDisks { if disk == nil { @@ -1108,7 +1128,7 @@ func (xl xlObjects) cleanupUploadedParts(bucket, object, uploadID string) error // Wait for all the cleanups to finish. wg.Wait() - return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) + return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum) } // abortMultipartUpload - wrapper for purging an ongoing multipart @@ -1116,8 +1136,20 @@ func (xl xlObjects) cleanupUploadedParts(bucket, object, uploadID string) error // the directory at '.minio.sys/multipart/bucket/object/uploadID' holding // all the upload parts. func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err error) { + // Construct uploadIDPath. + uploadIDPath := path.Join(bucket, object, uploadID) + + // Read metadata associated with the object from all disks. + partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath) + + // get Quorum for this object + _, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs) + if err != nil { + return toObjectErr(err, bucket, object) + } + // Cleanup all uploaded parts. - if err = xl.cleanupUploadedParts(bucket, object, uploadID); err != nil { + if err = xl.cleanupUploadedParts(uploadIDPath, writeQuorum); err != nil { return toObjectErr(err, bucket, object) } @@ -1131,7 +1163,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e defer objectMPartPathLock.Unlock() // remove entry from uploads.json with quorum - if err = xl.removeUploadID(bucket, object, uploadID); err != nil { + if err = xl.removeUploadID(bucket, object, uploadID, writeQuorum); err != nil { return toObjectErr(err, bucket, object) } diff --git a/cmd/xl-v1-multipart_test.go b/cmd/xl-v1-multipart_test.go index 72dd9b431..0dcbf3045 100644 --- a/cmd/xl-v1-multipart_test.go +++ b/cmd/xl-v1-multipart_test.go @@ -142,19 +142,20 @@ func TestUpdateUploadJSON(t *testing.T) { } testCases := []struct { - uploadID string - initiated time.Time - isRemove bool - errVal error + uploadID string + initiated time.Time + writeQuorum int + isRemove bool + errVal error }{ - {"111abc", UTCNow(), false, nil}, - {"222abc", UTCNow(), false, nil}, - {"111abc", time.Time{}, true, nil}, + {"111abc", UTCNow(), 9, false, nil}, + {"222abc", UTCNow(), 10, false, nil}, + {"111abc", time.Time{}, 11, true, nil}, } xl := obj.(*xlObjects) for i, test := range testCases { - testErrVal := xl.updateUploadJSON(bucket, object, test.uploadID, test.initiated, test.isRemove) + testErrVal := xl.updateUploadJSON(bucket, object, test.uploadID, test.initiated, test.writeQuorum, test.isRemove) if testErrVal != test.errVal { t.Errorf("Test %d: Expected error value %v, but got %v", i+1, test.errVal, testErrVal) @@ -166,7 +167,7 @@ func TestUpdateUploadJSON(t *testing.T) { xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i].(*retryStorage), nil, errFaultyDisk) } - testErrVal := xl.updateUploadJSON(bucket, object, "222abc", UTCNow(), false) + testErrVal := xl.updateUploadJSON(bucket, object, "222abc", UTCNow(), 10, false) if testErrVal == nil || testErrVal.Error() != errXLWriteQuorum.Error() { t.Errorf("Expected write quorum error, but got: %v", testErrVal) } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 313f79c96..48ecaad29 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -34,7 +34,7 @@ import ( var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied) // prepareFile hints the bottom layer to optimize the creation of a new object -func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks []StorageAPI, blockSize int64, dataBlocks int) error { +func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks []StorageAPI, blockSize int64, dataBlocks, writeQuorum int) error { pErrs := make([]error, len(onlineDisks)) // Calculate the real size of the part in one disk. actualSize := xl.sizeOnDisk(size, blockSize, dataBlocks) @@ -49,7 +49,7 @@ func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks [ } } } - return reduceWriteQuorumErrs(pErrs, objectOpIgnoredErrs, xl.writeQuorum) + return reduceWriteQuorumErrs(pErrs, objectOpIgnoredErrs, writeQuorum) } /// Object Operations @@ -60,7 +60,14 @@ func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks [ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string) (oi ObjectInfo, e error) { // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, srcBucket, srcObject) - if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum); reducedErr != nil { + + // get Quorum for this object + readQuorum, writeQuorum, err := objectQuorumFromMeta(xl, metaArr, errs) + if err != nil { + return oi, toObjectErr(err, srcBucket, srcObject) + } + + if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { return oi, toObjectErr(reducedErr, srcBucket, srcObject) } @@ -92,11 +99,11 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string tempObj := mustGetUUID() // Write unique `xl.json` for each disk. - if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { + if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, writeQuorum); err != nil { return oi, toObjectErr(err, srcBucket, srcObject) } // Rename atomically `xl.json` from tmp location to destination for each disk. - if _, err = renameXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, xl.writeQuorum); err != nil { + if _, err = renameXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, writeQuorum); err != nil { return oi, toObjectErr(err, srcBucket, srcObject) } return xlMeta.ToObjectInfo(srcBucket, srcObject), nil @@ -154,7 +161,14 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) - if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum); reducedErr != nil { + + // get Quorum for this object + readQuorum, _, err := objectQuorumFromMeta(xl, metaArr, errs) + if err != nil { + return toObjectErr(err, bucket, object) + } + + if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { return toObjectErr(reducedErr, bucket, object) } @@ -368,7 +382,7 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str // rename - common function that renamePart and renameObject use to rename // the respective underlying storage layer representations. -func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, quorum int) ([]StorageAPI, error) { +func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, writeQuorum int) ([]StorageAPI, error) { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -399,9 +413,9 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, // Wait for all renames to finish. wg.Wait() - // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum + // We can safely allow RenameFile errors up to len(xl.storageDisks) - writeQuorum // otherwise return failure. Cleanup successful renames. - err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, quorum) + err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum) if errors.Cause(err) == errXLWriteQuorum { // Undo all the partial rename operations. undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isDir, errs) @@ -493,11 +507,17 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m } } } + // Get parity and data drive count based on storage class metadata + dataDrives, parityDrives := getDrivesCount(metadata[amzStorageClass], xl.storageDisks) + + // we now know the number of blocks this object needs for data and parity. + // writeQuorum is dataBlocks + 1 + writeQuorum := dataDrives + 1 // Initialize parts metadata partsMetadata := make([]xlMetaV1, len(xl.storageDisks)) - xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks) + xlMeta := newXLMetaV1(object, dataDrives, parityDrives) // Initialize xl meta. for index := range partsMetadata { @@ -541,7 +561,7 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m // This is only an optimization. var curPartReader io.Reader if curPartSize > 0 { - pErr := xl.prepareFile(minioMetaTmpBucket, tempErasureObj, curPartSize, storage.disks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks) + pErr := xl.prepareFile(minioMetaTmpBucket, tempErasureObj, curPartSize, storage.disks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum) if pErr != nil { return ObjectInfo{}, toObjectErr(pErr, bucket, object) } @@ -554,7 +574,7 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m } file, erasureErr := storage.CreateFile(curPartReader, minioMetaTmpBucket, - tempErasureObj, buffer, DefaultBitrotAlgorithm, xl.writeQuorum) + tempErasureObj, buffer, DefaultBitrotAlgorithm, writeQuorum) if erasureErr != nil { return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) } @@ -602,7 +622,7 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m // NOTE: Do not use online disks slice here. // The reason is that existing object should be purged // regardless of `xl.json` status and rolled back in case of errors. - _, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum) + _, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, writeQuorum) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -617,12 +637,12 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m } // Write unique `xl.json` for each disk. - if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { + if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } // Rename the successfully written temporary object to final location. - if _, err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum); err != nil { + if _, err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -659,6 +679,15 @@ func (xl xlObjects) deleteObject(bucket, object string) error { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} + // Read metadata associated with the object from all disks. + metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) + + // get Quorum for this object + _, writeQuorum, err := objectQuorumFromMeta(xl, metaArr, errs) + if err != nil { + return err + } + // Initialize list of errors. var dErrs = make([]error, len(xl.storageDisks)) @@ -680,7 +709,7 @@ func (xl xlObjects) deleteObject(bucket, object string) error { // Wait for all routines to finish. wg.Wait() - return reduceWriteQuorumErrs(dErrs, objectOpIgnoredErrs, xl.writeQuorum) + return reduceWriteQuorumErrs(dErrs, objectOpIgnoredErrs, writeQuorum) } // DeleteObject - deletes an object, this call doesn't necessary reply diff --git a/cmd/xl-v1-object_test.go b/cmd/xl-v1-object_test.go index 307070acd..26e67757b 100644 --- a/cmd/xl-v1-object_test.go +++ b/cmd/xl-v1-object_test.go @@ -154,8 +154,9 @@ func TestXLDeleteObjectDiskNotFound(t *testing.T) { xl.storageDisks[8] = nil err = obj.DeleteObject(bucket, object) err = errors.Cause(err) - if err != toObjectErr(errXLWriteQuorum, bucket, object) { - t.Errorf("Expected deleteObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err) + // since majority of disks are not available, metaquorum is not achieved and hence errXLReadQuorum error + if err != toObjectErr(errXLReadQuorum, bucket, object) { + t.Errorf("Expected deleteObject to fail with %v, but failed with %v", toObjectErr(errXLReadQuorum, bucket, object), err) } // Cleanup backend directories removeRoots(fsDirs) diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 5b6bc4f5b..0c37e5136 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -50,10 +50,6 @@ const ( type xlObjects struct { mutex *sync.Mutex storageDisks []StorageAPI // Collection of initialized backend disks. - dataBlocks int // dataBlocks count caculated for erasure. - parityBlocks int // parityBlocks count calculated for erasure. - readQuorum int // readQuorum minimum required disks to read data. - writeQuorum int // writeQuorum minimum required disks to write data. // ListObjects pool management. listPool *treeWalkPool @@ -92,6 +88,7 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { return nil, errInvalidArgument } + // figure out readQuorum for erasure format.json readQuorum := len(storageDisks) / 2 writeQuorum := len(storageDisks)/2 + 1 @@ -101,9 +98,6 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { return nil, fmt.Errorf("Unable to recognize backend format, %s", err) } - // Calculate data and parity blocks. - dataBlocks, parityBlocks := len(newStorageDisks)/2, len(newStorageDisks)/2 - // Initialize list pool. listPool := newTreeWalkPool(globalLookupTimeout) @@ -111,8 +105,6 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { xl := &xlObjects{ mutex: &sync.Mutex{}, storageDisks: newStorageDisks, - dataBlocks: dataBlocks, - parityBlocks: parityBlocks, listPool: listPool, } @@ -144,11 +136,6 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { return nil, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err) } - // Figure out read and write quorum based on number of storage disks. - // READ and WRITE quorum is always set to (N/2) number of disks. - xl.readQuorum = readQuorum - xl.writeQuorum = writeQuorum - // If the number of offline servers is equal to the readQuorum // (i.e. the number of online servers also equals the // readQuorum), we cannot perform quick-heal (no @@ -160,7 +147,7 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { } // Perform a quick heal on the buckets and bucket metadata for any discrepancies. - if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil { + if err = quickHeal(*xl, writeQuorum, readQuorum); err != nil { return nil, err } @@ -258,13 +245,18 @@ func getStorageInfo(disks []StorageAPI) StorageInfo { storageInfo.Backend.Type = Erasure storageInfo.Backend.OnlineDisks = onlineDisks storageInfo.Backend.OfflineDisks = offlineDisks + + _, scParity := getDrivesCount(standardStorageClass, disks) + storageInfo.Backend.standardSCParity = scParity + + _, rrSCparity := getDrivesCount(reducedRedundancyStorageClass, disks) + storageInfo.Backend.rrSCParity = rrSCparity + return storageInfo } // StorageInfo - returns underlying storage statistics. func (xl xlObjects) StorageInfo() StorageInfo { storageInfo := getStorageInfo(xl.storageDisks) - storageInfo.Backend.ReadQuorum = xl.readQuorum - storageInfo.Backend.WriteQuorum = xl.writeQuorum return storageInfo } diff --git a/docs/erasure/storage-class/README.md b/docs/erasure/storage-class/README.md new file mode 100644 index 000000000..28369a2c3 --- /dev/null +++ b/docs/erasure/storage-class/README.md @@ -0,0 +1,78 @@ +# Minio Storage Class Quickstart Guide [![Slack](https://slack.minio.io/slack?type=svg)](https://slack.minio.io) + +Minio server supports storage class in erasure coding mode. This allows configurable data and parity disks per object. + +## Overview + +Minio supports two storage classes, Reduced Redundancy class and Standard class. These classes can be defined using environment variables +set before starting Minio server. After the data and parity disks for each storage class are defined using environment variables, +you can set the storage class of an object via request metadata field `x-amz-storage-class`. Minio server then honors the storage class by +saving the object in specific number of data and parity disks. + +### Values for standard storage class (SS) + +Standard storage class implies more parity than RRS class. So, SS parity disks should be + +- Greater than or equal to 2, if RRS parity is not set. +- Greater than RRS parity, if it is set. + +As parity blocks can not be higher than data blocks, Standard storage class can not be higher than N/2. (N being total number of disks) + +Default value for standard storage class is `N/2` (N is the total number of drives). + +### Reduced redundancy storage class (RRS) + +Reduced redundancy implies lesser parity than SS class. So, RRS parity disks should be + +- Less than N/2, if SS parity is not set. +- Less than SS Parity, if it is set. + +As parity below 2 is not recommended, RR storage class is not supported for 4 disks erasure coding setup. + +Default value for reduced redundancy storage class is `2`. + +## Get started with Storage Class + +### Set storage class + +The format to set storage class environment variables is as follows + +`MINIO_STORAGE_CLASS_RRS=EC:parity` +`MINIO_STORAGE_CLASS_STANDARD=EC:parity` + +For example, set RRS parity 2 and SS parity 3, in 8 disk erasure code setup. + +```sh +export MINIO_STORAGE_CLASS_RRS=EC:2 +export MINIO_STORAGE_CLASS_STANDARD=EC:3 +``` + +If storage class is not defined before starting Minio server, and subsequent PutObject metadata field has `x-amz-storage-class` present +with values `MINIO_STORAGE_CLASS_RRS` or `MINIO_STORAGE_CLASS_STANDARD`, Minio server uses default parity values. + +### Set metadata + +In below example `minio-go` is used to set the storage class to `MINIO_STORAGE_CLASS_RRS`. This means this object will be split across 6 data disks and 2 parity disks (as per the storage class set in previous step). + +```go +s3Client, err := minio.New("s3.amazonaws.com", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) +if err != nil { + log.Fatalln(err) +} + +object, err := os.Open("my-testfile") +if err != nil { + log.Fatalln(err) +} +defer object.Close() +objectStat, err := object.Stat() +if err != nil { + log.Fatalln(err) +} + +n, err := s3Client.PutObject("my-bucketname", "my-objectname", object, objectStat.Size(), minio.PutObjectOptions{ContentType: "application/octet-stream", StorageClass: "MINIO_STORAGE_CLASS_RRS"}) +if err != nil { + log.Fatalln(err) +} +log.Println("Uploaded", "my-objectname", " of size: ", n, "Successfully.") +``` \ No newline at end of file