list: Fix IsTruncated and NextMarker when encountering expired objects (#19290)

This commit is contained in:
Anis Eleuch 2024-03-19 21:23:12 +01:00 committed by GitHub
parent 4d7068931a
commit b5e074e54c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 104 additions and 90 deletions

View File

@ -1266,13 +1266,17 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec
return z.serverPools[poolIdx].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
}
func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return z.listObjectsGeneric(ctx, bucket, prefix, marker, delimiter, maxKeys, true)
}
func (z *erasureServerPools) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) {
marker := continuationToken
if marker == "" {
marker = startAfter
}
loi, err := z.ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
loi, err := z.listObjectsGeneric(ctx, bucket, prefix, marker, delimiter, maxKeys, false)
if err != nil {
return ListObjectsV2Info{}, err
}
@ -1381,9 +1385,10 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int {
return maxKeys
}
func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, v1 bool) (ListObjectsInfo, error) {
var loi ListObjectsInfo
opts := listPathOptions{
V1: v1,
Bucket: bucket,
Prefix: prefix,
Separator: delimiter,
@ -1554,8 +1559,23 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
}
if loi.IsTruncated {
last := objects[len(objects)-1]
loi.NextMarker = opts.encodeMarker(last.Name)
loi.NextMarker = last.Name
}
if merged.lastSkippedEntry != "" {
if merged.lastSkippedEntry > loi.NextMarker {
// An object hidden by ILM was found during listing. Since the number of entries
// fetched from drives is limited, set IsTruncated to true to ask the s3 client
// to continue listing if it wishes in order to find if there is more objects.
loi.IsTruncated = true
loi.NextMarker = merged.lastSkippedEntry
}
}
if loi.NextMarker != "" {
loi.NextMarker = opts.encodeMarker(loi.NextMarker)
}
return loi, nil
}

View File

@ -473,6 +473,8 @@ type metaCacheEntriesSorted struct {
listID string
// Reuse buffers
reuse bool
// Contain the last skipped object after an ILM expiry evaluation
lastSkippedEntry string
}
// shallowClone will create a shallow clone of the array objects,

View File

