diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 8abf74b42..975f7efa3 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -737,3 +737,32 @@ func extractReplicateDiffOpts(q url.Values) (opts madmin.ReplDiffOpts) { opts.Prefix = q.Get("prefix") return } + +const ( + replicationMRFDir = bucketMetaPrefix + SlashSeparator + replicationDir + SlashSeparator + "mrf" + mrfMetaFormat = 1 + mrfMetaVersionV1 = 1 + mrfMetaVersion = mrfMetaVersionV1 +) + +// MRFReplicateEntry mrf entry to save to disk +type MRFReplicateEntry struct { + Bucket string `json:"bucket" msg:"b"` + Object string `json:"object" msg:"o"` + versionID string `json:"-"` +} + +// MRFReplicateEntries has the map of MRF entries to save to disk +type MRFReplicateEntries struct { + Entries map[string]MRFReplicateEntry `json:"entries" msg:"e"` + Version int `json:"version" msg:"v"` +} + +// ToMRFEntry returns the relevant info needed by MRF +func (ri ReplicateObjectInfo) ToMRFEntry() MRFReplicateEntry { + return MRFReplicateEntry{ + Bucket: ri.Bucket, + Object: ri.Name, + versionID: ri.VersionID, + } +} diff --git a/cmd/bucket-replication-utils_gen.go b/cmd/bucket-replication-utils_gen.go index 7d2aa7dda..c8b22c572 100644 --- a/cmd/bucket-replication-utils_gen.go +++ b/cmd/bucket-replication-utils_gen.go @@ -260,6 +260,413 @@ func (z *BucketReplicationResyncStatus) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *MRFReplicateEntries) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "e": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Entries") + return + } + if z.Entries == nil { + z.Entries = make(map[string]MRFReplicateEntry, zb0002) + } else if len(z.Entries) > 0 { + for key := range z.Entries { + delete(z.Entries, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 MRFReplicateEntry + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Entries") + return + } + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Entries", za0001) + return + } + for zb0003 > 0 { + zb0003-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "Entries", za0001) + return + } + switch msgp.UnsafeString(field) { + case "b": + za0002.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Bucket") + return + } + case "o": + za0002.Object, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Object") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Entries", za0001) + return + } + } + } + z.Entries[za0001] = za0002 + } + case "v": + z.Version, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *MRFReplicateEntries) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "e" + err = en.Append(0x82, 0xa1, 0x65) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Entries))) + if err != nil { + err = msgp.WrapError(err, "Entries") + return + } + for za0001, za0002 := range z.Entries { + err = en.WriteString(za0001) + if err != nil { + err = msgp.WrapError(err, "Entries") + return + } + // map header, size 2 + // write "b" + err = en.Append(0x82, 0xa1, 0x62) + if err != nil { + return + } + err = en.WriteString(za0002.Bucket) + if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Bucket") + return + } + // write "o" + err = en.Append(0xa1, 0x6f) + if err != nil { + return + } + err = en.WriteString(za0002.Object) + if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Object") + return + } + } + // write "v" + err = en.Append(0xa1, 0x76) + if err != nil { + return + } + err = en.WriteInt(z.Version) + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *MRFReplicateEntries) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "e" + o = append(o, 0x82, 0xa1, 0x65) + o = msgp.AppendMapHeader(o, uint32(len(z.Entries))) + for za0001, za0002 := range z.Entries { + o = msgp.AppendString(o, za0001) + // map header, size 2 + // string "b" + o = append(o, 0x82, 0xa1, 0x62) + o = msgp.AppendString(o, za0002.Bucket) + // string "o" + o = append(o, 0xa1, 0x6f) + o = msgp.AppendString(o, za0002.Object) + } + // string "v" + o = append(o, 0xa1, 0x76) + o = msgp.AppendInt(o, z.Version) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *MRFReplicateEntries) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "e": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Entries") + return + } + if z.Entries == nil { + z.Entries = make(map[string]MRFReplicateEntry, zb0002) + } else if len(z.Entries) > 0 { + for key := range z.Entries { + delete(z.Entries, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 MRFReplicateEntry + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Entries") + return + } + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Entries", za0001) + return + } + for zb0003 > 0 { + zb0003-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "Entries", za0001) + return + } + switch msgp.UnsafeString(field) { + case "b": + za0002.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Bucket") + return + } + case "o": + za0002.Object, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Object") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "Entries", za0001) + return + } + } + } + z.Entries[za0001] = za0002 + } + case "v": + z.Version, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *MRFReplicateEntries) Msgsize() (s int) { + s = 1 + 2 + msgp.MapHeaderSize + if z.Entries != nil { + for za0001, za0002 := range z.Entries { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + 1 + 2 + msgp.StringPrefixSize + len(za0002.Bucket) + 2 + msgp.StringPrefixSize + len(za0002.Object) + } + } + s += 2 + msgp.IntSize + return +} + +// DecodeMsg implements msgp.Decodable +func (z *MRFReplicateEntry) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "b": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "o": + z.Object, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z MRFReplicateEntry) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "b" + err = en.Append(0x82, 0xa1, 0x62) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "o" + err = en.Append(0xa1, 0x6f) + if err != nil { + return + } + err = en.WriteString(z.Object) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z MRFReplicateEntry) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "b" + o = append(o, 0x82, 0xa1, 0x62) + o = msgp.AppendString(o, z.Bucket) + // string "o" + o = append(o, 0xa1, 0x6f) + o = msgp.AppendString(o, z.Object) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *MRFReplicateEntry) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "b": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "o": + z.Object, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z MRFReplicateEntry) Msgsize() (s int) { + s = 1 + 2 + msgp.StringPrefixSize + len(z.Bucket) + 2 + msgp.StringPrefixSize + len(z.Object) + return +} + // DecodeMsg implements msgp.Decodable func (z *ReplicateDecision) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/cmd/bucket-replication-utils_gen_test.go b/cmd/bucket-replication-utils_gen_test.go index 7ed7f1fef..9a8fd1ea7 100644 --- a/cmd/bucket-replication-utils_gen_test.go +++ b/cmd/bucket-replication-utils_gen_test.go @@ -122,6 +122,232 @@ func BenchmarkDecodeBucketReplicationResyncStatus(b *testing.B) { } } +func TestMarshalUnmarshalMRFReplicateEntries(t *testing.T) { + v := MRFReplicateEntries{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgMRFReplicateEntries(b *testing.B) { + v := MRFReplicateEntries{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgMRFReplicateEntries(b *testing.B) { + v := MRFReplicateEntries{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalMRFReplicateEntries(b *testing.B) { + v := MRFReplicateEntries{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeMRFReplicateEntries(t *testing.T) { + v := MRFReplicateEntries{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeMRFReplicateEntries Msgsize() is inaccurate") + } + + vn := MRFReplicateEntries{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeMRFReplicateEntries(b *testing.B) { + v := MRFReplicateEntries{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeMRFReplicateEntries(b *testing.B) { + v := MRFReplicateEntries{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalMRFReplicateEntry(t *testing.T) { + v := MRFReplicateEntry{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgMRFReplicateEntry(b *testing.B) { + v := MRFReplicateEntry{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgMRFReplicateEntry(b *testing.B) { + v := MRFReplicateEntry{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalMRFReplicateEntry(b *testing.B) { + v := MRFReplicateEntry{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeMRFReplicateEntry(t *testing.T) { + v := MRFReplicateEntry{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeMRFReplicateEntry Msgsize() is inaccurate") + } + + vn := MRFReplicateEntry{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeMRFReplicateEntry(b *testing.B) { + v := MRFReplicateEntry{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeMRFReplicateEntry(b *testing.B) { + v := MRFReplicateEntry{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalReplicateDecision(t *testing.T) { v := ReplicateDecision{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index a0c756091..3fa62692f 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -403,6 +403,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { + globalReplicationPool.queueMRFSave(dobj.ToMRFEntry()) logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, rcfg.RoleArn)) sendEvent(eventArgs{ BucketName: bucket, @@ -477,6 +478,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj eventName := event.ObjectReplicationComplete if replicationStatus == replication.Failed { eventName = event.ObjectReplicationFailed + globalReplicationPool.queueMRFSave(dobj.ToMRFEntry()) } drs := getReplicationState(rinfos, dobj.ReplicationState, dobj.VersionID) if replicationStatus != prevStatus { @@ -910,6 +912,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje Object: objInfo, Host: "Internal: [Replication]", }) + globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, cfg.RoleArn)) return } @@ -992,12 +995,12 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje // re-queue failures once more - keep a retry count to avoid flooding the queue if // the target site is down. Leave it to scanner to catch up instead. - if rinfos.ReplicationStatus() != replication.Completed && ri.RetryCount < 1 { + if rinfos.ReplicationStatus() != replication.Completed { ri.OpType = replication.HealReplicationType ri.EventType = ReplicateMRF ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal() ri.RetryCount++ - globalReplicationPool.queueReplicaFailedTask(ri) + globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) } } @@ -1304,6 +1307,19 @@ type DeletedObjectReplicationInfo struct { TargetArn string } +// ToMRFEntry returns the relevant info needed by MRF +func (di DeletedObjectReplicationInfo) ToMRFEntry() MRFReplicateEntry { + versionID := di.DeleteMarkerVersionID + if versionID == "" { + versionID = di.VersionID + } + return MRFReplicateEntry{ + Bucket: di.Bucket, + Object: di.ObjectName, + versionID: versionID, + } +} + // Replication specific APIName const ( ReplicateObjectAPI = "ReplicateObject" @@ -1348,13 +1364,16 @@ type ReplicationPool struct { mrfReplicaCh chan ReplicateObjectInfo existingReplicaCh chan ReplicateObjectInfo existingReplicaDeleteCh chan DeletedObjectReplicationInfo - workerSize int - mrfWorkerSize int - resyncState replicationResyncState - workerWg sync.WaitGroup - mrfWorkerWg sync.WaitGroup - once sync.Once - mu sync.Mutex + mrfSaveCh chan MRFReplicateEntry + + workerSize int + mrfWorkerSize int + resyncState replicationResyncState + workerWg sync.WaitGroup + mrfWorkerWg sync.WaitGroup + once sync.Once + mu sync.Mutex + mrfMutex sync.Mutex } // NewReplicationPool creates a pool of replication workers of specified size @@ -1368,6 +1387,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool existingReplicaCh: make(chan ReplicateObjectInfo, 100000), existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000), resyncState: replicationResyncState{statusMap: make(map[string]BucketReplicationResyncStatus)}, + mrfSaveCh: make(chan MRFReplicateEntry, 100000), ctx: ctx, objLayer: o, } @@ -1376,6 +1396,8 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool pool.ResizeFailedWorkers(opts.FailedWorkers) go pool.AddExistingObjectReplicateWorker() go pool.updateResyncStatus(ctx, o) + go pool.processMRF() + go pool.persistMRF() return pool } @@ -1481,33 +1503,17 @@ func (p *ReplicationPool) suggestedWorkers(failQueue bool) int { return int(float64(p.workerSize) * ReplicationWorkerMultiplier) } -func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) { - if p == nil { - return - } - select { - case <-GlobalContext.Done(): - p.once.Do(func() { - close(p.replicaCh) - close(p.mrfReplicaCh) - close(p.existingReplicaCh) - }) - case p.mrfReplicaCh <- ri: - default: - logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up retrying failed replication - we recommend increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`", p.suggestedWorkers(true)), string(replicationSubsystem)) - } -} - func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { if p == nil { return } - var ch chan ReplicateObjectInfo + var ch, healCh chan ReplicateObjectInfo switch ri.OpType { case replication.ExistingObjectReplicationType: ch = p.existingReplicaCh case replication.HealReplicationType: - fallthrough + ch = p.mrfReplicaCh + healCh = p.replicaCh default: ch = p.replicaCh } @@ -1518,6 +1524,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { close(p.mrfReplicaCh) close(p.existingReplicaCh) }) + case healCh <- ri: case ch <- ri: default: logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem)) @@ -2475,3 +2482,192 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf } return } + +const mrfTimeInterval = 5 * time.Minute + +func (p *ReplicationPool) persistMRF() { + var mu sync.Mutex + entries := make(map[string]MRFReplicateEntry) + mTimer := time.NewTimer(mrfTimeInterval) + defer mTimer.Stop() + saveMRFToDisk := func(drain bool) { + mu.Lock() + defer mu.Unlock() + if len(entries) == 0 { + return + } + cctx := p.ctx + if drain { + cctx = context.Background() + // drain all mrf entries and save to disk + for e := range p.mrfSaveCh { + entries[e.versionID] = e + } + } + if err := p.saveMRFEntries(cctx, entries); err != nil { + logger.LogOnceIf(p.ctx, fmt.Errorf("Unable to persist replication failures to disk:%w", err), string(replicationSubsystem)) + } + entries = make(map[string]MRFReplicateEntry) + return + } + for { + select { + case <-mTimer.C: + saveMRFToDisk(false) + mTimer.Reset(mrfTimeInterval) + case <-p.ctx.Done(): + close(p.mrfSaveCh) + saveMRFToDisk(true) + return + case e, ok := <-p.mrfSaveCh: + if !ok { + return + } + var cnt int + mu.Lock() + entries[e.versionID] = e + cnt = len(entries) + mu.Unlock() + if cnt >= cap(p.mrfSaveCh) || len(p.mrfSaveCh) >= int(0.8*float32(cap(p.mrfSaveCh))) { + saveMRFToDisk(true) + } + } + } +} + +func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) { + if p == nil { + return + } + select { + case <-GlobalContext.Done(): + return + case p.mrfSaveCh <- entry: + } +} + +// save mrf entries to mrf_.bin +func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) error { + if len(entries) == 0 { + return nil + } + v := MRFReplicateEntries{ + Entries: entries, + Version: mrfMetaVersionV1, + } + data := make([]byte, 4, v.Msgsize()+4) + + // Initialize the resync meta header. + binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat) + binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion) + + buf, err := v.MarshalMsg(data) + if err != nil { + return err + } + + configFile := path.Join(replicationMRFDir, mustGetUUID()+".bin") + err = saveConfig(ctx, p.objLayer, configFile, buf) + return err +} + +// load mrf entries from disk +func (p *ReplicationPool) loadMRF(fileName string) (re MRFReplicateEntries, e error) { + data, err := readConfig(p.ctx, p.objLayer, fileName) + if err != nil && err != errConfigNotFound { + return re, err + } + if len(data) == 0 { + // Seems to be empty. + return re, nil + } + if len(data) <= 4 { + return re, fmt.Errorf("replication mrf: no data") + } + // Read resync meta header + switch binary.LittleEndian.Uint16(data[0:2]) { + case mrfMetaFormat: + default: + return re, fmt.Errorf("replication mrf: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) + } + switch binary.LittleEndian.Uint16(data[2:4]) { + case mrfMetaVersion: + default: + return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) + } + // OK, parse data. + if _, err = re.UnmarshalMsg(data[4:]); err != nil { + return re, err + } + + switch re.Version { + case mrfMetaVersionV1: + default: + return re, fmt.Errorf("unexpected mrf meta version: %d", re.Version) + } + return re, nil +} + +func (p *ReplicationPool) processMRF() { + if p == nil || p.objLayer == nil { + return + } + pTimer := time.NewTimer(mrfTimeInterval) + defer pTimer.Stop() + for { + select { + case <-pTimer.C: + // skip healing if all targets are offline + var offlineCnt int + tgts := globalBucketTargetSys.ListTargets(p.ctx, "", "") + for _, tgt := range tgts { + if globalBucketTargetSys.isOffline(tgt.URL()) { + offlineCnt++ + } + } + if len(tgts) == offlineCnt { + pTimer.Reset(mrfTimeInterval) + continue + } + objCh := make(chan ObjectInfo) + cctx, cancelFn := context.WithCancel(p.ctx) + if err := p.objLayer.Walk(cctx, minioMetaBucket, replicationMRFDir, objCh, ObjectOptions{}); err != nil { + pTimer.Reset(mrfTimeInterval) + cancelFn() + logger.LogIf(p.ctx, err) + continue + } + for item := range objCh { + if err := p.queueMRFHeal(item.Name); err == nil { + p.objLayer.DeleteObject(p.ctx, minioMetaBucket, item.Name, ObjectOptions{}) + } + } + pTimer.Reset(mrfTimeInterval) + cancelFn() + case <-p.ctx.Done(): + return + } + } +} + +// process sends error logs to the heal channel for an attempt to heal replication. +func (p *ReplicationPool) queueMRFHeal(file string) error { + if p == nil || p.objLayer == nil { + return errServerNotInitialized + } + + mrfRec, err := p.loadMRF(file) + if err != nil { + return err + } + for vID, e := range mrfRec.Entries { + oi, err := p.objLayer.GetObjectInfo(p.ctx, e.Bucket, e.Object, ObjectOptions{ + VersionID: vID, + }) + if err != nil { + continue + } + QueueReplicationHeal(p.ctx, e.Bucket, oi) + } + return nil +}