Add abandoned folder scanning to metrics (#18076)

Include object and versions heal scan times when checking non-empty abandoned folders.

Furthermore don't add delay between healing versions, instead do one per object wait.
This commit is contained in:
Klaus Post 2023-09-24 22:15:31 -07:00 committed by GitHub
parent 8a672e70a7
commit 57f84a8b4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 18 deletions

View File

@ -66,10 +66,12 @@ const (
scannerMetricYield
scannerMetricCleanAbandoned
scannerMetricApplyNonCurrent
scannerMetricHealAbandonedVersion
// START Trace metrics:
scannerMetricStartTrace
scannerMetricScanObject // Scan object. All operations included.
scannerMetricHealAbandonedObject
// END realtime metrics:
scannerMetricLastRealtime

View File

@ -689,6 +689,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
reportNotFound: true,
minDisks: f.disksQuorum,
agreed: func(entry metaCacheEntry) {
f.updateCurrentPath(entry.name)
if f.dataUsageScannerDebug {
console.Debugf(healObjectsPrefix+" got agreement: %v\n", entry.name)
}
@ -702,6 +703,13 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
// this object might be dangling.
entry, _ = entries.firstFound()
}
// wait timer per object.
wait := scannerSleeper.Timer(ctx)
defer wait()
f.updateCurrentPath(entry.name)
stopFn := globalScannerMetrics.log(scannerMetricHealAbandonedObject, f.root, entry.name)
custom := make(map[string]string)
defer stopFn(custom)
if f.dataUsageScannerDebug {
console.Debugf(healObjectsPrefix+" resolved to: %v, dir: %v\n", entry.name, entry.isDir())
@ -711,13 +719,9 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
return
}
// wait on timer per object.
wait := scannerSleeper.Timer(ctx)
// We got an entry which we should be able to heal.
fiv, err := entry.fileInfoVersions(bucket)
if err != nil {
wait()
err := bgSeq.queueHealTask(healSource{
bucket: bucket,
object: entry.name,
@ -730,21 +734,28 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
return
}
custom["versions"] = fmt.Sprint(len(fiv.Versions))
var successVersions, failVersions int
for _, ver := range fiv.Versions {
// Sleep and reset.
wait()
wait = scannerSleeper.Timer(ctx)
stopFn := globalScannerMetrics.timeSize(scannerMetricHealAbandonedVersion)
err := bgSeq.queueHealTask(healSource{
bucket: bucket,
object: fiv.Name,
versionID: ver.VersionID,
}, madmin.HealItemObject)
stopFn(int(ver.Size))
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
logger.LogIf(ctx, err)
}
if err == nil {
successVersions++
} else {
failVersions++
}
foundObjs = foundObjs || err == nil
}
custom["success_versions"] = fmt.Sprint(successVersions)
custom["failed_versions"] = fmt.Sprint(failVersions)
},
// Too many disks failed.
finished: func(errs []error) {

View File

@ -20,19 +20,21 @@ func _() {
_ = x[scannerMetricYield-9]
_ = x[scannerMetricCleanAbandoned-10]
_ = x[scannerMetricApplyNonCurrent-11]
_ = x[scannerMetricStartTrace-12]
_ = x[scannerMetricScanObject-13]
_ = x[scannerMetricLastRealtime-14]
_ = x[scannerMetricScanFolder-15]
_ = x[scannerMetricScanCycle-16]
_ = x[scannerMetricScanBucketDrive-17]
_ = x[scannerMetricCompactFolder-18]
_ = x[scannerMetricLast-19]
_ = x[scannerMetricHealAbandonedVersion-12]
_ = x[scannerMetricStartTrace-13]
_ = x[scannerMetricScanObject-14]
_ = x[scannerMetricHealAbandonedObject-15]
_ = x[scannerMetricLastRealtime-16]
_ = x[scannerMetricScanFolder-17]
_ = x[scannerMetricScanCycle-18]
_ = x[scannerMetricScanBucketDrive-19]
_ = x[scannerMetricCompactFolder-20]
_ = x[scannerMetricLast-21]
}
const _scannerMetric_name = "ReadMetadataCheckMissingSaveUsageApplyAllApplyVersionTierObjSweepHealCheckILMCheckReplicationYieldCleanAbandonedApplyNonCurrentStartTraceScanObjectLastRealtimeScanFolderScanCycleScanBucketDriveCompactFolderLast"
const _scannerMetric_name = "ReadMetadataCheckMissingSaveUsageApplyAllApplyVersionTierObjSweepHealCheckILMCheckReplicationYieldCleanAbandonedApplyNonCurrentHealAbandonedVersionStartTraceScanObjectHealAbandonedObjectLastRealtimeScanFolderScanCycleScanBucketDriveCompactFolderLast"
var _scannerMetric_index = [...]uint8{0, 12, 24, 33, 41, 53, 65, 74, 77, 93, 98, 112, 127, 137, 147, 159, 169, 178, 193, 206, 210}
var _scannerMetric_index = [...]uint8{0, 12, 24, 33, 41, 53, 65, 74, 77, 93, 98, 112, 127, 147, 157, 167, 186, 198, 208, 217, 232, 245, 249}
func (i scannerMetric) String() string {
if i >= scannerMetric(len(_scannerMetric_index)-1) {