allow expiration of all versions during Listing() (#16757)

This commit is contained in:
Harshavardhana 2023-03-09 15:15:30 -08:00 committed by GitHub
parent 18f9cccfa7
commit b984bf8d1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 127 additions and 93 deletions

View File

@ -30,15 +30,15 @@ getdeps: ## fetch necessary dependencies
crosscompile: ## cross compile minio
@(env bash $(PWD)/buildscripts/cross-compile.sh)
verifiers: getdeps lint check-gen
verifiers: lint check-gen
check-gen: ## check for updated autogenerated files
@go generate ./... >/dev/null
@(! git diff --name-only | grep '_gen.go$$') || (echo "Non-committed changes in auto-generated code is detected, please commit them to proceed." && false)
lint: ## runs golangci-lint suite of linters
lint: getdeps ## runs golangci-lint suite of linters
@echo "Running $@ check"
@$(GOLANGCI) run --build-tags kqueue --timeout=10m --config ./.golangci.yml
@$(GOLANGCI) run --build-tags kqueue --timeout=10m --config ./.golangci.yml --fix
check: test
test: verifiers build ## builds minio, runs linters, tests

View File

@ -32,7 +32,6 @@ import (
"github.com/dustin/go-humanize"
"github.com/minio/madmin-go/v2"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console"
@ -735,17 +734,15 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
}
versioned := vc != nil && vc.Versioned(object)
objInfo := fi.ToObjectInfo(bucket, object, versioned)
evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
switch evt.Action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
globalExpiryState.enqueueByDays(objInfo, false, evt.Action == lifecycle.DeleteVersionAction)
// Skip this entry.
return true
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
globalExpiryState.enqueueByDays(objInfo, true, evt.Action == lifecycle.DeleteRestoredVersionAction)
// Skip this entry.
return true
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt.Action.DeleteRestored(), evt.Action.DeleteVersioned())
if !evt.Action.DeleteRestored() {
return true
}
}
return false
}

View File

@ -32,7 +32,6 @@ import (
"github.com/lithammer/shortuuid/v4"
"github.com/minio/madmin-go/v2"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/env"
@ -463,17 +462,13 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
}
versioned := vc != nil && vc.Versioned(object)
objInfo := fi.ToObjectInfo(bucket, object, versioned)
event := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
switch action := event.Action; action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
globalExpiryState.enqueueByDays(objInfo, false, action == lifecycle.DeleteVersionAction)
// Skip this entry.
return true
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
globalExpiryState.enqueueByDays(objInfo, true, action == lifecycle.DeleteRestoredVersionAction)
// Skip this entry.
evt := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt.Action.DeleteRestored(), evt.Action.DeleteVersioned())
return true
}
return false
}

View File

@ -37,7 +37,6 @@ import (
"github.com/minio/minio-go/v7/pkg/s3utils"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
@ -1305,11 +1304,12 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
if err == nil {
if opts.Lifecycle != nil {
evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, objInfo)
switch evt.Action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
fallthrough
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
return loi, nil
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt.Action.DeleteRestored(), evt.Action.DeleteVersioned())
if !evt.Action.DeleteRestored() {
// Skip entry if ILM action was DeleteVersionAction or DeleteAction
return loi, nil
}
}
}
loi.Objects = append(loi.Objects, objInfo)

View File

@ -28,7 +28,6 @@ import (
"sync"
"time"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/logger"
)
@ -341,7 +340,6 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCacheEntry, out chan<- metaCacheEntry) {
defer close(out)
vcfg, _ := globalBucketVersioningSys.Get(o.Bucket)
for {
var obj metaCacheEntry
var ok bool
@ -354,37 +352,61 @@ func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCa
}
}
fi, err := obj.fileInfo(o.Bucket)
var skip bool
versioned := o.Versioning != nil && o.Versioning.Versioned(obj.name)
// skip latest object from listing only for regular
// listObjects calls, versioned based listing cannot
// filter out between versions 'obj' cannot be truncated
// in such a manner, so look for skipping an object only
// for regular ListObjects() call only.
if !o.Versioned {
fi, err := obj.fileInfo(o.Bucket)
if err != nil {
continue
}
objInfo := fi.ToObjectInfo(o.Bucket, obj.name, versioned)
if o.Lifecycle != nil {
act := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, objInfo).Action
skip = act.Delete()
if act.DeleteRestored() {
// do not skip DeleteRestored* actions
skip = false
}
}
}
// Skip entry only if needed via ILM, skipping is never true for versioned listing.
if !skip {
select {
case <-ctx.Done():
return
case out <- obj:
}
}
fiv, err := obj.fileInfoVersions(o.Bucket)
if err != nil {
continue
}
versioned := vcfg != nil && vcfg.Versioned(obj.name)
// Expire all versions if needed, if not attempt to queue for replication.
for _, version := range fiv.Versions {
objInfo := version.ToObjectInfo(o.Bucket, obj.name, versioned)
objInfo := fi.ToObjectInfo(o.Bucket, obj.name, versioned)
if o.Lifecycle != nil {
evt := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, objInfo)
switch evt.Action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
globalExpiryState.enqueueByDays(objInfo, false, evt.Action == lifecycle.DeleteVersionAction)
// Skip this entry.
continue
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
globalExpiryState.enqueueByDays(objInfo, true, evt.Action == lifecycle.DeleteRestoredVersionAction)
// Skip this entry.
continue
}
}
select {
case <-ctx.Done():
return
case out <- obj:
if fiv, err := obj.fileInfoVersions(o.Bucket); err == nil {
for _, version := range fiv.Versions {
objInfo := version.ToObjectInfo(o.Bucket, obj.name, versioned)
queueReplicationHeal(ctx, o.Bucket, objInfo, o.Replication)
if o.Lifecycle != nil {
evt := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, objInfo)
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt.Action.DeleteRestored(), evt.Action.DeleteVersioned())
if !evt.Action.DeleteRestored() {
continue
} // queue version for replication upon expired restored copies if needed.
}
}
queueReplicationHeal(ctx, o.Bucket, objInfo, o.Replication)
}
}
}

