replication: improve MRF healing. (#15556)

This PR improves the replication failure healing by persisting
most recent failures to disk and re-queuing them until the replication
is successful.

While this does not eliminate the need for healing during a full scan, 
queuing MRF vastly improves the ETA to keeping replicated buckets 
in sync as it does not wait for the scanner visit to detect unreplicated 
object versions.
This commit is contained in:
Poorna 2022-08-22 16:53:06 -07:00 committed by GitHub
parent 471467d310
commit 4155c5b695
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 886 additions and 28 deletions

View File

@ -737,3 +737,32 @@ func extractReplicateDiffOpts(q url.Values) (opts madmin.ReplDiffOpts) {
opts.Prefix = q.Get("prefix")
return
}
const (
replicationMRFDir = bucketMetaPrefix + SlashSeparator + replicationDir + SlashSeparator + "mrf"
mrfMetaFormat = 1
mrfMetaVersionV1 = 1
mrfMetaVersion = mrfMetaVersionV1
)
// MRFReplicateEntry mrf entry to save to disk
type MRFReplicateEntry struct {
Bucket string `json:"bucket" msg:"b"`
Object string `json:"object" msg:"o"`
versionID string `json:"-"`
}
// MRFReplicateEntries has the map of MRF entries to save to disk
type MRFReplicateEntries struct {
Entries map[string]MRFReplicateEntry `json:"entries" msg:"e"`
Version int `json:"version" msg:"v"`
}
// ToMRFEntry returns the relevant info needed by MRF
func (ri ReplicateObjectInfo) ToMRFEntry() MRFReplicateEntry {
return MRFReplicateEntry{
Bucket: ri.Bucket,
Object: ri.Name,
versionID: ri.VersionID,
}
}

View File

@ -260,6 +260,413 @@ func (z *BucketReplicationResyncStatus) Msgsize() (s int) {
return
}
// DecodeMsg implements msgp.Decodable
func (z *MRFReplicateEntries) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "e":
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
if z.Entries == nil {
z.Entries = make(map[string]MRFReplicateEntry, zb0002)
} else if len(z.Entries) > 0 {
for key := range z.Entries {
delete(z.Entries, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 MRFReplicateEntry
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001)
return
}
for zb0003 > 0 {
zb0003--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001)
return
}
switch msgp.UnsafeString(field) {
case "b":
za0002.Bucket, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Bucket")
return
}
case "o":
za0002.Object, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Object")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001)
return
}
}
}
z.Entries[za0001] = za0002
}
case "v":
z.Version, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "Version")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *MRFReplicateEntries) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "e"
err = en.Append(0x82, 0xa1, 0x65)
if err != nil {
return
}
err = en.WriteMapHeader(uint32(len(z.Entries)))
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
for za0001, za0002 := range z.Entries {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
// map header, size 2
// write "b"
err = en.Append(0x82, 0xa1, 0x62)
if err != nil {
return
}
err = en.WriteString(za0002.Bucket)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Bucket")
return
}
// write "o"
err = en.Append(0xa1, 0x6f)
if err != nil {
return
}
err = en.WriteString(za0002.Object)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Object")
return
}
}
// write "v"
err = en.Append(0xa1, 0x76)
if err != nil {
return
}
err = en.WriteInt(z.Version)
if err != nil {
err = msgp.WrapError(err, "Version")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *MRFReplicateEntries) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "e"
o = append(o, 0x82, 0xa1, 0x65)
o = msgp.AppendMapHeader(o, uint32(len(z.Entries)))
for za0001, za0002 := range z.Entries {
o = msgp.AppendString(o, za0001)
// map header, size 2
// string "b"
o = append(o, 0x82, 0xa1, 0x62)
o = msgp.AppendString(o, za0002.Bucket)
// string "o"
o = append(o, 0xa1, 0x6f)
o = msgp.AppendString(o, za0002.Object)
}
// string "v"
o = append(o, 0xa1, 0x76)
o = msgp.AppendInt(o, z.Version)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MRFReplicateEntries) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "e":
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
if z.Entries == nil {
z.Entries = make(map[string]MRFReplicateEntry, zb0002)
} else if len(z.Entries) > 0 {
for key := range z.Entries {
delete(z.Entries, key)
}
}
for zb0002 > 0 {
var za0001 string
var za0002 MRFReplicateEntry
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
var zb0003 uint32
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001)
return
}
for zb0003 > 0 {
zb0003--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001)
return
}
switch msgp.UnsafeString(field) {
case "b":
za0002.Bucket, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Bucket")
return
}
case "o":
za0002.Object, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Object")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001)
return
}
}
}
z.Entries[za0001] = za0002
}
case "v":
z.Version, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Version")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *MRFReplicateEntries) Msgsize() (s int) {
s = 1 + 2 + msgp.MapHeaderSize
if z.Entries != nil {
for za0001, za0002 := range z.Entries {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + 1 + 2 + msgp.StringPrefixSize + len(za0002.Bucket) + 2 + msgp.StringPrefixSize + len(za0002.Object)
}
}
s += 2 + msgp.IntSize
return
}
// DecodeMsg implements msgp.Decodable
func (z *MRFReplicateEntry) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "b":
z.Bucket, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Bucket")
return
}
case "o":
z.Object, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Object")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z MRFReplicateEntry) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "b"
err = en.Append(0x82, 0xa1, 0x62)
if err != nil {
return
}
err = en.WriteString(z.Bucket)
if err != nil {
err = msgp.WrapError(err, "Bucket")
return
}
// write "o"
err = en.Append(0xa1, 0x6f)
if err != nil {
return
}
err = en.WriteString(z.Object)
if err != nil {
err = msgp.WrapError(err, "Object")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z MRFReplicateEntry) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "b"
o = append(o, 0x82, 0xa1, 0x62)
o = msgp.AppendString(o, z.Bucket)
// string "o"
o = append(o, 0xa1, 0x6f)
o = msgp.AppendString(o, z.Object)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MRFReplicateEntry) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "b":
z.Bucket, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Bucket")
return
}
case "o":
z.Object, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Object")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z MRFReplicateEntry) Msgsize() (s int) {
s = 1 + 2 + msgp.StringPrefixSize + len(z.Bucket) + 2 + msgp.StringPrefixSize + len(z.Object)
return
}
// DecodeMsg implements msgp.Decodable
func (z *ReplicateDecision) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte

