feat: Add support for site level resync (#15753)

This commit is contained in:
Poorna 2022-11-14 07:16:40 -08:00 committed by GitHub
parent 7ac64ad24a
commit d6bc141bd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1442 additions and 120 deletions

View File

@ -515,3 +515,45 @@ func (a adminAPIHandlers) SRPeerRemove(w http.ResponseWriter, r *http.Request) {
return
}
}
// SiteReplicationResyncOp - PUT /minio/admin/v3/site-replication/resync/op
func (a adminAPIHandlers) SiteReplicationResyncOp(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "SiteReplicationResyncOp")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationResyncAction)
if objectAPI == nil {
return
}
var peerSite madmin.PeerInfo
if err := parseJSONBody(ctx, r.Body, &peerSite, ""); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
vars := mux.Vars(r)
op := madmin.SiteResyncOp(vars["operation"])
var (
status madmin.SRResyncOpStatus
err error
)
switch op {
case madmin.SiteResyncStart:
status, err = globalSiteReplicationSys.startResync(ctx, objectAPI, peerSite)
case madmin.SiteResyncCancel:
status, err = globalSiteReplicationSys.cancelResync(ctx, objectAPI, peerSite)
default:
err = errSRInvalidRequest(errInvalidArgument)
}
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
body, err := json.Marshal(status)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
writeSuccessResponseJSON(w, body)
}

View File

@ -428,7 +428,7 @@ func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request)
}
}
}
dID := r.Form.Get("by-depID")
done := ctx.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
@ -441,15 +441,16 @@ func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request)
hosts: hostMap,
disks: diskMap,
jobID: jobID,
depID: dID,
})
m.Merge(&mLocal)
// Allow half the interval for collecting remote...
cctx, cancel := context.WithTimeout(ctx, interval/2)
mRemote := collectRemoteMetrics(cctx, types, collectMetricsOpts{
hosts: hostMap,
disks: diskMap,
jobID: jobID,
depID: dID,
})
cancel()
m.Merge(&mRemote)

View File

