skip ILM expired objects from healing (#18569)

This commit is contained in:
Harshavardhana 2023-12-01 07:56:24 -08:00 committed by GitHub
parent 5f971fea6e
commit 109a9e3f35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 13 deletions

View File

@ -25,6 +25,7 @@ type lcEventSrc uint8
//revive:disable:var-naming Underscores is used here to indicate where common prefix ends and the enumeration name begins
const (
lcEventSrc_None lcEventSrc = iota
lcEventSrc_Heal
lcEventSrc_Scanner
lcEventSrc_Decom
lcEventSrc_Rebal

View File

@ -769,7 +769,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
evt := evalActionFromLifecycle(ctx, *lc, lr, rcfg, objInfo)
switch {
case evt.Action.DeleteRestored(): // if restored copy has expired,delete it synchronously
case evt.Action.DeleteRestored(): // if restored copy has expired, delete it synchronously
applyExpiryOnTransitionedObject(ctx, z, objInfo, evt, lcEventSrc_Decom)
return false
case evt.Action.Delete():

View File

@ -195,6 +195,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
tracker.setObject("")
tracker.setBucket(bucket)
// Heal current bucket again in case if it is failed
// in the beginning of erasure set healing
if _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{
@ -204,6 +205,15 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
continue
}
vc, _ := globalBucketVersioningSys.Get(bucket)
// Check if the current bucket has a configured lifecycle policy
lc, _ := globalLifecycleSys.Get(bucket)
// Check if bucket is object locked.
lr, _ := globalBucketObjectLockSys.Get(bucket)
rcfg, _ := getReplicationConfig(ctx, bucket)
if serverDebugLog {
console.Debugf(color.Green("healDrive:")+" healing bucket %s content on %s erasure set\n",
bucket, humanize.Ordinal(er.setIndex+1))
@ -244,6 +254,26 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
}
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
if lc == nil {
return false
}
versioned := vc != nil && vc.Versioned(object)
objInfo := fi.ToObjectInfo(bucket, object, versioned)
evt := evalActionFromLifecycle(ctx, *lc, lr, rcfg, objInfo)
switch {
case evt.Action.DeleteRestored(): // if restored copy has expired,delete it synchronously
applyExpiryOnTransitionedObject(ctx, newObjectLayerFn(), objInfo, evt, lcEventSrc_Heal)
return false
case evt.Action.Delete():
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_Heal)
return true
default:
return false
}
}
// Collect updates to tracker from concurrent healEntry calls
results := make(chan healEntryResult, 1000)
go func() {
@ -326,10 +356,17 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
var versionNotFound int
for _, version := range fivs.Versions {
// Ignore a version with a modtime newer than healing start
// Ignore a version with a modtime newer than healing start time.
if version.ModTime.After(tracker.Started) {
continue
}
// Apply lifecycle rules on the objects that are expired.
if filterLifecycle(bucket, version.Name, version) {
versionNotFound++
continue
}
if _, err := er.HealObject(ctx, bucket, encodedEntryName,
version.VersionID, madmin.HealOpts{
ScanMode: scanMode,

View File

@ -9,20 +9,21 @@ func _() {
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[lcEventSrc_None-0]
_ = x[lcEventSrc_Scanner-1]
_ = x[lcEventSrc_Decom-2]
_ = x[lcEventSrc_Rebal-3]
_ = x[lcEventSrc_s3HeadObject-4]
_ = x[lcEventSrc_s3GetObject-5]
_ = x[lcEventSrc_s3ListObjects-6]
_ = x[lcEventSrc_s3PutObject-7]
_ = x[lcEventSrc_s3CopyObject-8]
_ = x[lcEventSrc_s3CompleteMultipartUpload-9]
_ = x[lcEventSrc_Heal-1]
_ = x[lcEventSrc_Scanner-2]
_ = x[lcEventSrc_Decom-3]
_ = x[lcEventSrc_Rebal-4]
_ = x[lcEventSrc_s3HeadObject-5]
_ = x[lcEventSrc_s3GetObject-6]
_ = x[lcEventSrc_s3ListObjects-7]
_ = x[lcEventSrc_s3PutObject-8]
_ = x[lcEventSrc_s3CopyObject-9]
_ = x[lcEventSrc_s3CompleteMultipartUpload-10]
}
const _lcEventSrc_name = "NoneScannerDecomRebals3HeadObjects3GetObjects3ListObjectss3PutObjects3CopyObjects3CompleteMultipartUpload"
const _lcEventSrc_name = "NoneHealScannerDecomRebals3HeadObjects3GetObjects3ListObjectss3PutObjects3CopyObjects3CompleteMultipartUpload"
var _lcEventSrc_index = [...]uint8{0, 4, 11, 16, 21, 33, 44, 57, 68, 80, 105}
var _lcEventSrc_index = [...]uint8{0, 4, 8, 15, 20, 25, 37, 48, 61, 72, 84, 109}
func (i lcEventSrc) String() string {
if i >= lcEventSrc(len(_lcEventSrc_index)-1) {