heal: Avoid marking a bucket as done when remote drives are offline (#19587)

This commit is contained in:
Anis Eleuch 2024-04-26 07:32:14 +01:00 committed by GitHub
parent f4f1c42cba
commit 135874ebdc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 24 additions and 22 deletions

View File

@ -44,8 +44,8 @@ const (
healingMetricCheckAbandonedParts healingMetricCheckAbandonedParts
) )
func (er erasureObjects) listAndHeal(bucket, prefix string, scanMode madmin.HealScanMode, healEntry func(string, metaCacheEntry, madmin.HealScanMode) error) error { func (er erasureObjects) listAndHeal(ctx context.Context, bucket, prefix string, scanMode madmin.HealScanMode, healEntry func(string, metaCacheEntry, madmin.HealScanMode) error) error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
disks, _ := er.getOnlineDisksWithHealing(false) disks, _ := er.getOnlineDisksWithHealing(false)

View File

@ -2270,7 +2270,7 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
go func(idx int, set *erasureObjects) { go func(idx int, set *erasureObjects) {
defer wg.Done() defer wg.Done()
errs[idx] = set.listAndHeal(bucket, prefix, opts.ScanMode, healEntry) errs[idx] = set.listAndHeal(ctx, bucket, prefix, opts.ScanMode, healEntry)
}(idx, set) }(idx, set)
} }
wg.Wait() wg.Wait()

View File

@ -272,11 +272,11 @@ func (er erasureObjects) LocalStorageInfo(ctx context.Context, metrics bool) Sto
} }
// getOnlineDisksWithHealingAndInfo - returns online disks and overall healing status. // getOnlineDisksWithHealingAndInfo - returns online disks and overall healing status.
// Disks are randomly ordered, but in the following groups: // Disks are ordered in the following groups:
// - Non-scanning disks // - Non-scanning disks
// - Non-healing disks // - Non-healing disks
// - Healing disks (if inclHealing is true) // - Healing disks (if inclHealing is true)
func (er erasureObjects) getOnlineDisksWithHealingAndInfo(inclHealing bool) (newDisks []StorageAPI, newInfos []DiskInfo, healing bool) { func (er erasureObjects) getOnlineDisksWithHealingAndInfo(inclHealing bool) (newDisks []StorageAPI, newInfos []DiskInfo, healing int) {
var wg sync.WaitGroup var wg sync.WaitGroup
disks := er.getDisks() disks := er.getDisks()
infos := make([]DiskInfo, len(disks)) infos := make([]DiskInfo, len(disks))
@ -315,7 +315,7 @@ func (er erasureObjects) getOnlineDisksWithHealingAndInfo(inclHealing bool) (new
continue continue
} }
if info.Healing { if info.Healing {
healing = true healing++
if inclHealing { if inclHealing {
healingDisks = append(healingDisks, disks[i]) healingDisks = append(healingDisks, disks[i])
healingInfos = append(healingInfos, infos[i]) healingInfos = append(healingInfos, infos[i])
@ -343,9 +343,9 @@ func (er erasureObjects) getOnlineDisksWithHealingAndInfo(inclHealing bool) (new
return newDisks, newInfos, healing return newDisks, newInfos, healing
} }
func (er erasureObjects) getOnlineDisksWithHealing(inclHealing bool) (newDisks []StorageAPI, healing bool) { func (er erasureObjects) getOnlineDisksWithHealing(inclHealing bool) ([]StorageAPI, bool) {
newDisks, _, healing = er.getOnlineDisksWithHealingAndInfo(inclHealing) newDisks, _, healing := er.getOnlineDisksWithHealingAndInfo(inclHealing)
return return newDisks, healing > 0
} }
// Clean-up previously deleted objects. from .minio.sys/tmp/.trash/ // Clean-up previously deleted objects. from .minio.sys/tmp/.trash/

View File

@ -259,12 +259,17 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
bucket, humanize.Ordinal(er.setIndex+1)) bucket, humanize.Ordinal(er.setIndex+1))
} }
disks, _ := er.getOnlineDisksWithHealing(false) disks, _, healing := er.getOnlineDisksWithHealingAndInfo(true)
if len(disks) == 0 { if len(disks) == healing {
// No object healing necessary // All drives in this erasure set were reformatted for some reasons, abort healing and mark it as successful
tracker.bucketDone(bucket) healingLogIf(ctx, errors.New("all drives are in healing state, aborting.."))
healingLogIf(ctx, tracker.update(ctx)) return nil
continue }
disks = disks[:len(disks)-healing] // healing drives are always at the end of the list
if len(disks) < er.setDriveCount/2 {
return fmt.Errorf("not enough drives (found=%d, healing=%d, total=%d) are available to heal `%s`", len(disks), healing, er.setDriveCount, tracker.disk.String())
} }
rand.Shuffle(len(disks), func(i, j int) { rand.Shuffle(len(disks), func(i, j int) {
@ -465,27 +470,24 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
waitForLowHTTPReq() waitForLowHTTPReq()
} }
actualBucket, prefix := path2BucketObject(bucket)
// How to resolve partial results. // How to resolve partial results.
resolver := metadataResolutionParams{ resolver := metadataResolutionParams{
dirQuorum: 1, dirQuorum: 1,
objQuorum: 1, objQuorum: 1,
bucket: actualBucket, bucket: bucket,
} }
err = listPathRaw(ctx, listPathRawOptions{ err = listPathRaw(ctx, listPathRawOptions{
disks: disks, disks: disks,
fallbackDisks: fallbackDisks, fallbackDisks: fallbackDisks,
bucket: actualBucket, bucket: bucket,
path: prefix,
recursive: true, recursive: true,
forwardTo: forwardTo, forwardTo: forwardTo,
minDisks: 1, minDisks: 1,
reportNotFound: false, reportNotFound: false,
agreed: func(entry metaCacheEntry) { agreed: func(entry metaCacheEntry) {
jt.Take() jt.Take()
go healEntry(actualBucket, entry) go healEntry(bucket, entry)
}, },
partial: func(entries metaCacheEntries, _ []error) { partial: func(entries metaCacheEntries, _ []error) {
entry, ok := entries.resolve(&resolver) entry, ok := entries.resolve(&resolver)
@ -495,7 +497,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
entry, _ = entries.firstFound() entry, _ = entries.firstFound()
} }
jt.Take() jt.Take()
go healEntry(actualBucket, *entry) go healEntry(bucket, *entry)
}, },
finished: nil, finished: nil,
}) })