From 951acf561c4a888b2c85920bfe3f70911d7b3fbf Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Thu, 13 May 2021 19:20:45 -0700 Subject: [PATCH] Add support for syncing replica modifications (#11104) when bidirectional replication is set up. If ReplicaModifications is enabled in the replication configuration, sync metadata updates to source if replication rules are met. By default, if this configuration is unset, MinIO automatically sync's metadata updates on replica back to the source. --- cmd/bucket-replication.go | 15 ++-- cmd/object-handlers.go | 32 ++++---- docs/bucket/replication/README.md | 10 +++ pkg/bucket/replication/replication.go | 17 ++++- pkg/bucket/replication/replication_test.go | 23 ++++-- pkg/bucket/replication/rule.go | 20 ++++- pkg/bucket/replication/rule_test.go | 68 +++++++++++++++++ .../replication/sourceselectioncriteria.go | 76 +++++++++++++++++++ 8 files changed, 230 insertions(+), 31 deletions(-) create mode 100644 pkg/bucket/replication/rule_test.go create mode 100644 pkg/bucket/replication/sourceselectioncriteria.go diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index e5ea6463f..37010eae1 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -95,28 +95,28 @@ func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object strin if permErr != ErrNone { return } - return mustReplicater(ctx, bucket, object, meta, replStatus) + return mustReplicater(ctx, bucket, object, meta, replStatus, false) } // mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in // a synchronous manner. -func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) (replicate bool, sync bool) { +func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, metadataOnly bool) (replicate bool, sync bool) { if s3Err := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, "", r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone { return } - return mustReplicater(ctx, bucket, object, meta, replStatus) + return mustReplicater(ctx, bucket, object, meta, replStatus, metadataOnly) } // mustReplicater returns 2 booleans - true if object meets replication criteria and true if replication is to be done in // a synchronous manner. -func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string) (replicate bool, sync bool) { +func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string, metadataOnly bool) (replicate bool, sync bool) { if globalIsGateway { return replicate, sync } if rs, ok := meta[xhttp.AmzBucketReplicationStatus]; ok { replStatus = rs } - if replication.StatusType(replStatus) == replication.Replica { + if replication.StatusType(replStatus) == replication.Replica && !metadataOnly { return replicate, sync } cfg, err := getReplicationConfig(ctx, bucket) @@ -124,8 +124,9 @@ func mustReplicater(ctx context.Context, bucket, object string, meta map[string] return replicate, sync } opts := replication.ObjectOpts{ - Name: object, - SSEC: crypto.SSEC.IsEncrypted(meta), + Name: object, + SSEC: crypto.SSEC.IsEncrypted(meta), + Replica: replication.StatusType(replStatus) == replication.Replica, } tagStr, ok := meta[xhttp.AmzObjectTagging] if ok { diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 2c7354342..c4739ac03 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1285,7 +1285,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re if rs := r.Header.Get(xhttp.AmzBucketReplicationStatus); rs != "" { srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = rs } - if ok, _ := mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String()); ok { + if ok, _ := mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String(), srcInfo.metadataOnly); ok { srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } // Store the preserved compression metadata. @@ -1387,7 +1387,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re objInfo.ETag = getDecryptedETag(r.Header, objInfo, false) response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime) encodedSuccessResponse := encodeResponse(response) - if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate { + if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String(), objInfo.metadataOnly); replicate { scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } @@ -1634,7 +1634,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok { + if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, "", false); ok { metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() { @@ -1721,7 +1721,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req objInfo.ETag = objInfo.ETag + "-1" } } - if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate { + if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, "", false); replicate { scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } @@ -1960,7 +1960,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h return } - if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok { + if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, "", false); ok { metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } @@ -2016,8 +2016,9 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h return } - if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate { + if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, "", false); replicate { scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) + } } @@ -2130,7 +2131,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok { + if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, "", false); ok { metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } // We need to preserve the encryption headers set in EncryptRequest, @@ -3120,7 +3121,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } setPutObjHeaders(w, objInfo, false) - if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate { + if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String(), false); replicate { scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } @@ -3381,7 +3382,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r return } objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status)) - replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "") + replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String(), true) if replicate { objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } @@ -3560,7 +3561,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = "" objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = "" } - replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "") + replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String(), true) if replicate { objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } @@ -3752,7 +3753,12 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h return } - replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: tags.String()}, "") + objInfo, err := objAPI.GetObjectInfo(ctx, bucket, object, opts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: tags.String()}, objInfo.ReplicationStatus.String(), true) if replicate { opts.UserDefined = make(map[string]string) opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() @@ -3761,7 +3767,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h tagsStr := tags.String() // Put object tags - objInfo, err := objAPI.PutObjectTags(ctx, bucket, object, tagsStr, opts) + objInfo, err = objAPI.PutObjectTags(ctx, bucket, object, tagsStr, opts) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return @@ -3829,7 +3835,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: oi.UserTags}, "") + replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: oi.UserTags}, oi.ReplicationStatus.String(), true) if replicate { opts.UserDefined = make(map[string]string) opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() diff --git a/docs/bucket/replication/README.md b/docs/bucket/replication/README.md index dc28dda70..a0f47f0c0 100644 --- a/docs/bucket/replication/README.md +++ b/docs/bucket/replication/README.md @@ -132,6 +132,11 @@ The replication configuration can now be added to the source bucket by applying "Destination": { "Bucket": "arn:aws:s3:::destbucket", "StorageClass": "STANDARD" + }, + "SourceSelectionCriteria": { + "ReplicaModifications": { + "Status": "Enabled" + } } } ] @@ -150,6 +155,11 @@ To perform bi-directional replication, repeat the above process on the target si ![head](https://raw.githubusercontent.com/minio/minio/master/docs/bucket/replication/HEAD_bucket_replication.png) +## Replica Modification sync +If bi-directional replication is set up between two clusters, any metadata update on the REPLICA object is by default reflected back in the source object when `ReplicaModifications` status in the `SourceSelectionCriteria` is `Enabled`. In MinIO, this is enabled by default. If a metadata update is performed on the "REPLICA" object, its `X-Amz-Replication-Status` will change from `PENDING` to `COMPLETE` or `FAILED`, and the source object version will show `X-Amz-Replication-Status` of `REPLICA` once the replication operation is complete. + +The replication configuration in use on a bucket can be viewed using the `mc replicate export alias/bucket` command. + ## MinIO Extension ### Replicating Deletes diff --git a/pkg/bucket/replication/replication.go b/pkg/bucket/replication/replication.go index db0e4db19..1f1187221 100644 --- a/pkg/bucket/replication/replication.go +++ b/pkg/bucket/replication/replication.go @@ -58,6 +58,7 @@ var ( errReplicationUniquePriority = Errorf("Replication configuration has duplicate priority") errReplicationDestinationMismatch = Errorf("The destination bucket must be same for all rules") errRoleArnMissing = Errorf("Missing required parameter `Role` in ReplicationConfiguration") + errInvalidSourceSelectionCriteria = Errorf("Invalid ReplicaModification status") ) // Config - replication configuration specified in @@ -78,6 +79,16 @@ func ParseConfig(reader io.Reader) (*Config, error) { if err := xml.NewDecoder(io.LimitReader(reader, maxReplicationConfigSize)).Decode(&config); err != nil { return nil, err } + // By default, set replica modification to enabled if unset. + for i := range config.Rules { + if len(config.Rules[i].SourceSelectionCriteria.ReplicaModifications.Status) == 0 { + config.Rules[i].SourceSelectionCriteria = SourceSelectionCriteria{ + ReplicaModifications: ReplicaModifications{ + Status: Enabled, + }, + } + } + } return &config, nil } @@ -136,6 +147,7 @@ type ObjectOpts struct { DeleteMarker bool SSEC bool OpType Type + Replica bool } // FilterActionableRules returns the rules actions that need to be executed @@ -187,9 +199,8 @@ func (c Config) Replicate(obj ObjectOpts) bool { default: return rule.DeleteMarkerReplication.Status == Enabled } - } else { // regular object/metadata replication - return true - } + } // regular object/metadata replication + return rule.MetadataReplicate(obj) } return false } diff --git a/pkg/bucket/replication/replication_test.go b/pkg/bucket/replication/replication_test.go index 0593dd70c..13a79e747 100644 --- a/pkg/bucket/replication/replication_test.go +++ b/pkg/bucket/replication/replication_test.go @@ -140,7 +140,7 @@ func TestParseAndValidateReplicationConfig(t *testing.T) { } func TestReplicate(t *testing.T) { cfgs := []Config{ - { //Config0 - Replication config has no filters, all replication enabled + { // Config0 - Replication config has no filters, all replication enabled Rules: []Rule{ { Status: Enabled, @@ -151,7 +151,7 @@ func TestReplicate(t *testing.T) { }, }, }, - { //Config1 - Replication config has no filters, delete,delete-marker replication disabled + { // Config1 - Replication config has no filters, delete,delete-marker replication disabled Rules: []Rule{ { Status: Enabled, @@ -162,7 +162,7 @@ func TestReplicate(t *testing.T) { }, }, }, - { //Config2 - Replication config has filters and more than 1 matching rule, delete,delete-marker replication disabled + { // Config2 - Replication config has filters and more than 1 matching rule, delete,delete-marker replication disabled Rules: []Rule{ { Status: Enabled, @@ -180,7 +180,7 @@ func TestReplicate(t *testing.T) { }, }, }, - { //Config3 - Replication config has filters and no overlapping rules + { // Config3 - Replication config has filters and no overlapping rules Rules: []Rule{ { Status: Enabled, @@ -198,6 +198,17 @@ func TestReplicate(t *testing.T) { }, }, }, + { // Config4 - Replication config has filters and SourceSelectionCriteria Disabled + Rules: []Rule{ + { + Status: Enabled, + Priority: 2, + DeleteMarkerReplication: DeleteMarkerReplication{Status: Enabled}, + DeleteReplication: DeleteReplication{Status: Enabled}, + SourceSelectionCriteria: SourceSelectionCriteria{ReplicaModifications: ReplicaModifications{Status: Disabled}}, + }, + }, + }, } testCases := []struct { opts ObjectOpts @@ -249,7 +260,9 @@ func TestReplicate(t *testing.T) { {ObjectOpts{Name: "abc/c4test", DeleteMarker: true, OpType: DeleteReplicationType}, cfgs[3], true}, //36. matches rule 2 - DeleteMarker replication allowed by rule {ObjectOpts{Name: "abc/c4test", DeleteMarker: true, VersionID: "vid", OpType: DeleteReplicationType}, cfgs[3], false}, //37. matches rule 2 - DeleteReplication disallowed by rule for permanent delete of DeleteMarker {ObjectOpts{Name: "abc/c4test", VersionID: "vid", OpType: DeleteReplicationType}, cfgs[3], false}, //38. matches rule 2 - DeleteReplication disallowed by rule for permanent delete of version - + // using config 4 - with replica modification sync disabled. + {ObjectOpts{Name: "xy/c5test", UserTags: "k1=v1", Replica: true}, cfgs[4], false}, //39. replica syncing disabled, this object is a replica + {ObjectOpts{Name: "xa/c5test", UserTags: "k1=v1", Replica: false}, cfgs[4], true}, //40. replica syncing disabled, this object is NOT a replica } for i, testCase := range testCases { diff --git a/pkg/bucket/replication/rule.go b/pkg/bucket/replication/rule.go index 97432dd3b..b272fdd48 100644 --- a/pkg/bucket/replication/rule.go +++ b/pkg/bucket/replication/rule.go @@ -98,9 +98,10 @@ type Rule struct { Priority int `xml:"Priority" json:"Priority"` DeleteMarkerReplication DeleteMarkerReplication `xml:"DeleteMarkerReplication" json:"DeleteMarkerReplication"` // MinIO extension to replicate versioned deletes - DeleteReplication DeleteReplication `xml:"DeleteReplication" json:"DeleteReplication"` - Destination Destination `xml:"Destination" json:"Destination"` - Filter Filter `xml:"Filter" json:"Filter"` + DeleteReplication DeleteReplication `xml:"DeleteReplication" json:"DeleteReplication"` + Destination Destination `xml:"Destination" json:"Destination"` + SourceSelectionCriteria SourceSelectionCriteria `xml:"SourceSelectionCriteria" json:"SourceSelectionCriteria"` + Filter Filter `xml:"Filter" json:"Filter"` } var ( @@ -192,6 +193,10 @@ func (r Rule) Validate(bucket string, sameTarget bool) error { if err := r.DeleteReplication.Validate(); err != nil { return err } + if err := r.SourceSelectionCriteria.Validate(); err != nil { + return err + } + if r.Priority < 0 { return errPriorityMissing } @@ -200,3 +205,12 @@ func (r Rule) Validate(bucket string, sameTarget bool) error { } return nil } + +// MetadataReplicate returns true if object is not a replica or in the case of replicas, +// replica modification sync is enabled. +func (r Rule) MetadataReplicate(obj ObjectOpts) bool { + if !obj.Replica { + return true + } + return obj.Replica && r.SourceSelectionCriteria.ReplicaModifications.Status == Enabled +} diff --git a/pkg/bucket/replication/rule_test.go b/pkg/bucket/replication/rule_test.go new file mode 100644 index 000000000..c390e1ed6 --- /dev/null +++ b/pkg/bucket/replication/rule_test.go @@ -0,0 +1,68 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package replication + +import ( + "bytes" + "fmt" + "testing" +) + +func TestMetadataReplicate(t *testing.T) { + testCases := []struct { + inputConfig string + opts ObjectOpts + expectedResult bool + }{ + // case 1 - rule with replica modification enabled; not a replica + {inputConfig: `arn:aws:iam::AcctID:role/role-nameEnabledDisabledDisabledkey-prefixarn:aws:s3:::destinationbucketEnabled`, + opts: ObjectOpts{Name: "c1test", DeleteMarker: false, OpType: ObjectReplicationType, Replica: false}, //1. Replica mod sync enabled; not a replica + expectedResult: true, + }, + // case 2 - rule with replica modification disabled; a replica + {inputConfig: `arn:aws:iam::AcctID:role/role-nameEnabledDisabledDisabledkey-prefixarn:aws:s3:::destinationbucketDisabled`, + opts: ObjectOpts{Name: "c2test", DeleteMarker: false, OpType: ObjectReplicationType, Replica: true}, //1. Replica mod sync enabled; a replica + expectedResult: false, + }, + // case 3 - rule with replica modification disabled; not a replica + {inputConfig: `arn:aws:iam::AcctID:role/role-nameEnabledDisabledDisabledkey-prefixarn:aws:s3:::destinationbucketDisabled`, + opts: ObjectOpts{Name: "c2test", DeleteMarker: false, OpType: ObjectReplicationType, Replica: false}, //1. Replica mod sync disabled; not a replica + expectedResult: true, + }, + + // case 4 - rule with replica modification enabled; a replica + {inputConfig: `arn:aws:iam::AcctID:role/role-nameEnabledDisabledDisabledkey-prefixarn:aws:s3:::destinationbucketEnabled`, + opts: ObjectOpts{Name: "c2test", DeleteMarker: false, OpType: MetadataReplicationType, Replica: true}, //1. Replica mod sync enabled; a replica + expectedResult: true, + }, + } + + for i, tc := range testCases { + tc := tc + t.Run(fmt.Sprintf("Test_%d", i+1), func(t *testing.T) { + cfg, err := ParseConfig(bytes.NewReader([]byte(tc.inputConfig))) + if err != nil { + t.Fatalf("Got unexpected error: %v", err) + } + if got := cfg.Rules[0].MetadataReplicate(tc.opts); got != tc.expectedResult { + t.Fatalf("Expected result with recursive set to false: `%v`, got: `%v`", tc.expectedResult, got) + } + }) + + } +} diff --git a/pkg/bucket/replication/sourceselectioncriteria.go b/pkg/bucket/replication/sourceselectioncriteria.go new file mode 100644 index 000000000..19768f52b --- /dev/null +++ b/pkg/bucket/replication/sourceselectioncriteria.go @@ -0,0 +1,76 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package replication + +import ( + "encoding/xml" +) + +// ReplicaModifications specifies if replica modification sync is enabled +type ReplicaModifications struct { + Status Status `xml:"Status" json:"Status"` +} + +// SourceSelectionCriteria - specifies additional source selection criteria in ReplicationConfiguration. +type SourceSelectionCriteria struct { + ReplicaModifications ReplicaModifications `xml:"ReplicaModifications" json:"ReplicaModifications"` +} + +// IsValid - checks whether SourceSelectionCriteria is valid or not. +func (s SourceSelectionCriteria) IsValid() bool { + return s.ReplicaModifications.Status == Enabled || s.ReplicaModifications.Status == Disabled +} + +// Validate source selection criteria +func (s SourceSelectionCriteria) Validate() error { + if (s == SourceSelectionCriteria{}) { + return nil + } + if !s.IsValid() { + return errInvalidSourceSelectionCriteria + } + return nil +} + +// UnmarshalXML - decodes XML data. +func (s *SourceSelectionCriteria) UnmarshalXML(dec *xml.Decoder, start xml.StartElement) (err error) { + // Make subtype to avoid recursive UnmarshalXML(). + type sourceSelectionCriteria SourceSelectionCriteria + ssc := sourceSelectionCriteria{} + if err := dec.DecodeElement(&ssc, &start); err != nil { + return err + } + if len(ssc.ReplicaModifications.Status) == 0 { + ssc.ReplicaModifications.Status = Enabled + } + *s = SourceSelectionCriteria(ssc) + return nil +} + +// MarshalXML - encodes to XML data. +func (s SourceSelectionCriteria) MarshalXML(e *xml.Encoder, start xml.StartElement) error { + if err := e.EncodeToken(start); err != nil { + return err + } + if s.IsValid() { + if err := e.EncodeElement(s.ReplicaModifications, xml.StartElement{Name: xml.Name{Local: "ReplicaModifications"}}); err != nil { + return err + } + } + return e.EncodeToken(xml.EndElement{Name: start.Name}) +}