From ec816f384081804455c7b9f224f863ea2b8b4888 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 19 Apr 2024 09:44:59 -0700 Subject: [PATCH] Reduce parallelReader allocs (#19558) --- cmd/erasure-decode.go | 27 +++++++++++++++++++++++++++ cmd/erasure-multipart.go | 17 +++++++++-------- cmd/erasure-object.go | 17 +++++++++-------- cmd/erasure-server-pool.go | 5 +++-- cmd/globals.go | 2 +- internal/bpool/bpool.go | 12 ++++++++++++ 6 files changed, 61 insertions(+), 19 deletions(-) diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index 86a2ff1b5..7f352f0b2 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -38,6 +38,7 @@ type parallelReader struct { shardFileSize int64 buf [][]byte readerToBuf []int + stashBuffer []byte } // newParallelReader returns parallelReader. @@ -46,6 +47,21 @@ func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int for i := range r2b { r2b[i] = i } + bufs := make([][]byte, len(readers)) + // Fill buffers + b := globalBytePoolCap.Load().Get() + shardSize := int(e.ShardSize()) + if cap(b) < len(readers)*shardSize { + // We should always have enough capacity, but older objects may be bigger. + globalBytePoolCap.Load().Put(b) + b = nil + } else { + // Seed the buffers. + for i := range bufs { + bufs[i] = b[i*shardSize : (i+1)*shardSize] + } + } + return ¶llelReader{ readers: readers, orgReaders: readers, @@ -55,6 +71,15 @@ func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int shardFileSize: e.ShardFileSize(totalLength), buf: make([][]byte, len(readers)), readerToBuf: r2b, + stashBuffer: b, + } +} + +// Done will release any resources used by the parallelReader. +func (p *parallelReader) Done() { + if p.stashBuffer != nil { + globalBytePoolCap.Load().Put(p.stashBuffer) + p.stashBuffer = nil } } @@ -224,6 +249,7 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read if len(prefer) == len(readers) { reader.preferReaders(prefer) } + defer reader.Done() startBlock := offset / e.blockSize endBlock := (offset + length) / e.blockSize @@ -294,6 +320,7 @@ func (e Erasure) Heal(ctx context.Context, writers []io.Writer, readers []io.Rea if len(readers) == len(prefer) { reader.preferReaders(prefer) } + defer reader.Done() startBlock := int64(0) endBlock := totalLength / e.blockSize diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 069b39476..1e4bffd8a 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -679,12 +679,12 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Account for padding and forced compression overhead and encryption. buffer = make([]byte, data.ActualSize()+256+32+32, data.ActualSize()*2+512) } else { - buffer = globalBytePoolCap.Get() - defer globalBytePoolCap.Put(buffer) + buffer = globalBytePoolCap.Load().Get() + defer globalBytePoolCap.Load().Put(buffer) } case size >= fi.Erasure.BlockSize: - buffer = globalBytePoolCap.Get() - defer globalBytePoolCap.Put(buffer) + buffer = globalBytePoolCap.Load().Get() + defer globalBytePoolCap.Load().Put(buffer) case size < fi.Erasure.BlockSize: // No need to allocate fully fi.Erasure.BlockSize buffer if the incoming data is smaller. buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1)) @@ -705,10 +705,11 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo if data.Size() > bigFileThreshold { // Add input readahead. // We use 2 buffers, so we always have a full buffer of input. - bufA := globalBytePoolCap.Get() - bufB := globalBytePoolCap.Get() - defer globalBytePoolCap.Put(bufA) - defer globalBytePoolCap.Put(bufB) + pool := globalBytePoolCap.Load() + bufA := pool.Get() + bufB := pool.Get() + defer pool.Put(bufA) + defer pool.Put(bufB) ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]}) if err == nil { toEncode = ra diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index a8106471d..66d3bc924 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1138,8 +1138,8 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r * case size == 0: buffer = make([]byte, 1) // Allocate at least a byte to reach EOF case size >= fi.Erasure.BlockSize: - buffer = globalBytePoolCap.Get() - defer globalBytePoolCap.Put(buffer) + buffer = globalBytePoolCap.Load().Get() + defer globalBytePoolCap.Load().Put(buffer) case size < fi.Erasure.BlockSize: // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1)) @@ -1388,8 +1388,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st case size == 0: buffer = make([]byte, 1) // Allocate at least a byte to reach EOF case size >= fi.Erasure.BlockSize || size == -1: - buffer = globalBytePoolCap.Get() - defer globalBytePoolCap.Put(buffer) + buffer = globalBytePoolCap.Load().Get() + defer globalBytePoolCap.Load().Put(buffer) case size < fi.Erasure.BlockSize: // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1)) @@ -1451,10 +1451,11 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st toEncode := io.Reader(data) if data.Size() >= bigFileThreshold { // We use 2 buffers, so we always have a full buffer of input. - bufA := globalBytePoolCap.Get() - bufB := globalBytePoolCap.Get() - defer globalBytePoolCap.Put(bufA) - defer globalBytePoolCap.Put(bufB) + pool := globalBytePoolCap.Load() + bufA := pool.Get() + bufB := pool.Get() + defer pool.Put(bufA) + defer pool.Put(bufB) ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]}) if err == nil { toEncode = ra diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 4d230c964..982494e10 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -103,8 +103,9 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ // Initialize byte pool once for all sets, bpool size is set to // setCount * setDriveCount with each memory upto blockSizeV2. - globalBytePoolCap = bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2) - globalBytePoolCap.Populate() + buffers := bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2) + buffers.Populate() + globalBytePoolCap.Store(buffers) var localDrives []StorageAPI local := endpointServerPools.FirstLocal() diff --git a/cmd/globals.go b/cmd/globals.go index 51ddd2417..58eea1125 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -241,7 +241,7 @@ var ( globalBucketMonitor *bandwidth.Monitor globalPolicySys *PolicySys globalIAMSys *IAMSys - globalBytePoolCap *bpool.BytePoolCap + globalBytePoolCap atomic.Pointer[bpool.BytePoolCap] globalLifecycleSys *LifecycleSys globalBucketSSEConfigSys *BucketSSEConfigSys diff --git a/internal/bpool/bpool.go b/internal/bpool/bpool.go index d73367346..2cbf88e8c 100644 --- a/internal/bpool/bpool.go +++ b/internal/bpool/bpool.go @@ -52,6 +52,9 @@ func (bp *BytePoolCap) Populate() { // Get gets a []byte from the BytePool, or creates a new one if none are // available in the pool. func (bp *BytePoolCap) Get() (b []byte) { + if bp == nil { + return nil + } select { case b = <-bp.c: // reuse existing buffer @@ -68,6 +71,9 @@ func (bp *BytePoolCap) Get() (b []byte) { // Put returns the given Buffer to the BytePool. func (bp *BytePoolCap) Put(b []byte) { + if bp == nil { + return + } select { case bp.c <- b: // buffer went back into pool @@ -78,10 +84,16 @@ func (bp *BytePoolCap) Put(b []byte) { // Width returns the width of the byte arrays in this pool. func (bp *BytePoolCap) Width() (n int) { + if bp == nil { + return 0 + } return bp.w } // WidthCap returns the cap width of the byte arrays in this pool. func (bp *BytePoolCap) WidthCap() (n int) { + if bp == nil { + return 0 + } return bp.wcap }