diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 6da177a50..73d8596ef 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -49,6 +49,7 @@ import ( "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" + "github.com/tinylib/msgp/msgp" "github.com/zeebo/xxh3" ) @@ -3051,7 +3052,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf const ( mrfSaveInterval = 5 * time.Minute - mrfQueueInterval = 6 * time.Minute + mrfQueueInterval = mrfSaveInterval + time.Minute // A minute higher than save interval mrfRetryLimit = 3 // max number of retries before letting scanner catch up on this object version mrfMaxEntries = 1000000 @@ -3066,46 +3067,37 @@ func (p *ReplicationPool) persistMRF() { mTimer := time.NewTimer(mrfSaveInterval) defer mTimer.Stop() - saveMRFToDisk := func(drain bool) { + saveMRFToDisk := func() { if len(entries) == 0 { return } - cctx := p.ctx - if drain { - cctx = context.Background() - // drain all mrf entries and save to disk - for e := range p.mrfSaveCh { - entries[e.versionID] = e - } - } // queue all entries for healing before overwriting the node mrf file if !contextCanceled(p.ctx) { p.queueMRFHeal() } - if err := p.saveMRFEntries(cctx, entries); err != nil { - logger.LogOnceIf(p.ctx, fmt.Errorf("unable to persist replication failures to disk:%w", err), string(replicationSubsystem)) - } + p.saveMRFEntries(p.ctx, entries) entries = make(map[string]MRFReplicateEntry) } for { select { case <-mTimer.C: - saveMRFToDisk(false) + saveMRFToDisk() mTimer.Reset(mrfSaveInterval) case <-p.ctx.Done(): p.mrfStopCh <- struct{}{} close(p.mrfSaveCh) - saveMRFToDisk(true) + // We try to save if possible, but we don't care beyond that. + saveMRFToDisk() return case e, ok := <-p.mrfSaveCh: if !ok { return } if len(entries) >= mrfMaxEntries { - saveMRFToDisk(true) + saveMRFToDisk() } entries[e.versionID] = e } @@ -3135,14 +3127,45 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) { } } +func (p *ReplicationPool) persistToDrive(ctx context.Context, v MRFReplicateEntries, data []byte) { + newReader := func() io.ReadCloser { + r, w := io.Pipe() + go func() { + mw := msgp.NewWriter(w) + n, err := mw.Write(data) + if err != nil { + w.CloseWithError(err) + return + } + if n != len(data) { + w.CloseWithError(io.ErrShortWrite) + return + } + err = v.EncodeMsg(mw) + mw.Flush() + w.CloseWithError(err) + }() + return r + } + + for _, localDrive := range globalLocalDrives { + r := newReader() + err := localDrive.CreateFile(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), -1, r) + r.Close() + if err == nil { + break + } + } +} + // save mrf entries to nodenamehex.bin -func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) error { +func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) { if !p.initialized() { - return nil + return } atomic.StoreUint64(&globalReplicationStats.mrfStats.LastFailedCount, uint64(len(entries))) if len(entries) == 0 { - return nil + return } v := MRFReplicateEntries{ @@ -3155,53 +3178,60 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string binary.LittleEndian.PutUint16(data[0:2], mrfMetaFormat) binary.LittleEndian.PutUint16(data[2:4], mrfMetaVersion) - buf, err := v.MarshalMsg(data) - if err != nil { - return err - } - - for _, localDrive := range globalLocalDrives { - if err := localDrive.WriteAll(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), buf); err == nil { - break - } - } - return nil + p.persistToDrive(ctx, v, data) } // load mrf entries from disk -func (p *ReplicationPool) loadMRF(data []byte) (re MRFReplicateEntries, e error) { - if !p.initialized() { +func (p *ReplicationPool) loadMRF() (mrfRec MRFReplicateEntries, err error) { + loadMRF := func(rc io.ReadCloser) (re MRFReplicateEntries, err error) { + defer rc.Close() + + if !p.initialized() { + return re, nil + } + data := make([]byte, 4) + n, err := rc.Read(data) + if err != nil { + return re, err + } + if n != len(data) { + return re, errors.New("replication mrf: no data") + } + // Read resync meta header + switch binary.LittleEndian.Uint16(data[0:2]) { + case mrfMetaFormat: + default: + return re, fmt.Errorf("replication mrf: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) + } + switch binary.LittleEndian.Uint16(data[2:4]) { + case mrfMetaVersion: + default: + return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) + } + + // OK, parse data. + // ignore any parsing errors, we do not care this file is generated again anyways. + re.DecodeMsg(msgp.NewReader(rc)) + return re, nil } - if len(data) == 0 { - // Seems to be empty. - return re, nil - } - if len(data) <= 4 { - return re, fmt.Errorf("replication mrf: no data") - } - // Read resync meta header - switch binary.LittleEndian.Uint16(data[0:2]) { - case mrfMetaFormat: - default: - return re, fmt.Errorf("replication mrf: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) - } - switch binary.LittleEndian.Uint16(data[2:4]) { - case mrfMetaVersion: - default: - return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) - } - // OK, parse data. - if _, err := re.UnmarshalMsg(data[4:]); err != nil { - return re, err + for _, localDrive := range globalLocalDrives { + rc, err := localDrive.ReadFileStream(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), 0, -1) + if err != nil { + continue + } + + mrfRec, err = loadMRF(rc) + if err != nil { + continue + } + + // finally delete the file after processing mrf entries + localDrive.Delete(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), DeleteOptions{}) + break } - switch re.Version { - case mrfMetaVersionV1: - default: - return re, fmt.Errorf("unexpected mrf meta version: %d", re.Version) - } - return re, nil + return mrfRec, nil } func (p *ReplicationPool) processMRF() { @@ -3244,39 +3274,28 @@ func (p *ReplicationPool) queueMRFHeal() error { return errServerNotInitialized } - for _, localDrive := range globalLocalDrives { - buf, err := localDrive.ReadAll(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin")) - if err != nil { - continue - } - - mrfRec, err := p.loadMRF(buf) - if err != nil { - continue - } - - // finally delete the file after processing mrf entries - localDrive.Delete(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), DeleteOptions{}) - - // queue replication heal in a goroutine to avoid holding up mrf save routine - go func(mrfRec MRFReplicateEntries) { - for vID, e := range mrfRec.Entries { - ctx, cancel := context.WithTimeout(p.ctx, time.Second) // Do not waste more than a second on this. - - oi, err := p.objLayer.GetObjectInfo(ctx, e.Bucket, e.Object, ObjectOptions{ - VersionID: vID, - }) - cancel() - if err != nil { - continue - } - - QueueReplicationHeal(p.ctx, e.Bucket, oi, e.RetryCount) - } - }(mrfRec) - break + mrfRec, err := p.loadMRF() + if err != nil { + return err } + // queue replication heal in a goroutine to avoid holding up mrf save routine + go func() { + for vID, e := range mrfRec.Entries { + ctx, cancel := context.WithTimeout(p.ctx, time.Second) // Do not waste more than a second on this. + + oi, err := p.objLayer.GetObjectInfo(ctx, e.Bucket, e.Object, ObjectOptions{ + VersionID: vID, + }) + cancel() + if err != nil { + continue + } + + QueueReplicationHeal(p.ctx, e.Bucket, oi, e.RetryCount) + } + }() + return nil } @@ -3286,33 +3305,28 @@ func (p *ReplicationPool) initialized() bool { // getMRF returns MRF entries for this node. func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch chan madmin.ReplicationMRF, err error) { + mrfRec, err := p.loadMRF() + if err != nil { + return nil, err + } + mrfCh := make(chan madmin.ReplicationMRF, 100) go func() { defer close(mrfCh) - for _, localDrive := range globalLocalDrives { - buf, err := localDrive.ReadAll(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin")) - if err != nil { + for vID, e := range mrfRec.Entries { + if e.Bucket != bucket && bucket != "" { continue } - mrfRec, err := p.loadMRF(buf) - if err != nil { - continue - } - for vID, e := range mrfRec.Entries { - if e.Bucket != bucket && bucket != "" { - continue - } - select { - case mrfCh <- madmin.ReplicationMRF{ - NodeName: globalLocalNodeName, - Object: e.Object, - VersionID: vID, - Bucket: e.Bucket, - RetryCount: e.RetryCount, - }: - case <-ctx.Done(): - return - } + select { + case mrfCh <- madmin.ReplicationMRF{ + NodeName: globalLocalNodeName, + Object: e.Object, + VersionID: vID, + Bucket: e.Bucket, + RetryCount: e.RetryCount, + }: + case <-ctx.Done(): + return } } }() diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 6b49b9486..da9c4d804 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1790,7 +1790,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off return nil, err } - odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect + odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect && length >= 0 var file *os.File if odirectEnabled { @@ -1822,6 +1822,10 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off } } + if length < 0 { + return file, nil + } + st, err := file.Stat() if err != nil { file.Close() @@ -1889,10 +1893,6 @@ func (c closeWrapper) Close() error { // CreateFile - creates the file. func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSize int64, r io.Reader) (err error) { - if fileSize < -1 { - return errInvalidArgument - } - volumeDir, err := s.getVolDir(volume) if err != nil { return err @@ -1929,7 +1929,7 @@ func (s *xlStorage) writeAllDirect(ctx context.Context, filePath string, fileSiz return osErrToFileErr(err) } - odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect + odirectEnabled := globalAPIConfig.odirectEnabled() && s.oDirect && fileSize > 0 var w *os.File if odirectEnabled {