fix: for unexpected errors in reading versioning config panic (#14994)

We need to make sure if we cannot read bucket metadata
for some reason, and bucket metadata is not missing and
returning corrupted information we should panic such
handlers to disallow I/O to protect the overall state
on the system.

In-case of such corruption we have a mechanism now
to force recreate the metadata on the bucket, using
`x-minio-force-create` header with `PUT /bucket` API
call.

Additionally fix the versioning config updated state
to be set properly for the site replication healing
to trigger correctly.
This commit is contained in:
Harshavardhana 2022-05-31 02:57:57 -07:00 committed by GitHub
parent befbf48563
commit 52221db7ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 178 additions and 88 deletions

View File

@ -203,7 +203,7 @@ func TestObjectIsRemote(t *testing.T) {
if got := fi.IsRemote(); got != tc.remote {
t.Fatalf("Test %d.a: expected %v got %v", i+1, tc.remote, got)
}
oi := fi.ToObjectInfo("bucket", "object")
oi := fi.ToObjectInfo("bucket", "object", false)
if got := oi.IsRemote(); got != tc.remote {
t.Fatalf("Test %d.b: expected %v got %v", i+1, tc.remote, got)
}

View File

@ -131,6 +131,7 @@ func (sys *BucketMetadataSys) Update(ctx context.Context, bucket string, configF
meta.ObjectLockConfigUpdatedAt = UTCNow()
case bucketVersioningConfig:
meta.VersioningConfigXML = configData
meta.VersioningConfigUpdatedAt = UTCNow()
case bucketReplicationConfig:
meta.ReplicationConfigXML = configData
meta.ReplicationConfigUpdatedAt = UTCNow()
@ -184,12 +185,15 @@ func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) {
// GetVersioningConfig returns configured versioning config
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetVersioningConfig(bucket string) (*versioning.Versioning, error) {
func (sys *BucketMetadataSys) GetVersioningConfig(bucket string) (*versioning.Versioning, time.Time, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
return &versioning.Versioning{}, err
if errors.Is(err, errConfigNotFound) {
return &versioning.Versioning{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/"}, meta.Created, nil
}
return &versioning.Versioning{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/"}, time.Time{}, err
}
return meta.versioningConfig, nil
return meta.versioningConfig, meta.VersioningConfigUpdatedAt, nil
}
// GetTaggingConfig returns configured tagging config
@ -306,26 +310,27 @@ func (sys *BucketMetadataSys) CreatedAt(bucket string) (time.Time, error) {
// GetPolicyConfig returns configured bucket policy
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetPolicyConfig(bucket string) (*policy.Policy, error) {
func (sys *BucketMetadataSys) GetPolicyConfig(bucket string) (*policy.Policy, time.Time, error) {
if globalIsGateway {
objAPI := newObjectLayerFn()
if objAPI == nil {
return nil, errServerNotInitialized
return nil, time.Time{}, errServerNotInitialized
}
return objAPI.GetBucketPolicy(GlobalContext, bucket)
p, err := objAPI.GetBucketPolicy(GlobalContext, bucket)
return p, UTCNow(), err
}
meta, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketPolicyNotFound{Bucket: bucket}
return nil, time.Time{}, BucketPolicyNotFound{Bucket: bucket}
}
return nil, err
return nil, time.Time{}, err
}
if meta.policyConfig == nil {
return nil, BucketPolicyNotFound{Bucket: bucket}
return nil, time.Time{}, BucketPolicyNotFound{Bucket: bucket}
}
return meta.policyConfig, nil
return meta.policyConfig, meta.PolicyConfigUpdatedAt, nil
}
// GetQuotaConfig returns configured bucket quota
@ -360,6 +365,9 @@ func (sys *BucketMetadataSys) GetReplicationConfig(ctx context.Context, bucket s
func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.BucketTargets, error) {
meta, err := sys.GetConfig(GlobalContext, bucket)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return nil, BucketRemoteTargetNotFound{Bucket: bucket}
}
return nil, err
}
if meta.bucketTargetConfig == nil {
@ -368,20 +376,6 @@ func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.Buc
return meta.bucketTargetConfig, nil
}
// GetBucketTarget returns the target for the bucket and arn.
func (sys *BucketMetadataSys) GetBucketTarget(bucket string, arn string) (madmin.BucketTarget, error) {
targets, err := sys.GetBucketTargetsConfig(bucket)
if err != nil {
return madmin.BucketTarget{}, err
}
for _, t := range targets.Targets {
if t.Arn == arn {
return t, nil
}
}
return madmin.BucketTarget{}, errConfigNotFound
}
// GetConfig returns a specific configuration from the bucket metadata.
// The returned object may not be modified.
func (sys *BucketMetadataSys) GetConfig(ctx context.Context, bucket string) (BucketMetadata, error) {

View File

@ -87,6 +87,7 @@ type BucketMetadata struct {
TaggingConfigUpdatedAt time.Time
QuotaConfigUpdatedAt time.Time
ReplicationConfigUpdatedAt time.Time
VersioningConfigUpdatedAt time.Time
// Unexported fields. Must be updated atomically.
policyConfig *policy.Policy
@ -364,6 +365,7 @@ func (b *BucketMetadata) defaultTimestamps() {
if b.PolicyConfigUpdatedAt.IsZero() {
b.PolicyConfigUpdatedAt = b.Created
}
if b.EncryptionConfigUpdatedAt.IsZero() {
b.EncryptionConfigUpdatedAt = b.Created
}
@ -383,6 +385,10 @@ func (b *BucketMetadata) defaultTimestamps() {
if b.ReplicationConfigUpdatedAt.IsZero() {
b.ReplicationConfigUpdatedAt = b.Created
}
if b.VersioningConfigUpdatedAt.IsZero() {
b.VersioningConfigUpdatedAt = b.Created
}
}
// Save config to supplied ObjectLayer api.

View File

@ -144,6 +144,12 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "ReplicationConfigUpdatedAt")
return
}
case "VersioningConfigUpdatedAt":
z.VersioningConfigUpdatedAt, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "VersioningConfigUpdatedAt")
return
}
default:
err = dc.Skip()
if err != nil {
@ -157,9 +163,9 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 20
// map header, size 21
// write "Name"
err = en.Append(0xde, 0x0, 0x14, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
err = en.Append(0xde, 0x0, 0x15, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
if err != nil {
return
}
@ -358,15 +364,25 @@ func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "ReplicationConfigUpdatedAt")
return
}
// write "VersioningConfigUpdatedAt"
err = en.Append(0xb9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74)
if err != nil {
return
}
err = en.WriteTime(z.VersioningConfigUpdatedAt)
if err != nil {
err = msgp.WrapError(err, "VersioningConfigUpdatedAt")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 20
// map header, size 21
// string "Name"
o = append(o, 0xde, 0x0, 0x14, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = append(o, 0xde, 0x0, 0x15, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = msgp.AppendString(o, z.Name)
// string "Created"
o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64)
@ -425,6 +441,9 @@ func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) {
// string "ReplicationConfigUpdatedAt"
o = append(o, 0xba, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74)
o = msgp.AppendTime(o, z.ReplicationConfigUpdatedAt)
// string "VersioningConfigUpdatedAt"
o = append(o, 0xb9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74)
o = msgp.AppendTime(o, z.VersioningConfigUpdatedAt)
return
}
@ -566,6 +585,12 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "ReplicationConfigUpdatedAt")
return
}
case "VersioningConfigUpdatedAt":
z.VersioningConfigUpdatedAt, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "VersioningConfigUpdatedAt")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
@ -580,6 +605,6 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BucketMetadata) Msgsize() (s int) {
s = 3 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 24 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigJSON) + 28 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigMetaJSON) + 22 + msgp.TimeSize + 26 + msgp.TimeSize + 26 + msgp.TimeSize + 23 + msgp.TimeSize + 21 + msgp.TimeSize + 27 + msgp.TimeSize
s = 3 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 24 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigJSON) + 28 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigMetaJSON) + 22 + msgp.TimeSize + 26 + msgp.TimeSize + 26 + msgp.TimeSize + 23 + msgp.TimeSize + 21 + msgp.TimeSize + 27 + msgp.TimeSize + 26 + msgp.TimeSize
return
}

