mirror of
				https://github.com/minio/minio.git
				synced 2025-11-04 10:11:09 +01:00 
			
		
		
		
	If site replication enabled across sites, replicate the SSE-C objects as well. These objects could be read from target sites using the same client encryption keys. Signed-off-by: Shubhendu Ram Tripathi <shubhendu@minio.io>
		
			
				
	
	
		
			295 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			295 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright (c) 2015-2021 MinIO, Inc.
 | 
						|
//
 | 
						|
// This file is part of MinIO Object Storage stack
 | 
						|
//
 | 
						|
// This program is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Affero General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// This program is distributed in the hope that it will be useful
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
// GNU Affero General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Affero General Public License
 | 
						|
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package replication
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/xml"
 | 
						|
	"io"
 | 
						|
	"sort"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	errReplicationTooManyRules          = Errorf("Replication configuration allows a maximum of 1000 rules")
 | 
						|
	errReplicationNoRule                = Errorf("Replication configuration should have at least one rule")
 | 
						|
	errReplicationUniquePriority        = Errorf("Replication configuration has duplicate priority")
 | 
						|
	errRoleArnMissingLegacy             = Errorf("Missing required parameter `Role` in ReplicationConfiguration")
 | 
						|
	errDestinationArnMissing            = Errorf("Missing required parameter `Destination` in Replication rule")
 | 
						|
	errInvalidSourceSelectionCriteria   = Errorf("Invalid ReplicaModification status")
 | 
						|
	errRoleArnPresentForMultipleTargets = Errorf("`Role` should be empty in ReplicationConfiguration for multiple targets")
 | 
						|
)
 | 
						|
 | 
						|
// Config - replication configuration specified in
 | 
						|
// https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html
 | 
						|
type Config struct {
 | 
						|
	XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"`
 | 
						|
	Rules   []Rule   `xml:"Rule" json:"Rules"`
 | 
						|
	// RoleArn is being reused for MinIO replication ARN
 | 
						|
	RoleArn string `xml:"Role" json:"Role"`
 | 
						|
}
 | 
						|
 | 
						|
// Maximum 2MiB size per replication config.
 | 
						|
const maxReplicationConfigSize = 2 << 20
 | 
						|
 | 
						|
// ParseConfig parses ReplicationConfiguration from xml
 | 
						|
