avoid repeated large allocations for large parts (#17968)

objects with 10,000 parts and many of them can
cause a large memory spike which can potentially
lead to OOM due to lack of GC.

with previous PR reducing the memory usage significantly
in #17963, this PR reduces this further by 80% under
repeated calls.

Scanner sub-system has no use for the slice of Parts(),
it is better left empty.

```
benchmark                            old ns/op     new ns/op     delta
BenchmarkToFileInfo/ToFileInfo-8     295658        188143        -36.36%

benchmark                            old allocs     new allocs     delta
BenchmarkToFileInfo/ToFileInfo-8     61             60             -1.64%

benchmark                            old bytes     new bytes     delta
BenchmarkToFileInfo/ToFileInfo-8     1097210       227255        -79.29%
```
This commit is contained in:
Harshavardhana 2023-09-02 07:49:24 -07:00 committed by GitHub
parent 8208bcb896
commit 3995355150
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 92 additions and 51 deletions

View File

@ -93,7 +93,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
if srcOpts.VersionID != "" { if srcOpts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true) metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true)
} else { } else {
metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true, false) metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true, false, true)
} }
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
@ -533,7 +533,7 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
return m, err return m, err
} }
func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, readData, inclFreeVers bool) ([]FileInfo, []error) { func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, readData, inclFreeVers, allParts bool) ([]FileInfo, []error) {
metadataArray := make([]*xlMetaV2, len(disks)) metadataArray := make([]*xlMetaV2, len(disks))
metaFileInfos := make([]FileInfo, len(metadataArray)) metaFileInfos := make([]FileInfo, len(metadataArray))
metadataShallowVersions := make([][]xlMetaV2ShallowVersion, len(disks)) metadataShallowVersions := make([][]xlMetaV2ShallowVersion, len(disks))
@ -601,7 +601,7 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r
readQuorum := (len(disks) + 1) / 2 readQuorum := (len(disks) + 1) / 2
meta := &xlMetaV2{versions: mergeXLV2Versions(readQuorum, false, 1, metadataShallowVersions...)} meta := &xlMetaV2{versions: mergeXLV2Versions(readQuorum, false, 1, metadataShallowVersions...)}
lfi, err := meta.ToFileInfo(bucket, object, "", inclFreeVers) lfi, err := meta.ToFileInfo(bucket, object, "", inclFreeVers, allParts)
if err != nil { if err != nil {
for i := range errs { for i := range errs {
if errs[i] == nil { if errs[i] == nil {
@ -631,7 +631,7 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r
// make sure to preserve this for diskmtime based healing bugfix. // make sure to preserve this for diskmtime based healing bugfix.
diskMTime := metaFileInfos[index].DiskMTime diskMTime := metaFileInfos[index].DiskMTime
metaFileInfos[index], errs[index] = metadataArray[index].ToFileInfo(bucket, object, versionID, inclFreeVers) metaFileInfos[index], errs[index] = metadataArray[index].ToFileInfo(bucket, object, versionID, inclFreeVers, allParts)
if errs[index] != nil { if errs[index] != nil {
continue continue
} }
@ -660,7 +660,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
if opts.VersionID != "" { if opts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData) metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData)
} else { } else {
metaArr, errs = readAllXL(ctx, disks, bucket, object, readData, opts.InclFreeVersions) metaArr, errs = readAllXL(ctx, disks, bucket, object, readData, opts.InclFreeVersions, true)
} }
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
@ -1834,7 +1834,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
if opts.VersionID != "" { if opts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
} else { } else {
metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false) metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false, true)
} }
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
@ -1907,7 +1907,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
if opts.VersionID != "" { if opts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
} else { } else {
metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false) metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false, true)
} }
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)

View File

@ -248,9 +248,9 @@ func (e *metaCacheEntry) fileInfo(bucket string) (FileInfo, error) {
ModTime: timeSentinel1970, ModTime: timeSentinel1970,
}, nil }, nil
} }
return e.cached.ToFileInfo(bucket, e.name, "", false) return e.cached.ToFileInfo(bucket, e.name, "", false, false)
} }
return getFileInfo(e.metadata, bucket, e.name, "", false) return getFileInfo(e.metadata, bucket, e.name, "", false, false)
} }
// xlmeta returns the decoded metadata. // xlmeta returns the decoded metadata.
@ -291,7 +291,7 @@ func (e *metaCacheEntry) fileInfoVersions(bucket string) (FileInfoVersions, erro
}, nil }, nil
} }
// Too small gains to reuse cache here. // Too small gains to reuse cache here.
return getFileInfoVersions(e.metadata, bucket, e.name) return getFileInfoVersions(e.metadata, bucket, e.name, false)
} }
// metaCacheEntries is a slice of metacache entries. // metaCacheEntries is a slice of metacache entries.