View File

@ -122,6 +122,232 @@ func BenchmarkDecodeBucketReplicationResyncStatus(b *testing.B) {
}
}
func TestMarshalUnmarshalMRFReplicateEntries(t *testing.T) {
v := MRFReplicateEntries{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgMRFReplicateEntries(b *testing.B) {
v := MRFReplicateEntries{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgMRFReplicateEntries(b *testing.B) {
v := MRFReplicateEntries{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalMRFReplicateEntries(b *testing.B) {
v := MRFReplicateEntries{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeMRFReplicateEntries(t *testing.T) {
v := MRFReplicateEntries{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeMRFReplicateEntries Msgsize() is inaccurate")
}
vn := MRFReplicateEntries{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeMRFReplicateEntries(b *testing.B) {
v := MRFReplicateEntries{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeMRFReplicateEntries(b *testing.B) {
v := MRFReplicateEntries{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalMRFReplicateEntry(t *testing.T) {
v := MRFReplicateEntry{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgMRFReplicateEntry(b *testing.B) {
v := MRFReplicateEntry{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgMRFReplicateEntry(b *testing.B) {
v := MRFReplicateEntry{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalMRFReplicateEntry(b *testing.B) {
v := MRFReplicateEntry{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeMRFReplicateEntry(t *testing.T) {
v := MRFReplicateEntry{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeMRFReplicateEntry Msgsize() is inaccurate")
}
vn := MRFReplicateEntry{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeMRFReplicateEntry(b *testing.B) {
v := MRFReplicateEntry{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeMRFReplicateEntry(b *testing.B) {
v := MRFReplicateEntry{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalReplicateDecision(t *testing.T) {
v := ReplicateDecision{}
bts, err := v.MarshalMsg(nil)

View File

@ -403,6 +403,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
globalReplicationPool.queueMRFSave(dobj.ToMRFEntry())
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, rcfg.RoleArn))
sendEvent(eventArgs{
BucketName: bucket,
@ -477,6 +478,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
eventName := event.ObjectReplicationComplete
if replicationStatus == replication.Failed {
eventName = event.ObjectReplicationFailed
globalReplicationPool.queueMRFSave(dobj.ToMRFEntry())
}
drs := getReplicationState(rinfos, dobj.ReplicationState, dobj.VersionID)
if replicationStatus != prevStatus {
@ -910,6 +912,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
Object: objInfo,
Host: "Internal: [Replication]",
})
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, cfg.RoleArn))
return
}
@ -992,12 +995,12 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
// re-queue failures once more - keep a retry count to avoid flooding the queue if
// the target site is down. Leave it to scanner to catch up instead.
if rinfos.ReplicationStatus() != replication.Completed && ri.RetryCount < 1 {
if rinfos.ReplicationStatus() != replication.Completed {
ri.OpType = replication.HealReplicationType
ri.EventType = ReplicateMRF
ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal()
ri.RetryCount++
globalReplicationPool.queueReplicaFailedTask(ri)
globalReplicationPool.queueMRFSave(ri.ToMRFEntry())
}
}
@ -1304,6 +1307,19 @@ type DeletedObjectReplicationInfo struct {
TargetArn string
}
// ToMRFEntry returns the relevant info needed by MRF
func (di DeletedObjectReplicationInfo) ToMRFEntry() MRFReplicateEntry {
versionID := di.DeleteMarkerVersionID
if versionID == "" {
versionID = di.VersionID
}
return MRFReplicateEntry{
Bucket: di.Bucket,
Object: di.ObjectName,
versionID: versionID,
}
}
// Replication specific APIName
const (
ReplicateObjectAPI = "ReplicateObject"
@ -1348,13 +1364,16 @@ type ReplicationPool struct {
mrfReplicaCh chan ReplicateObjectInfo
existingReplicaCh chan ReplicateObjectInfo
existingReplicaDeleteCh chan DeletedObjectReplicationInfo
workerSize int
mrfWorkerSize int
resyncState replicationResyncState
workerWg sync.WaitGroup
mrfWorkerWg sync.WaitGroup
once sync.Once
mu sync.Mutex
mrfSaveCh chan MRFReplicateEntry
workerSize int
mrfWorkerSize int
resyncState replicationResyncState
workerWg sync.WaitGroup
mrfWorkerWg sync.WaitGroup
once sync.Once
mu sync.Mutex
mrfMutex sync.Mutex
}
// NewReplicationPool creates a pool of replication workers of specified size
@ -1368,6 +1387,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
existingReplicaCh: make(chan ReplicateObjectInfo, 100000),
existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
resyncState: replicationResyncState{statusMap: make(map[string]BucketReplicationResyncStatus)},
mrfSaveCh: make(chan MRFReplicateEntry, 100000),
ctx: ctx,
objLayer: o,
}
@ -1376,6 +1396,8 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
pool.ResizeFailedWorkers(opts.FailedWorkers)
go pool.AddExistingObjectReplicateWorker()
go pool.updateResyncStatus(ctx, o)
go pool.processMRF()
go pool.persistMRF()
return pool
}
@ -1481,33 +1503,17 @@ func (p *ReplicationPool) suggestedWorkers(failQueue bool) int {
return int(float64(p.workerSize) * ReplicationWorkerMultiplier)
}
func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
if p == nil {
return
}
select {
case <-GlobalContext.Done():
p.once.Do(func() {
close(p.replicaCh)
close(p.mrfReplicaCh)
close(p.existingReplicaCh)
})
case p.mrfReplicaCh <- ri:
default:
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up retrying failed replication - we recommend increasing number of replication failed workers with `mc admin config set api replication_failed_workers=%d`", p.suggestedWorkers(true)), string(replicationSubsystem))
}
}
func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
if p == nil {
return
}
var ch chan ReplicateObjectInfo
var ch, healCh chan ReplicateObjectInfo
switch ri.OpType {
case replication.ExistingObjectReplicationType:
ch = p.existingReplicaCh
case replication.HealReplicationType:
fallthrough
ch = p.mrfReplicaCh
healCh = p.replicaCh
default:
ch = p.replicaCh
}
@ -1518,6 +1524,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
close(p.mrfReplicaCh)
close(p.existingReplicaCh)
})
case healCh <- ri:
case ch <- ri:
default:
logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing number of replicate object workers with `mc admin config set api replication_workers=%d`", p.suggestedWorkers(false)), string(replicationSubsystem))
@ -2475,3 +2482,192 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf
}
return
}
const mrfTimeInterval = 5 * time.Minute
func (p *ReplicationPool) persistMRF() {
var mu sync.Mutex
entries := make(map[string]MRFReplicateEntry)
mTimer := time.NewTimer(mrfTimeInterval)
defer mTimer.Stop()
saveMRFToDisk := func(drain bool) {
mu.Lock()
defer mu.Unlock()
if len(entries) == 0 {
return
}
cctx := p.ctx
if drain {
cctx = context.Background()
// drain all mrf entries and save to disk
for e := range p.mrfSaveCh {
entries[e.versionID] = e
}
}
if err := p.saveMRFEntries(cctx, entries); err != nil {
logger.LogOnceIf(p.ctx, fmt.Errorf("Unable to persist replication failures to disk:%w", err), string(replicationSubsystem))
}
entries = make(map[string]MRFReplicateEntry)
return
}
for {
select {
case <-mTimer.C:
saveMRFToDisk(false)
mTimer.Reset(mrfTimeInterval)
case <-p.ctx.Done():
close(p.mrfSaveCh)
saveMRFToDisk(true)
return
case e, ok := <-p.mrfSaveCh:
if !ok {
return
}
var cnt int
mu.Lock()
entries[e.versionID] = e
cnt = len(entries)
mu.Unlock()
if cnt >= cap(p.mrfSaveCh) || len(p.mrfSaveCh) >= int(0.8*float32(cap(p.mrfSaveCh))) {
saveMRFToDisk(true)
}
}
}
}
func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) {
if p == nil {
return
}
select {
case <-GlobalContext.Done():
return
case p.mrfSaveCh <- entry:
}
}
// save mrf entries to mrf_<uuid>.bin
func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) error {
if len(entries) == 0 {
return nil
}
v := MRFReplicateEntries{
Entries: entries,
Version: mrfMetaVersionV1,
}
data := make([]byte, 4, v.Msgsize()+4)
// Initialize the resync meta header.
binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat)
binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion)
buf, err := v.MarshalMsg(data)
if err != nil {
return err
}
configFile := path.Join(replicationMRFDir, mustGetUUID()+".bin")
err = saveConfig(ctx, p.objLayer, configFile, buf)
return err
}
// load mrf entries from disk
func (p *ReplicationPool) loadMRF(fileName string) (re MRFReplicateEntries, e error) {
data, err := readConfig(p.ctx, p.objLayer, fileName)
if err != nil && err != errConfigNotFound {
return re, err
}
if len(data) == 0 {
// Seems to be empty.
return re, nil
}
if len(data) <= 4 {
return re, fmt.Errorf("replication mrf: no data")
}
// Read resync meta header
switch binary.LittleEndian.Uint16(data[0:2]) {
case mrfMetaFormat:
default:
return re, fmt.Errorf("replication mrf: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case mrfMetaVersion:
default:
return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
// OK, parse data.
if _, err = re.UnmarshalMsg(data[4:]); err != nil {
return re, err
}
switch re.Version {
case mrfMetaVersionV1:
default:
return re, fmt.Errorf("unexpected mrf meta version: %d", re.Version)
}
return re, nil
}
func (p *ReplicationPool) processMRF() {
if p == nil || p.objLayer == nil {
return
}
pTimer := time.NewTimer(mrfTimeInterval)
defer pTimer.Stop()
for {
select {
case <-pTimer.C:
// skip healing if all targets are offline
var offlineCnt int
tgts := globalBucketTargetSys.ListTargets(p.ctx, "", "")
for _, tgt := range tgts {
if globalBucketTargetSys.isOffline(tgt.URL()) {
offlineCnt++
}
}
if len(tgts) == offlineCnt {
pTimer.Reset(mrfTimeInterval)
continue
}
objCh := make(chan ObjectInfo)
cctx, cancelFn := context.WithCancel(p.ctx)
if err := p.objLayer.Walk(cctx, minioMetaBucket, replicationMRFDir, objCh, ObjectOptions{}); err != nil {
pTimer.Reset(mrfTimeInterval)
cancelFn()
logger.LogIf(p.ctx, err)
continue
}
for item := range objCh {
if err := p.queueMRFHeal(item.Name); err == nil {
p.objLayer.DeleteObject(p.ctx, minioMetaBucket, item.Name, ObjectOptions{})
}
}
pTimer.Reset(mrfTimeInterval)
cancelFn()
case <-p.ctx.Done():
return
}
}
}
// process sends error logs to the heal channel for an attempt to heal replication.
func (p *ReplicationPool) queueMRFHeal(file string) error {
if p == nil || p.objLayer == nil {
return errServerNotInitialized
}
mrfRec, err := p.loadMRF(file)
if err != nil {
return err
}
for vID, e := range mrfRec.Entries {
oi, err := p.objLayer.GetObjectInfo(p.ctx, e.Bucket, e.Object, ObjectOptions{
VersionID: vID,
})
if err != nil {
continue
}
QueueReplicationHeal(p.ctx, e.Bucket, oi)
}
return nil
}