batch replication now supports arbitrary S3 targets (#17113)

This commit is contained in:
Poorna 2023-05-02 22:52:35 -07:00 committed by GitHub
parent b53376a3a4
commit ec84bad882
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 73 additions and 10 deletions

View File

@ -174,6 +174,7 @@ type BatchJobReplicateResourceType string
func (t BatchJobReplicateResourceType) Validate() error {
switch t {
case BatchJobReplicateResourceMinIO:
case BatchJobReplicateResourceS3:
default:
return errInvalidArgument
}
@ -183,6 +184,8 @@ func (t BatchJobReplicateResourceType) Validate() error {
// Different types of batch jobs..
const (
BatchJobReplicateResourceMinIO BatchJobReplicateResourceType = "minio"
BatchJobReplicateResourceS3 BatchJobReplicateResourceType = "s3"
// add future targets
)
@ -297,10 +300,13 @@ func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api Objec
versioned := globalBucketVersioningSys.PrefixEnabled(tgtBucket, tgtObject)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(tgtBucket, tgtObject)
versionID := srcObjInfo.VersionID
if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 {
versionID = ""
}
if srcObjInfo.DeleteMarker {
_, err := api.DeleteObject(ctx, tgtBucket, tgtObject, ObjectOptions{
VersionID: srcObjInfo.VersionID,
VersionID: versionID,
VersionSuspended: versionSuspended,
Versioned: versioned,
MTime: srcObjInfo.ModTime,
@ -318,6 +324,9 @@ func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api Objec
PreserveETag: srcObjInfo.ETag,
UserDefined: srcObjInfo.UserDefined,
}
if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 {
opts.VersionID = ""
}
if crypto.S3.IsEncrypted(srcObjInfo.UserDefined) {
opts.ServerSideEncryption = encrypt.NewSSE()
}
@ -358,7 +367,9 @@ func (r *BatchJobReplicateV1) copyWithMultipartfromSource(ctx context.Context, a
if r.Target.Prefix != "" {
tgtObject = path.Join(r.Target.Prefix, srcObjInfo.Name)
}
if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 {
opts.VersionID = ""
}
var uploadedParts []CompletePart
res, err := api.NewMultipartUpload(context.Background(), tgtBucket, tgtObject, opts)
if err != nil {
@ -539,19 +550,47 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
retry := false
for attempts := 1; attempts <= retryAttempts; attempts++ {
attempts := attempts
// one of source/target is s3, skip delete marker and all versions under the same object name.
s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3
minioSrc := r.Source.Type == BatchJobReplicateResourceMinIO
ctx, cancel := context.WithCancel(ctx)
objInfoCh := c.ListObjects(ctx, r.Source.Bucket, minio.ListObjectsOptions{
Prefix: r.Source.Prefix,
WithVersions: true,
WithVersions: minioSrc,
Recursive: true,
WithMetadata: true,
})
prevObj := ""
skipReplicate := false
for obj := range objInfoCh {
oi := toObjectInfo(r.Source.Bucket, obj.Key, obj)
if !minioSrc {
oi2, err := c.StatObject(ctx, r.Source.Bucket, obj.Key, minio.StatObjectOptions{})
if err == nil {
oi = toObjectInfo(r.Source.Bucket, obj.Key, oi2)
} else {
if isErrMethodNotAllowed(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) ||
isErrObjectNotFound(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) {
continue
}
logger.LogIf(ctx, err)
cancel()
return err
}
}
if skip(oi) {
continue
}
if obj.Key != prevObj {
prevObj = obj.Key
// skip replication of delete marker and all versions under the same object name if one of source or target is s3.
skipReplicate = obj.IsDeleteMarker && s3Type
}
if skipReplicate {
continue
}
wk.Take()
go func() {
defer wk.Give()
@ -646,9 +685,10 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL
tgtBucket := r.Target.Bucket
tgtPrefix := r.Target.Prefix
srcObject := srcObjInfo.Name
s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3
if srcObjInfo.DeleteMarker || !srcObjInfo.VersionPurgeStatus.Empty() {
if retry {
if retry && !s3Type {
if _, err := c.StatObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.StatObjectOptions{
VersionID: srcObjInfo.VersionID,
Internal: miniogo.AdvancedGetOptions{
@ -664,7 +704,10 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL
if srcObjInfo.VersionPurgeStatus.Empty() {
dmVersionID = srcObjInfo.VersionID
}
if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 {
dmVersionID = ""
versionID = ""
}
return c.RemoveObject(ctx, tgtBucket, pathJoin(tgtPrefix, srcObject), miniogo.RemoveObjectOptions{
VersionID: versionID,
Internal: miniogo.AdvancedRemoveOptions{
@ -676,7 +719,7 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL
})
}
if retry { // when we are retrying avoid copying if necessary.
if retry && !s3Type { // when we are retrying avoid copying if necessary.
gopts := miniogo.GetObjectOptions{}
if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil {
return err
@ -710,7 +753,9 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL
if err != nil {
return err
}
if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 {
putOpts.Internal = miniogo.AdvancedPutOptions{}
}
if objInfo.isMultipart() {
if err := replicateObjectWithMultipart(ctx, c, tgtBucket, pathJoin(tgtPrefix, objInfo.Name), rd, objInfo, putOpts); err != nil {
return err
@ -1021,6 +1066,10 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
// None of the provided metadata filters match skip the object.
return false
}
// if one of source or target is non MinIO, just replicate the top most version like `mc mirror`
if (r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3) && !info.IsLatest {
return false
}
return true
}
@ -1059,6 +1108,8 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
attempts := attempts
ctx, cancel := context.WithCancel(ctx)
// one of source/target is s3, skip delete marker and all versions under the same object name.
s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3
results := make(chan ObjectInfo, 100)
if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, results, ObjectOptions{
@ -1070,8 +1121,18 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
return err
}
prevObj := ""
skipReplicate := false
for result := range results {
result := result
if result.Name != prevObj {
prevObj = result.Name
skipReplicate = result.DeleteMarker && s3Type
}
if skipReplicate {
continue
}
wk.Take()
go func() {
defer wk.Give()
@ -1265,8 +1326,10 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
}
return err
}
// if both source and target are minio instances
minioType := r.Target.Type == BatchJobReplicateResourceMinIO && r.Source.Type == BatchJobReplicateResourceMinIO
// If source has versioning enabled, target must have versioning enabled
if (info.Versioning && !vcfg.Enabled() && !isRemoteToLocal) || (!info.Versioning && vcfg.Enabled() && isRemoteToLocal) {
if minioType && ((info.Versioning && !vcfg.Enabled() && !isRemoteToLocal) || (!info.Versioning && vcfg.Enabled() && isRemoteToLocal)) {
return batchReplicationJobError{
Code: "InvalidBucketState",
Description: fmt.Sprintf("The source '%s' has versioning enabled, target '%s' must have versioning enabled",