View File

@ -23,8 +23,8 @@ import (
"github.com/zeebo/xxh3" "github.com/zeebo/xxh3"
) )
func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersions, error) { func getFileInfoVersions(xlMetaBuf []byte, volume, path string, allParts bool) (FileInfoVersions, error) {
fivs, err := getAllFileInfoVersions(xlMetaBuf, volume, path) fivs, err := getAllFileInfoVersions(xlMetaBuf, volume, path, allParts)
if err != nil { if err != nil {
return fivs, err return fivs, err
} }
@ -46,20 +46,20 @@ func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersion
return fivs, nil return fivs, nil
} }
func getAllFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersions, error) { func getAllFileInfoVersions(xlMetaBuf []byte, volume, path string, allParts bool) (FileInfoVersions, error) {
var versions []FileInfo var versions []FileInfo
var err error var err error
if buf, _, e := isIndexedMetaV2(xlMetaBuf); e != nil { if buf, _, e := isIndexedMetaV2(xlMetaBuf); e != nil {
return FileInfoVersions{}, e return FileInfoVersions{}, e
} else if buf != nil { } else if buf != nil {
versions, err = buf.ListVersions(volume, path) versions, err = buf.ListVersions(volume, path, allParts)
} else { } else {
var xlMeta xlMetaV2 var xlMeta xlMetaV2
if err := xlMeta.LoadOrConvert(xlMetaBuf); err != nil { if err := xlMeta.LoadOrConvert(xlMetaBuf); err != nil {
return FileInfoVersions{}, err return FileInfoVersions{}, err
} }
versions, err = xlMeta.ListVersions(volume, path) versions, err = xlMeta.ListVersions(volume, path, allParts)
} }
if err == nil && len(versions) == 0 { if err == nil && len(versions) == 0 {
// This special case is needed to handle len(xlMeta.versions) == 0 // This special case is needed to handle len(xlMeta.versions) == 0
@ -85,7 +85,7 @@ func getAllFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVers
}, nil }, nil
} }
func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data bool) (FileInfo, error) { func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data, allParts bool) (FileInfo, error) {
var fi FileInfo var fi FileInfo
var err error var err error
var inData xlMetaInlineData var inData xlMetaInlineData
@ -93,7 +93,7 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data bool) (F
return FileInfo{}, e return FileInfo{}, e
} else if buf != nil { } else if buf != nil {
inData = data inData = data
fi, err = buf.ToFileInfo(volume, path, versionID) fi, err = buf.ToFileInfo(volume, path, versionID, allParts)
if len(buf) != 0 && errors.Is(err, errFileNotFound) { if len(buf) != 0 && errors.Is(err, errFileNotFound) {
// This special case is needed to handle len(xlMeta.versions) == 0 // This special case is needed to handle len(xlMeta.versions) == 0
return FileInfo{ return FileInfo{
@ -122,7 +122,7 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data bool) (F
}, nil }, nil
} }
inData = xlMeta.data inData = xlMeta.data
fi, err = xlMeta.ToFileInfo(volume, path, versionID, false) fi, err = xlMeta.ToFileInfo(volume, path, versionID, false, allParts)
} }
if !data || err != nil { if !data || err != nil {
return fi, err return fi, err

View File

@ -175,7 +175,7 @@ func TestGetFileInfoVersions(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to serialize xlmeta %v", err) t.Fatalf("Failed to serialize xlmeta %v", err)
} }
fivs, err := getFileInfoVersions(buf, basefi.Volume, basefi.Name) fivs, err := getFileInfoVersions(buf, basefi.Volume, basefi.Name, true)
if err != nil { if err != nil {
t.Fatalf("getFileInfoVersions failed: %v", err) t.Fatalf("getFileInfoVersions failed: %v", err)
} }

View File

@ -432,13 +432,13 @@ func (j xlMetaV2Version) getVersionID() [16]byte {
} }
// ToFileInfo returns FileInfo of the underlying type. // ToFileInfo returns FileInfo of the underlying type.
func (j *xlMetaV2Version) ToFileInfo(volume, path string) (fi FileInfo, err error) { func (j *xlMetaV2Version) ToFileInfo(volume, path string, allParts bool) (fi FileInfo, err error) {
if j == nil { if j == nil {
return fi, errFileNotFound return fi, errFileNotFound
} }
switch j.Type { switch j.Type {
case ObjectType: case ObjectType:
fi, err = j.ObjectV2.ToFileInfo(volume, path) fi, err = j.ObjectV2.ToFileInfo(volume, path, allParts)
case DeleteType: case DeleteType:
fi, err = j.DeleteMarker.ToFileInfo(volume, path) fi, err = j.DeleteMarker.ToFileInfo(volume, path)
case LegacyType: case LegacyType:
@ -585,7 +585,7 @@ func (j *xlMetaV2Object) Signature() [4]byte {
return tmp return tmp
} }
func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) { func (j xlMetaV2Object) ToFileInfo(volume, path string, allParts bool) (FileInfo, error) {
versionID := "" versionID := ""
var uv uuid.UUID var uv uuid.UUID
// check if the version is not "null" // check if the version is not "null"
@ -599,18 +599,21 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) {
ModTime: time.Unix(0, j.ModTime).UTC(), ModTime: time.Unix(0, j.ModTime).UTC(),
VersionID: versionID, VersionID: versionID,
} }
fi.Parts = make([]ObjectPartInfo, len(j.PartNumbers)) if allParts {
for i := range fi.Parts { fi.Parts = make([]ObjectPartInfo, len(j.PartNumbers))
fi.Parts[i].Number = j.PartNumbers[i] for i := range fi.Parts {
fi.Parts[i].Size = j.PartSizes[i] fi.Parts[i].Number = j.PartNumbers[i]
if len(j.PartETags) == len(fi.Parts) { fi.Parts[i].Size = j.PartSizes[i]
fi.Parts[i].ETag = j.PartETags[i] if len(j.PartETags) == len(fi.Parts) {
} fi.Parts[i].ETag = j.PartETags[i]
fi.Parts[i].ActualSize = j.PartActualSizes[i] }
if len(j.PartIndices) == len(fi.Parts) { fi.Parts[i].ActualSize = j.PartActualSizes[i]
fi.Parts[i].Index = j.PartIndices[i] if len(j.PartIndices) == len(fi.Parts) {
fi.Parts[i].Index = j.PartIndices[i]
}
} }
} }
// fi.Erasure.Checksums - is left empty since we do not have any // fi.Erasure.Checksums - is left empty since we do not have any
// whole checksums for many years now, no need to allocate. // whole checksums for many years now, no need to allocate.
@ -1719,7 +1722,7 @@ func (x *xlMetaV2) AddLegacy(m *xlMetaV1Object) error {
// ToFileInfo converts xlMetaV2 into a common FileInfo datastructure // ToFileInfo converts xlMetaV2 into a common FileInfo datastructure
// for consumption across callers. // for consumption across callers.
func (x xlMetaV2) ToFileInfo(volume, path, versionID string, inclFreeVers bool) (fi FileInfo, err error) { func (x xlMetaV2) ToFileInfo(volume, path, versionID string, inclFreeVers, allParts bool) (fi FileInfo, err error) {
var uv uuid.UUID var uv uuid.UUID
if versionID != "" && versionID != nullVersionID { if versionID != "" && versionID != nullVersionID {
uv, err = uuid.Parse(versionID) uv, err = uuid.Parse(versionID)
@ -1747,7 +1750,7 @@ func (x xlMetaV2) ToFileInfo(volume, path, versionID string, inclFreeVers bool)
if inclFreeVers && !freeFound { if inclFreeVers && !freeFound {
// ignore unmarshalling errors, will return errFileNotFound in that case // ignore unmarshalling errors, will return errFileNotFound in that case
if _, err := freeVersion.unmarshalV(x.metaV, ver.meta); err == nil { if _, err := freeVersion.unmarshalV(x.metaV, ver.meta); err == nil {
if freeFi, err = freeVersion.ToFileInfo(volume, path); err == nil { if freeFi, err = freeVersion.ToFileInfo(volume, path, allParts); err == nil {
freeFi.IsLatest = true // when this is returned, it would be the latest free version remaining. freeFi.IsLatest = true // when this is returned, it would be the latest free version remaining.
freeFound = true freeFound = true
} }
@ -1775,7 +1778,7 @@ func (x xlMetaV2) ToFileInfo(volume, path, versionID string, inclFreeVers bool)
if _, err := version.unmarshalV(x.metaV, ver.meta); err != nil { if _, err := version.unmarshalV(x.metaV, ver.meta); err != nil {
return fi, err return fi, err
} }
if fi, err = version.ToFileInfo(volume, path); err != nil { if fi, err = version.ToFileInfo(volume, path, allParts); err != nil {
return fi, err return fi, err
} }
fi.IsLatest = isLatest fi.IsLatest = isLatest
@ -1803,7 +1806,7 @@ func (x xlMetaV2) ToFileInfo(volume, path, versionID string, inclFreeVers bool)
// versions returns error for unexpected entries. // versions returns error for unexpected entries.
// showPendingDeletes is set to true if ListVersions needs to list objects marked deleted // showPendingDeletes is set to true if ListVersions needs to list objects marked deleted
// but waiting to be replicated // but waiting to be replicated
func (x xlMetaV2) ListVersions(volume, path string) ([]FileInfo, error) { func (x xlMetaV2) ListVersions(volume, path string, allParts bool) ([]FileInfo, error) {
versions := make([]FileInfo, 0, len(x.versions)) versions := make([]FileInfo, 0, len(x.versions))
var err error var err error
@ -1813,7 +1816,7 @@ func (x xlMetaV2) ListVersions(volume, path string) ([]FileInfo, error) {
if err != nil { if err != nil {
return versions, err return versions, err
} }
fi, err := dst.ToFileInfo(volume, path) fi, err := dst.ToFileInfo(volume, path, allParts)
if err != nil { if err != nil {
return versions, err return versions, err
} }
@ -2024,7 +2027,7 @@ type xlMetaBuf []byte
// ToFileInfo converts xlMetaV2 into a common FileInfo datastructure // ToFileInfo converts xlMetaV2 into a common FileInfo datastructure
// for consumption across callers. // for consumption across callers.
func (x xlMetaBuf) ToFileInfo(volume, path, versionID string) (fi FileInfo, err error) { func (x xlMetaBuf) ToFileInfo(volume, path, versionID string, allParts bool) (fi FileInfo, err error) {
var uv uuid.UUID var uv uuid.UUID
if versionID != "" && versionID != nullVersionID { if versionID != "" && versionID != nullVersionID {
uv, err = uuid.Parse(versionID) uv, err = uuid.Parse(versionID)
@ -2071,7 +2074,7 @@ func (x xlMetaBuf) ToFileInfo(volume, path, versionID string) (fi FileInfo, err
if _, err := version.unmarshalV(metaV, meta); err != nil { if _, err := version.unmarshalV(metaV, meta); err != nil {
return err return err
} }
if fi, err = version.ToFileInfo(volume, path); err != nil { if fi, err = version.ToFileInfo(volume, path, allParts); err != nil {
return err return err
} }
fi.IsLatest = isLatest fi.IsLatest = isLatest
@ -2095,7 +2098,7 @@ func (x xlMetaBuf) ToFileInfo(volume, path, versionID string) (fi FileInfo, err
// versions returns error for unexpected entries. // versions returns error for unexpected entries.
// showPendingDeletes is set to true if ListVersions needs to list objects marked deleted // showPendingDeletes is set to true if ListVersions needs to list objects marked deleted
// but waiting to be replicated // but waiting to be replicated
func (x xlMetaBuf) ListVersions(volume, path string) ([]FileInfo, error) { func (x xlMetaBuf) ListVersions(volume, path string, allParts bool) ([]FileInfo, error) {
vers, _, metaV, buf, err := decodeXLHeaders(x) vers, _, metaV, buf, err := decodeXLHeaders(x)
if err != nil { if err != nil {
return nil, err return nil, err
@ -2111,7 +2114,7 @@ func (x xlMetaBuf) ListVersions(volume, path string) ([]FileInfo, error) {
if !xl.Valid() { if !xl.Valid() {
return errFileCorrupt return errFileCorrupt
} }
fi, err := xl.ToFileInfo(volume, path) fi, err := xl.ToFileInfo(volume, path, allParts)
if err != nil { if err != nil {
return err return err
} }

View File

@ -1049,7 +1049,7 @@ func TestXMinIOHealingSkip(t *testing.T) {
failOnErr(xl.AddVersion(fi)) failOnErr(xl.AddVersion(fi))
var err error var err error
fi, err = xl.ToFileInfo(fi.Volume, fi.Name, fi.VersionID, false) fi, err = xl.ToFileInfo(fi.Volume, fi.Name, fi.VersionID, false, true)
if err != nil { if err != nil {
t.Fatalf("xl.ToFileInfo failed with %v", err) t.Fatalf("xl.ToFileInfo failed with %v", err)
} }
@ -1058,3 +1058,41 @@ func TestXMinIOHealingSkip(t *testing.T) {
t.Fatal("Expected fi.Healing to be false") t.Fatal("Expected fi.Healing to be false")
} }
} }
func benchmarkManyPartsOptionally(b *testing.B, allParts bool) {
f, err := os.Open("testdata/xl-many-parts.meta")
if err != nil {
b.Fatal(err)
}
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
b.Fatal(err)
}
buf, _, _ := isIndexedMetaV2(data)
if buf == nil {
b.Fatal("buf == nil")
}
b.Run("ToFileInfo", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, err = buf.ToFileInfo("volume", "path", "", allParts)
if err != nil {
b.Fatal(err)
}
}
})
}
func BenchmarkToFileInfoNoParts(b *testing.B) {
benchmarkManyPartsOptionally(b, false)
}
func BenchmarkToFileInfoWithParts(b *testing.B) {
benchmarkManyPartsOptionally(b, true)
}

View File

@ -485,7 +485,7 @@ func BenchmarkXlMetaV2Shallow(b *testing.B) {
b.Fatal(err) b.Fatal(err)
} }
// List... // List...
_, err = xl.ToFileInfo("volume", "path", ids[rng.Intn(size)], false) _, err = xl.ToFileInfo("volume", "path", ids[rng.Intn(size)], false, true)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -503,7 +503,7 @@ func BenchmarkXlMetaV2Shallow(b *testing.B) {
b.Fatal(err) b.Fatal(err)
} }
// List... // List...
_, err = xl.ListVersions("volume", "path") _, err = xl.ListVersions("volume", "path", true)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -518,7 +518,7 @@ func BenchmarkXlMetaV2Shallow(b *testing.B) {
if buf == nil { if buf == nil {
b.Fatal("buf == nil") b.Fatal("buf == nil")
} }
_, err = buf.ToFileInfo("volume", "path", ids[rng.Intn(size)]) _, err = buf.ToFileInfo("volume", "path", ids[rng.Intn(size)], true)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -533,7 +533,7 @@ func BenchmarkXlMetaV2Shallow(b *testing.B) {
if buf == nil { if buf == nil {
b.Fatal("buf == nil") b.Fatal("buf == nil")
} }
_, err = buf.ListVersions("volume", "path") _, err = buf.ListVersions("volume", "path", true)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View File

@ -26,7 +26,7 @@ import (
) )
func (x xlMetaV2) listFreeVersions(volume, path string) ([]FileInfo, error) { func (x xlMetaV2) listFreeVersions(volume, path string) ([]FileInfo, error) {
fivs, err := x.ListVersions(volume, path) fivs, err := x.ListVersions(volume, path, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -179,7 +179,7 @@ func TestFreeVersion(t *testing.T) {
} }
for _, ft := range freeVersionsTests { for _, ft := range freeVersionsTests {
fi, err := xl.ToFileInfo(ft.vol, ft.name, "", ft.inclFreeVers) fi, err := xl.ToFileInfo(ft.vol, ft.name, "", ft.inclFreeVers, true)
if err != nil && !errors.Is(err, ft.expectedErr) { if err != nil && !errors.Is(err, ft.expectedErr) {
t.Fatalf("ToFileInfo failed due to %v", err) t.Fatalf("ToFileInfo failed due to %v", err)
} }

View File

@ -528,7 +528,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
// Remove filename which is the meta file. // Remove filename which is the meta file.
item.transformMetaDir() item.transformMetaDir()
fivs, err := getFileInfoVersions(buf, item.bucket, item.objectPath()) fivs, err := getFileInfoVersions(buf, item.bucket, item.objectPath(), false)
metaDataPoolPut(buf) metaDataPoolPut(buf)
if err != nil { if err != nil {
res["err"] = err.Error() res["err"] = err.Error()
@ -1437,7 +1437,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
return fi, err return fi, err
} }
fi, err = getFileInfo(buf, volume, path, versionID, readData) fi, err = getFileInfo(buf, volume, path, versionID, readData, true)
if err != nil { if err != nil {
return fi, err return fi, err
} }
@ -2410,7 +2410,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
} }
// Replace the data of null version or any other existing version-id // Replace the data of null version or any other existing version-id
ofi, err := xlMeta.ToFileInfo(dstVolume, dstPath, reqVID, false) ofi, err := xlMeta.ToFileInfo(dstVolume, dstPath, reqVID, false, false)
if err == nil && !ofi.Deleted { if err == nil && !ofi.Deleted {
if xlMeta.SharedDataDirCountStr(reqVID, ofi.DataDir) == 0 { if xlMeta.SharedDataDirCountStr(reqVID, ofi.DataDir) == 0 {
// Purge the destination path as we are not preserving anything // Purge the destination path as we are not preserving anything