@ -291,14 +291,6 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
}
mu.Unlock()
// Do lifecycle filtering.
if o.Lifecycle != nil || o.Replication.Config != nil {
filterIn := make(chan metaCacheEntry, 10)
go applyBucketActions(ctx, o, filterIn, results)
// Replace results.
results = filterIn
}
// Gather results to a single channel.
// Quorum is one since we are merging across sets.
err := mergeEntryChannels(ctx, inputs, results, 1)
@ -339,84 +331,50 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
return nil
}
// applyBucketActions applies lifecycle and replication actions on the listing
// It will filter out objects if the most recent version should be deleted by lifecycle.
// Entries that failed replication will be queued if no lifecycle rules got applied.
// out will be closed when there are no more results.
// When 'in' is closed or the context is canceled the
// function closes 'out' and exits.
func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCacheEntry, out chan<- metaCacheEntry) {
defer xioutil.SafeClose(out)
// triggerExpiryAndRepl applies lifecycle and replication actions on the listing
// It returns true if the listing is non-versioned and the given object is expired.
func triggerExpiryAndRepl(ctx context.Context, o listPathOptions, obj metaCacheEntry) (skip bool) {
versioned := o.Versioning != nil && o.Versioning.Versioned(obj.name)
for {
var obj metaCacheEntry
var ok bool
select {
case <-ctx.Done():
return
case obj, ok = <-in:
if !ok {
return
}
}
var skip bool
versioned := o.Versioning != nil && o.Versioning.Versioned(obj.name)
// skip latest object from listing only for regular
// listObjects calls, versioned based listing cannot
// filter out between versions 'obj' cannot be truncated
// in such a manner, so look for skipping an object only
// for regular ListObjects() call only.
if !o.Versioned {
fi, err := obj.fileInfo(o.Bucket)
if err != nil {
continue
}
objInfo := fi.ToObjectInfo(o.Bucket, obj.name, versioned)
if o.Lifecycle != nil {
act := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, o.Replication.Config, objInfo).Action
skip = act.Delete()
if act.DeleteRestored() {
// do not skip DeleteRestored* actions
skip = false
}
}
}
// Skip entry only if needed via ILM, skipping is never true for versioned listing.
if !skip {
select {
case <-ctx.Done():
return
case out <- obj:
}
}
fiv, err := obj.fileInfoVersions(o.Bucket)
// skip latest object from listing only for regular
// listObjects calls, versioned based listing cannot
// filter out between versions 'obj' cannot be truncated
// in such a manner, so look for skipping an object only
// for regular ListObjects() call only.
if !o.Versioned && !o.V1 {
fi, err := obj.fileInfo(o.Bucket)
if err != nil {
continue
return
}
// Expire all versions if needed, if not attempt to queue for replication.
for _, version := range fiv.Versions {
objInfo := version.ToObjectInfo(o.Bucket, obj.name, versioned)
if o.Lifecycle != nil {
evt := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, o.Replication.Config, objInfo)
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects)
if !evt.Action.DeleteRestored() {
continue
} // queue version for replication upon expired restored copies if needed.
}
}
queueReplicationHeal(ctx, o.Bucket, objInfo, o.Replication, 0)
objInfo := fi.ToObjectInfo(o.Bucket, obj.name, versioned)
if o.Lifecycle != nil {
act := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, o.Replication.Config, objInfo).Action
skip = act.Delete() && !act.DeleteRestored()
}
}
fiv, err := obj.fileInfoVersions(o.Bucket)
if err != nil {
return
}
// Expire all versions if needed, if not attempt to queue for replication.
for _, version := range fiv.Versions {
objInfo := version.ToObjectInfo(o.Bucket, obj.name, versioned)
if o.Lifecycle != nil {
evt := evalActionFromLifecycle(ctx, *o.Lifecycle, o.Retention, o.Replication.Config, objInfo)
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects)
if !evt.Action.DeleteRestored() {
continue
} // queue version for replication upon expired restored copies if needed.
}
}
queueReplicationHeal(ctx, o.Bucket, objInfo, o.Replication, 0)
}
return
}
func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) {

View File

@ -98,6 +98,8 @@ type listPathOptions struct {
// Versioned is this a ListObjectVersions call.
Versioned bool
// V1 listing type
V1 bool
// Versioning config is used for if the path
// has versioning enabled.
@ -172,7 +174,8 @@ func (o *listPathOptions) debugln(data ...interface{}) {
}
}
// gatherResults will collect all results on the input channel and filter results according to the options.
// gatherResults will collect all results on the input channel and filter results according
// to the options or to the current bucket ILM expiry rules.
// Caller should close the channel when done.
// The returned function will return the results once there is enough or input is closed,
// or the context is canceled.
@ -214,6 +217,12 @@ func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCache
if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() {
continue
}
if o.Lifecycle != nil || o.Replication.Config != nil {
if skipped := triggerExpiryAndRepl(ctx, *o, entry); skipped == true {
results.lastSkippedEntry = entry.name
continue
}
}
if o.Limit > 0 && results.len() >= o.Limit {
// We have enough and we have more.
// Do not return io.EOF

View File

@ -114,6 +114,12 @@ func (z *listPathOptions) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Versioned")
return
}
case "V1":
z.V1, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "V1")
return
}
case "StopDiskAtLimit":
z.StopDiskAtLimit, err = dc.ReadBool()
if err != nil {
@ -145,9 +151,9 @@ func (z *listPathOptions) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *listPathOptions) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 18
// map header, size 19
// write "ID"
err = en.Append(0xde, 0x0, 0x12, 0xa2, 0x49, 0x44)
err = en.Append(0xde, 0x0, 0x13, 0xa2, 0x49, 0x44)
if err != nil {
return
}
@ -296,6 +302,16 @@ func (z *listPathOptions) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Versioned")
return
}
// write "V1"
err = en.Append(0xa2, 0x56, 0x31)
if err != nil {
return
}
err = en.WriteBool(z.V1)
if err != nil {
err = msgp.WrapError(err, "V1")
return
}
// write "StopDiskAtLimit"
err = en.Append(0xaf, 0x53, 0x74, 0x6f, 0x70, 0x44, 0x69, 0x73, 0x6b, 0x41, 0x74, 0x4c, 0x69, 0x6d, 0x69, 0x74)
if err != nil {
@ -332,9 +348,9 @@ func (z *listPathOptions) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z *listPathOptions) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 18
// map header, size 19
// string "ID"
o = append(o, 0xde, 0x0, 0x12, 0xa2, 0x49, 0x44)
o = append(o, 0xde, 0x0, 0x13, 0xa2, 0x49, 0x44)
o = msgp.AppendString(o, z.ID)
// string "Bucket"
o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
@ -378,6 +394,9 @@ func (z *listPathOptions) MarshalMsg(b []byte) (o []byte, err error) {
// string "Versioned"
o = append(o, 0xa9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x65, 0x64)
o = msgp.AppendBool(o, z.Versioned)
// string "V1"
o = append(o, 0xa2, 0x56, 0x31)
o = msgp.AppendBool(o, z.V1)
// string "StopDiskAtLimit"
o = append(o, 0xaf, 0x53, 0x74, 0x6f, 0x70, 0x44, 0x69, 0x73, 0x6b, 0x41, 0x74, 0x4c, 0x69, 0x6d, 0x69, 0x74)
o = msgp.AppendBool(o, z.StopDiskAtLimit)
@ -498,6 +517,12 @@ func (z *listPathOptions) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Versioned")
return
}
case "V1":
z.V1, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "V1")
return
}
case "StopDiskAtLimit":
z.StopDiskAtLimit, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
@ -530,6 +555,6 @@ func (z *listPathOptions) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *listPathOptions) Msgsize() (s int) {
s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 8 + msgp.StringPrefixSize + len(z.BaseDir) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 13 + msgp.StringPrefixSize + len(z.FilterPrefix) + 7 + msgp.StringPrefixSize + len(z.Marker) + 6 + msgp.IntSize + 9 + msgp.StringPrefixSize + len(z.AskDisks) + 12 + msgp.BoolSize + 10 + msgp.BoolSize + 10 + msgp.StringPrefixSize + len(z.Separator) + 7 + msgp.BoolSize + 19 + msgp.BoolSize + 10 + msgp.BoolSize + 10 + msgp.BoolSize + 16 + msgp.BoolSize + 5 + msgp.IntSize + 4 + msgp.IntSize
s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 8 + msgp.StringPrefixSize + len(z.BaseDir) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 13 + msgp.StringPrefixSize + len(z.FilterPrefix) + 7 + msgp.StringPrefixSize + len(z.Marker) + 6 + msgp.IntSize + 9 + msgp.StringPrefixSize + len(z.AskDisks) + 12 + msgp.BoolSize + 10 + msgp.BoolSize + 10 + msgp.StringPrefixSize + len(z.Separator) + 7 + msgp.BoolSize + 19 + msgp.BoolSize + 10 + msgp.BoolSize + 10 + msgp.BoolSize + 3 + msgp.BoolSize + 16 + msgp.BoolSize + 5 + msgp.IntSize + 4 + msgp.IntSize
return
}