func ParseConfig(reader io.Reader) (*Config, error) {
 | 
						|
	config := Config{}
 | 
						|
	if err := xml.NewDecoder(io.LimitReader(reader, maxReplicationConfigSize)).Decode(&config); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	// By default, set replica modification to enabled if unset.
 | 
						|
	for i := range config.Rules {
 | 
						|
		if len(config.Rules[i].SourceSelectionCriteria.ReplicaModifications.Status) == 0 {
 | 
						|
			config.Rules[i].SourceSelectionCriteria = SourceSelectionCriteria{
 | 
						|
				ReplicaModifications: ReplicaModifications{
 | 
						|
					Status: Enabled,
 | 
						|
				},
 | 
						|
			}
 | 
						|
		}
 | 
						|
		// Default DeleteReplication to disabled if unset.
 | 
						|
		if len(config.Rules[i].DeleteReplication.Status) == 0 {
 | 
						|
			config.Rules[i].DeleteReplication = DeleteReplication{
 | 
						|
				Status: Disabled,
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return &config, nil
 | 
						|
}
 | 
						|
 | 
						|
// Validate - validates the replication configuration
 | 
						|
func (c Config) Validate(bucket string, sameTarget bool) error {
 | 
						|
	// replication config can't have more than 1000 rules
 | 
						|
	if len(c.Rules) > 1000 {
 | 
						|
		return errReplicationTooManyRules
 | 
						|
	}
 | 
						|
	// replication config should have at least one rule
 | 
						|
	if len(c.Rules) == 0 {
 | 
						|
		return errReplicationNoRule
 | 
						|
	}
 | 
						|
 | 
						|
	// Validate all the rules in the replication config
 | 
						|
	targetMap := make(map[string]struct{})
 | 
						|
	priorityMap := make(map[string]struct{})
 | 
						|
	var legacyArn bool
 | 
						|
	for _, r := range c.Rules {
 | 
						|
		if _, ok := targetMap[r.Destination.Bucket]; !ok {
 | 
						|
			targetMap[r.Destination.Bucket] = struct{}{}
 | 
						|
		}
 | 
						|
		if err := r.Validate(bucket, sameTarget); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if _, ok := priorityMap[strconv.Itoa(r.Priority)]; ok {
 | 
						|
			return errReplicationUniquePriority
 | 
						|
		}
 | 
						|
		priorityMap[strconv.Itoa(r.Priority)] = struct{}{}
 | 
						|
 | 
						|
		if r.Destination.LegacyArn() {
 | 
						|
			legacyArn = true
 | 
						|
		}
 | 
						|
		if c.RoleArn == "" && !r.Destination.TargetArn() {
 | 
						|
			return errDestinationArnMissing
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// disallow combining old replication configuration which used RoleArn as target ARN with multiple
 | 
						|
	// destination replication
 | 
						|
	if c.RoleArn != "" && len(targetMap) > 1 {
 | 
						|
		return errRoleArnPresentForMultipleTargets
 | 
						|
	}
 | 
						|
	// validate RoleArn if destination used legacy ARN format.
 | 
						|
	if c.RoleArn == "" && legacyArn {
 | 
						|
		return errRoleArnMissingLegacy
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Types of replication
 | 
						|
const (
 | 
						|
	UnsetReplicationType Type = 0 + iota
 | 
						|
	ObjectReplicationType
 | 
						|
	DeleteReplicationType
 | 
						|
	MetadataReplicationType
 | 
						|
	HealReplicationType
 | 
						|
	ExistingObjectReplicationType
 | 
						|
	ResyncReplicationType
 | 
						|
	AllReplicationType
 | 
						|
)
 | 
						|
 | 
						|
// Valid returns true if replication type is set
 | 
						|
func (t Type) Valid() bool {
 | 
						|
	return t > 0
 | 
						|
}
 | 
						|
 | 
						|
// IsDataReplication returns true if content being replicated
 | 
						|
func (t Type) IsDataReplication() bool {
 | 
						|
	switch t {
 | 
						|
	case ObjectReplicationType, HealReplicationType, ExistingObjectReplicationType:
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// ObjectOpts provides information to deduce whether replication
 | 
						|
// can be triggered on the resultant object.
 | 
						|
type ObjectOpts struct {
 | 
						|
	Name           string
 | 
						|
	UserTags       string
 | 
						|
	VersionID      string
 | 
						|
	DeleteMarker   bool
 | 
						|
	SSEC           bool
 | 
						|
	OpType         Type
 | 
						|
	Replica        bool
 | 
						|
	ExistingObject bool
 | 
						|
	TargetArn      string
 | 
						|
}
 | 
						|
 | 
						|
// HasExistingObjectReplication returns true if any of the rule returns 'ExistingObjects' replication.
 | 
						|
func (c Config) HasExistingObjectReplication(arn string) (hasARN, isEnabled bool) {
 | 
						|
	for _, rule := range c.Rules {
 | 
						|
		if rule.Destination.ARN == arn || c.RoleArn == arn {
 | 
						|
			if !hasARN {
 | 
						|
				hasARN = true
 | 
						|
			}
 | 
						|
			if rule.ExistingObjectReplication.Status == Enabled {
 | 
						|
				return true, true
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return hasARN, false
 | 
						|
}
 | 
						|
 | 
						|
// FilterActionableRules returns the rules actions that need to be executed
 | 
						|
// after evaluating prefix/tag filtering
 | 
						|
func (c Config) FilterActionableRules(obj ObjectOpts) []Rule {
 | 
						|
	if obj.Name == "" && !(obj.OpType == ResyncReplicationType || obj.OpType == AllReplicationType) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	var rules []Rule
 | 
						|
	for _, rule := range c.Rules {
 | 
						|
		if rule.Status == Disabled {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		if obj.TargetArn != "" && rule.Destination.ARN != obj.TargetArn && c.RoleArn != obj.TargetArn {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		// Ignore other object level and prefix filters for resyncing target/listing bucket targets
 | 
						|
		if obj.OpType == ResyncReplicationType || obj.OpType == AllReplicationType {
 | 
						|
			rules = append(rules, rule)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if obj.ExistingObject && rule.ExistingObjectReplication.Status == Disabled {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if !strings.HasPrefix(obj.Name, rule.Prefix()) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if rule.Filter.TestTags(obj.UserTags) {
 | 
						|
			rules = append(rules, rule)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	sort.Slice(rules, func(i, j int) bool {
 | 
						|
		return rules[i].Priority > rules[j].Priority && rules[i].Destination.String() == rules[j].Destination.String()
 | 
						|
	})
 | 
						|
 | 
						|
	return rules
 | 
						|
}
 | 
						|
 | 
						|
// GetDestination returns destination bucket and storage class.
 | 
						|
func (c Config) GetDestination() Destination {
 | 
						|
	if len(c.Rules) > 0 {
 | 
						|
		return c.Rules[0].Destination
 | 
						|
	}
 | 
						|
	return Destination{}
 | 
						|
}
 | 
						|
 | 
						|
// Replicate returns true if the object should be replicated.
 | 
						|
func (c Config) Replicate(obj ObjectOpts) bool {
 | 
						|
	for _, rule := range c.FilterActionableRules(obj) {
 | 
						|
		if rule.Status == Disabled {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if obj.ExistingObject && rule.ExistingObjectReplication.Status == Disabled {
 | 
						|
			return false
 | 
						|
		}
 | 
						|
		if obj.OpType == DeleteReplicationType {
 | 
						|
			switch {
 | 
						|
			case obj.VersionID != "":
 | 
						|
				// check MinIO extension for versioned deletes
 | 
						|
				return rule.DeleteReplication.Status == Enabled
 | 
						|
			default:
 | 
						|
				return rule.DeleteMarkerReplication.Status == Enabled
 | 
						|
			}
 | 
						|
		} // regular object/metadata replication
 | 
						|
		return rule.MetadataReplicate(obj)
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// HasActiveRules - returns whether replication policy has active rules
 | 
						|
// Optionally a prefix can be supplied.
 | 
						|
// If recursive is specified the function will also return true if any level below the
 | 
						|
// prefix has active rules. If no prefix is specified recursive is effectively true.
 | 
						|
func (c Config) HasActiveRules(prefix string, recursive bool) bool {
 | 
						|
	if len(c.Rules) == 0 {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	for _, rule := range c.Rules {
 | 
						|
		if rule.Status == Disabled {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if len(prefix) > 0 && len(rule.Filter.Prefix) > 0 {
 | 
						|
			// incoming prefix must be in rule prefix
 | 
						|
			if !recursive && !strings.HasPrefix(prefix, rule.Filter.Prefix) {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			// If recursive, we can skip this rule if it doesn't match the tested prefix or level below prefix
 | 
						|
			// does not match
 | 
						|
			if recursive && !strings.HasPrefix(rule.Prefix(), prefix) && !strings.HasPrefix(prefix, rule.Prefix()) {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// FilterTargetArns returns a slice of distinct target arns in the config
 | 
						|
func (c Config) FilterTargetArns(obj ObjectOpts) []string {
 | 
						|
	var arns []string
 | 
						|
 | 
						|
	tgtsMap := make(map[string]struct{})
 | 
						|
	rules := c.FilterActionableRules(obj)
 | 
						|
	for _, rule := range rules {
 | 
						|
		if rule.Status == Disabled {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if c.RoleArn != "" {
 | 
						|
			arns = append(arns, c.RoleArn) // use legacy RoleArn if present
 | 
						|
			return arns
 | 
						|
		}
 | 
						|
		if _, ok := tgtsMap[rule.Destination.ARN]; !ok {
 | 
						|
			tgtsMap[rule.Destination.ARN] = struct{}{}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	for k := range tgtsMap {
 | 
						|
		arns = append(arns, k)
 | 
						|
	}
 | 
						|
	return arns
 | 
						|
}
 |