Reduce parallelReader allocs (#19558)

This commit is contained in:
Klaus Post 2024-04-19 09:44:59 -07:00 committed by GitHub
parent 5f774951b1
commit ec816f3840
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 61 additions and 19 deletions

View File

@ -38,6 +38,7 @@ type parallelReader struct {
shardFileSize int64
buf [][]byte
readerToBuf []int
stashBuffer []byte
}
// newParallelReader returns parallelReader.
@ -46,6 +47,21 @@ func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int
for i := range r2b {
r2b[i] = i
}
bufs := make([][]byte, len(readers))
// Fill buffers
b := globalBytePoolCap.Load().Get()
shardSize := int(e.ShardSize())
if cap(b) < len(readers)*shardSize {
// We should always have enough capacity, but older objects may be bigger.
globalBytePoolCap.Load().Put(b)
b = nil
} else {
// Seed the buffers.
for i := range bufs {
bufs[i] = b[i*shardSize : (i+1)*shardSize]
}
}
return &parallelReader{
readers: readers,
orgReaders: readers,
@ -55,6 +71,15 @@ func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int
shardFileSize: e.ShardFileSize(totalLength),
buf: make([][]byte, len(readers)),
readerToBuf: r2b,
stashBuffer: b,
}
}
// Done will release any resources used by the parallelReader.
func (p *parallelReader) Done() {
if p.stashBuffer != nil {
globalBytePoolCap.Load().Put(p.stashBuffer)
p.stashBuffer = nil
}
}
@ -224,6 +249,7 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read
if len(prefer) == len(readers) {
reader.preferReaders(prefer)
}
defer reader.Done()
startBlock := offset / e.blockSize
endBlock := (offset + length) / e.blockSize
@ -294,6 +320,7 @@ func (e Erasure) Heal(ctx context.Context, writers []io.Writer, readers []io.Rea
if len(readers) == len(prefer) {
reader.preferReaders(prefer)
}
defer reader.Done()
startBlock := int64(0)
endBlock := totalLength / e.blockSize

View File

@ -679,12 +679,12 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// Account for padding and forced compression overhead and encryption.
buffer = make([]byte, data.ActualSize()+256+32+32, data.ActualSize()*2+512)
} else {
buffer = globalBytePoolCap.Get()
defer globalBytePoolCap.Put(buffer)
buffer = globalBytePoolCap.Load().Get()
defer globalBytePoolCap.Load().Put(buffer)
}
case size >= fi.Erasure.BlockSize:
buffer = globalBytePoolCap.Get()
defer globalBytePoolCap.Put(buffer)
buffer = globalBytePoolCap.Load().Get()
defer globalBytePoolCap.Load().Put(buffer)
case size < fi.Erasure.BlockSize:
// No need to allocate fully fi.Erasure.BlockSize buffer if the incoming data is smaller.
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
@ -705,10 +705,11 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
if data.Size() > bigFileThreshold {
// Add input readahead.
// We use 2 buffers, so we always have a full buffer of input.
bufA := globalBytePoolCap.Get()
bufB := globalBytePoolCap.Get()
defer globalBytePoolCap.Put(bufA)
defer globalBytePoolCap.Put(bufB)
pool := globalBytePoolCap.Load()
bufA := pool.Get()
bufB := pool.Get()
defer pool.Put(bufA)
defer pool.Put(bufB)
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
if err == nil {
toEncode = ra

View File

@ -1138,8 +1138,8 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
case size == 0:
buffer = make([]byte, 1) // Allocate at least a byte to reach EOF
case size >= fi.Erasure.BlockSize:
buffer = globalBytePoolCap.Get()
defer globalBytePoolCap.Put(buffer)
buffer = globalBytePoolCap.Load().Get()
defer globalBytePoolCap.Load().Put(buffer)
case size < fi.Erasure.BlockSize:
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
@ -1388,8 +1388,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
case size == 0:
buffer = make([]byte, 1) // Allocate at least a byte to reach EOF
case size >= fi.Erasure.BlockSize || size == -1:
buffer = globalBytePoolCap.Get()
defer globalBytePoolCap.Put(buffer)
buffer = globalBytePoolCap.Load().Get()
defer globalBytePoolCap.Load().Put(buffer)
case size < fi.Erasure.BlockSize:
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
@ -1451,10 +1451,11 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
toEncode := io.Reader(data)
if data.Size() >= bigFileThreshold {
// We use 2 buffers, so we always have a full buffer of input.
bufA := globalBytePoolCap.Get()
bufB := globalBytePoolCap.Get()
defer globalBytePoolCap.Put(bufA)
defer globalBytePoolCap.Put(bufB)
pool := globalBytePoolCap.Load()
bufA := pool.Get()
bufB := pool.Get()
defer pool.Put(bufA)
defer pool.Put(bufB)
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
if err == nil {
toEncode = ra

View File

@ -103,8 +103,9 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
// Initialize byte pool once for all sets, bpool size is set to
// setCount * setDriveCount with each memory upto blockSizeV2.
globalBytePoolCap = bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2)
globalBytePoolCap.Populate()
buffers := bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2)
buffers.Populate()
globalBytePoolCap.Store(buffers)
var localDrives []StorageAPI
local := endpointServerPools.FirstLocal()

View File

@ -241,7 +241,7 @@ var (
globalBucketMonitor *bandwidth.Monitor
globalPolicySys *PolicySys
globalIAMSys *IAMSys
globalBytePoolCap *bpool.BytePoolCap
globalBytePoolCap atomic.Pointer[bpool.BytePoolCap]
globalLifecycleSys *LifecycleSys
globalBucketSSEConfigSys *BucketSSEConfigSys

View File

@ -52,6 +52,9 @@ func (bp *BytePoolCap) Populate() {
// Get gets a []byte from the BytePool, or creates a new one if none are
// available in the pool.
func (bp *BytePoolCap) Get() (b []byte) {
if bp == nil {
return nil
}
select {
case b = <-bp.c:
// reuse existing buffer
@ -68,6 +71,9 @@ func (bp *BytePoolCap) Get() (b []byte) {
// Put returns the given Buffer to the BytePool.
func (bp *BytePoolCap) Put(b []byte) {
if bp == nil {
return
}
select {
case bp.c <- b:
// buffer went back into pool
@ -78,10 +84,16 @@ func (bp *BytePoolCap) Put(b []byte) {
// Width returns the width of the byte arrays in this pool.
func (bp *BytePoolCap) Width() (n int) {
if bp == nil {
return 0
}
return bp.w
}
// WidthCap returns the cap width of the byte arrays in this pool.
func (bp *BytePoolCap) WidthCap() (n int) {
if bp == nil {
return 0
}
return bp.wcap
}