View File

@ -38,7 +38,8 @@ type PolicySys struct{}
// Get returns stored bucket policy
func (sys *PolicySys) Get(bucket string) (*policy.Policy, error) {
return globalBucketMetadataSys.GetPolicyConfig(bucket)
policy, _, err := globalBucketMetadataSys.GetPolicyConfig(bucket)
return policy, err
}
// IsAllowed - checks given policy args is allowed to continue the Rest API.

View File

@ -190,11 +190,17 @@ func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplica
return
}
// object layer not initialized we return with no decision.
if newObjectLayerFn() == nil {
return
}
// Disable server-side replication on object prefixes which are excluded
// from versioning via the MinIO bucket versioning extension.
if globalBucketVersioningSys.PrefixSuspended(bucket, object) {
return
}
replStatus := mopts.ReplicationStatus()
if replStatus == replication.Replica && !mopts.isMetadataReplication() {
return

View File

@ -17,16 +17,21 @@
package cmd
import "github.com/minio/minio/internal/bucket/versioning"
import (
"strings"
"github.com/minio/minio/internal/bucket/versioning"
"github.com/minio/minio/internal/logger"
)
// BucketVersioningSys - policy subsystem.
type BucketVersioningSys struct{}
// Enabled enabled versioning?
func (sys *BucketVersioningSys) Enabled(bucket string) bool {
vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket)
vc, err := sys.Get(bucket)
if err != nil {
return false
logger.CriticalIf(GlobalContext, err)
}
return vc.Enabled()
}
@ -35,18 +40,18 @@ func (sys *BucketVersioningSys) Enabled(bucket string) bool {
// the given prefix doesn't match any excluded prefixes pattern. This is
// part of a MinIO versioning configuration extension.
func (sys *BucketVersioningSys) PrefixEnabled(bucket, prefix string) bool {
vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket)
vc, err := sys.Get(bucket)
if err != nil {
return false
logger.CriticalIf(GlobalContext, err)
}
return vc.PrefixEnabled(prefix)
}
// Suspended suspended versioning?
func (sys *BucketVersioningSys) Suspended(bucket string) bool {
vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket)
vc, err := sys.Get(bucket)
if err != nil {
return false
logger.CriticalIf(GlobalContext, err)
}
return vc.Suspended()
}
@ -54,9 +59,9 @@ func (sys *BucketVersioningSys) Suspended(bucket string) bool {
// PrefixSuspended returns true if the given prefix matches an excluded prefix
// pattern. This is part of a MinIO versioning configuration extension.
func (sys *BucketVersioningSys) PrefixSuspended(bucket, prefix string) bool {
vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket)
vc, err := sys.Get(bucket)
if err != nil {
return false
logger.CriticalIf(GlobalContext, err)
}
return vc.PrefixSuspended(prefix)
@ -64,12 +69,17 @@ func (sys *BucketVersioningSys) PrefixSuspended(bucket, prefix string) bool {
// Get returns stored bucket policy
func (sys *BucketVersioningSys) Get(bucket string) (*versioning.Versioning, error) {
return globalBucketMetadataSys.GetVersioningConfig(bucket)
}
if globalIsGateway {
// Gateway does not implement versioning.
return &versioning.Versioning{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/"}, nil
}
// Reset BucketVersioningSys to initial state.
func (sys *BucketVersioningSys) Reset() {
// There is currently no internal state.
if bucket == minioMetaBucket || strings.HasPrefix(bucket, minioMetaBucket) {
return &versioning.Versioning{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/"}, nil
}
vcfg, _, err := globalBucketMetadataSys.GetVersioningConfig(bucket)
return vcfg, err
}
// NewBucketVersioningSys - creates new versioning system.

View File

@ -1057,9 +1057,13 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob
fivs = fivs[:lim+1]
rcfg, _ := globalBucketObjectLockSys.Get(i.bucket)
vcfg, _ := globalBucketVersioningSys.Get(i.bucket)
versioned := vcfg != nil && vcfg.Versioned(i.objectPath())
toDel := make([]ObjectToDelete, 0, len(overflowVersions))
for _, fi := range overflowVersions {
obj := fi.ToObjectInfo(i.bucket, i.objectPath())
obj := fi.ToObjectInfo(i.bucket, i.objectPath(), versioned)
// skip versions with object locking enabled
if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) {
if i.debug {

View File

@ -104,10 +104,10 @@ func (fi FileInfo) IsValid() bool {
}
// ToObjectInfo - Converts metadata to object info.
func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
func (fi FileInfo) ToObjectInfo(bucket, object string, versioned bool) ObjectInfo {
object = decodeDirObject(object)
versionID := fi.VersionID
if (globalBucketVersioningSys.PrefixEnabled(bucket, object) || globalBucketVersioningSys.PrefixSuspended(bucket, object)) && versionID == "" {
if versioned && versionID == "" {
versionID = nullVersionID
}

View File

@ -1027,7 +1027,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
fi.IsLatest = true
// Success, return object info.
return fi.ToObjectInfo(bucket, object), nil
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
// AbortMultipartUpload - aborts an ongoing multipart operation

View File

@ -117,7 +117,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
if srcOpts.VersionID == "" {
return oi, toObjectErr(errFileNotFound, srcBucket, srcObject)
}
return fi.ToObjectInfo(srcBucket, srcObject), toObjectErr(errMethodNotAllowed, srcBucket, srcObject)
return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), toObjectErr(errMethodNotAllowed, srcBucket, srcObject)
}
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
@ -178,7 +178,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
return oi, toObjectErr(err, srcBucket, srcObject)
}
return fi.ToObjectInfo(srcBucket, srcObject), nil
return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), nil
}
// GetObjectNInfo - returns object info and an object
@ -245,7 +245,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
}
}
objInfo := fi.ToObjectInfo(bucket, object)
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if objInfo.DeleteMarker {
if opts.VersionID == "" {
return &GetObjectReader{
@ -638,7 +638,7 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin
if err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
objInfo = fi.ToObjectInfo(bucket, object)
objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if fi.Deleted {
if opts.VersionID == "" || opts.DeleteMarker {
return objInfo, toObjectErr(errFileNotFound, bucket, object)
@ -662,7 +662,7 @@ func (er erasureObjects) getObjectInfoAndQuorum(ctx context.Context, bucket, obj
wquorum++
}
objInfo = fi.ToObjectInfo(bucket, object)
objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" {
// Make sure to return object info to provide extra information.
return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object)
@ -854,7 +854,7 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key)
}
return fi.ToObjectInfo(minioMetaBucket, key), nil
return fi.ToObjectInfo(minioMetaBucket, key, opts.Versioned || opts.VersionSuspended), nil
}
// PutObject - creates an object upon reading from the input stream
@ -1159,7 +1159,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
// we are adding a new version to this object under the namespace lock, so this is the latest version.
fi.IsLatest = true
return fi.ToObjectInfo(bucket, object), nil
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error {
@ -1533,7 +1533,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object), nil
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
}
@ -1638,7 +1638,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
opts.VersionID = fi.VersionID
}
objInfo := fi.ToObjectInfo(bucket, object)
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if opts.EvalMetadataFn != nil {
if err := opts.EvalMetadataFn(objInfo); err != nil {
return ObjectInfo{}, err
@ -1654,7 +1654,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object), nil
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
// PutObjectTags - replace or add tags to an existing object
@ -1718,7 +1718,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object), nil
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
// updateObjectMeta will update the metadata of a file.
@ -1856,7 +1856,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
break
}
objInfo := fi.ToObjectInfo(bucket, object)
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
@ -1911,7 +1911,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
}
oi = actualfi.ToObjectInfo(bucket, object)
oi = actualfi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi)
if len(oi.Parts) == 1 {
var rs *HTTPRangeSpec

View File

@ -1756,6 +1756,8 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
defer cancel()
defer close(results)
versioned := opts.Versioned || opts.VersionSuspended
for _, erasureSet := range z.serverPools {
var wg sync.WaitGroup
for _, set := range erasureSet.sets {
@ -1783,12 +1785,12 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
if opts.WalkAscending {
for i := len(fivs.Versions) - 1; i >= 0; i-- {
version := fivs.Versions[i]
results <- version.ToObjectInfo(bucket, version.Name)
results <- version.ToObjectInfo(bucket, version.Name, versioned)
}
return
}
for _, version := range fivs.Versions {
results <- version.ToObjectInfo(bucket, version.Name)
results <- version.ToObjectInfo(bucket, version.Name, versioned)
}
}

View File

@ -401,7 +401,7 @@ func (es *erasureSingle) CopyObject(ctx context.Context, srcBucket, srcObject, d
if srcOpts.VersionID == "" {
return oi, toObjectErr(errFileNotFound, srcBucket, srcObject)
}
return fi.ToObjectInfo(srcBucket, srcObject), toObjectErr(errMethodNotAllowed, srcBucket, srcObject)
return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), toObjectErr(errMethodNotAllowed, srcBucket, srcObject)
}
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
@ -457,7 +457,7 @@ func (es *erasureSingle) CopyObject(ctx context.Context, srcBucket, srcObject, d
return oi, toObjectErr(err, srcBucket, srcObject)
}
return fi.ToObjectInfo(srcBucket, srcObject), nil
return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), nil
}
putOpts := ObjectOptions{
@ -516,7 +516,7 @@ func (es *erasureSingle) GetObjectNInfo(ctx context.Context, bucket, object stri
return nil, toObjectErr(err, bucket, object)
}
objInfo := fi.ToObjectInfo(bucket, object)
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if objInfo.DeleteMarker {
if opts.VersionID == "" {
return &GetObjectReader{
@ -716,7 +716,7 @@ func (es *erasureSingle) getObjectInfo(ctx context.Context, bucket, object strin
if err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
objInfo = fi.ToObjectInfo(bucket, object)
objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if fi.Deleted {
if opts.VersionID == "" || opts.DeleteMarker {
return objInfo, toObjectErr(errFileNotFound, bucket, object)
@ -740,7 +740,7 @@ func (es *erasureSingle) getObjectInfoAndQuorum(ctx context.Context, bucket, obj
wquorum++
}
objInfo = fi.ToObjectInfo(bucket, object)
objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" {
// Make sure to return object info to provide extra information.
return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object)
@ -889,7 +889,7 @@ func (es *erasureSingle) putMetacacheObject(ctx context.Context, key string, r *
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key)
}
return fi.ToObjectInfo(minioMetaBucket, key), nil
return fi.ToObjectInfo(minioMetaBucket, key, opts.Versioned || opts.VersionSuspended), nil
}
// PutObject - creates an object upon reading from the input stream
@ -1148,7 +1148,7 @@ func (es *erasureSingle) putObject(ctx context.Context, bucket string, object st
// we are adding a new version to this object under the namespace lock, so this is the latest version.
fi.IsLatest = true
return fi.ToObjectInfo(bucket, object), nil
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
func (es *erasureSingle) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error {
@ -1475,7 +1475,7 @@ func (es *erasureSingle) DeleteObject(ctx context.Context, bucket, object string
if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object), nil
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
}
@ -1549,7 +1549,7 @@ func (es *erasureSingle) PutObjectMetadata(ctx context.Context, bucket, object s
opts.VersionID = fi.VersionID
}
objInfo := fi.ToObjectInfo(bucket, object)
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if opts.EvalMetadataFn != nil {
if err := opts.EvalMetadataFn(objInfo); err != nil {
return ObjectInfo{}, err
@ -1565,7 +1565,7 @@ func (es *erasureSingle) PutObjectMetadata(ctx context.Context, bucket, object s
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object), nil
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
// PutObjectTags - replace or add tags to an existing object
@ -1623,7 +1623,7 @@ func (es *erasureSingle) PutObjectTags(ctx context.Context, bucket, object strin
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object), nil
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
// updateObjectMeta will update the metadata of a file.
@ -1739,7 +1739,7 @@ func (es *erasureSingle) TransitionObject(ctx context.Context, bucket, object st
eventName = event.ObjectTransitionFailed
}
objInfo := fi.ToObjectInfo(bucket, object)
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
@ -1794,7 +1794,7 @@ func (es *erasureSingle) restoreTransitionedObject(ctx context.Context, bucket s
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
}
oi = actualfi.ToObjectInfo(bucket, object)
oi = actualfi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi)
if len(oi.Parts) == 1 {
var rs *HTTPRangeSpec
@ -2722,7 +2722,7 @@ func (es *erasureSingle) CompleteMultipartUpload(ctx context.Context, bucket str
fi.IsLatest = true
// Success, return object info.
return fi.ToObjectInfo(bucket, object), nil
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
// AbortMultipartUpload - aborts an ongoing multipart operation
@ -2922,6 +2922,8 @@ func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, result
defer cancel()
defer close(results)
versioned := opts.Versioned || opts.VersionSuspended
var wg sync.WaitGroup
wg.Add(1)
go func() {
@ -2939,12 +2941,12 @@ func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, result
if opts.WalkAscending {
for i := len(fivs.Versions) - 1; i >= 0; i-- {
version := fivs.Versions[i]
results <- version.ToObjectInfo(bucket, version.Name)
results <- version.ToObjectInfo(bucket, version.Name, versioned)
}
return
}
for _, version := range fivs.Versions {
results <- version.ToObjectInfo(bucket, version.Name)
results <- version.ToObjectInfo(bucket, version.Name, versioned)
}
}

View File

@ -438,6 +438,9 @@ func (m metaCacheEntriesSorted) shallowClone() metaCacheEntriesSorted {
func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, afterV string) (versions []ObjectInfo) {
versions = make([]ObjectInfo, 0, m.len())
prevPrefix := ""
vcfg, _ := globalBucketVersioningSys.Get(bucket)
versioned := vcfg != nil && vcfg.Versioned(prefix)
for _, entry := range m.o {
if entry.isObject() {
if delimiter != "" {
@ -473,7 +476,7 @@ func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, aft
}
for _, version := range fiVersions {
versions = append(versions, version.ToObjectInfo(bucket, entry.name))
versions = append(versions, version.ToObjectInfo(bucket, entry.name, versioned))
}
continue
@ -509,6 +512,10 @@ func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, aft
func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (objects []ObjectInfo) {
objects = make([]ObjectInfo, 0, m.len())
prevPrefix := ""
vcfg, _ := globalBucketVersioningSys.Get(bucket)
versioned := vcfg != nil && vcfg.Versioned(prefix)
for _, entry := range m.o {
if entry.isObject() {
if delimiter != "" {
@ -531,7 +538,7 @@ func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (ob
fi, err := entry.fileInfo(bucket)
if err == nil {
objects = append(objects, fi.ToObjectInfo(bucket, entry.name))
objects = append(objects, fi.ToObjectInfo(bucket, entry.name, versioned))
}
continue
}

View File

@ -642,6 +642,8 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
// function closes 'out' and exits.
func filterLifeCycle(ctx context.Context, bucket string, lc lifecycle.Lifecycle, lr lock.Retention, in <-chan metaCacheEntry, out chan<- metaCacheEntry) {
defer close(out)
vcfg, _ := globalBucketVersioningSys.Get(bucket)
for {
var obj metaCacheEntry
var ok bool
@ -658,7 +660,10 @@ func filterLifeCycle(ctx context.Context, bucket string, lc lifecycle.Lifecycle,
if err != nil {
continue
}
objInfo := fi.ToObjectInfo(bucket, obj.name)
versioned := vcfg != nil && vcfg.Versioned(obj.name)
objInfo := fi.ToObjectInfo(bucket, obj.name, versioned)
action := evalActionFromLifecycle(ctx, lc, lr, objInfo, false)
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:

View File

@ -277,8 +277,6 @@ func initAllSubsystems() {
// Create new bucket versioning subsystem
if globalBucketVersioningSys == nil {
globalBucketVersioningSys = NewBucketVersioningSys()
} else {
globalBucketVersioningSys.Reset()
}
// Create new bucket replication subsytem

View File

@ -2984,8 +2984,9 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI
CreatedAt: createdAt,
Location: globalSite.Region,
}
// Get bucket policy if present.
policy, err := globalPolicySys.Get(bucket)
policy, updatedAt, err := globalBucketMetadataSys.GetPolicyConfig(bucket)
found := true
if _, ok := err.(BucketPolicyNotFound); ok {
found = false
@ -2998,6 +2999,7 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI
return info, wrapSRErr(err)
}
bms.Policy = policyJSON
bms.PolicyUpdatedAt = updatedAt
}
// Get bucket tags if present.
@ -3018,6 +3020,23 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI
bms.TagConfigUpdatedAt = updatedAt
}
versioningCfg, updatedAt, err := globalBucketMetadataSys.GetVersioningConfig(bucket)
found = true
if versioningCfg != nil && versioningCfg.Status == "" {
found = false
} else if err != nil {
return info, errSRBackendIssue(err)
}
if found {
versionCfgData, err := xml.Marshal(versioningCfg)
if err != nil {
return info, wrapSRErr(err)
}
versioningCfgStr := base64.StdEncoding.EncodeToString(versionCfgData)
bms.Versioning = &versioningCfgStr
bms.VersioningConfigUpdatedAt = updatedAt
}
// Get object-lock config if present.
objLockCfg, updatedAt, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
found = true

View File

@ -443,6 +443,9 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
}
}
}
vcfg, _ := globalBucketVersioningSys.Get(cache.Info.Name)
// return initialized object layer
objAPI := newObjectLayerFn()
// object layer not initialized, return.
@ -494,9 +497,12 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
}
return sizeSummary{}, errSkipFile
}
versioned := vcfg != nil && vcfg.Versioned(item.objectPath())
for _, version := range fivs.Versions {
atomic.AddUint64(&globalScannerStats.accTotalVersions, 1)
oi := version.ToObjectInfo(item.bucket, item.objectPath())
oi := version.ToObjectInfo(item.bucket, item.objectPath(), versioned)
sz := item.applyActions(ctx, objAPI, oi, &sizeS)
if oi.VersionID != "" && sz == oi.Size {
sizeS.versions++
@ -521,7 +527,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
// apply tier sweep action on free versions
for _, freeVersion := range fivs.FreeVersions {
oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath())
oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath(), versioned)
item.applyTierObjSweep(ctx, objAPI, oi)
}
return sizeS, nil

View File

@ -88,6 +88,11 @@ func (v Versioning) Enabled() bool {
return v.Status == Enabled
}
// Versioned returns if 'prefix' has versioning enabled or suspended.
func (v Versioning) Versioned(prefix string) bool {
return v.PrefixEnabled(prefix) || v.PrefixSuspended(prefix)
}
// PrefixEnabled - returns true if versioning is enabled at the bucket and given
// prefix, false otherwise.
func (v Versioning) PrefixEnabled(prefix string) bool {