From b732a673dc5d2dec5aa97424668d5c7002f84bdc Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 8 Aug 2023 13:27:40 -0700 Subject: [PATCH] reduce logging in bucket replication in retry scenarios (#17820) --- cmd/bucket-replication-utils.go | 13 +++++++-- cmd/bucket-replication-utils_test.go | 3 ++- cmd/bucket-replication.go | 40 +++++++++------------------- 3 files changed, 25 insertions(+), 31 deletions(-) diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 392844ebb..f5a88eb41 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" + "context" "fmt" "net/http" "net/url" @@ -31,6 +32,7 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/bucket/replication" xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/logger" ) //go:generate msgp -file=$GOFILE @@ -180,6 +182,7 @@ type replicateTargetDecision struct { Synchronous bool // Synchronous replication configured. Arn string // ARN of replication target ID string + Tgt *TargetClient } func (t *replicateTargetDecision) String() string { @@ -288,7 +291,7 @@ var errInvalidReplicateDecisionFormat = fmt.Errorf("ReplicateDecision has invali // parse k-v pairs of target ARN to stringified ReplicateTargetDecision delimited by ',' into a // ReplicateDecision struct -func parseReplicateDecision(s string) (r ReplicateDecision, err error) { +func parseReplicateDecision(ctx context.Context, bucket, s string) (r ReplicateDecision, err error) { r = ReplicateDecision{ targetsMap: make(map[string]replicateTargetDecision), } @@ -317,7 +320,13 @@ func parseReplicateDecision(s string) (r ReplicateDecision, err error) { if err != nil { return r, err } - r.targetsMap[slc[0]] = replicateTargetDecision{Replicate: replicate, Synchronous: sync, Arn: tgt[2], ID: tgt[3]} + tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, slc[0]) + if tgtClnt == nil { + // Skip stale targets if any and log them to be missing atleast once. + logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, slc[0]), slc[0]) + // We save the targetDecision even when its not configured or stale. + } + r.targetsMap[slc[0]] = replicateTargetDecision{Replicate: replicate, Synchronous: sync, Arn: tgt[2], ID: tgt[3], Tgt: tgtClnt} } return } diff --git a/cmd/bucket-replication-utils_test.go b/cmd/bucket-replication-utils_test.go index beed63a70..43a1c3567 100644 --- a/cmd/bucket-replication-utils_test.go +++ b/cmd/bucket-replication-utils_test.go @@ -18,6 +18,7 @@ package cmd import ( + "context" "testing" "github.com/minio/minio/internal/bucket/replication" @@ -183,7 +184,7 @@ var parseReplicationDecisionTest = []struct { func TestParseReplicateDecision(t *testing.T) { for i, test := range parseReplicationDecisionTest { - dsc, err := parseReplicateDecision(test.expDsc.String()) + dsc, err := parseReplicateDecision(context.Background(), "bucket", test.expDsc.String()) if err != nil { if test.expErr != err { t.Errorf("Test%d (%s): Expected parse error got %t , want %t", i+1, test.name, err, test.expErr) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 75106d190..0f77219d5 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -427,7 +427,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj rcfg, err := getReplicationConfig(ctx, bucket) if err != nil || rcfg == nil { - logger.LogIf(ctx, err) + logger.LogOnceIf(ctx, fmt.Errorf("unable to obtain replication config for bucket: %s: err: %s", bucket, err), bucket) sendEvent(eventArgs{ BucketName: bucket, Object: ObjectInfo{ @@ -442,9 +442,10 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj }) return } - dsc, err := parseReplicateDecision(dobj.ReplicationState.ReplicateDecisionStr) + dsc, err := parseReplicateDecision(ctx, bucket, dobj.ReplicationState.ReplicateDecisionStr) if err != nil { - logger.LogIf(ctx, err) + logger.LogOnceIf(ctx, fmt.Errorf("unable to parse replication decision parameters for bucket: %s, err: %s, decision: %s", + bucket, err, dobj.ReplicationState.ReplicateDecisionStr), dobj.ReplicationState.ReplicateDecisionStr) sendEvent(eventArgs{ BucketName: bucket, Object: ObjectInfo{ @@ -466,7 +467,6 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { globalReplicationPool.queueMRFSave(dobj.ToMRFEntry()) - logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", dobj.ObjectName, bucket, dobj.TargetArn)) sendEvent(eventArgs{ BucketName: bucket, Object: ObjectInfo{ @@ -488,38 +488,23 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj var rinfos replicatedInfos rinfos.Targets = make([]replicatedTargetInfo, len(dsc.targetsMap)) idx := -1 - for tgtArn := range dsc.targetsMap { + for _, tgtEntry := range dsc.targetsMap { idx++ - tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn) - if tgt == nil { - logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn)) - sendEvent(eventArgs{ - BucketName: bucket, - Object: ObjectInfo{ - Bucket: bucket, - Name: dobj.ObjectName, - VersionID: versionID, - DeleteMarker: dobj.DeleteMarker, - }, - UserAgent: "Internal: [Replication]", - Host: globalLocalNodeName, - EventName: event.ObjectReplicationNotTracked, - }) + if tgtEntry.Tgt == nil { continue } - if tgt := dsc.targetsMap[tgtArn]; !tgt.Replicate { + if !tgtEntry.Replicate { continue } // if dobj.TargetArn is not empty string, this is a case of specific target being re-synced. - if dobj.TargetArn != "" && dobj.TargetArn != tgt.ARN { + if dobj.TargetArn != "" && dobj.TargetArn != tgtEntry.Arn { continue } wg.Add(1) go func(index int, tgt *TargetClient) { defer wg.Done() - rinfo := replicateDeleteToTarget(ctx, dobj, tgt) - rinfos.Targets[index] = rinfo - }(idx, tgt) + rinfos.Targets[index] = replicateDeleteToTarget(ctx, dobj, tgt) + }(idx, tgtEntry.Tgt) } wg.Wait() @@ -1005,7 +990,6 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje Host: globalLocalNodeName, }) globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) - logger.LogIf(ctx, fmt.Errorf("failed to get lock for object: %s bucket:%s arn:%s", object, bucket, ri.TargetArn)) return } ctx = lkctx.Context() @@ -1017,7 +1001,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje for i, tgtArn := range tgtArns { tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, tgtArn) if tgt == nil { - logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn)) + logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn), tgtArn) sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, @@ -1156,7 +1140,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj UserAgent: "Internal: [Replication]", Host: globalLocalNodeName, }) - logger.LogIf(ctx, fmt.Errorf("unable to update replicate metadata for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err)) + logger.LogOnceIf(ctx, fmt.Errorf("unable to read source object %s/%s(%s): %w", bucket, object, objInfo.VersionID, err), object+":"+objInfo.VersionID) } return }