From a50ea92c64c517a2659ba32aa569f1d4ce7e6975 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 29 Dec 2023 15:52:41 -0800 Subject: [PATCH] feat: introduce list_quorum="auto" to prefer quorum drives (#18084) NOTE: This feature is not retro-active; it will not cater to previous transactions on existing setups. To enable this feature, please set ` _MINIO_DRIVE_QUORUM=on` environment variable as part of systemd service or k8s configmap. Once this has been enabled, you need to also set `list_quorum`. ``` ~ mc admin config set alias/ api list_quorum=auto` ``` A new debugging tool is available to check for any missing counters. --- .gitignore | 1 + cmd/background-newdisks-heal-ops.go | 1 - cmd/erasure-healing.go | 9 +- cmd/erasure-object.go | 10 +- cmd/erasure-server-pool.go | 14 +- cmd/erasure-sets.go | 26 +- cmd/erasure.go | 35 +- cmd/format-erasure.go | 17 +- cmd/format-erasure_test.go | 2 +- cmd/metacache-set.go | 94 +++- cmd/metrics-v2.go | 1 + cmd/naughty-disk_test.go | 12 +- cmd/storage-datatypes.go | 36 +- cmd/storage-datatypes_gen.go | 642 +++++++++++++++++++++++----- cmd/storage-datatypes_gen_test.go | 226 ++++++++++ cmd/storage-interface.go | 19 +- cmd/storage-rest-client.go | 58 +-- cmd/storage-rest-common.go | 2 +- cmd/storage-rest-server.go | 8 +- cmd/xl-storage-disk-id-check.go | 96 ++++- cmd/xl-storage.go | 70 ++- cmd/xl-storage_test.go | 6 +- docs/debugging/pprofgoparser/go.mod | 2 +- docs/debugging/s3-verify/go.mod | 2 +- docs/debugging/xattr/go.mod | 14 + docs/debugging/xattr/go.sum | 13 + docs/debugging/xattr/main.go | 117 +++++ go.mod | 2 +- internal/config/api/api.go | 2 +- internal/config/api/help.go | 2 +- 30 files changed, 1288 insertions(+), 251 deletions(-) create mode 100644 docs/debugging/xattr/go.mod create mode 100644 docs/debugging/xattr/go.sum create mode 100644 docs/debugging/xattr/main.go diff --git a/.gitignore b/.gitignore index b226c380b..ebbdebcef 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,4 @@ docs/debugging/inspect/inspect docs/debugging/pprofgoparser/pprofgoparser docs/debugging/reorder-disks/reorder-disks docs/debugging/populate-hard-links/populate-hardlinks +docs/debugging/xattr/xattr \ No newline at end of file diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index b34180f9b..249dc467c 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -347,7 +347,6 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { if env.Get("_MINIO_AUTO_DRIVE_HEALING", config.EnableOn) == config.EnableOn || env.Get("_MINIO_AUTO_DISK_HEALING", config.EnableOn) == config.EnableOn { globalBackgroundHealState.pushHealLocalDisks(getLocalDisksToHeal()...) - go monitorLocalDisksAndHeal(ctx, z) } } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 26c075b40..f60b1f7c7 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -370,10 +370,8 @@ func (fi *FileInfo) SetHealing() { // Healing returns true if object is being healed (i.e fi is being passed down // from healObject) func (fi FileInfo) Healing() bool { - if _, ok := fi.Metadata[xMinIOHealing]; ok { - return true - } - return false + _, ok := fi.Metadata[xMinIOHealing] + return ok } // Heals an object by re-writing corrupt/missing erasure blocks. @@ -760,7 +758,8 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object // Attempt a rename now from healed data to final location. partsMetadata[i].SetHealing() - if _, err = disk.RenameData(ctx, minioMetaTmpBucket, tmpID, partsMetadata[i], bucket, object); err != nil { + + if _, err = disk.RenameData(ctx, minioMetaTmpBucket, tmpID, partsMetadata[i], bucket, object, RenameOptions{}); err != nil { return result, err } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index a69c1357b..bc62b5544 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -546,7 +546,7 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st if disks[index] == nil { return errDiskNotFound } - return disks[index].DeleteVersion(ctx, bucket, object, fi, false) + return disks[index].DeleteVersion(ctx, bucket, object, fi, false, DeleteOptions{}) }, index) } @@ -1032,7 +1032,7 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str if !fi.IsValid() { return errFileCorrupt } - sign, err := disks[index].RenameData(ctx, srcBucket, srcEntry, fi, dstBucket, dstEntry) + sign, err := disks[index].RenameData(ctx, srcBucket, srcEntry, fi, dstBucket, dstEntry, RenameOptions{}) if err != nil { return err } @@ -1059,7 +1059,7 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str // caller this dangling object will be now scheduled to be removed // via active healing. dg.Go(func() error { - return disks[index].DeleteVersion(context.Background(), dstBucket, dstEntry, metadata[index], false) + return disks[index].DeleteVersion(context.Background(), dstBucket, dstEntry, metadata[index], false, DeleteOptions{UndoWrite: true}) }, index) } dg.Wait() @@ -1610,7 +1610,7 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object if disks[index] == nil { return errDiskNotFound } - return disks[index].DeleteVersion(ctx, bucket, object, fi, forceDelMarker) + return disks[index].DeleteVersion(ctx, bucket, object, fi, forceDelMarker, DeleteOptions{}) }, index) } // return errors if any during deletion @@ -1723,7 +1723,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec } return } - errs := disk.DeleteVersions(ctx, bucket, dedupVersions) + errs := disk.DeleteVersions(ctx, bucket, dedupVersions, DeleteOptions{}) for i, err := range errs { if err == nil { continue diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index c256ca9a2..92095e1b2 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1985,7 +1985,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re go func() { defer wg.Done() - disks, _ := set.getOnlineDisksWithHealing(true) + disks, infos, _ := set.getOnlineDisksWithHealingAndInfo(true) if len(disks) == 0 { cancel() return @@ -2002,7 +2002,17 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re askDisks := getListQuorum(opts.AskDisks, set.setDriveCount) if askDisks == -1 { - askDisks = getListQuorum("strict", set.setDriveCount) + newDisks := getQuorumDisks(disks, infos, (len(disks)+1)/2) + if newDisks != nil { + // If we found disks signature in quorum, we proceed to list + // from a single drive, shuffling of the drives is subsequently. + disks = newDisks + askDisks = 1 + } else { + // If we did not find suitable disks, perform strict quorum listing + // as no disk agrees on quorum anymore. + askDisks = getListQuorum("strict", set.setDriveCount) + } } // Special case: ask all disks if the drive count is 4 diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 663fed580..c9fd80432 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -1110,7 +1110,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H formatOpID := mustGetUUID() // Initialize a new set of set formats which will be written to disk. - newFormatSets := newHealFormatSets(refFormat, s.setCount, s.setDriveCount, formats, sErrs) + newFormatSets, currentDisksInfo := newHealFormatSets(refFormat, s.setCount, s.setDriveCount, formats, sErrs) if !dryRun { tmpNewFormats := make([]*formatErasureV3, s.setCount*s.setDriveCount) @@ -1153,9 +1153,27 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H s.erasureDisks[m][n].Close() } - if storageDisks[index] != nil { - storageDisks[index].SetDiskLoc(s.poolIndex, m, n) - s.erasureDisks[m][n] = storageDisks[index] + if disk := storageDisks[index]; disk != nil { + disk.SetDiskLoc(s.poolIndex, m, n) + + if disk.IsLocal() && driveQuorum { + commonWrites, commonDeletes := calcCommonWritesDeletes(currentDisksInfo[m], (s.setDriveCount+1)/2) + xldisk, ok := disk.(*xlStorageDiskIDCheck) + if ok { + xldisk.totalWrites.Add(commonWrites) + xldisk.totalDeletes.Add(commonDeletes) + xldisk.storage.setWriteAttribute(commonWrites) + xldisk.storage.setDeleteAttribute(commonDeletes) + } + } + + s.erasureDisks[m][n] = disk + + if disk.IsLocal() && globalIsDistErasure { + globalLocalDrivesMu.Lock() + globalLocalSetDrives[s.poolIndex][m][n] = disk + globalLocalDrivesMu.Unlock() + } } } diff --git a/cmd/erasure.go b/cmd/erasure.go index a1071a4ce..a5b820572 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -266,12 +266,12 @@ func (er erasureObjects) LocalStorageInfo(ctx context.Context, metrics bool) Sto return getStorageInfo(localDisks, localEndpoints, metrics) } -// getOnlineDisksWithHealing - returns online disks and overall healing status. +// getOnlineDisksWithHealingAndInfo - returns online disks and overall healing status. // Disks are randomly ordered, but in the following groups: // - Non-scanning disks // - Non-healing disks // - Healing disks (if inclHealing is true) -func (er erasureObjects) getOnlineDisksWithHealing(inclHealing bool) (newDisks []StorageAPI, healing bool) { +func (er erasureObjects) getOnlineDisksWithHealingAndInfo(inclHealing bool) (newDisks []StorageAPI, newInfos []DiskInfo, healing bool) { var wg sync.WaitGroup disks := er.getDisks() infos := make([]DiskInfo, len(disks)) @@ -284,32 +284,24 @@ func (er erasureObjects) getOnlineDisksWithHealing(inclHealing bool) (newDisks [ disk := disks[i] if disk == nil { - infos[i].Error = "offline drive" + infos[i].Error = errDiskNotFound.Error() return } di, err := disk.DiskInfo(context.Background(), false) + infos[i] = di if err != nil { // - Do not consume disks which are not reachable // unformatted or simply not accessible for some reason. - // - // - // - Future: skip busy disks - if err != nil { - infos[i].Error = err.Error() - } - return + infos[i].Error = err.Error() } - if !inclHealing && di.Healing { - return - } - - infos[i] = di }() } wg.Wait() var scanningDisks, healingDisks []StorageAPI + var scanningInfos, healingInfos []DiskInfo + for i, info := range infos { // Check if one of the drives in the set is being healed. // this information is used by scanner to skip healing @@ -321,23 +313,34 @@ func (er erasureObjects) getOnlineDisksWithHealing(inclHealing bool) (newDisks [ healing = true if inclHealing { healingDisks = append(healingDisks, disks[i]) + healingInfos = append(healingInfos, infos[i]) } continue } if !info.Scanning { newDisks = append(newDisks, disks[i]) + newInfos = append(newInfos, infos[i]) } else { scanningDisks = append(scanningDisks, disks[i]) + scanningInfos = append(scanningInfos, infos[i]) } } // Prefer non-scanning disks over disks which are currently being scanned. newDisks = append(newDisks, scanningDisks...) + newInfos = append(newInfos, scanningInfos...) + /// Then add healing disks. newDisks = append(newDisks, healingDisks...) + newInfos = append(newInfos, healingInfos...) - return newDisks, healing + return newDisks, newInfos, healing +} + +func (er erasureObjects) getOnlineDisksWithHealing(inclHealing bool) (newDisks []StorageAPI, healing bool) { + newDisks, _, healing = er.getOnlineDisksWithHealingAndInfo(inclHealing) + return } // Clean-up previously deleted objects. from .minio.sys/tmp/.trash/ diff --git a/cmd/format-erasure.go b/cmd/format-erasure.go index b9324e980..5c9de2a30 100644 --- a/cmd/format-erasure.go +++ b/cmd/format-erasure.go @@ -123,6 +123,7 @@ type formatErasureV3 struct { // to pick the right set index for an object. DistributionAlgo string `json:"distributionAlgo"` } `json:"xl"` + Info DiskInfo `json:"-"` } func (f *formatErasureV3) Drives() (drives int) { @@ -328,6 +329,11 @@ func loadFormatErasureAll(storageDisks []StorageAPI, heal bool) ([]*formatErasur if err != nil { return err } + info, err := storageDisks[index].DiskInfo(context.Background(), false) + if err != nil { + return err + } + format.Info = info formats[index] = format if !heal { // If no healing required, make the disks valid and @@ -824,11 +830,15 @@ func makeFormatErasureMetaVolumes(disk StorageAPI) error { } // Initialize a new set of set formats which will be written to all disks. -func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int, formats []*formatErasureV3, errs []error) [][]*formatErasureV3 { +func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int, formats []*formatErasureV3, errs []error) ([][]*formatErasureV3, [][]DiskInfo) { newFormats := make([][]*formatErasureV3, setCount) for i := range refFormat.Erasure.Sets { newFormats[i] = make([]*formatErasureV3, setDriveCount) } + currentDisksInfo := make([][]DiskInfo, setCount) + for i := range refFormat.Erasure.Sets { + currentDisksInfo[i] = make([]DiskInfo, setDriveCount) + } for i := range refFormat.Erasure.Sets { for j := range refFormat.Erasure.Sets[i] { if errors.Is(errs[i*setDriveCount+j], errUnformattedDisk) { @@ -841,7 +851,10 @@ func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int, newFormats[i][j].Erasure.Version = refFormat.Erasure.Version newFormats[i][j].Erasure.DistributionAlgo = refFormat.Erasure.DistributionAlgo } + if format := formats[i*setDriveCount+j]; format != nil && (errs[i*setDriveCount+j] == nil) { + currentDisksInfo[i][j] = format.Info + } } } - return newFormats + return newFormats, currentDisksInfo } diff --git a/cmd/format-erasure_test.go b/cmd/format-erasure_test.go index 2fe317045..9a92751dd 100644 --- a/cmd/format-erasure_test.go +++ b/cmd/format-erasure_test.go @@ -513,7 +513,7 @@ func TestNewFormatSets(t *testing.T) { // 16th disk is unformatted. errs[15] = errUnformattedDisk - newFormats := newHealFormatSets(quorumFormat, setCount, setDriveCount, formats, errs) + newFormats, _ := newHealFormatSets(quorumFormat, setCount, setDriveCount, formats, errs) if newFormats == nil { t.Fatal("Unexpected failure") } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 6340cc82c..e6ae83b15 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -590,25 +590,115 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt func getListQuorum(quorum string, driveCount int) int { switch quorum { case "disk": - // smallest possible value, generally meant for testing. return 1 case "reduced": return 2 case "optimal": return (driveCount + 1) / 2 + case "auto": + return -1 } // defaults to 'strict' return driveCount } +func calcCommonWritesDeletes(infos []DiskInfo, readQuorum int) (commonWrite, commonDelete uint64) { + deletes := make([]uint64, len(infos)) + writes := make([]uint64, len(infos)) + for index, di := range infos { + deletes[index] = di.Metrics.TotalDeletes + writes[index] = di.Metrics.TotalWrites + } + + filter := func(list []uint64) (commonCount uint64) { + max := 0 + signatureMap := map[uint64]int{} + for _, v := range list { + signatureMap[v]++ + } + for ops, count := range signatureMap { + if max < count && commonCount < ops { + max = count + commonCount = ops + } + } + if max < readQuorum { + return 0 + } + return commonCount + } + + commonWrite = filter(writes) + commonDelete = filter(deletes) + return +} + +func calcCommonCounter(infos []DiskInfo, readQuorum int) (commonCount uint64) { + filter := func() (commonCount uint64) { + max := 0 + signatureMap := map[uint64]int{} + for _, info := range infos { + if info.Error != "" { + continue + } + mutations := info.Metrics.TotalDeletes + info.Metrics.TotalWrites + signatureMap[mutations]++ + } + for ops, count := range signatureMap { + if max < count && commonCount < ops { + max = count + commonCount = ops + } + } + if max < readQuorum { + return 0 + } + return commonCount + } + + return filter() +} + +func getQuorumDiskInfos(disks []StorageAPI, infos []DiskInfo, readQuorum int) (newDisks []StorageAPI, newInfos []DiskInfo) { + commonMutations := calcCommonCounter(infos, readQuorum) + for i, info := range infos { + mutations := info.Metrics.TotalDeletes + info.Metrics.TotalWrites + if mutations >= commonMutations { + newDisks = append(newDisks, disks[i]) + newInfos = append(newInfos, infos[i]) + } + } + + return newDisks, newInfos +} + +func getQuorumDisks(disks []StorageAPI, infos []DiskInfo, readQuorum int) (newDisks []StorageAPI) { + newDisks, _ = getQuorumDiskInfos(disks, infos, readQuorum) + return newDisks +} + // Will return io.EOF if continuing would not yield more results. func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, results chan<- metaCacheEntry) (err error) { defer close(results) o.debugf(color.Green("listPath:")+" with options: %#v", o) // get prioritized non-healing disks for listing - disks, _ := er.getOnlineDisksWithHealing(true) + disks, infos, _ := er.getOnlineDisksWithHealingAndInfo(true) askDisks := getListQuorum(o.AskDisks, er.setDriveCount) + if askDisks == -1 { + newDisks := getQuorumDisks(disks, infos, (len(disks)+1)/2) + if newDisks != nil { + // If we found disks signature in quorum, we proceed to list + // from a single drive, shuffling of the drives is subsequently. + disks = newDisks + askDisks = 1 + } else { + // If we did not find suitable disks, perform strict quorum listing + // as no disk agrees on quorum anymore. + askDisks = getListQuorum("strict", er.setDriveCount) + } + } + var fallbackDisks []StorageAPI // Special case: ask all disks if the drive count is 4 diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index f9ce4793c..630c0c885 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -94,6 +94,7 @@ func init() { getNotificationMetrics(), getDistLockMetrics(), getIAMNodeMetrics(), + getLocalStorageMetrics(), } bucketMetricsGroups := []*MetricsGroup{ diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 9523aeda4..701632f3a 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -200,11 +200,11 @@ func (d *naughtyDisk) AppendFile(ctx context.Context, volume string, path string return d.disk.AppendFile(ctx, volume, path, buf) } -func (d *naughtyDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error) { +func (d *naughtyDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (uint64, error) { if err := d.calcError(); err != nil { return 0, err } - return d.disk.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath) + return d.disk.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath, opts) } func (d *naughtyDisk) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error { @@ -228,7 +228,7 @@ func (d *naughtyDisk) Delete(ctx context.Context, volume string, path string, de return d.disk.Delete(ctx, volume, path, deleteOpts) } -func (d *naughtyDisk) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) []error { +func (d *naughtyDisk) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions, opts DeleteOptions) []error { if err := d.calcError(); err != nil { errs := make([]error, len(versions)) for i := range errs { @@ -236,7 +236,7 @@ func (d *naughtyDisk) DeleteVersions(ctx context.Context, volume string, version } return errs } - return d.disk.DeleteVersions(ctx, volume, versions) + return d.disk.DeleteVersions(ctx, volume, versions, opts) } func (d *naughtyDisk) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { @@ -253,11 +253,11 @@ func (d *naughtyDisk) UpdateMetadata(ctx context.Context, volume, path string, f return d.disk.UpdateMetadata(ctx, volume, path, fi, opts) } -func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) { +func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) { if err := d.calcError(); err != nil { return err } - return d.disk.DeleteVersion(ctx, volume, path, fi, forceDelMarker) + return d.disk.DeleteVersion(ctx, volume, path, fi, forceDelMarker, opts) } func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 681c0a0e1..0b8f1b4a8 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -23,8 +23,18 @@ import ( // DeleteOptions represents the disk level delete options available for the APIs type DeleteOptions struct { + BaseOptions Recursive bool `msg:"r"` Immediate bool `msg:"i"` + UndoWrite bool `msg:"u"` +} + +// BaseOptions represents common options for all Storage API calls +type BaseOptions struct{} + +// RenameOptions represents rename API options, currently its same as BaseOptions +type RenameOptions struct { + BaseOptions } //go:generate msgp -file=$GOFILE @@ -65,6 +75,8 @@ type DiskMetrics struct { APICalls map[string]uint64 `json:"apiCalls,omitempty"` TotalErrorsAvailability uint64 `json:"totalErrsAvailability"` TotalErrorsTimeout uint64 `json:"totalErrsTimeout"` + TotalWrites uint64 `json:"totalWrites"` + TotalDeletes uint64 `json:"totalDeletes"` } // VolsInfo is a collection of volume(bucket) information @@ -360,11 +372,12 @@ type ReadMultipleResp struct { // DeleteVersionHandlerParams are parameters for DeleteVersionHandler type DeleteVersionHandlerParams struct { - DiskID string `msg:"id"` - Volume string `msg:"v"` - FilePath string `msg:"fp"` - ForceDelMarker bool `msg:"fdm"` - FI FileInfo `msg:"fi"` + DiskID string `msg:"id"` + Volume string `msg:"v"` + FilePath string `msg:"fp"` + ForceDelMarker bool `msg:"fdm"` + Opts DeleteOptions `msg:"do"` + FI FileInfo `msg:"fi"` } // MetadataHandlerParams is request info for UpdateMetadataHandle and WriteMetadataHandler. @@ -399,12 +412,13 @@ type DeleteFileHandlerParams struct { // RenameDataHandlerParams are parameters for RenameDataHandler. type RenameDataHandlerParams struct { - DiskID string `msg:"id"` - SrcVolume string `msg:"sv"` - SrcPath string `msg:"sp"` - DstVolume string `msg:"dv"` - DstPath string `msg:"dp"` - FI FileInfo `msg:"fi"` + DiskID string `msg:"id"` + SrcVolume string `msg:"sv"` + SrcPath string `msg:"sp"` + DstVolume string `msg:"dv"` + DstPath string `msg:"dp"` + FI FileInfo `msg:"fi"` + Opts RenameOptions `msg:"ro"` } // RenameDataResp - RenameData()'s response. diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index e72e29f74..b36786b98 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -6,6 +6,89 @@ import ( "github.com/tinylib/msgp/msgp" ) +// DecodeMsg implements msgp.Decodable +func (z *BaseOptions) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z BaseOptions) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 0 + err = en.Append(0x80) + if err != nil { + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z BaseOptions) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 0 + o = append(o, 0x80) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BaseOptions) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z BaseOptions) Msgsize() (s int) { + s = 1 + return +} + // DecodeMsg implements msgp.Decodable func (z *CheckPartsHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte @@ -225,40 +308,11 @@ func (z *DeleteFileHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { return } case "do": - var zb0002 uint32 - zb0002, err = dc.ReadMapHeader() + err = z.Opts.DecodeMsg(dc) if err != nil { err = msgp.WrapError(err, "Opts") return } - for zb0002 > 0 { - zb0002-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err, "Opts") - return - } - switch msgp.UnsafeString(field) { - case "r": - z.Opts.Recursive, err = dc.ReadBool() - if err != nil { - err = msgp.WrapError(err, "Opts", "Recursive") - return - } - case "i": - z.Opts.Immediate, err = dc.ReadBool() - if err != nil { - err = msgp.WrapError(err, "Opts", "Immediate") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err, "Opts") - return - } - } - } default: err = dc.Skip() if err != nil { @@ -308,25 +362,9 @@ func (z *DeleteFileHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - // map header, size 2 - // write "r" - err = en.Append(0x82, 0xa1, 0x72) + err = z.Opts.EncodeMsg(en) if err != nil { - return - } - err = en.WriteBool(z.Opts.Recursive) - if err != nil { - err = msgp.WrapError(err, "Opts", "Recursive") - return - } - // write "i" - err = en.Append(0xa1, 0x69) - if err != nil { - return - } - err = en.WriteBool(z.Opts.Immediate) - if err != nil { - err = msgp.WrapError(err, "Opts", "Immediate") + err = msgp.WrapError(err, "Opts") return } return @@ -347,13 +385,11 @@ func (z *DeleteFileHandlerParams) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendString(o, z.FilePath) // string "do" o = append(o, 0xa2, 0x64, 0x6f) - // map header, size 2 - // string "r" - o = append(o, 0x82, 0xa1, 0x72) - o = msgp.AppendBool(o, z.Opts.Recursive) - // string "i" - o = append(o, 0xa1, 0x69) - o = msgp.AppendBool(o, z.Opts.Immediate) + o, err = z.Opts.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } return } @@ -394,40 +430,11 @@ func (z *DeleteFileHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) return } case "do": - var zb0002 uint32 - zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + bts, err = z.Opts.UnmarshalMsg(bts) if err != nil { err = msgp.WrapError(err, "Opts") return } - for zb0002 > 0 { - zb0002-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err, "Opts") - return - } - switch msgp.UnsafeString(field) { - case "r": - z.Opts.Recursive, bts, err = msgp.ReadBoolBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Opts", "Recursive") - return - } - case "i": - z.Opts.Immediate, bts, err = msgp.ReadBoolBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Opts", "Immediate") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err, "Opts") - return - } - } - } default: bts, err = msgp.Skip(bts) if err != nil { @@ -442,7 +449,7 @@ func (z *DeleteFileHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *DeleteFileHandlerParams) Msgsize() (s int) { - s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 2 + msgp.StringPrefixSize + len(z.Volume) + 3 + msgp.StringPrefixSize + len(z.FilePath) + 3 + 1 + 2 + msgp.BoolSize + 2 + msgp.BoolSize + s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 2 + msgp.StringPrefixSize + len(z.Volume) + 3 + msgp.StringPrefixSize + len(z.FilePath) + 3 + z.Opts.Msgsize() return } @@ -464,6 +471,29 @@ func (z *DeleteOptions) DecodeMsg(dc *msgp.Reader) (err error) { return } switch msgp.UnsafeString(field) { + case "BaseOptions": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + for zb0002 > 0 { + zb0002-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + switch msgp.UnsafeString(field) { + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + } + } case "r": z.Recursive, err = dc.ReadBool() if err != nil { @@ -476,6 +506,12 @@ func (z *DeleteOptions) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Immediate") return } + case "u": + z.UndoWrite, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "UndoWrite") + return + } default: err = dc.Skip() if err != nil { @@ -488,10 +524,20 @@ func (z *DeleteOptions) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z DeleteOptions) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 +func (z *DeleteOptions) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 4 + // write "BaseOptions" + err = en.Append(0x84, 0xab, 0x42, 0x61, 0x73, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73) + if err != nil { + return + } + // map header, size 0 + err = en.Append(0x80) + if err != nil { + return + } // write "r" - err = en.Append(0x82, 0xa1, 0x72) + err = en.Append(0xa1, 0x72) if err != nil { return } @@ -510,19 +556,36 @@ func (z DeleteOptions) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Immediate") return } + // write "u" + err = en.Append(0xa1, 0x75) + if err != nil { + return + } + err = en.WriteBool(z.UndoWrite) + if err != nil { + err = msgp.WrapError(err, "UndoWrite") + return + } return } // MarshalMsg implements msgp.Marshaler -func (z DeleteOptions) MarshalMsg(b []byte) (o []byte, err error) { +func (z *DeleteOptions) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 2 + // map header, size 4 + // string "BaseOptions" + o = append(o, 0x84, 0xab, 0x42, 0x61, 0x73, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73) + // map header, size 0 + o = append(o, 0x80) // string "r" - o = append(o, 0x82, 0xa1, 0x72) + o = append(o, 0xa1, 0x72) o = msgp.AppendBool(o, z.Recursive) // string "i" o = append(o, 0xa1, 0x69) o = msgp.AppendBool(o, z.Immediate) + // string "u" + o = append(o, 0xa1, 0x75) + o = msgp.AppendBool(o, z.UndoWrite) return } @@ -544,6 +607,29 @@ func (z *DeleteOptions) UnmarshalMsg(bts []byte) (o []byte, err error) { return } switch msgp.UnsafeString(field) { + case "BaseOptions": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + for zb0002 > 0 { + zb0002-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + switch msgp.UnsafeString(field) { + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + } + } case "r": z.Recursive, bts, err = msgp.ReadBoolBytes(bts) if err != nil { @@ -556,6 +642,12 @@ func (z *DeleteOptions) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Immediate") return } + case "u": + z.UndoWrite, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "UndoWrite") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -569,8 +661,8 @@ func (z *DeleteOptions) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z DeleteOptions) Msgsize() (s int) { - s = 1 + 2 + msgp.BoolSize + 2 + msgp.BoolSize +func (z *DeleteOptions) Msgsize() (s int) { + s = 1 + 12 + 1 + 2 + msgp.BoolSize + 2 + msgp.BoolSize + 2 + msgp.BoolSize return } @@ -616,6 +708,12 @@ func (z *DeleteVersionHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "ForceDelMarker") return } + case "do": + err = z.Opts.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } case "fi": err = z.FI.DecodeMsg(dc) if err != nil { @@ -635,9 +733,9 @@ func (z *DeleteVersionHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *DeleteVersionHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 5 + // map header, size 6 // write "id" - err = en.Append(0x85, 0xa2, 0x69, 0x64) + err = en.Append(0x86, 0xa2, 0x69, 0x64) if err != nil { return } @@ -676,6 +774,16 @@ func (z *DeleteVersionHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ForceDelMarker") return } + // write "do" + err = en.Append(0xa2, 0x64, 0x6f) + if err != nil { + return + } + err = z.Opts.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } // write "fi" err = en.Append(0xa2, 0x66, 0x69) if err != nil { @@ -692,9 +800,9 @@ func (z *DeleteVersionHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *DeleteVersionHandlerParams) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 5 + // map header, size 6 // string "id" - o = append(o, 0x85, 0xa2, 0x69, 0x64) + o = append(o, 0x86, 0xa2, 0x69, 0x64) o = msgp.AppendString(o, z.DiskID) // string "v" o = append(o, 0xa1, 0x76) @@ -705,6 +813,13 @@ func (z *DeleteVersionHandlerParams) MarshalMsg(b []byte) (o []byte, err error) // string "fdm" o = append(o, 0xa3, 0x66, 0x64, 0x6d) o = msgp.AppendBool(o, z.ForceDelMarker) + // string "do" + o = append(o, 0xa2, 0x64, 0x6f) + o, err = z.Opts.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } // string "fi" o = append(o, 0xa2, 0x66, 0x69) o, err = z.FI.MarshalMsg(o) @@ -757,6 +872,12 @@ func (z *DeleteVersionHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err err err = msgp.WrapError(err, "ForceDelMarker") return } + case "do": + bts, err = z.Opts.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } case "fi": bts, err = z.FI.UnmarshalMsg(bts) if err != nil { @@ -777,7 +898,7 @@ func (z *DeleteVersionHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err err // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *DeleteVersionHandlerParams) Msgsize() (s int) { - s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 2 + msgp.StringPrefixSize + len(z.Volume) + 3 + msgp.StringPrefixSize + len(z.FilePath) + 4 + msgp.BoolSize + 3 + z.FI.Msgsize() + s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 2 + msgp.StringPrefixSize + len(z.Volume) + 3 + msgp.StringPrefixSize + len(z.FilePath) + 4 + msgp.BoolSize + 3 + z.Opts.Msgsize() + 3 + z.FI.Msgsize() return } @@ -1218,6 +1339,18 @@ func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "TotalErrorsTimeout") return } + case "TotalWrites": + z.TotalWrites, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "TotalWrites") + return + } + case "TotalDeletes": + z.TotalDeletes, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "TotalDeletes") + return + } default: err = dc.Skip() if err != nil { @@ -1231,9 +1364,9 @@ func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *DiskMetrics) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 4 + // map header, size 6 // write "LastMinute" - err = en.Append(0x84, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x4d, 0x69, 0x6e, 0x75, 0x74, 0x65) + err = en.Append(0x86, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x4d, 0x69, 0x6e, 0x75, 0x74, 0x65) if err != nil { return } @@ -1296,15 +1429,35 @@ func (z *DiskMetrics) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "TotalErrorsTimeout") return } + // write "TotalWrites" + err = en.Append(0xab, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.TotalWrites) + if err != nil { + err = msgp.WrapError(err, "TotalWrites") + return + } + // write "TotalDeletes" + err = en.Append(0xac, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.TotalDeletes) + if err != nil { + err = msgp.WrapError(err, "TotalDeletes") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *DiskMetrics) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 4 + // map header, size 6 // string "LastMinute" - o = append(o, 0x84, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x4d, 0x69, 0x6e, 0x75, 0x74, 0x65) + o = append(o, 0x86, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x4d, 0x69, 0x6e, 0x75, 0x74, 0x65) o = msgp.AppendMapHeader(o, uint32(len(z.LastMinute))) for za0001, za0002 := range z.LastMinute { o = msgp.AppendString(o, za0001) @@ -1327,6 +1480,12 @@ func (z *DiskMetrics) MarshalMsg(b []byte) (o []byte, err error) { // string "TotalErrorsTimeout" o = append(o, 0xb2, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74) o = msgp.AppendUint64(o, z.TotalErrorsTimeout) + // string "TotalWrites" + o = append(o, 0xab, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x73) + o = msgp.AppendUint64(o, z.TotalWrites) + // string "TotalDeletes" + o = append(o, 0xac, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x73) + o = msgp.AppendUint64(o, z.TotalDeletes) return } @@ -1420,6 +1579,18 @@ func (z *DiskMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "TotalErrorsTimeout") return } + case "TotalWrites": + z.TotalWrites, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "TotalWrites") + return + } + case "TotalDeletes": + z.TotalDeletes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "TotalDeletes") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -1448,7 +1619,7 @@ func (z *DiskMetrics) Msgsize() (s int) { s += msgp.StringPrefixSize + len(za0003) + msgp.Uint64Size } } - s += 24 + msgp.Uint64Size + 19 + msgp.Uint64Size + s += 24 + msgp.Uint64Size + 19 + msgp.Uint64Size + 12 + msgp.Uint64Size + 13 + msgp.Uint64Size return } @@ -3553,6 +3724,52 @@ func (z *RenameDataHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "FI") return } + case "ro": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } + for zb0002 > 0 { + zb0002-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } + switch msgp.UnsafeString(field) { + case "BaseOptions": + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Opts", "BaseOptions") + return + } + for zb0003 > 0 { + zb0003-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "Opts", "BaseOptions") + return + } + switch msgp.UnsafeString(field) { + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Opts", "BaseOptions") + return + } + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } + } + } default: err = dc.Skip() if err != nil { @@ -3566,9 +3783,9 @@ func (z *RenameDataHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *RenameDataHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 7 // write "id" - err = en.Append(0x86, 0xa2, 0x69, 0x64) + err = en.Append(0x87, 0xa2, 0x69, 0x64) if err != nil { return } @@ -3627,15 +3844,31 @@ func (z *RenameDataHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "FI") return } + // write "ro" + err = en.Append(0xa2, 0x72, 0x6f) + if err != nil { + return + } + // map header, size 1 + // write "BaseOptions" + err = en.Append(0x81, 0xab, 0x42, 0x61, 0x73, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73) + if err != nil { + return + } + // map header, size 0 + err = en.Append(0x80) + if err != nil { + return + } return } // MarshalMsg implements msgp.Marshaler func (z *RenameDataHandlerParams) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 7 // string "id" - o = append(o, 0x86, 0xa2, 0x69, 0x64) + o = append(o, 0x87, 0xa2, 0x69, 0x64) o = msgp.AppendString(o, z.DiskID) // string "sv" o = append(o, 0xa2, 0x73, 0x76) @@ -3656,6 +3889,13 @@ func (z *RenameDataHandlerParams) MarshalMsg(b []byte) (o []byte, err error) { err = msgp.WrapError(err, "FI") return } + // string "ro" + o = append(o, 0xa2, 0x72, 0x6f) + // map header, size 1 + // string "BaseOptions" + o = append(o, 0x81, 0xab, 0x42, 0x61, 0x73, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73) + // map header, size 0 + o = append(o, 0x80) return } @@ -3713,6 +3953,52 @@ func (z *RenameDataHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) err = msgp.WrapError(err, "FI") return } + case "ro": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } + for zb0002 > 0 { + zb0002-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } + switch msgp.UnsafeString(field) { + case "BaseOptions": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Opts", "BaseOptions") + return + } + for zb0003 > 0 { + zb0003-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "Opts", "BaseOptions") + return + } + switch msgp.UnsafeString(field) { + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "Opts", "BaseOptions") + return + } + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "Opts") + return + } + } + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -3727,7 +4013,7 @@ func (z *RenameDataHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *RenameDataHandlerParams) Msgsize() (s int) { - s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 3 + msgp.StringPrefixSize + len(z.SrcVolume) + 3 + msgp.StringPrefixSize + len(z.SrcPath) + 3 + msgp.StringPrefixSize + len(z.DstVolume) + 3 + msgp.StringPrefixSize + len(z.DstPath) + 3 + z.FI.Msgsize() + s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 3 + msgp.StringPrefixSize + len(z.SrcVolume) + 3 + msgp.StringPrefixSize + len(z.SrcPath) + 3 + msgp.StringPrefixSize + len(z.DstVolume) + 3 + msgp.StringPrefixSize + len(z.DstPath) + 3 + z.FI.Msgsize() + 3 + 1 + 12 + 1 return } @@ -3834,6 +4120,144 @@ func (z RenameDataResp) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *RenameOptions) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "BaseOptions": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + for zb0002 > 0 { + zb0002-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + switch msgp.UnsafeString(field) { + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *RenameOptions) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "BaseOptions" + err = en.Append(0x81, 0xab, 0x42, 0x61, 0x73, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73) + if err != nil { + return + } + // map header, size 0 + err = en.Append(0x80) + if err != nil { + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *RenameOptions) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "BaseOptions" + o = append(o, 0x81, 0xab, 0x42, 0x61, 0x73, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73) + // map header, size 0 + o = append(o, 0x80) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *RenameOptions) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "BaseOptions": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + for zb0002 > 0 { + zb0002-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + switch msgp.UnsafeString(field) { + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "BaseOptions") + return + } + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *RenameOptions) Msgsize() (s int) { + s = 1 + 12 + 1 + return +} + // DecodeMsg implements msgp.Decodable func (z *UpdateMetadataOpts) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/cmd/storage-datatypes_gen_test.go b/cmd/storage-datatypes_gen_test.go index 6a4a5c554..b41874a04 100644 --- a/cmd/storage-datatypes_gen_test.go +++ b/cmd/storage-datatypes_gen_test.go @@ -9,6 +9,119 @@ import ( "github.com/tinylib/msgp/msgp" ) +func TestMarshalUnmarshalBaseOptions(t *testing.T) { + v := BaseOptions{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBaseOptions(b *testing.B) { + v := BaseOptions{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBaseOptions(b *testing.B) { + v := BaseOptions{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBaseOptions(b *testing.B) { + v := BaseOptions{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBaseOptions(t *testing.T) { + v := BaseOptions{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBaseOptions Msgsize() is inaccurate") + } + + vn := BaseOptions{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBaseOptions(b *testing.B) { + v := BaseOptions{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBaseOptions(b *testing.B) { + v := BaseOptions{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalCheckPartsHandlerParams(t *testing.T) { v := CheckPartsHandlerParams{} bts, err := v.MarshalMsg(nil) @@ -1704,6 +1817,119 @@ func BenchmarkDecodeRenameDataResp(b *testing.B) { } } +func TestMarshalUnmarshalRenameOptions(t *testing.T) { + v := RenameOptions{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgRenameOptions(b *testing.B) { + v := RenameOptions{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgRenameOptions(b *testing.B) { + v := RenameOptions{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalRenameOptions(b *testing.B) { + v := RenameOptions{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeRenameOptions(t *testing.T) { + v := RenameOptions{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeRenameOptions Msgsize() is inaccurate") + } + + vn := RenameOptions{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeRenameOptions(b *testing.B) { + v := RenameOptions{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeRenameOptions(b *testing.B) { + v := RenameOptions{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalUpdateMetadataOpts(t *testing.T) { v := UpdateMetadataOpts{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 36bd539a6..ee9d27809 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -79,13 +79,13 @@ type StorageAPI interface { WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error // Metadata operations - DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error - DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) []error + DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) error + DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions, opts DeleteOptions) []error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (FileInfo, error) ReadXL(ctx context.Context, volume, path string, readData bool) (RawFileInfo, error) - RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error) + RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (uint64, error) // File operations. ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) @@ -95,7 +95,7 @@ type StorageAPI interface { ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error CheckParts(ctx context.Context, volume string, path string, fi FileInfo) error - Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) + Delete(ctx context.Context, volume string, path string, opts DeleteOptions) (err error) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error @@ -217,7 +217,7 @@ func (p *unrecognizedDisk) RenameFile(ctx context.Context, srcVolume, srcPath, d return errDiskNotFound } -func (p *unrecognizedDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error) { +func (p *unrecognizedDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (uint64, error) { return 0, errDiskNotFound } @@ -225,13 +225,12 @@ func (p *unrecognizedDisk) CheckParts(ctx context.Context, volume string, path s return errDiskNotFound } -func (p *unrecognizedDisk) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) { +func (p *unrecognizedDisk) Delete(ctx context.Context, volume string, path string, opts DeleteOptions) (err error) { return errDiskNotFound } -// DeleteVersions deletes slice of versions, it can be same object -// or multiple objects. -func (p *unrecognizedDisk) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) (errs []error) { +// DeleteVersions deletes slice of versions, it can be same object or multiple objects. +func (p *unrecognizedDisk) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions, opts DeleteOptions) (errs []error) { errs = make([]error, len(versions)) for i := range errs { @@ -248,7 +247,7 @@ func (p *unrecognizedDisk) WriteAll(ctx context.Context, volume string, path str return errDiskNotFound } -func (p *unrecognizedDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) { +func (p *unrecognizedDisk) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) { return errDiskNotFound } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 70d8194c2..6d7232787 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -139,6 +139,7 @@ func toStorageErr(err error) error { // Abstracts a remote disk. type storageRESTClient struct { + // Indicate of NSScanner is in progress in this disk scanning int32 endpoint Endpoint @@ -148,9 +149,6 @@ type storageRESTClient struct { // Indexes, will be -1 until assigned a set. poolIndex, setIndex, diskIndex int - - diskInfoCache timedValue - diskInfoCacheMetrics timedValue } // Retrieve location indexes. @@ -272,36 +270,23 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, metrics bool) (in // transport is already down. return info, errDiskNotFound } - fetchDI := func(di *timedValue, metrics bool) { - di.TTL = time.Second - di.Update = func() (interface{}, error) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - info, err := storageDiskInfoHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{ - storageRESTDiskID: client.diskID, - // Always request metrics, since we are caching the result. - storageRESTMetrics: strconv.FormatBool(metrics), - })) - if err != nil { - return info, err - } - if info.Error != "" { - return info, toStorageErr(errors.New(info.Error)) - } - return info, nil - } + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + infop, err := storageDiskInfoHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{ + storageRESTDiskID: client.diskID, + // Always request metrics, since we are caching the result. + storageRESTMetrics: strconv.FormatBool(metrics), + })) + if err != nil { + return info, err } - // Fetch disk info from appropriate cache. - dic := &client.diskInfoCache - if metrics { - dic = &client.diskInfoCacheMetrics + info = *infop + if info.Error != "" { + return info, toStorageErr(errors.New(info.Error)) } - dic.Once.Do(func() { fetchDI(dic, metrics) }) - val, err := dic.Get() - if di, ok := val.(*DiskInfo); di != nil && ok { - info = *di - } - return info, err + info.Scanning = atomic.LoadInt32(&client.scanning) == 1 + return info, nil } // MakeVolBulk - create multiple volumes in a bulk operation. @@ -406,13 +391,14 @@ func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, pat return toStorageErr(err) } -func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error { - _, err := storageDeleteVersionHandler.Call(ctx, client.gridConn, &DeleteVersionHandlerParams{ +func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) { + _, err = storageDeleteVersionHandler.Call(ctx, client.gridConn, &DeleteVersionHandlerParams{ DiskID: client.diskID, Volume: volume, FilePath: path, ForceDelMarker: forceDelMarker, FI: fi, + Opts: opts, }) return toStorageErr(err) } @@ -439,10 +425,11 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string, } // RenameData - rename source path to destination path atomically, metadata and data file. -func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (sign uint64, err error) { +func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) { // Set a very long timeout for rename data. ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() + resp, err := storageRenameDataHandler.Call(ctx, client.gridConn, &RenameDataHandlerParams{ DiskID: client.diskID, SrcVolume: srcVolume, @@ -450,6 +437,7 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP DstPath: dstPath, DstVolume: dstVolume, FI: fi, + Opts: opts, }) if err != nil { return 0, toStorageErr(err) @@ -627,7 +615,7 @@ func (client *storageRESTClient) Delete(ctx context.Context, volume string, path } // DeleteVersions - deletes list of specified versions if present -func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) (errs []error) { +func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions, opts DeleteOptions) (errs []error) { if len(versions) == 0 { return errs } diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 25047478a..e7875eb2b 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -20,7 +20,7 @@ package cmd //go:generate msgp -file $GOFILE -unexported const ( - storageRESTVersion = "v51" // Added ReadVersions readOptions + storageRESTVersion = "v52" // Added DiskInfo drive signature storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index d3a8b52e2..34b20dd11 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -370,7 +370,8 @@ func (s *storageRESTServer) DeleteVersionHandler(p *DeleteVersionHandlerParams) filePath := p.FilePath forceDelMarker := p.ForceDelMarker - err := s.getStorage().DeleteVersion(context.Background(), volume, filePath, p.FI, forceDelMarker) + opts := DeleteOptions{} + err := s.getStorage().DeleteVersion(context.Background(), volume, filePath, p.FI, forceDelMarker, opts) return np, grid.NewRemoteErr(err) } @@ -726,7 +727,8 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http encoder := gob.NewEncoder(w) done := keepHTTPResponseAlive(w) - errs := s.getStorage().DeleteVersions(r.Context(), volume, versions) + opts := DeleteOptions{} + errs := s.getStorage().DeleteVersions(r.Context(), volume, versions, opts) done(nil) for idx := range versions { if errs[idx] != nil { @@ -748,7 +750,7 @@ func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*Rena return nil, grid.NewRemoteErr(errDiskNotFound) } - sign, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath) + sign, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath, p.Opts) resp := &RenameDataResp{ Signature: sign, } diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index bd0a272dd..904b4e285 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -78,8 +78,10 @@ const ( // Detects change in underlying disk. type xlStorageDiskIDCheck struct { - totalErrsTimeout atomic.Uint64 // Captures all timeout only errors + totalWrites atomic.Uint64 + totalDeletes atomic.Uint64 totalErrsAvailability atomic.Uint64 // Captures all data availability errors such as permission denied, faulty disk and timeout errors. + totalErrsTimeout atomic.Uint64 // Captures all timeout only errors // apiCalls should be placed first so alignment is guaranteed for atomic operations. apiCalls [storageMetricLast]uint64 @@ -224,6 +226,12 @@ func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDis diskMaxConcurrent: diskMaxConcurrent, diskStartChecking: diskStartChecking, } + + if driveQuorum { + xl.totalWrites.Add(xl.storage.getWriteAttribute()) + xl.totalDeletes.Add(xl.storage.getDeleteAttribute()) + } + xl.diskCtx, xl.cancel = context.WithCancel(context.TODO()) for i := range xl.apiLatencies[:] { xl.apiLatencies[i] = &lockedLastMinuteLatency{} @@ -330,6 +338,12 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, metrics bool) (info if metrics { info.Metrics = p.getMetrics() } + if driveQuorum { + info.Metrics.TotalWrites = p.totalWrites.Load() + info.Metrics.TotalDeletes = p.totalDeletes.Load() + } + info.Metrics.TotalErrorsTimeout = p.totalErrsTimeout.Load() + info.Metrics.TotalErrorsAvailability = p.totalErrsAvailability.Load() }() if p.health.isFaulty() { @@ -347,7 +361,6 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, metrics bool) (info if p.diskID != "" && p.diskID != info.ID { return info, errDiskNotFound } - return info, nil } @@ -494,17 +507,22 @@ func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPat return w.Run(func() error { return p.storage.RenameFile(ctx, srcVolume, srcPath, dstVolume, dstPath) }) } -func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (sign uint64, err error) { +func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricRenameData, srcPath, fi.DataDir, dstVolume, dstPath) if err != nil { return 0, err } - defer done(&err) + defer func() { + if driveQuorum && err == nil && !skipAccessChecks(dstVolume) { + p.storage.setWriteAttribute(p.totalWrites.Add(1)) + } + done(&err) + }() w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) err = w.Run(func() error { var ierr error - sign, ierr = p.storage.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath) + sign, ierr = p.storage.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath, opts) return ierr }) return sign, err @@ -534,7 +552,7 @@ func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path s // DeleteVersions deletes slice of versions, it can be same object // or multiple objects. -func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) (errs []error) { +func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions, opts DeleteOptions) (errs []error) { // Merely for tracing storage path := "" if len(versions) > 0 { @@ -548,9 +566,35 @@ func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string } return errs } - defer done(&err) + defer func() { + if driveQuorum && !skipAccessChecks(volume) { + var permanentDeletes uint64 + var deleteMarkers uint64 - errs = p.storage.DeleteVersions(ctx, volume, versions) + for i, nerr := range errs { + if nerr != nil { + continue + } + for _, fi := range versions[i].Versions { + if fi.Deleted { + // Delete markers are a write operation not a permanent delete. + deleteMarkers++ + continue + } + permanentDeletes++ + } + } + if deleteMarkers > 0 { + p.storage.setWriteAttribute(p.totalWrites.Add(deleteMarkers)) + } + if permanentDeletes > 0 { + p.storage.setDeleteAttribute(p.totalDeletes.Add(permanentDeletes)) + } + } + done(&err) + }() + + errs = p.storage.DeleteVersions(ctx, volume, versions, opts) for i := range errs { if errs[i] != nil { err = errs[i] @@ -582,15 +626,32 @@ func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path return w.Run(func() error { return p.storage.WriteAll(ctx, volume, path, b) }) } -func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) { +func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDeleteVersion, volume, path) if err != nil { return err } - defer done(&err) + defer func() { + defer done(&err) + + if driveQuorum && err == nil && !skipAccessChecks(volume) { + if opts.UndoWrite { + p.storage.setWriteAttribute(p.totalWrites.Add(^uint64(0))) + return + } + + if fi.Deleted { + // Delete markers are a write operation not a permanent delete. + p.storage.setWriteAttribute(p.totalWrites.Add(1)) + return + } + + p.storage.setDeleteAttribute(p.totalDeletes.Add(1)) + } + }() w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) - return w.Run(func() error { return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) }) + return w.Run(func() error { return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker, opts) }) } func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) (err error) { @@ -707,7 +768,7 @@ func (p *xlStorageDiskIDCheck) CleanAbandonedData(ctx context.Context, volume st return w.Run(func() error { return p.storage.CleanAbandonedData(ctx, volume, path) }) } -func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, path string, err string) madmin.TraceInfo { +func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, path string, err string, custom map[string]string) madmin.TraceInfo { return madmin.TraceInfo{ TraceType: madmin.TraceStorage, Time: startTime, @@ -758,15 +819,19 @@ func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...st p.totalErrsTimeout.Add(1) } } + p.apiLatencies[s].add(duration) if trace { + custom := make(map[string]string) paths = append([]string{p.String()}, paths...) var errStr string if err != nil { errStr = err.Error() } - globalTrace.Publish(storageTrace(s, startTime, duration, strings.Join(paths, " "), errStr)) + custom["total-errs-timeout"] = strconv.FormatUint(p.totalErrsTimeout.Load(), 10) + custom["total-errs-availability"] = strconv.FormatUint(p.totalErrsAvailability.Load(), 10) + globalTrace.Publish(storageTrace(s, startTime, duration, strings.Join(paths, " "), errStr, custom)) } } } @@ -779,9 +844,14 @@ const ( // diskActiveMonitoring indicates if we have enabled "active" disk monitoring var diskActiveMonitoring = true +// Indicates if users want to enable drive_quorum feature +var driveQuorum bool + func init() { diskActiveMonitoring = (env.Get("_MINIO_DRIVE_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn) || (env.Get("_MINIO_DISK_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn) + + driveQuorum = env.Get("_MINIO_DRIVE_QUORUM", config.EnableOff) == config.EnableOn } type diskHealthTracker struct { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 6da7e6091..3aa940709 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -21,6 +21,7 @@ import ( "bytes" "context" "crypto/rand" + "encoding/binary" "errors" "fmt" "io" @@ -46,6 +47,7 @@ import ( "github.com/minio/minio/internal/disk" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" + "github.com/pkg/xattr" "github.com/zeebo/xxh3" ) @@ -91,6 +93,9 @@ func isValidVolname(volname string) bool { // xlStorage - implements StorageAPI interface. type xlStorage struct { + // Indicate of NSScanner is in progress in this disk + scanning int32 + drivePath string endpoint Endpoint @@ -103,10 +108,8 @@ type xlStorage struct { // Indexes, will be -1 until assigned a set. poolIndex, setIndex, diskIndex int - // Indicate of NSScanner is in progress in this disk - scanning int32 - formatFileInfo os.FileInfo + formatFile string formatLegacy bool formatLastCheck time.Time @@ -274,6 +277,7 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { } s.formatData = formatData s.formatFileInfo = formatFi + s.formatFile = pathJoin(s.drivePath, minioMetaBucket, formatConfigFile) if len(s.formatData) == 0 { // Create all necessary bucket folders if possible. @@ -668,12 +672,46 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates return dataUsageInfo, nil } +func (s *xlStorage) getDeleteAttribute() uint64 { + attr := "user.total_deletes" + buf, err := xattr.LGet(s.formatFile, attr) + if err != nil { + // We start off with '0' if we can read the attributes + return 0 + } + return binary.LittleEndian.Uint64(buf[:8]) +} + +func (s *xlStorage) getWriteAttribute() uint64 { + attr := "user.total_writes" + buf, err := xattr.LGet(s.formatFile, attr) + if err != nil { + // We start off with '0' if we can read the attributes + return 0 + } + + return binary.LittleEndian.Uint64(buf[:8]) +} + +func (s *xlStorage) setDeleteAttribute(deleteCount uint64) error { + attr := "user.total_deletes" + + data := make([]byte, 8) + binary.LittleEndian.PutUint64(data, deleteCount) + return xattr.LSet(s.formatFile, attr, data) +} + +func (s *xlStorage) setWriteAttribute(writeCount uint64) error { + attr := "user.total_writes" + + data := make([]byte, 8) + binary.LittleEndian.PutUint64(data, writeCount) + return xattr.LSet(s.formatFile, attr, data) +} + // DiskInfo provides current information about disk space usage, // total free inodes and underlying filesystem. func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err error) { - // Do not cache results from atomic variables - scanning := atomic.LoadInt32(&s.scanning) == 1 - s.diskInfoCache.Once.Do(func() { s.diskInfoCache.TTL = time.Second s.diskInfoCache.Update = func() (interface{}, error) { @@ -710,7 +748,7 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err erro if v != nil { info = v.(DiskInfo) } - info.Scanning = scanning + info.Scanning = atomic.LoadInt32(&s.scanning) == 1 return info, err } @@ -727,8 +765,7 @@ func (s *xlStorage) getVolDir(volume string) (string, error) { } func (s *xlStorage) checkFormatJSON() (os.FileInfo, error) { - formatFile := pathJoin(s.drivePath, minioMetaBucket, formatConfigFile) - fi, err := Lstat(formatFile) + fi, err := Lstat(s.formatFile) if err != nil { // If the disk is still not initialized. if osIsNotExist(err) { @@ -779,8 +816,7 @@ func (s *xlStorage) GetDiskID() (string, error) { return diskID, nil } - formatFile := pathJoin(s.drivePath, minioMetaBucket, formatConfigFile) - b, err := os.ReadFile(formatFile) + b, err := os.ReadFile(s.formatFile) if err != nil { // If the disk is still not initialized. if osIsNotExist(err) { @@ -1093,7 +1129,7 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis // DeleteVersions deletes slice of versions, it can be same object // or multiple objects. -func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) []error { +func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions, opts DeleteOptions) []error { errs := make([]error, len(versions)) for i, fiv := range versions { @@ -1145,7 +1181,7 @@ func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) // DeleteVersion - deletes FileInfo metadata for path at `xl.meta`. forceDelMarker // will force creating a new `xl.meta` to create a new delete marker -func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error { +func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) { if HasSuffix(path, SlashSeparator) { return s.Delete(ctx, volume, path, DeleteOptions{ Recursive: false, @@ -1253,7 +1289,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F } // Updates only metadata for a given version. -func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error { +func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) (err error) { if len(fi.Metadata) == 0 { return errInvalidArgument } @@ -1292,7 +1328,7 @@ func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi } // WriteMetadata - writes FileInfo metadata for path at `xl.meta` -func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error { +func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { if fi.Fresh { var xlMeta xlMetaV2 if err := xlMeta.AddVersion(fi); err != nil { @@ -2266,7 +2302,7 @@ func skipAccessChecks(volume string) (ok bool) { } // RenameData - rename source path to destination path atomically, metadata and data directory. -func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (sign uint64, err error) { +func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) { defer func() { ignoredErrs := []error{ errFileNotFound, @@ -2285,7 +2321,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f dstVolume, dstPath, err), "xl-storage-rename-data-"+dstVolume) } - if err == nil && s.globalSync { + if s.globalSync { globalSync() } }() diff --git a/cmd/xl-storage_test.go b/cmd/xl-storage_test.go index 1de574b46..cfdff623f 100644 --- a/cmd/xl-storage_test.go +++ b/cmd/xl-storage_test.go @@ -1681,7 +1681,7 @@ func TestXLStorageDeleteVersion(t *testing.T) { // Delete version 0... checkVerExist(t) - err = xl.DeleteVersion(ctx, volume, object, FileInfo{Name: object, Volume: volume, VersionID: versions[0]}, false) + err = xl.DeleteVersion(ctx, volume, object, FileInfo{Name: object, Volume: volume, VersionID: versions[0]}, false, DeleteOptions{}) if err != nil { t.Fatal(err) } @@ -1694,7 +1694,7 @@ func TestXLStorageDeleteVersion(t *testing.T) { fis[0].Versions = append(fis[0].Versions, FileInfo{Name: object, Volume: volume, VersionID: versions[i]}) deleted[i] = true } - errs := xl.DeleteVersions(ctx, volume, fis) + errs := xl.DeleteVersions(ctx, volume, fis, DeleteOptions{}) if errs[0] != nil { t.Fatalf("expected nil error, got %v", errs[0]) } @@ -1706,7 +1706,7 @@ func TestXLStorageDeleteVersion(t *testing.T) { fis[0].Versions = append(fis[0].Versions, FileInfo{Name: object, Volume: volume, VersionID: versions[i]}) deleted[i] = true } - errs = xl.DeleteVersions(ctx, volume, fis) + errs = xl.DeleteVersions(ctx, volume, fis, DeleteOptions{}) if errs[0] != nil { t.Fatalf("expected nil error, got %v", errs[0]) } diff --git a/docs/debugging/pprofgoparser/go.mod b/docs/debugging/pprofgoparser/go.mod index 0813c0b6f..5ddb69a01 100644 --- a/docs/debugging/pprofgoparser/go.mod +++ b/docs/debugging/pprofgoparser/go.mod @@ -1,3 +1,3 @@ module github.com/minio/minio/docs/debugging/pprofgoparser -go 1.21.3 +go 1.19 diff --git a/docs/debugging/s3-verify/go.mod b/docs/debugging/s3-verify/go.mod index 160fdabe8..4febf7570 100644 --- a/docs/debugging/s3-verify/go.mod +++ b/docs/debugging/s3-verify/go.mod @@ -1,6 +1,6 @@ module github.com/minio/minio/docs/debugging/s3-verify -go 1.21.3 +go 1.19 require github.com/minio/minio-go/v7 v7.0.66 diff --git a/docs/debugging/xattr/go.mod b/docs/debugging/xattr/go.mod new file mode 100644 index 000000000..a53fe331d --- /dev/null +++ b/docs/debugging/xattr/go.mod @@ -0,0 +1,14 @@ +module github.com/minio/minio/docs/debugging/xattr + +go 1.19 + +require ( + github.com/olekukonko/tablewriter v0.0.5 + github.com/pkg/xattr v0.4.9 +) + +require ( + github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/rivo/uniseg v0.4.4 // indirect + golang.org/x/sys v0.15.0 // indirect +) diff --git a/docs/debugging/xattr/go.sum b/docs/debugging/xattr/go.sum new file mode 100644 index 000000000..e57bd98f2 --- /dev/null +++ b/docs/debugging/xattr/go.sum @@ -0,0 +1,13 @@ +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE= +github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= +github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/docs/debugging/xattr/main.go b/docs/debugging/xattr/main.go new file mode 100644 index 000000000..31954671f --- /dev/null +++ b/docs/debugging/xattr/main.go @@ -0,0 +1,117 @@ +// Copyright (c) 2015-2023 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "encoding/binary" + "errors" + "flag" + "fmt" + "log" + "os" + + "github.com/olekukonko/tablewriter" + "github.com/pkg/xattr" +) + +var ( + path, name string + value uint64 + set, list bool +) + +func getxattr(path, name string) (uint64, error) { + buf, err := xattr.LGet(path, name) + if err != nil { + return 0, err + } + + return binary.LittleEndian.Uint64(buf[:8]), nil +} + +func listxattr(path string) ([]string, error) { + return xattr.LList(path) +} + +func setxattr(path, name string, value uint64) error { + data := make([]byte, 8) + binary.LittleEndian.PutUint64(data, value) + return xattr.LSet(path, name, data) +} + +func main() { + flag.StringVar(&path, "path", "", "path name where the attribute shall be applied") + flag.StringVar(&name, "name", "", "attribute name or it can be a wildcard if '.' is specified") + flag.Uint64Var(&value, "value", 0, "attribute value expects the value to be uint64") + flag.BoolVar(&set, "set", false, "this is a set attribute operation") + + flag.Parse() + + if set && value == 0 { + log.Fatalln("setting an attribute requires a non-zero value") + } + + if !set && value > 0 { + log.Fatalln("to set a value please specify --set along with --value") + } + + table := tablewriter.NewWriter(os.Stdout) + table.SetHeader([]string{"Name", "Value"}) + table.SetAutoWrapText(false) + table.SetAutoFormatHeaders(true) + table.SetHeaderAlignment(tablewriter.ALIGN_LEFT) + table.SetAlignment(tablewriter.ALIGN_LEFT) + table.SetCenterSeparator("") + table.SetColumnSeparator("") + table.SetRowSeparator("") + table.SetHeaderLine(false) + // table.EnableBorder(false) + table.SetTablePadding("\t") // pad with tabs + table.SetNoWhiteSpace(true) + + if set { + if err := setxattr(path, name, value); err != nil { + log.Fatalln(fmt.Errorf("setting attribute %s failed with: %v", name, err)) + } + } else { + if name == "" { + log.Fatalln("you must specify an attribute name for reading") + } + var names []string + if name == "." { + attrs, err := listxattr(path) + if err != nil { + log.Fatalln(fmt.Errorf("listing attributes failed with: %v", err)) + } + names = append(names, attrs...) + } else { + names = append(names, name) + } + var data [][]string + for _, attr := range names { + value, err := getxattr(path, attr) + if err != nil { + data = append(data, []string{attr, errors.Unwrap(err).Error()}) + } else { + data = append(data, []string{attr, fmt.Sprintf("%d", value)}) + } + } + table.AppendBulk(data) // Add Bulk Data + table.Render() + } +} diff --git a/go.mod b/go.mod index eccdc4399..22bde3307 100644 --- a/go.mod +++ b/go.mod @@ -71,6 +71,7 @@ require ( github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 github.com/pkg/sftp v1.13.6 + github.com/pkg/xattr v0.4.9 github.com/prometheus/client_golang v1.17.0 github.com/prometheus/client_model v0.5.0 github.com/prometheus/common v0.45.0 @@ -213,7 +214,6 @@ require ( github.com/oklog/ulid v1.3.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/pierrec/lz4/v4 v4.1.19 // indirect - github.com/pkg/xattr v0.4.9 // indirect github.com/posener/complete v1.2.3 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/pquerna/cachecontrol v0.2.0 // indirect diff --git a/internal/config/api/api.go b/internal/config/api/api.go index 03776c3d1..8e665ab23 100644 --- a/internal/config/api/api.go +++ b/internal/config/api/api.go @@ -253,7 +253,7 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { listQuorum := env.Get(EnvAPIListQuorum, kvs.GetWithDefault(apiListQuorum, DefaultKVS)) switch listQuorum { - case "strict", "optimal", "reduced", "disk": + case "strict", "optimal", "reduced", "disk", "auto": default: return cfg, fmt.Errorf("invalid value %v for list_quorum: will default to 'strict'", listQuorum) } diff --git a/internal/config/api/help.go b/internal/config/api/help.go index 44840bb1a..98a5f18dc 100644 --- a/internal/config/api/help.go +++ b/internal/config/api/help.go @@ -58,7 +58,7 @@ var ( }, config.HelpKV{ Key: apiListQuorum, - Description: `set the acceptable quorum expected for list operations e.g. "optimal", "reduced", "disk", "strict"` + defaultHelpPostfix(apiListQuorum), + Description: `set the acceptable quorum expected for list operations e.g. "optimal", "reduced", "disk", "strict", "auto"` + defaultHelpPostfix(apiListQuorum), Optional: true, Type: "string", },