View File

@ -34,6 +34,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/versioning"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
@ -95,6 +96,10 @@ type listPathOptions struct {
// Versioned is this a ListObjectVersions call.
Versioned bool
// Versioning config is used for if the path
// has versioning enabled.
Versioning *versioning.Versioning
// Lifecycle performs filtering based on lifecycle.
// This will filter out objects if the most recent version should be deleted by lifecycle.
// Is not transferred across request calls.
@ -119,12 +124,14 @@ func init() {
func (o *listPathOptions) setBucketMeta(ctx context.Context) {
lc, _ := globalLifecycleSys.Get(o.Bucket)
vc, _ := globalBucketVersioningSys.Get(o.Bucket)
// Check if bucket is object locked.
rcfg, _ := globalBucketObjectLockSys.Get(o.Bucket)
replCfg, _, _ := globalBucketMetadataSys.GetReplicationConfig(ctx, o.Bucket)
tgts, _ := globalBucketTargetSys.ListBucketTargets(ctx, o.Bucket)
o.Lifecycle = lc
o.Versioning = vc
o.Replication = replicationConfig{
Config: replCfg,
remotes: tgts,

View File

@ -472,22 +472,18 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
}
if !proxy.Proxy { // apply lifecycle rules only for local requests
// Automatically remove the object/version is an expiry lifecycle rule can be applied
// Automatically remove the object/version if an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
evt := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo)
var success bool
switch evt.Action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
success = applyExpiryRule(objInfo, false, evt.Action == lifecycle.DeleteVersionAction)
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
// Restored object delete would be still allowed to proceed as success
// since transition behavior is slightly different.
applyExpiryRule(objInfo, true, evt.Action == lifecycle.DeleteRestoredVersionAction)
}
if success {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
act := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo).Action
if act.Delete() {
// apply whatever the expiry rule is.
applyExpiryRule(objInfo, act.DeleteRestored(), act.DeleteVersioned())
if !act.DeleteRestored() {
// If the ILM action is not on restored object return error.
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
}
}
}
@ -729,22 +725,18 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
}
if !proxy.Proxy { // apply lifecycle rules only locally not for proxied requests
// Automatically remove the object/version is an expiry lifecycle rule can be applied
// Automatically remove the object/version if an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
evt := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo)
var success bool
switch evt.Action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
success = applyExpiryRule(objInfo, false, evt.Action == lifecycle.DeleteVersionAction)
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
// Restored object delete would be still allowed to proceed as success
// since transition behavior is slightly different.
applyExpiryRule(objInfo, true, evt.Action == lifecycle.DeleteRestoredVersionAction)
}
if success {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
act := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo).Action
if act.Delete() {
// apply whatever the expiry rule is.
applyExpiryRule(objInfo, act.DeleteRestored(), act.DeleteVersioned())
if !act.DeleteRestored() {
// If the ILM action is not on restored object return error.
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
}
}
}
QueueReplicationHeal(ctx, bucket, objInfo)

View File

@ -1285,11 +1285,9 @@ func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) {
// Renaming xl.json to xl.meta should be fully synced to disk.
defer func() {
if err == nil {
if s.globalSync {
// Sync to disk only upon success.
globalSync()
}
if err == nil && s.globalSync {
// Sync to disk only upon success.
globalSync()
}
}()
@ -2140,17 +2138,22 @@ func skipAccessChecks(volume string) (ok bool) {
// RenameData - rename source path to destination path atomically, metadata and data directory.
func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (sign uint64, err error) {
defer func() {
if err != nil && !contextCanceled(ctx) && !errors.Is(err, errFileNotFound) {
ignoredErrs := []error{
errFileNotFound,
errVolumeNotFound,
errFileVersionNotFound,
errDiskNotFound,
errUnformattedDisk,
}
if err != nil && !IsErr(err, ignoredErrs...) && !contextCanceled(ctx) {
// Only log these errors if context is not yet canceled.
logger.LogIf(ctx, fmt.Errorf("srcVolume: %s, srcPath: %s, dstVolume: %s:, dstPath: %s - error %v",
srcVolume, srcPath,
dstVolume, dstPath,
err))
}
if err == nil {
if s.globalSync {
globalSync()
}
if err == nil && s.globalSync {
globalSync()
}
}()

View File

@ -70,6 +70,24 @@ const (
ActionCount
)
// DeleteRestored - Returns true if action demands delete on restored objects
func (a Action) DeleteRestored() bool {
return a == DeleteRestoredAction || a == DeleteRestoredVersionAction
}
// DeleteVersioned - Returns true if action demands delete on a versioned object
func (a Action) DeleteVersioned() bool {
return a == DeleteVersionAction || a == DeleteRestoredVersionAction
}
// Delete - Returns true if action demands delete on all objects (including restored)
func (a Action) Delete() bool {
if a.DeleteRestored() {
return true
}
return a == DeleteVersionAction || a == DeleteAction
}
// Lifecycle - Configuration for bucket lifecycle.
type Lifecycle struct {
XMLName xml.Name `xml:"LifecycleConfiguration"`