@ -258,6 +258,7 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationEdit)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerEdit)))
adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/remove").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerRemove)))
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/site-replication/resync/op").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationResyncOp))).Queries("operation", "{operation:.*}")
if globalIsDistErasure {
// Top locks

View File

@ -321,7 +321,12 @@ func (api objectAPIHandlers) ResetBucketReplicationStartHandler(w http.ResponseW
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
}
}
if err := startReplicationResync(ctx, bucket, arn, resetID, resetBeforeDate, objectAPI); err != nil {
if err := globalReplicationPool.resyncer.start(ctx, objectAPI, resyncOpts{
bucket: bucket,
arn: arn,
resyncID: resetID,
resyncBefore: resetBeforeDate,
}); err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
Bucket: bucket,
Err: err,
@ -370,10 +375,13 @@ func (api objectAPIHandlers) ResetBucketReplicationStatusHandler(w http.Response
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
globalReplicationPool.resyncState.RLock()
brs, ok := globalReplicationPool.resyncState.statusMap[bucket]
globalReplicationPool.resyncState.RUnlock()
var tgtStats map[string]TargetReplicationResyncStatus
globalReplicationPool.resyncer.RLock()
brs, ok := globalReplicationPool.resyncer.statusMap[bucket]
if ok {
tgtStats = brs.cloneTgtStats()
}
globalReplicationPool.resyncer.RUnlock()
if !ok {
brs, err = loadBucketResyncMetadata(ctx, bucket, objectAPI)
if err != nil {
@ -383,10 +391,11 @@ func (api objectAPIHandlers) ResetBucketReplicationStatusHandler(w http.Response
}), r.URL)
return
}
tgtStats = brs.cloneTgtStats()
}
var rinfo ResyncTargetsInfo
for tarn, st := range brs.TargetsMap {
for tarn, st := range tgtStats {
if arn != "" && tarn != arn {
continue
}
@ -394,7 +403,7 @@ func (api objectAPIHandlers) ResetBucketReplicationStatusHandler(w http.Response
Arn: tarn,
ResetID: st.ResyncID,
StartTime: st.StartTime,
EndTime: st.EndTime,
EndTime: st.LastUpdate,
ResyncStatus: st.ResyncStatus.String(),
ReplicatedSize: st.ReplicatedSize,
ReplicatedCount: st.ReplicatedCount,

View File

@ -628,9 +628,12 @@ func (v VersionPurgeStatusType) Pending() bool {
return v == Pending || v == Failed
}
type replicationResyncState struct {
type replicationResyncer struct {
// map of bucket to their resync status
statusMap map[string]BucketReplicationResyncStatus
statusMap map[string]BucketReplicationResyncStatus
workerSize int
resyncCancelCh chan struct{}
workerCh chan struct{}
sync.RWMutex
}
@ -642,12 +645,23 @@ const (
resyncMetaVersion = resyncMetaVersionV1
)
type resyncOpts struct {
bucket string
arn string
resyncID string
resyncBefore time.Time
}
// ResyncStatusType status of resync operation
type ResyncStatusType int
const (
// NoResync - no resync in progress
NoResync ResyncStatusType = iota
// ResyncPending - resync pending
ResyncPending
// ResyncCanceled - resync canceled
ResyncCanceled
// ResyncStarted - resync in progress
ResyncStarted
// ResyncCompleted - resync finished
@ -656,6 +670,10 @@ const (
ResyncFailed
)
func (rt ResyncStatusType) isValid() bool {
return rt != NoResync
}
func (rt ResyncStatusType) String() string {
switch rt {
case ResyncStarted:
@ -664,6 +682,10 @@ func (rt ResyncStatusType) String() string {
return "Completed"
case ResyncFailed:
return "Failed"
case ResyncPending:
return "Pending"
case ResyncCanceled:
return "Canceled"
default:
return ""
}
@ -671,8 +693,8 @@ func (rt ResyncStatusType) String() string {
// TargetReplicationResyncStatus status of resync of bucket for a specific target
type TargetReplicationResyncStatus struct {
StartTime time.Time `json:"startTime" msg:"st"`
EndTime time.Time `json:"endTime" msg:"et"`
StartTime time.Time `json:"startTime" msg:"st"`
LastUpdate time.Time `json:"lastUpdated" msg:"lst"`
// Resync ID assigned to this reset
ResyncID string `json:"resyncID" msg:"id"`
// ResyncBeforeDate - resync all objects created prior to this date
@ -701,6 +723,14 @@ type BucketReplicationResyncStatus struct {
LastUpdate time.Time `json:"lastUpdate" msg:"lu"`
}
func (rs *BucketReplicationResyncStatus) cloneTgtStats() (m map[string]TargetReplicationResyncStatus) {
m = make(map[string]TargetReplicationResyncStatus)
for arn, st := range rs.TargetsMap {
m[arn] = st
}
return
}
func newBucketResyncStatus(bucket string) BucketReplicationResyncStatus {
return BucketReplicationResyncStatus{
TargetsMap: make(map[string]TargetReplicationResyncStatus),

View File

@ -2117,10 +2117,10 @@ func (z *TargetReplicationResyncStatus) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "StartTime")
return
}
case "et":
z.EndTime, err = dc.ReadTime()
case "lst":
z.LastUpdate, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "EndTime")
err = msgp.WrapError(err, "LastUpdate")
return
}
case "id":
@ -2205,14 +2205,14 @@ func (z *TargetReplicationResyncStatus) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "StartTime")
return
}
// write "et"
err = en.Append(0xa2, 0x65, 0x74)
// write "lst"
err = en.Append(0xa3, 0x6c, 0x73, 0x74)
if err != nil {
return
}
err = en.WriteTime(z.EndTime)
err = en.WriteTime(z.LastUpdate)
if err != nil {
err = msgp.WrapError(err, "EndTime")
err = msgp.WrapError(err, "LastUpdate")
return
}
// write "id"
@ -2315,9 +2315,9 @@ func (z *TargetReplicationResyncStatus) MarshalMsg(b []byte) (o []byte, err erro
// string "st"
o = append(o, 0x8b, 0xa2, 0x73, 0x74)
o = msgp.AppendTime(o, z.StartTime)
// string "et"
o = append(o, 0xa2, 0x65, 0x74)
o = msgp.AppendTime(o, z.EndTime)
// string "lst"
o = append(o, 0xa3, 0x6c, 0x73, 0x74)
o = msgp.AppendTime(o, z.LastUpdate)
// string "id"
o = append(o, 0xa2, 0x69, 0x64)
o = msgp.AppendString(o, z.ResyncID)
@ -2372,10 +2372,10 @@ func (z *TargetReplicationResyncStatus) UnmarshalMsg(bts []byte) (o []byte, err
err = msgp.WrapError(err, "StartTime")
return
}
case "et":
z.EndTime, bts, err = msgp.ReadTimeBytes(bts)
case "lst":
z.LastUpdate, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "EndTime")
err = msgp.WrapError(err, "LastUpdate")
return
}
case "id":
@ -2450,7 +2450,7 @@ func (z *TargetReplicationResyncStatus) UnmarshalMsg(bts []byte) (o []byte, err
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *TargetReplicationResyncStatus) Msgsize() (s int) {
s = 1 + 3 + msgp.TimeSize + 3 + msgp.TimeSize + 3 + msgp.StringPrefixSize + len(z.ResyncID) + 4 + msgp.TimeSize + 4 + msgp.IntSize + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Object)
s = 1 + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 3 + msgp.StringPrefixSize + len(z.ResyncID) + 4 + msgp.TimeSize + 4 + msgp.IntSize + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Object)
return
}

View File

@ -24,6 +24,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"net/http"
"path"
"reflect"
@ -1519,17 +1520,16 @@ type ReplicationPool struct {
existingReplicaDeleteCh chan DeletedObjectReplicationInfo
mrfSaveCh chan MRFReplicateEntry
saveStateCh chan struct{}
workerSize int
mrfWorkerSize int
activeWorkers int32
activeMRFWorkers int32
priority string
resyncState replicationResyncState
workerWg sync.WaitGroup
mrfWorkerWg sync.WaitGroup
once sync.Once
mu sync.RWMutex
workerSize int
mrfWorkerSize int
activeWorkers int32
activeMRFWorkers int32
priority string
resyncer *replicationResyncer
workerWg sync.WaitGroup
mrfWorkerWg sync.WaitGroup
once sync.Once
mu sync.RWMutex
}
const (
@ -1578,7 +1578,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
mrfWorkerKillCh: make(chan struct{}, failedWorkers),
existingReplicaCh: make(chan ReplicateObjectInfo, 100000),
existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000),
resyncState: replicationResyncState{statusMap: make(map[string]BucketReplicationResyncStatus)},
resyncer: newresyncer(),
mrfSaveCh: make(chan MRFReplicateEntry, 100000),
saveStateCh: make(chan struct{}, 1),
ctx: ctx,
@ -1589,7 +1589,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool
pool.ResizeWorkers(workers)
pool.ResizeFailedWorkers(failedWorkers)
go pool.AddExistingObjectReplicateWorker()
go pool.updateResyncStatus(ctx, o)
go pool.resyncer.PersistToDisk(ctx, o)
go pool.processMRF()
go pool.persistMRF()
go pool.saveStatsToDisk()
@ -2147,8 +2147,8 @@ func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate tim
const resyncTimeInterval = time.Minute * 1
// updateResyncStatus persists in-memory resync metadata stats to disk at periodic intervals
func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI ObjectLayer) {
// PersistToDisk persists in-memory resync metadata stats to disk at periodic intervals
func (s *replicationResyncer) PersistToDisk(ctx context.Context, objectAPI ObjectLayer) {
resyncTimer := time.NewTimer(resyncTimeInterval)
defer resyncTimer.Stop()
@ -2159,12 +2159,12 @@ func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI Obje
for {
select {
case <-resyncTimer.C:
p.resyncState.RLock()
for bucket, brs := range p.resyncState.statusMap {
s.RLock()
for bucket, brs := range s.statusMap {
var updt bool
// Save the replication status if one resync to any bucket target is still not finished
for _, st := range brs.TargetsMap {
if st.EndTime.Equal(timeSentinel) {
if st.LastUpdate.Equal(timeSentinel) {
updt = true
break
}
@ -2181,7 +2181,7 @@ func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI Obje
}
}
}
p.resyncState.RUnlock()
s.RUnlock()
resyncTimer.Reset(resyncTimeInterval)
case <-ctx.Done():
@ -2192,31 +2192,54 @@ func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI Obje
}
}
const resyncWorkerCnt = 50 // limit of number of bucket resyncs is progress at any given time
func newresyncer() *replicationResyncer {
rs := replicationResyncer{
statusMap: make(map[string]BucketReplicationResyncStatus),
workerSize: resyncWorkerCnt,
resyncCancelCh: make(chan struct{}, resyncWorkerCnt),
workerCh: make(chan struct{}, resyncWorkerCnt),
}
for i := 0; i < rs.workerSize; i++ {
rs.workerCh <- struct{}{}
}
return &rs
}
// resyncBucket resyncs all qualifying objects as per replication rules for the target
// ARN
func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI ObjectLayer) {
func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI ObjectLayer, heal bool, opts resyncOpts) {
select {
case <-s.workerCh: // block till a worker is available
case <-ctx.Done():
return
}
resyncStatus := ResyncFailed
defer func() {
globalReplicationPool.resyncState.Lock()
m := globalReplicationPool.resyncState.statusMap[bucket]
st := m.TargetsMap[arn]
st.EndTime = UTCNow()
s.Lock()
m := s.statusMap[opts.bucket]
st := m.TargetsMap[opts.arn]
st.LastUpdate = UTCNow()
st.ResyncStatus = resyncStatus
m.TargetsMap[arn] = st
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
globalReplicationPool.resyncState.statusMap[bucket] = m
globalReplicationPool.resyncState.Unlock()
s.statusMap[opts.bucket] = m
s.Unlock()
globalSiteResyncMetrics.incBucket(opts, resyncStatus)
s.workerCh <- struct{}{}
}()
// Allocate new results channel to receive ObjectInfo.
objInfoCh := make(chan ObjectInfo)
cfg, err := getReplicationConfig(ctx, bucket)
cfg, err := getReplicationConfig(ctx, opts.bucket)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed with %w", bucket, arn, err))
logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err))
return
}
tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, opts.bucket)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed %w", bucket, arn, err))
logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed %w", opts.bucket, opts.arn, err))
return
}
rcfg := replicationConfig{
@ -2226,34 +2249,50 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI
tgtArns := cfg.FilterTargetArns(
replication.ObjectOpts{
OpType: replication.ResyncReplicationType,
TargetArn: arn,
TargetArn: opts.arn,
})
if len(tgtArns) != 1 {
logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - arn specified %s is missing in the replication config", bucket, arn))
logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn))
return
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, arn)
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, opts.arn)
if tgt == nil {
logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - target could not be created for arn %s", bucket, arn))
logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn))
return
}
// mark resync status as resync started
if !heal {
s.Lock()
m := s.statusMap[opts.bucket]
st := m.TargetsMap[opts.arn]
st.ResyncStatus = ResyncStarted
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
s.statusMap[opts.bucket] = m
s.Unlock()
}
// Walk through all object versions - Walk() is always in ascending order needed to ensure
// delete marker replicated to target after object version is first created.
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{}); err != nil {
if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, ObjectOptions{}); err != nil {
logger.LogIf(ctx, err)
return
}
globalReplicationPool.resyncState.RLock()
m := globalReplicationPool.resyncState.statusMap[bucket]
st := m.TargetsMap[arn]
globalReplicationPool.resyncState.RUnlock()
s.RLock()
m := s.statusMap[opts.bucket]
st := m.TargetsMap[opts.arn]
s.RUnlock()
var lastCheckpoint string
if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed {
lastCheckpoint = st.Object
}
for obj := range objInfoCh {
select {
case <-s.resyncCancelCh:
resyncStatus = ResyncCanceled
return
default:
}
if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name {
continue
}
@ -2263,7 +2302,7 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI
if !roi.ExistingObjResync.mustResync() {
continue
}
traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID))
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
versionID := ""
dmVersionID := ""
@ -2298,96 +2337,134 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI
ReplicationProxyRequest: "false",
},
})
globalReplicationPool.resyncState.Lock()
m = globalReplicationPool.resyncState.statusMap[bucket]
st = m.TargetsMap[arn]
s.Lock()
m = s.statusMap[opts.bucket]
st = m.TargetsMap[opts.arn]
st.Object = roi.Name
success := true
if err != nil {
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, bucket, roi.Name)) {
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
st.ReplicatedCount++
} else {
st.FailedCount++
success = false
}
} else {
st.ReplicatedCount++
st.ReplicatedSize += roi.Size
}
m.TargetsMap[arn] = st
m.TargetsMap[opts.arn] = st
m.LastUpdate = UTCNow()
globalReplicationPool.resyncState.statusMap[bucket] = m
globalReplicationPool.resyncState.Unlock()
s.statusMap[opts.bucket] = m
s.Unlock()
traceFn(err)
globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID)
}
resyncStatus = ResyncCompleted
}
// start replication resync for the remote target ARN specified
func startReplicationResync(ctx context.Context, bucket, arn, resyncID string, resyncBeforeDate time.Time, objAPI ObjectLayer) error {
if bucket == "" {
func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opts resyncOpts) error {
if opts.bucket == "" {
return fmt.Errorf("bucket name is empty")
}
if arn == "" {
if opts.arn == "" {
return fmt.Errorf("target ARN specified for resync is empty")
}
// Check if the current bucket has quota restrictions, if not skip it
cfg, err := getReplicationConfig(ctx, bucket)
cfg, err := getReplicationConfig(ctx, opts.bucket)
if err != nil {
return err
}
tgtArns := cfg.FilterTargetArns(
replication.ObjectOpts{
OpType: replication.ResyncReplicationType,
TargetArn: arn,
TargetArn: opts.arn,
})
if len(tgtArns) == 0 {
return fmt.Errorf("arn %s specified for resync not found in replication config", arn)
return fmt.Errorf("arn %s specified for resync not found in replication config", opts.arn)
}
data, err := loadBucketResyncMetadata(ctx, bucket, objAPI)
if err != nil {
return err
globalReplicationPool.resyncer.RLock()
data, ok := globalReplicationPool.resyncer.statusMap[opts.bucket]
globalReplicationPool.resyncer.RUnlock()
if !ok {
data, err = loadBucketResyncMetadata(ctx, opts.bucket, objAPI)
if err != nil {
return err
}
}
// validate if resync is in progress for this arn
for tArn, st := range data.TargetsMap {
if arn == tArn && st.ResyncStatus == ResyncStarted {
return fmt.Errorf("Resync of bucket %s is already in progress for remote bucket %s", bucket, arn)
if opts.arn == tArn && (st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncPending) {
return fmt.Errorf("Resync of bucket %s is already in progress for remote bucket %s", opts.bucket, opts.arn)
}
}
status := TargetReplicationResyncStatus{
ResyncID: resyncID,
ResyncBeforeDate: resyncBeforeDate,
ResyncID: opts.resyncID,
ResyncBeforeDate: opts.resyncBefore,
StartTime: UTCNow(),
ResyncStatus: ResyncStarted,
Bucket: bucket,
ResyncStatus: ResyncPending,
Bucket: opts.bucket,
}
data.TargetsMap[arn] = status
if err = saveResyncStatus(ctx, bucket, data, objAPI); err != nil {
data.TargetsMap[opts.arn] = status
if err = saveResyncStatus(ctx, opts.bucket, data, objAPI); err != nil {
return err
}
globalReplicationPool.resyncState.Lock()
defer globalReplicationPool.resyncState.Unlock()
brs, ok := globalReplicationPool.resyncState.statusMap[bucket]
globalReplicationPool.resyncer.Lock()
defer globalReplicationPool.resyncer.Unlock()
brs, ok := globalReplicationPool.resyncer.statusMap[opts.bucket]
if !ok {
brs = BucketReplicationResyncStatus{
Version: resyncMetaVersion,
TargetsMap: make(map[string]TargetReplicationResyncStatus),
}
}
brs.TargetsMap[arn] = status
globalReplicationPool.resyncState.statusMap[bucket] = brs
go resyncBucket(GlobalContext, bucket, arn, false, objAPI)
brs.TargetsMap[opts.arn] = status
globalReplicationPool.resyncer.statusMap[opts.bucket] = brs
go globalReplicationPool.resyncer.resyncBucket(GlobalContext, objAPI, false, opts)
return nil
}
func (s *replicationResyncer) trace(resyncID string, path string) func(err error) {
startTime := time.Now()
return func(err error) {
duration := time.Since(startTime)
if globalTrace.NumSubscribers(madmin.TraceReplicationResync) > 0 {
globalTrace.Publish(replicationResyncTrace(resyncID, startTime, duration, path, err))
}
}
}
func replicationResyncTrace(resyncID string, startTime time.Time, duration time.Duration, path string, err error) madmin.TraceInfo {
var errStr string
if err != nil {
errStr = err.Error()
}
funcName := fmt.Sprintf("replication.(resyncID=%s)", resyncID)
return madmin.TraceInfo{
TraceType: madmin.TraceReplicationResync,
Time: startTime,
NodeName: globalLocalNodeName,
FuncName: funcName,
Duration: duration,
Path: path,
Error: errStr,
}
}
// delete resync metadata from replication resync state in memory
func (p *ReplicationPool) deleteResyncMetadata(ctx context.Context, bucket string) {
if p == nil {
return
}
p.resyncState.Lock()
delete(p.resyncState.statusMap, bucket)
defer p.resyncState.Unlock()
p.resyncer.Lock()
delete(p.resyncer.statusMap, bucket)
defer p.resyncer.Unlock()
globalSiteResyncMetrics.deleteBucket(bucket)
}
// initResync - initializes bucket replication resync for all buckets.
@ -2396,12 +2473,44 @@ func (p *ReplicationPool) initResync(ctx context.Context, buckets []BucketInfo,
return errServerNotInitialized
}
// Load bucket metadata sys in background
go p.loadResync(ctx, buckets, objAPI)
go p.startResyncRoutine(ctx, buckets, objAPI)
return nil
}
func (p *ReplicationPool) startResyncRoutine(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Run the replication resync in a loop
for {
if err := p.loadResync(ctx, buckets, objAPI); err == nil {
<-ctx.Done()
return
}
duration := time.Duration(r.Float64() * float64(time.Minute))
if duration < time.Second {
// Make sure to sleep atleast a second to avoid high CPU ticks.
duration = time.Second
}
time.Sleep(duration)
}
}
var replicationResyncLockTimeout = newDynamicTimeoutWithOpts(dynamicTimeoutOpts{
timeout: 30 * time.Second,
minimum: 10 * time.Second,
retryInterval: time.Second,
})
// Loads bucket replication resync statuses into memory.
func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
// Make sure only one node running resync on the cluster.
locker := objAPI.NewNSLock(minioMetaBucket, "replication/resync.lock")
lkctx, err := locker.GetLock(ctx, replicationResyncLockTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lkctx.Cancel()
// No unlock for "leader" lock.
for index := range buckets {
meta, err := loadBucketResyncMetadata(ctx, buckets[index].Name, objAPI)
if err != nil {
@ -2410,30 +2519,38 @@ func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo,
}
continue
}
p.resyncState.Lock()
p.resyncState.statusMap[buckets[index].Name] = meta
p.resyncState.Unlock()
p.resyncer.Lock()
p.resyncer.statusMap[buckets[index].Name] = meta
p.resyncer.Unlock()
}
for index := range buckets {
bucket := buckets[index].Name
p.resyncState.RLock()
m, ok := p.resyncState.statusMap[bucket]
p.resyncState.RUnlock()
var tgts map[string]TargetReplicationResyncStatus
p.resyncer.RLock()
m, ok := p.resyncer.statusMap[bucket]
if ok {
for arn, st := range m.TargetsMap {
if st.ResyncStatus == ResyncFailed || st.ResyncStatus == ResyncStarted {
go resyncBucket(ctx, bucket, arn, true, objAPI)
}
tgts = m.cloneTgtStats()
}
p.resyncer.RUnlock()
for arn, st := range tgts {
switch st.ResyncStatus {
case ResyncFailed, ResyncStarted, ResyncPending:
go p.resyncer.resyncBucket(ctx, objAPI, true, resyncOpts{
bucket: bucket,
arn: arn,
resyncID: st.ResyncID,
resyncBefore: st.ResyncBeforeDate,
})
}
}
}
return nil
}
// load bucket resync metadata from disk
func loadBucketResyncMetadata(ctx context.Context, bucket string, objAPI ObjectLayer) (brs BucketReplicationResyncStatus, e error) {
brs = newBucketResyncStatus(bucket)
resyncDirPath := path.Join(bucketMetaPrefix, bucket, replicationDir)
data, err := readConfig(GlobalContext, objAPI, pathJoin(resyncDirPath, resyncFileName))
if err != nil && err != errConfigNotFound {

View File

@ -487,6 +487,22 @@ func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTar
return generateARN(target)
}
// getRemoteARNForPeer returns the remote target for a peer site in site replication
func (sys *BucketTargetSys) getRemoteARNForPeer(bucket string, peer madmin.PeerInfo) string {
tgts := sys.targetsMap[bucket]
for _, target := range tgts {
ep, _ := url.Parse(peer.Endpoint)
if target.SourceBucket == bucket &&
target.TargetBucket == bucket &&
target.Endpoint == ep.Host &&
target.Secure == (ep.Scheme == "https") &&
target.Type == madmin.ReplicationService {
return target.Arn
}
}
return ""
}
// generate ARN that is unique to this target type
func generateARN(t *madmin.BucketTarget) string {
arn := madmin.ARN{

View File

@ -281,6 +281,9 @@ var (
// Cluster replication manager.
globalSiteReplicationSys SiteReplicationSys
// Cluster replication resync metrics
globalSiteResyncMetrics *siteResyncMetrics
// Is set to true when Bucket federation is requested
// and is 'true' when etcdConfig.PathPrefix is empty
globalBucketFederation bool

View File

@ -29,6 +29,7 @@ type collectMetricsOpts struct {
hosts map[string]struct{}
disks map[string]struct{}
jobID string
depID string
}
func collectLocalMetrics(types madmin.MetricType, opts collectMetricsOpts) (m madmin.RealtimeMetrics) {
@ -65,7 +66,9 @@ func collectLocalMetrics(types madmin.MetricType, opts collectMetricsOpts) (m ma
if types.Contains(madmin.MetricsBatchJobs) {
m.Aggregated.BatchJobs = globalBatchJobsMetrics.report(opts.jobID)
}
if types.Contains(madmin.MetricsSiteResync) {
m.Aggregated.SiteResync = globalSiteResyncMetrics.report(opts.depID)
}
// Add types...
// ByHost is a shallow reference, so careful about sharing.

View File

@ -202,6 +202,8 @@ func (client *peerRESTClient) GetMetrics(ctx context.Context, t madmin.MetricTyp
values.Set(peerRESTDisk, disk)
}
values.Set(peerRESTJobID, opts.jobID)
values.Set(peerRESTDepID, opts.depID)
respBody, err := client.callWithContext(ctx, peerRESTMethodMetrics, values, nil, -1)
if err != nil {
return

View File

@ -97,6 +97,7 @@ const (
peerRESTMetricsTypes = "types"
peerRESTDisk = "disk"
peerRESTJobID = "job-id"
peerRESTDepID = "depID"
peerRESTStartRebalance = "start-rebalance"
peerRESTListenBucket = "bucket"

View File

@ -437,6 +437,7 @@ func (s *peerRESTServer) GetMetricsHandler(w http.ResponseWriter, r *http.Reques
diskMap[disk] = struct{}{}
}
jobID := r.Form.Get(peerRESTJobID)
depID := r.Form.Get(peerRESTDepID)
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
@ -444,8 +445,8 @@ func (s *peerRESTServer) GetMetricsHandler(w http.ResponseWriter, r *http.Reques
info := collectLocalMetrics(types, collectMetricsOpts{
disks: diskMap,
jobID: jobID,
depID: depID,
})
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}

View File

@ -330,6 +330,8 @@ func initAllSubsystems(ctx context.Context) {
// Create new ILM tier configuration subsystem
globalTierConfigMgr = NewTierConfigMgr()
globalSiteResyncMetrics = newSiteResyncMetrics(GlobalContext)
}
func configRetriableErrors(err error) bool {

View File

@ -0,0 +1,334 @@
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"math/rand"
"sync"
"time"
"github.com/minio/madmin-go"
)
//go:generate msgp -file=$GOFILE
// SiteResyncStatus captures current replication resync status for a target site
type SiteResyncStatus struct {
Version int `json:"version" msg:"v"`
// Overall site status
Status ResyncStatusType `json:"st" msg:"ss"`
DeplID string `json:"dId" msg:"did"`
BucketStatuses map[string]ResyncStatusType `json:"buckets" msg:"bkts"`
TotBuckets int `json:"totbuckets" msg:"tb"`
TargetReplicationResyncStatus `json:"currSt" msg:"cst"`
}
func (s *SiteResyncStatus) clone() SiteResyncStatus {
if s == nil {
return SiteResyncStatus{}
}
o := *s
o.BucketStatuses = make(map[string]ResyncStatusType, len(s.BucketStatuses))
for b, st := range s.BucketStatuses {
o.BucketStatuses[b] = st
}
return o
}
const (
siteResyncPrefix = bucketMetaPrefix + "/site-replication/resync"
)
type resyncState struct {
resyncID string
LastSaved time.Time
}
//msgp:ignore siteResyncMetrics
type siteResyncMetrics struct {
sync.RWMutex
// resyncStatus maps resync ID to resync status for peer
resyncStatus map[string]SiteResyncStatus
// map peer deployment ID to resync ID
peerResyncMap map[string]resyncState
}
func newSiteResyncMetrics(ctx context.Context) *siteResyncMetrics {
s := siteResyncMetrics{
resyncStatus: make(map[string]SiteResyncStatus),
peerResyncMap: make(map[string]resyncState),
}
go s.save(ctx)
go s.init(ctx)
return &s
}
// init site resync metrics
func (sm *siteResyncMetrics) init(ctx context.Context) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Run the site resync metrics load in a loop
for {
if err := sm.load(ctx, newObjectLayerFn()); err == nil {
<-ctx.Done()
return
}
duration := time.Duration(r.Float64() * float64(time.Second*10))
if duration < time.Second {
// Make sure to sleep atleast a second to avoid high CPU ticks.
duration = time.Second
}
time.Sleep(duration)
}
}
// load resync metrics saved on disk into memory
func (sm *siteResyncMetrics) load(ctx context.Context, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
info, err := globalSiteReplicationSys.GetClusterInfo(ctx)
if err != nil {
return err
}
if !info.Enabled {
return nil
}
for _, peer := range info.Sites {
if peer.DeploymentID == globalDeploymentID {
continue
}
rs, err := loadSiteResyncMetadata(ctx, objAPI, peer.DeploymentID)
if err != nil {
return err
}
sm.Lock()
defer sm.Unlock()
if _, ok := sm.peerResyncMap[peer.DeploymentID]; !ok {
sm.peerResyncMap[peer.DeploymentID] = resyncState{resyncID: rs.ResyncID, LastSaved: time.Time{}}
sm.resyncStatus[rs.ResyncID] = rs
}
}
return nil
}
func (sm *siteResyncMetrics) report(dID string) *madmin.SiteResyncMetrics {
sm.RLock()
defer sm.RUnlock()
rst, ok := sm.peerResyncMap[dID]
if !ok {
return nil
}
rs, ok := sm.resyncStatus[rst.resyncID]
if !ok {
return nil
}
m := madmin.SiteResyncMetrics{
CollectedAt: rs.LastUpdate,
StartTime: rs.StartTime,
LastUpdate: rs.LastUpdate,
ResyncStatus: rs.Status.String(),
ResyncID: rst.resyncID,
DeplID: rs.DeplID,
ReplicatedSize: rs.ReplicatedSize,
ReplicatedCount: rs.ReplicatedCount,
FailedSize: rs.FailedSize,
FailedCount: rs.FailedCount,
Bucket: rs.Bucket,
Object: rs.Object,
NumBuckets: int64(rs.TotBuckets),
}
for b, st := range rs.BucketStatuses {
if st == ResyncFailed {
m.FailedBuckets = append(m.FailedBuckets, b)
}
}
return &m
}
// save in-memory stats to disk
func (sm *siteResyncMetrics) save(ctx context.Context) {
sTimer := time.NewTimer(siteResyncSaveInterval)
defer sTimer.Stop()
for {
select {
case <-sTimer.C:
if globalSiteReplicationSys.isEnabled() {
sm.Lock()
for dID, rs := range sm.peerResyncMap {
st, ok := sm.resyncStatus[rs.resyncID]
if ok {
updt := st.Status.isValid() && st.LastUpdate.After(rs.LastSaved)
if !updt {
continue
}
rs.LastSaved = UTCNow()
sm.peerResyncMap[dID] = rs
go saveSiteResyncMetadata(ctx, st, newObjectLayerFn())
}
}
sm.Unlock()
}
sTimer.Reset(siteResyncSaveInterval)
case <-ctx.Done():
return
}
}
}
// update overall site resync state
func (sm *siteResyncMetrics) updateState(s SiteResyncStatus) {
if !globalSiteReplicationSys.isEnabled() {
return
}
sm.Lock()
defer sm.Unlock()
switch s.Status {
case ResyncStarted:
sm.peerResyncMap[s.DeplID] = resyncState{resyncID: s.ResyncID, LastSaved: time.Time{}}
sm.resyncStatus[s.ResyncID] = s
case ResyncCompleted, ResyncCanceled, ResyncFailed:
st, ok := sm.resyncStatus[s.ResyncID]
if ok {
st.LastUpdate = s.LastUpdate
st.Status = s.Status
}
sm.resyncStatus[s.ResyncID] = st
}
}
// increment SyncedBuckets count
func (sm *siteResyncMetrics) incBucket(o resyncOpts, bktStatus ResyncStatusType) {
if !globalSiteReplicationSys.isEnabled() {
return
}
sm.Lock()
defer sm.Unlock()
st, ok := sm.resyncStatus[o.resyncID]
if ok {
switch bktStatus {
case ResyncCompleted:
st.BucketStatuses[o.bucket] = ResyncCompleted
st.Status = siteResyncStatus(st.Status, st.BucketStatuses)
st.LastUpdate = UTCNow()
sm.resyncStatus[o.resyncID] = st
case ResyncFailed:
st.BucketStatuses[o.bucket] = ResyncFailed
st.Status = siteResyncStatus(st.Status, st.BucketStatuses)
st.LastUpdate = UTCNow()
sm.resyncStatus[o.resyncID] = st
}
}
}
// remove deleted bucket from active resync tracking
func (sm *siteResyncMetrics) deleteBucket(b string) {
if !globalSiteReplicationSys.isEnabled() {
return
}
sm.Lock()
defer sm.Unlock()
for _, rs := range sm.peerResyncMap {
st, ok := sm.resyncStatus[rs.resyncID]
if !ok {
return
}
switch st.Status {
case ResyncCompleted, ResyncFailed:
return
default:
delete(st.BucketStatuses, b)
}
}
}
// returns overall resync status from individual bucket resync status map
func siteResyncStatus(currSt ResyncStatusType, m map[string]ResyncStatusType) ResyncStatusType {
// avoid overwriting canceled resync status
if currSt != ResyncStarted {
return currSt
}
totBuckets := len(m)
var cmpCount, failCount int
for _, st := range m {
switch st {
case ResyncCompleted:
cmpCount++
case ResyncFailed:
failCount++
}
}
if cmpCount == totBuckets {
return ResyncCompleted
}
if cmpCount+failCount == totBuckets {
return ResyncFailed
}
return ResyncStarted
}
// update resync metrics per object
func (sm *siteResyncMetrics) updateMetric(roi ReplicateObjectInfo, success bool, resyncID string) {
if !globalSiteReplicationSys.isEnabled() {
return
}
sm.Lock()
defer sm.Unlock()
s := sm.resyncStatus[resyncID]
if success {
s.ReplicatedCount++
s.ReplicatedSize += roi.Size
} else {
s.FailedCount++
s.FailedSize += roi.Size
}
s.Bucket = roi.Bucket
s.Object = roi.Name
s.LastUpdate = UTCNow()
sm.resyncStatus[resyncID] = s
}
// Status returns current in-memory resync status for this deployment
func (sm *siteResyncMetrics) status(dID string) (rs SiteResyncStatus, err error) {
sm.RLock()
defer sm.RUnlock()
if rst, ok1 := sm.peerResyncMap[dID]; ok1 {
if st, ok2 := sm.resyncStatus[rst.resyncID]; ok2 {
return st.clone(), nil
}
}
return rs, errSRNoResync
}
// Status returns latest resync status for this deployment
func (sm *siteResyncMetrics) siteStatus(ctx context.Context, objAPI ObjectLayer, dID string) (rs SiteResyncStatus, err error) {
if !globalSiteReplicationSys.isEnabled() {
return rs, errSRNotEnabled
}
// check in-memory status
rs, err = sm.status(dID)
if err == nil {
return rs, nil
}
// check disk resync status
rs, err = loadSiteResyncMetadata(ctx, objAPI, dID)
if err != nil && err == errConfigNotFound {
return rs, nil
}
return rs, err
}

View File

@ -0,0 +1,318 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *SiteResyncStatus) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "v":
z.Version, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "Version")
return
}
case "ss":
err = z.Status.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "Status")
return
}
case "did":
z.DeplID, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "DeplID")
return
}
case "bkts":
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "BucketStatuses")
return
}
if z.BucketStatuses == nil {
z.BucketStatuses = make(map[string]ResyncStatusType, zb0002)
} else if len(z.BucketStatuses) > 0 {
for key := range z.BucketStatuses {
delete(z.BucketStatuses, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 ResyncStatusType
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "BucketStatuses")
return
}
err = za0002.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "BucketStatuses", za0001)
return
}
z.BucketStatuses[za0001] = za0002
}
case "tb":
z.TotBuckets, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "TotBuckets")
return
}
case "cst":
err = z.TargetReplicationResyncStatus.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "TargetReplicationResyncStatus")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *SiteResyncStatus) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 6
// write "v"
err = en.Append(0x86, 0xa1, 0x76)
if err != nil {
return
}
err = en.WriteInt(z.Version)
if err != nil {
err = msgp.WrapError(err, "Version")
return
}
// write "ss"
err = en.Append(0xa2, 0x73, 0x73)
if err != nil {
return
}
err = z.Status.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "Status")
return
}
// write "did"
err = en.Append(0xa3, 0x64, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteString(z.DeplID)
if err != nil {
err = msgp.WrapError(err, "DeplID")
return
}
// write "bkts"
err = en.Append(0xa4, 0x62, 0x6b, 0x74, 0x73)
if err != nil {
return
}
err = en.WriteMapHeader(uint32(len(z.BucketStatuses)))
if err != nil {
err = msgp.WrapError(err, "BucketStatuses")
return
}
for za0001, za0002 := range z.BucketStatuses {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err, "BucketStatuses")
return
}
err = za0002.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "BucketStatuses", za0001)
return
}
}
// write "tb"
err = en.Append(0xa2, 0x74, 0x62)
if err != nil {
return
}
err = en.WriteInt(z.TotBuckets)
if err != nil {
err = msgp.WrapError(err, "TotBuckets")
return
}
// write "cst"
err = en.Append(0xa3, 0x63, 0x73, 0x74)
if err != nil {
return
}
err = z.TargetReplicationResyncStatus.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "TargetReplicationResyncStatus")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *SiteResyncStatus) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 6
// string "v"
o = append(o, 0x86, 0xa1, 0x76)
o = msgp.AppendInt(o, z.Version)
// string "ss"
o = append(o, 0xa2, 0x73, 0x73)
o, err = z.Status.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "Status")
return
}
// string "did"
o = append(o, 0xa3, 0x64, 0x69, 0x64)
o = msgp.AppendString(o, z.DeplID)
// string "bkts"
o = append(o, 0xa4, 0x62, 0x6b, 0x74, 0x73)
o = msgp.AppendMapHeader(o, uint32(len(z.BucketStatuses)))
for za0001, za0002 := range z.BucketStatuses {
o = msgp.AppendString(o, za0001)
o, err = za0002.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "BucketStatuses", za0001)
return
}
}
// string "tb"
o = append(o, 0xa2, 0x74, 0x62)
o = msgp.AppendInt(o, z.TotBuckets)
// string "cst"
o = append(o, 0xa3, 0x63, 0x73, 0x74)
o, err = z.TargetReplicationResyncStatus.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "TargetReplicationResyncStatus")
return
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *SiteResyncStatus) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "v":
z.Version, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Version")
return
}
case "ss":
bts, err = z.Status.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "Status")
return
}
case "did":
z.DeplID, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "DeplID")
return
}
case "bkts":
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "BucketStatuses")
return
}
if z.BucketStatuses == nil {
z.BucketStatuses = make(map[string]ResyncStatusType, zb0002)
} else if len(z.BucketStatuses) > 0 {
for key := range z.BucketStatuses {
delete(z.BucketStatuses, key)
}
}
for zb0002 > 0 {
var za0001 string
var za0002 ResyncStatusType
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "BucketStatuses")
return
}
bts, err = za0002.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "BucketStatuses", za0001)
return
}
z.BucketStatuses[za0001] = za0002
}
case "tb":
z.TotBuckets, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "TotBuckets")
return
}
case "cst":
bts, err = z.TargetReplicationResyncStatus.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "TargetReplicationResyncStatus")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *SiteResyncStatus) Msgsize() (s int) {
s = 1 + 2 + msgp.IntSize + 3 + z.Status.Msgsize() + 4 + msgp.StringPrefixSize + len(z.DeplID) + 5 + msgp.MapHeaderSize
if z.BucketStatuses != nil {
for za0001, za0002 := range z.BucketStatuses {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize()
}
}
s += 3 + msgp.IntSize + 4 + z.TargetReplicationResyncStatus.Msgsize()
return
}

