turn-off Nlink readdir() optimization for NFS/CIFS (#19420)

fixes #19418
fixes #19416
This commit is contained in:
Harshavardhana 2024-04-05 08:17:08 -07:00 committed by GitHub
parent 96d226c0b1
commit a207bd6790
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 28 additions and 21 deletions

View File

@ -25,22 +25,19 @@ import (
)
// Returns true if no error and there is no object or prefix inside this directory
func isDirEmpty(dirname string) bool {
var stat syscall.Stat_t
if err := syscall.Stat(dirname, &stat); err != nil {
return false
}
if stat.Mode&syscall.S_IFMT == syscall.S_IFDIR && stat.Nlink == 2 {
return true
}
// On filesystems such as btrfs, nfs this is not true, so fallback
// to performing readdir() instead.
if stat.Mode&syscall.S_IFMT == syscall.S_IFDIR && stat.Nlink < 2 {
func isDirEmpty(dirname string, legacy bool) bool {
if legacy {
// On filesystems such as btrfs, nfs this is not true, so fallback
// to performing readdir() instead.
entries, err := readDirN(dirname, 1)
if err != nil {
return false
}
return len(entries) == 0
}
return false
var stat syscall.Stat_t
if err := syscall.Stat(dirname, &stat); err != nil {
return false
}
return stat.Mode&syscall.S_IFMT == syscall.S_IFDIR && stat.Nlink == 2
}

View File

@ -21,7 +21,7 @@
package cmd
// isDirEmpty - returns true if there is no error and no object and prefix inside this directory
func isDirEmpty(dirname string) bool {
func isDirEmpty(dirname string, _ bool) bool {
entries, err := readDirN(dirname, 1)
if err != nil {
return false

View File

@ -59,10 +59,22 @@ type WalkDirOptions struct {
DiskID string
}
// supported FS for Nlink optimization in readdir.
const (
xfs = "XFS"
ext4 = "EXT4"
)
// WalkDir will traverse a directory and return all entries found.
// On success a sorted meta cache stream will be returned.
// Metadata has data stripped, if any.
func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) {
legacyFS := !(s.fsType == xfs || s.fsType == ext4)
s.RLock()
legacy := s.formatLegacy
s.RUnlock()
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(opts.Bucket)
if err != nil {
@ -76,10 +88,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
}
}
s.RLock()
legacy := s.formatLegacy
s.RUnlock()
// Use a small block size to start sending quickly
w := newMetacacheWriter(wr, 16<<10)
w.reuseBlocks = true // We are not sharing results, so reuse buffers.
@ -353,7 +361,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// NOT an object, append to stack (with slash)
// If dirObject, but no metadata (which is unexpected) we skip it.
if !isDirObj {
if !isDirEmpty(pathJoinBuf(sb, volumeDir, meta.name)) {
if !isDirEmpty(pathJoinBuf(sb, volumeDir, meta.name), legacyFS) {
dirStack = append(dirStack, meta.name+slashSeparator)
}
}

View File

@ -117,6 +117,7 @@ type xlStorage struct {
nrRequests uint64
major, minor uint32
fsType string
immediatePurge chan string
@ -254,6 +255,7 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
}
s.major = info.Major
s.minor = info.Minor
s.fsType = info.FSType
if !globalIsCICD && !globalIsErasureSD {
var rootDrive bool

View File

@ -190,7 +190,7 @@ func TestXLStorageIsDirEmpty(t *testing.T) {
// Should give false on non-existent directory.
dir1 := slashpath.Join(tmp, "non-existent-directory")
if isDirEmpty(dir1) {
if isDirEmpty(dir1, true) {
t.Error("expected false for non-existent directory, got true")
}
@ -201,7 +201,7 @@ func TestXLStorageIsDirEmpty(t *testing.T) {
t.Fatal(err)
}
if isDirEmpty(dir2) {
if isDirEmpty(dir2, true) {
t.Error("expected false for a file, got true")
}
@ -212,7 +212,7 @@ func TestXLStorageIsDirEmpty(t *testing.T) {
t.Fatal(err)
}
if !isDirEmpty(dir3) {
if !isDirEmpty(dir3, true) {
t.Error("expected true for empty dir, got false")
}
}