diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 598149bd5..8033ae28d 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -2034,178 +2034,185 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts } // Walk a bucket, optionally prefix recursively, until we have returned -// all the content to objectInfo channel, it is callers responsibility -// to allocate a receive channel for ObjectInfo, upon any unhandled -// error walker returns error. Optionally if context.Done() is received -// then Walk() stops the walker. +// all the contents of the provided bucket+prefix. +// TODO: Note that most errors will result in a truncated listing. func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error { if err := checkListObjsArgs(ctx, bucket, prefix, ""); err != nil { - // Upon error close the channel. xioutil.SafeClose(results) return err } - vcfg, _ := globalBucketVersioningSys.Get(bucket) - ctx, cancel := context.WithCancel(ctx) + var entries []chan metaCacheEntry + + for poolIdx, erasureSet := range z.serverPools { + for setIdx, set := range erasureSet.sets { + set := set + listOut := make(chan metaCacheEntry, 1) + entries = append(entries, listOut) + disks, infos, _ := set.getOnlineDisksWithHealingAndInfo(true) + if len(disks) == 0 { + xioutil.SafeClose(results) + cancel() + return fmt.Errorf("Walk: no online disks found in pool %d, set %d", setIdx, poolIdx) + } + go func() { + defer xioutil.SafeClose(listOut) + send := func(e metaCacheEntry) { + if e.isDir() { + // Ignore directories. + return + } + select { + case listOut <- e: + case <-ctx.Done(): + } + } + + askDisks := getListQuorum(opts.AskDisks, set.setDriveCount) + if askDisks == -1 { + newDisks := getQuorumDisks(disks, infos, (len(disks)+1)/2) + if newDisks != nil { + // If we found disks signature in quorum, we proceed to list + // from a single drive, shuffling of the drives is subsequently. + disks = newDisks + askDisks = 1 + } else { + // If we did not find suitable disks, perform strict quorum listing + // as no disk agrees on quorum anymore. + askDisks = getListQuorum("strict", set.setDriveCount) + } + } + + // Special case: ask all disks if the drive count is 4 + if set.setDriveCount == 4 || askDisks > len(disks) { + askDisks = len(disks) // use all available drives + } + + var fallbackDisks []StorageAPI + if askDisks > 0 && len(disks) > askDisks { + rand.Shuffle(len(disks), func(i, j int) { + disks[i], disks[j] = disks[j], disks[i] + }) + fallbackDisks = disks[askDisks:] + disks = disks[:askDisks] + } + + requestedVersions := 0 + if opts.LatestOnly { + requestedVersions = 1 + } + + // However many we ask, versions must exist on ~50% + listingQuorum := (askDisks + 1) / 2 + + // How to resolve partial results. + resolver := metadataResolutionParams{ + dirQuorum: listingQuorum, + objQuorum: listingQuorum, + bucket: bucket, + requestedVersions: requestedVersions, + } + + path := baseDirFromPrefix(prefix) + filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator) + if path == prefix { + filterPrefix = "" + } + + lopts := listPathRawOptions{ + disks: disks, + fallbackDisks: fallbackDisks, + bucket: bucket, + path: path, + filterPrefix: filterPrefix, + recursive: true, + forwardTo: opts.Marker, + minDisks: listingQuorum, + reportNotFound: false, + agreed: send, + partial: func(entries metaCacheEntries, _ []error) { + entry, ok := entries.resolve(&resolver) + if ok { + send(*entry) + } + }, + finished: nil, + } + + if err := listPathRaw(ctx, lopts); err != nil { + storageLogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)) + cancel() + return + } + }() + } + } + + // Convert and filter merged entries. + merged := make(chan metaCacheEntry, 100) + vcfg, _ := globalBucketVersioningSys.Get(bucket) go func() { defer cancel() defer xioutil.SafeClose(results) - - for _, erasureSet := range z.serverPools { - var wg sync.WaitGroup - for _, set := range erasureSet.sets { - set := set - wg.Add(1) - go func() { - defer wg.Done() - - disks, infos, _ := set.getOnlineDisksWithHealingAndInfo(true) - if len(disks) == 0 { - cancel() - return - } - - send := func(objInfo ObjectInfo) bool { - select { - case <-ctx.Done(): - return false - case results <- objInfo: - return true - } - } - - askDisks := getListQuorum(opts.AskDisks, set.setDriveCount) - if askDisks == -1 { - newDisks := getQuorumDisks(disks, infos, (len(disks)+1)/2) - if newDisks != nil { - // If we found disks signature in quorum, we proceed to list - // from a single drive, shuffling of the drives is subsequently. - disks = newDisks - askDisks = 1 - } else { - // If we did not find suitable disks, perform strict quorum listing - // as no disk agrees on quorum anymore. - askDisks = getListQuorum("strict", set.setDriveCount) - } - } - - // Special case: ask all disks if the drive count is 4 - if set.setDriveCount == 4 || askDisks > len(disks) { - askDisks = len(disks) // use all available drives - } - - var fallbackDisks []StorageAPI - if askDisks > 0 && len(disks) > askDisks { - rand.Shuffle(len(disks), func(i, j int) { - disks[i], disks[j] = disks[j], disks[i] - }) - fallbackDisks = disks[askDisks:] - disks = disks[:askDisks] - } - - requestedVersions := 0 - if opts.LatestOnly { - requestedVersions = 1 - } - loadEntry := func(entry metaCacheEntry) { - if entry.isDir() { + send := func(oi ObjectInfo) bool { + select { + case results <- oi: + return true + case <-ctx.Done(): + return false + } + } + for entry := range merged { + if opts.LatestOnly { + fi, err := entry.fileInfo(bucket) + if err != nil { + return + } + if opts.Filter != nil { + if opts.Filter(fi) { + if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) { return } - - if opts.LatestOnly { - fi, err := entry.fileInfo(bucket) - if err != nil { - cancel() - return - } - if opts.Filter != nil { - if opts.Filter(fi) { - if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) { - return - } - } - } else { - if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) { - return - } - } - - } else { - fivs, err := entry.fileInfoVersions(bucket) - if err != nil { - cancel() - return - } - - // Note: entry.fileInfoVersions returns versions sorted in reverse chronological order based on ModTime - if opts.VersionsSort == WalkVersionsSortAsc { - versionsSorter(fivs.Versions).reverse() - } - - for _, version := range fivs.Versions { - if opts.Filter != nil { - if opts.Filter(version) { - if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) { - return - } - } - } else { - if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) { - return - } - } - } - } } - - // However many we ask, versions must exist on ~50% - listingQuorum := (askDisks + 1) / 2 - - // How to resolve partial results. - resolver := metadataResolutionParams{ - dirQuorum: listingQuorum, - objQuorum: listingQuorum, - bucket: bucket, - requestedVersions: requestedVersions, - } - - path := baseDirFromPrefix(prefix) - filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator) - if path == prefix { - filterPrefix = "" - } - - lopts := listPathRawOptions{ - disks: disks, - fallbackDisks: fallbackDisks, - bucket: bucket, - path: path, - filterPrefix: filterPrefix, - recursive: true, - forwardTo: opts.Marker, - minDisks: 1, - reportNotFound: false, - agreed: loadEntry, - partial: func(entries metaCacheEntries, _ []error) { - entry, ok := entries.resolve(&resolver) - if ok { - loadEntry(*entry) - } - }, - finished: nil, - } - - if err := listPathRaw(ctx, lopts); err != nil { - storageLogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)) - cancel() + } else { + if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) { return } - }() + } + continue + } + fivs, err := entry.fileInfoVersions(bucket) + if err != nil { + return + } + + // Note: entry.fileInfoVersions returns versions sorted in reverse chronological order based on ModTime + if opts.VersionsSort == WalkVersionsSortAsc { + versionsSorter(fivs.Versions).reverse() + } + + for _, version := range fivs.Versions { + if opts.Filter != nil { + if opts.Filter(version) { + if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) { + return + } + } + } else { + if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) { + return + } + } } - wg.Wait() } }() + go func() { + // Merge all entries from all disks. + // We leave quorum at 1, since entries are already resolved to have the desired quorum. + // mergeEntryChannels will close 'merged' channel upon completion or cancellation. + storageLogIf(ctx, mergeEntryChannels(ctx, entries, merged, 1)) + }() return nil } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 52e463890..c57e3ce1e 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -217,6 +217,27 @@ func prepareErasure(ctx context.Context, nDisks int) (ObjectLayer, []string, err return nil, nil, err } + // Wait up to 10 seconds for disks to come online. + pools := obj.(*erasureServerPools) + t := time.Now() + for _, pool := range pools.serverPools { + for _, sets := range pool.erasureDisks { + for _, s := range sets { + if !s.IsLocal() { + for { + if s.IsOnline() { + break + } + time.Sleep(100 * time.Millisecond) + if time.Since(t) > 10*time.Second { + return nil, nil, errors.New("timeout waiting for disk to come online") + } + } + } + } + } + } + return obj, fsDirs, nil }