View File

@ -0,0 +1,123 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalSiteResyncStatus(t *testing.T) {
v := SiteResyncStatus{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgSiteResyncStatus(b *testing.B) {
v := SiteResyncStatus{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgSiteResyncStatus(b *testing.B) {
v := SiteResyncStatus{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalSiteResyncStatus(b *testing.B) {
v := SiteResyncStatus{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeSiteResyncStatus(t *testing.T) {
v := SiteResyncStatus{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeSiteResyncStatus Msgsize() is inaccurate")
}
vn := SiteResyncStatus{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeSiteResyncStatus(b *testing.B) {
v := SiteResyncStatus{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeSiteResyncStatus(b *testing.B) {
v := SiteResyncStatus{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -21,6 +21,7 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"encoding/json"
"encoding/xml"
"errors"
@ -76,6 +77,22 @@ var (
Cause: errors.New("site replication is not enabled"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRResyncStarted = SRError{
Cause: errors.New("site replication resync is already in progress"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRResyncCanceled = SRError{
Cause: errors.New("site replication resync is already canceled"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRNoResync = SRError{
Cause: errors.New("no resync in progress"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRResyncToSelf = SRError{
Cause: errors.New("invalid peer specified - cannot resync to self"),
Code: ErrSiteReplicationInvalidRequest,
}
)
func errSRInvalidRequest(err error) SRError {
@ -4858,3 +4875,305 @@ func (c *SiteReplicationSys) getPeerForUpload(deplID string) (pi srPeerInfo, loc
}
return pi, true
}
// startResync initiates resync of data to peerSite specified. The overall site resync status
// is maintained in .minio.sys/buckets/site-replication/resync/<deployment-id.meta>, while collecting
// individual bucket resync status in .minio.sys/buckets/<bucket-name>/replication/resync.bin
func (c *SiteReplicationSys) startResync(ctx context.Context, objAPI ObjectLayer, peer madmin.PeerInfo) (res madmin.SRResyncOpStatus, err error) {
if !c.isEnabled() {
return res, errSRNotEnabled
}
if objAPI == nil {
return res, errSRObjectLayerNotReady
}
if peer.DeploymentID == globalDeploymentID {
return res, errSRResyncToSelf
}
if _, ok := c.state.Peers[peer.DeploymentID]; !ok {
return res, errSRPeerNotFound
}
rs, err := globalSiteResyncMetrics.siteStatus(ctx, objAPI, peer.DeploymentID)
if err != nil {
return res, err
}
if rs.Status == ResyncStarted {
return res, errSRResyncStarted
}
var buckets []BucketInfo
buckets, err = objAPI.ListBuckets(ctx, BucketOptions{})
if err != nil {
return res, err
}
rs = newSiteResyncStatus(peer.DeploymentID, buckets)
defer func() {
if err != nil {
rs.Status = ResyncFailed
saveSiteResyncMetadata(ctx, rs, objAPI)
globalSiteResyncMetrics.updateState(rs)
}
}()
globalSiteResyncMetrics.updateState(rs)
if err := saveSiteResyncMetadata(ctx, rs, objAPI); err != nil {
return res, err
}
for _, bi := range buckets {
bucket := bi.Name
if _, err := getReplicationConfig(ctx, bucket); err != nil {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: err.Error(),
Bucket: bucket,
Status: ResyncFailed.String(),
})
continue
}
// mark remote target for this deployment with the new reset id
tgtArn := globalBucketTargetSys.getRemoteARNForPeer(bucket, peer)
if tgtArn == "" {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: fmt.Sprintf("no valid remote target found for this peer %s (%s)", peer.Name, peer.DeploymentID),
Bucket: bucket,
})
continue
}
target := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, tgtArn)
target.ResetBeforeDate = UTCNow()
target.ResetID = rs.ResyncID
if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, true); err != nil {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: err.Error(),
Bucket: bucket,
})
continue
}
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: err.Error(),
Bucket: bucket,
})
continue
}
tgtBytes, err := json.Marshal(&targets)
if err != nil {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: err.Error(),
Bucket: bucket,
})
continue
}
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: err.Error(),
Bucket: bucket,
})
continue
}
if err := globalReplicationPool.resyncer.start(ctx, objAPI, resyncOpts{
bucket: bucket,
arn: tgtArn,
resyncID: rs.ResyncID,
}); err != nil {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: err.Error(),
Bucket: bucket,
})
continue
}
}
res = madmin.SRResyncOpStatus{
Status: ResyncStarted.String(),
OpType: "start",
ResyncID: rs.ResyncID,
}
if len(res.Buckets) > 0 {
res.ErrDetail = "partial failure in starting site resync"
}
return res, nil
}
// cancelResync stops an ongoing site level resync for the peer specified.
func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLayer, peer madmin.PeerInfo) (res madmin.SRResyncOpStatus, err error) {
if !c.isEnabled() {
return res, errSRNotEnabled
}
if objAPI == nil {
return res, errSRObjectLayerNotReady
}
if peer.DeploymentID == globalDeploymentID {
return res, errSRResyncToSelf
}
if _, ok := c.state.Peers[peer.DeploymentID]; !ok {
return res, errSRPeerNotFound
}
rs, err := globalSiteResyncMetrics.siteStatus(ctx, objAPI, peer.DeploymentID)
if err != nil {
return res, err
}
switch rs.Status {
case ResyncCanceled:
return res, errSRResyncCanceled
case ResyncCompleted, NoResync:
return res, errSRNoResync
}
res = madmin.SRResyncOpStatus{
Status: rs.Status.String(),
OpType: "cancel",
ResyncID: rs.ResyncID,
}
switch rs.Status {
case ResyncCanceled:
return res, errSRResyncCanceled
case ResyncCompleted, NoResync:
return res, errSRNoResync
}
targets := globalBucketTargetSys.ListTargets(ctx, "", string(madmin.ReplicationService))
// clear the remote target resetID set while initiating resync to stop replication
for _, t := range targets {
if t.ResetID == rs.ResyncID {
// get tgt with credentials
tgt := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, t.SourceBucket, t.Arn)
tgt.ResetID = ""
bucket := t.SourceBucket
if err = globalBucketTargetSys.SetTarget(ctx, bucket, &tgt, true); err != nil {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: err.Error(),
Bucket: bucket,
})
continue
}
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: err.Error(),
Bucket: bucket,
})
continue
}
tgtBytes, err := json.Marshal(&targets)
if err != nil {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: err.Error(),
Bucket: bucket,
})
continue
}
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil {
res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{
ErrDetail: err.Error(),
Bucket: bucket,
})
continue
}
// update resync state for the bucket
globalReplicationPool.resyncer.Lock()
m, ok := globalReplicationPool.resyncer.statusMap[bucket]
if !ok {
m = newBucketResyncStatus(bucket)
}
if st, ok := m.TargetsMap[t.Arn]; ok {
st.LastUpdate = UTCNow()
st.ResyncStatus = ResyncCanceled
m.TargetsMap[t.Arn] = st
m.LastUpdate = UTCNow()
}
globalReplicationPool.resyncer.statusMap[bucket] = m
globalReplicationPool.resyncer.Unlock()
}
}
rs.Status = ResyncCanceled
rs.LastUpdate = UTCNow()
if err := saveSiteResyncMetadata(ctx, rs, objAPI); err != nil {
return res, err
}
globalSiteResyncMetrics.updateState(rs)
res.Status = rs.Status.String()
return res, nil
}
const (
siteResyncMetaFormat = 1
siteResyncMetaVersionV1 = 1
siteResyncMetaVersion = siteResyncMetaVersionV1
siteResyncSaveInterval = 10 * time.Second
)
func newSiteResyncStatus(dID string, buckets []BucketInfo) SiteResyncStatus {
now := UTCNow()
s := SiteResyncStatus{
Version: siteResyncMetaVersion,
Status: ResyncStarted,
DeplID: dID,
TotBuckets: len(buckets),
BucketStatuses: make(map[string]ResyncStatusType),
}
for _, bi := range buckets {
s.BucketStatuses[bi.Name] = ResyncPending
}
s.ResyncID = mustGetUUID()
s.StartTime = now
s.LastUpdate = now
return s
}
// load site resync metadata from disk
func loadSiteResyncMetadata(ctx context.Context, objAPI ObjectLayer, dID string) (rs SiteResyncStatus, e error) {
data, err := readConfig(GlobalContext, objAPI, getSRResyncFilePath(dID))
if err != nil {
return rs, err
}
if len(data) == 0 {
// Seems to be empty.
return rs, nil
}
if len(data) <= 4 {
return rs, fmt.Errorf("site resync: no data")
}
// Read resync meta header
switch binary.LittleEndian.Uint16(data[0:2]) {
case siteResyncMetaFormat:
default:
return rs, fmt.Errorf("resyncMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case siteResyncMetaVersion:
default:
return rs, fmt.Errorf("resyncMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
// OK, parse data.
if _, err = rs.UnmarshalMsg(data[4:]); err != nil {
return rs, err
}
switch rs.Version {
case siteResyncMetaVersionV1:
default:
return rs, fmt.Errorf("unexpected resync meta version: %d", rs.Version)
}
return rs, nil
}
// save resync status of peer to resync/depl-id.meta
func saveSiteResyncMetadata(ctx context.Context, ss SiteResyncStatus, objectAPI ObjectLayer) error {
data := make([]byte, 4, ss.Msgsize()+4)
// Initialize the resync meta header.
binary.LittleEndian.PutUint16(data[0:2], siteResyncMetaFormat)
binary.LittleEndian.PutUint16(data[2:4], siteResyncMetaVersion)
buf, err := ss.MarshalMsg(data)
if err != nil {
return err
}
return saveConfig(ctx, objectAPI, getSRResyncFilePath(ss.DeplID), buf)
}
func getSRResyncFilePath(dID string) string {
return pathJoin(siteResyncPrefix, dID+".meta")
}