mirror of https://github.com/minio/minio.git
Compare commits
3 Commits
74db70ee30
...
afa74167c9
Author | SHA1 | Date |
---|---|---|
Harshavardhana | afa74167c9 | |
Klaus Post | ec49fff583 | |
Harshavardhana | 4005f2b5a9 |
|
@ -1217,9 +1217,9 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ
|
|||
}
|
||||
|
||||
bucketStorageCache.InitOnce(10*time.Second,
|
||||
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||
func() (DataUsageInfo, error) {
|
||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
cachevalue.Opts{ReturnLastGood: true},
|
||||
func(ctx context.Context) (DataUsageInfo, error) {
|
||||
ctx, done := context.WithTimeout(ctx, 2*time.Second)
|
||||
defer done()
|
||||
|
||||
return loadDataUsageFromBackend(ctx, objectAPI)
|
||||
|
|
|
@ -49,8 +49,8 @@ var bucketStorageCache = cachevalue.New[DataUsageInfo]()
|
|||
func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
||||
bucketStorageCache.InitOnce(10*time.Second,
|
||||
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||
func() (DataUsageInfo, error) {
|
||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
func(ctx context.Context) (DataUsageInfo, error) {
|
||||
ctx, done := context.WithTimeout(ctx, 2*time.Second)
|
||||
defer done()
|
||||
|
||||
return loadDataUsageFromBackend(ctx, objAPI)
|
||||
|
@ -59,8 +59,8 @@ func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
|||
}
|
||||
|
||||
// GetBucketUsageInfo return bucket usage info for a given bucket
|
||||
func (sys *BucketQuotaSys) GetBucketUsageInfo(bucket string) (BucketUsageInfo, error) {
|
||||
dui, err := bucketStorageCache.Get()
|
||||
func (sys *BucketQuotaSys) GetBucketUsageInfo(ctx context.Context, bucket string) (BucketUsageInfo, error) {
|
||||
dui, err := bucketStorageCache.GetWithCtx(ctx)
|
||||
timedout := OperationTimedOut{}
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &timedout) {
|
||||
if len(dui.BucketsUsage) > 0 {
|
||||
|
@ -118,7 +118,7 @@ func (sys *BucketQuotaSys) enforceQuotaHard(ctx context.Context, bucket string,
|
|||
return BucketQuotaExceeded{Bucket: bucket}
|
||||
}
|
||||
|
||||
bui, err := sys.GetBucketUsageInfo(bucket)
|
||||
bui, err := sys.GetBucketUsageInfo(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -79,12 +79,12 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket
|
|||
prefixUsageCache.InitOnce(30*time.Second,
|
||||
// No need to fail upon Update() error, fallback to old value.
|
||||
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||
func() (map[string]uint64, error) {
|
||||
func(ctx context.Context) (map[string]uint64, error) {
|
||||
m := make(map[string]uint64)
|
||||
for _, pool := range z.serverPools {
|
||||
for _, er := range pool.sets {
|
||||
// Load bucket usage prefixes
|
||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
ctx, done := context.WithTimeout(ctx, 2*time.Second)
|
||||
ok := cache.load(ctx, er, bucket+slashSeparator+dataUsageCacheName) == nil
|
||||
done()
|
||||
if ok {
|
||||
|
@ -107,7 +107,7 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket
|
|||
},
|
||||
)
|
||||
|
||||
return prefixUsageCache.Get()
|
||||
return prefixUsageCache.GetWithCtx(ctx)
|
||||
}
|
||||
|
||||
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
|
||||
|
|
|
@ -1231,7 +1231,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||
}
|
||||
|
||||
if opts.WantChecksum != nil {
|
||||
err := opts.WantChecksum.Matches(checksumCombined)
|
||||
err := opts.WantChecksum.Matches(checksumCombined, len(parts))
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
|
|
|
@ -1962,8 +1962,8 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions
|
|||
if opts.Cached {
|
||||
listBucketsCache.InitOnce(time.Second,
|
||||
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||
func() ([]BucketInfo, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
func(ctx context.Context) ([]BucketInfo, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
||||
|
@ -1980,7 +1980,7 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions
|
|||
},
|
||||
)
|
||||
|
||||
return listBucketsCache.Get()
|
||||
return listBucketsCache.GetWithCtx(ctx)
|
||||
}
|
||||
|
||||
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
||||
|
|
|
@ -361,7 +361,7 @@ type MetricsGroupOpts struct {
|
|||
func (g *MetricsGroupV2) RegisterRead(read func(context.Context) []MetricV2) {
|
||||
g.metricsCache = cachevalue.NewFromFunc(g.cacheInterval,
|
||||
cachevalue.Opts{ReturnLastGood: true},
|
||||
func() ([]MetricV2, error) {
|
||||
func(ctx context.Context) ([]MetricV2, error) {
|
||||
if g.metricsGroupOpts.dependGlobalObjectAPI {
|
||||
objLayer := newObjectLayerFn()
|
||||
// Service not initialized yet
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -57,7 +58,7 @@ type nodesOnline struct {
|
|||
}
|
||||
|
||||
func newNodesUpDownCache() *cachevalue.Cache[nodesOnline] {
|
||||
loadNodesUpDown := func() (v nodesOnline, err error) {
|
||||
loadNodesUpDown := func(ctx context.Context) (v nodesOnline, err error) {
|
||||
v.Online, v.Offline = globalNotificationSys.GetPeerOnlineCount()
|
||||
return
|
||||
}
|
||||
|
@ -84,7 +85,7 @@ type storageMetrics struct {
|
|||
}
|
||||
|
||||
func newDataUsageInfoCache() *cachevalue.Cache[DataUsageInfo] {
|
||||
loadDataUsage := func() (u DataUsageInfo, err error) {
|
||||
loadDataUsage := func(ctx context.Context) (u DataUsageInfo, err error) {
|
||||
objLayer := newObjectLayerFn()
|
||||
if objLayer == nil {
|
||||
return
|
||||
|
@ -100,7 +101,7 @@ func newDataUsageInfoCache() *cachevalue.Cache[DataUsageInfo] {
|
|||
}
|
||||
|
||||
func newESetHealthResultCache() *cachevalue.Cache[HealthResult] {
|
||||
loadHealth := func() (r HealthResult, err error) {
|
||||
loadHealth := func(ctx context.Context) (r HealthResult, err error) {
|
||||
objLayer := newObjectLayerFn()
|
||||
if objLayer == nil {
|
||||
return
|
||||
|
@ -157,7 +158,7 @@ func newDriveMetricsCache() *cachevalue.Cache[storageMetrics] {
|
|||
prevDriveIOStatsRefreshedAt time.Time
|
||||
)
|
||||
|
||||
loadDriveMetrics := func() (v storageMetrics, err error) {
|
||||
loadDriveMetrics := func(ctx context.Context) (v storageMetrics, err error) {
|
||||
objLayer := newObjectLayerFn()
|
||||
if objLayer == nil {
|
||||
return
|
||||
|
@ -203,7 +204,7 @@ func newDriveMetricsCache() *cachevalue.Cache[storageMetrics] {
|
|||
}
|
||||
|
||||
func newCPUMetricsCache() *cachevalue.Cache[madmin.CPUMetrics] {
|
||||
loadCPUMetrics := func() (v madmin.CPUMetrics, err error) {
|
||||
loadCPUMetrics := func(ctx context.Context) (v madmin.CPUMetrics, err error) {
|
||||
var types madmin.MetricType = madmin.MetricsCPU
|
||||
|
||||
m := collectLocalMetrics(types, collectMetricsOpts{
|
||||
|
@ -228,7 +229,7 @@ func newCPUMetricsCache() *cachevalue.Cache[madmin.CPUMetrics] {
|
|||
}
|
||||
|
||||
func newMemoryMetricsCache() *cachevalue.Cache[madmin.MemInfo] {
|
||||
loadMemoryMetrics := func() (v madmin.MemInfo, err error) {
|
||||
loadMemoryMetrics := func(ctx context.Context) (v madmin.MemInfo, err error) {
|
||||
var types madmin.MetricType = madmin.MetricsMem
|
||||
|
||||
m := collectLocalMetrics(types, collectMetricsOpts{
|
||||
|
@ -253,7 +254,7 @@ func newMemoryMetricsCache() *cachevalue.Cache[madmin.MemInfo] {
|
|||
}
|
||||
|
||||
func newClusterStorageInfoCache() *cachevalue.Cache[storageMetrics] {
|
||||
loadStorageInfo := func() (v storageMetrics, err error) {
|
||||
loadStorageInfo := func(ctx context.Context) (v storageMetrics, err error) {
|
||||
objLayer := newObjectLayerFn()
|
||||
if objLayer == nil {
|
||||
return storageMetrics{}, nil
|
||||
|
|
|
@ -116,7 +116,7 @@ func (client *peerRESTClient) call(method string, values url.Values, body io.Rea
|
|||
// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
||||
// after verifying format.json
|
||||
func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||
if client == nil || !client.IsOnline() {
|
||||
if client == nil {
|
||||
return nil, errPeerNotReachable
|
||||
}
|
||||
|
||||
|
@ -129,6 +129,10 @@ func (client *peerRESTClient) callWithContext(ctx context.Context, method string
|
|||
return respBody, nil
|
||||
}
|
||||
|
||||
if xnet.IsNetworkOrHostDown(err, true) {
|
||||
return nil, errPeerNotReachable
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -139,7 +143,11 @@ func (client *peerRESTClient) String() string {
|
|||
|
||||
// IsOnline returns true if the peer client is online.
|
||||
func (client *peerRESTClient) IsOnline() bool {
|
||||
return client.restClient.IsOnline()
|
||||
conn := client.gridConn()
|
||||
if conn == nil {
|
||||
return false
|
||||
}
|
||||
return client.restClient.IsOnline() || conn.State() == grid.StateConnected
|
||||
}
|
||||
|
||||
// Close - marks the client as closed.
|
||||
|
|
|
@ -191,8 +191,13 @@ func (client *storageRESTClient) String() string {
|
|||
return client.endpoint.String()
|
||||
}
|
||||
|
||||
// IsOnline - returns whether RPC client failed to connect or not.
|
||||
// IsOnline - returns whether client failed to connect or not.
|
||||
func (client *storageRESTClient) IsOnline() bool {
|
||||
return client.restClient.IsOnline() || client.IsOnlineWS()
|
||||
}
|
||||
|
||||
// IsOnlineWS - returns whether websocket client failed to connect or not.
|
||||
func (client *storageRESTClient) IsOnlineWS() bool {
|
||||
return client.gridConn.State() == grid.StateConnected
|
||||
}
|
||||
|
||||
|
@ -254,7 +259,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
|
|||
}
|
||||
|
||||
func (client *storageRESTClient) GetDiskID() (string, error) {
|
||||
if !client.IsOnline() {
|
||||
if !client.IsOnlineWS() {
|
||||
// make sure to check if the disk is offline, since the underlying
|
||||
// value is cached we should attempt to invalidate it if such calls
|
||||
// were attempted. This can lead to false success under certain conditions
|
||||
|
@ -275,7 +280,7 @@ func (client *storageRESTClient) SetDiskID(id string) {
|
|||
}
|
||||
|
||||
func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOptions) (info DiskInfo, err error) {
|
||||
if !client.IsOnline() {
|
||||
if !client.IsOnlineWS() {
|
||||
// make sure to check if the disk is offline, since the underlying
|
||||
// value is cached we should attempt to invalidate it if such calls
|
||||
// were attempted. This can lead to false success under certain conditions
|
||||
|
@ -302,10 +307,9 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
|||
return info, nil
|
||||
} // In all other cases cache the value upto 1sec.
|
||||
|
||||
client.diskInfoCache.InitOnce(time.Second,
|
||||
cachevalue.Opts{CacheError: true},
|
||||
func() (info DiskInfo, err error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
client.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{},
|
||||
func(ctx context.Context) (info DiskInfo, err error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
nopts := DiskInfoOptions{DiskID: *client.diskID.Load(), Metrics: true}
|
||||
|
@ -321,7 +325,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
|||
},
|
||||
)
|
||||
|
||||
return client.diskInfoCache.Get()
|
||||
return client.diskInfoCache.GetWithCtx(ctx)
|
||||
}
|
||||
|
||||
// MakeVolBulk - create multiple volumes in a bulk operation.
|
||||
|
|
|
@ -156,7 +156,7 @@ func veeamSOSAPIGetObject(ctx context.Context, bucket, object string, rs *HTTPRa
|
|||
}
|
||||
|
||||
q, _ := globalBucketQuotaSys.Get(ctx, bucket)
|
||||
binfo, _ := globalBucketQuotaSys.GetBucketUsageInfo(bucket)
|
||||
binfo, _ := globalBucketQuotaSys.GetBucketUsageInfo(ctx, bucket)
|
||||
|
||||
ci := capacityInfo{
|
||||
Used: int64(binfo.Size),
|
||||
|
|
|
@ -99,7 +99,7 @@ type xlStorageDiskIDCheck struct {
|
|||
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
||||
p.metricsCache.InitOnce(5*time.Second,
|
||||
cachevalue.Opts{},
|
||||
func() (DiskMetrics, error) {
|
||||
func(ctx context.Context) (DiskMetrics, error) {
|
||||
diskMetric := DiskMetrics{
|
||||
LastMinute: make(map[string]AccElem, len(p.apiLatencies)),
|
||||
APICalls: make(map[string]uint64, len(p.apiCalls)),
|
||||
|
@ -114,7 +114,7 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
|||
},
|
||||
)
|
||||
|
||||
diskMetric, _ := p.metricsCache.Get()
|
||||
diskMetric, _ := p.metricsCache.GetWithCtx(context.Background())
|
||||
// Do not need this value to be cached.
|
||||
diskMetric.TotalErrorsTimeout = p.totalErrsTimeout.Load()
|
||||
diskMetric.TotalErrorsAvailability = p.totalErrsAvailability.Load()
|
||||
|
|
|
@ -331,7 +331,7 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
|
|||
|
||||
// Initialize DiskInfo cache
|
||||
s.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{},
|
||||
func() (DiskInfo, error) {
|
||||
func(ctx context.Context) (DiskInfo, error) {
|
||||
dcinfo := DiskInfo{}
|
||||
di, err := getDiskInfo(s.drivePath)
|
||||
if err != nil {
|
||||
|
@ -752,8 +752,8 @@ func (s *xlStorage) setWriteAttribute(writeCount uint64) error {
|
|||
|
||||
// DiskInfo provides current information about disk space usage,
|
||||
// total free inodes and underlying filesystem.
|
||||
func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInfo, err error) {
|
||||
info, err = s.diskInfoCache.Get()
|
||||
func (s *xlStorage) DiskInfo(ctx context.Context, _ DiskInfoOptions) (info DiskInfo, err error) {
|
||||
info, err = s.diskInfoCache.GetWithCtx(ctx)
|
||||
info.NRRequests = s.nrRequests
|
||||
info.Rotational = s.rotational
|
||||
info.MountPath = s.drivePath
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package cachevalue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -30,11 +31,6 @@ type Opts struct {
|
|||
// Returns the last good value AND the error.
|
||||
ReturnLastGood bool
|
||||
|
||||
// If CacheError is set, errors will be cached as well
|
||||
// and not continuously try to update.
|
||||
// Should not be combined with ReturnLastGood.
|
||||
CacheError bool
|
||||
|
||||
// If NoWait is set, Get() will return the last good value,
|
||||
// if TTL has expired but 2x TTL has not yet passed,
|
||||
// but will fetch a new value in the background.
|
||||
|
@ -50,7 +46,7 @@ type Cache[T any] struct {
|
|||
// Only one caller will call this function at any time, others will be blocking.
|
||||
// The returned value can no longer be modified once returned.
|
||||
// Should be set before calling Get().
|
||||
updateFn func() (T, error)
|
||||
updateFn func(ctx context.Context) (T, error)
|
||||
|
||||
// ttl for a cached value.
|
||||
ttl time.Duration
|
||||
|
@ -62,10 +58,7 @@ type Cache[T any] struct {
|
|||
Once sync.Once
|
||||
|
||||
// Managed values.
|
||||
valErr atomic.Pointer[struct {
|
||||
v T
|
||||
e error
|
||||
}]
|
||||
val atomic.Pointer[T]
|
||||
lastUpdateMs atomic.Int64
|
||||
updating sync.Mutex
|
||||
}
|
||||
|
@ -78,7 +71,7 @@ func New[T any]() *Cache[T] {
|
|||
|
||||
// NewFromFunc allocates a new cached value instance and initializes it with an
|
||||
// update function, making it ready for use.
|
||||
func NewFromFunc[T any](ttl time.Duration, opts Opts, update func() (T, error)) *Cache[T] {
|
||||
func NewFromFunc[T any](ttl time.Duration, opts Opts, update func(ctx context.Context) (T, error)) *Cache[T] {
|
||||
return &Cache[T]{
|
||||
ttl: ttl,
|
||||
updateFn: update,
|
||||
|
@ -88,7 +81,7 @@ func NewFromFunc[T any](ttl time.Duration, opts Opts, update func() (T, error))
|
|||
|
||||
// InitOnce initializes the cache with a TTL and an update function. It is
|
||||
// guaranteed to be called only once.
|
||||
func (t *Cache[T]) InitOnce(ttl time.Duration, opts Opts, update func() (T, error)) {
|
||||
func (t *Cache[T]) InitOnce(ttl time.Duration, opts Opts, update func(ctx context.Context) (T, error)) {
|
||||
t.Once.Do(func() {
|
||||
t.ttl = ttl
|
||||
t.updateFn = update
|
||||
|
@ -96,61 +89,68 @@ func (t *Cache[T]) InitOnce(ttl time.Duration, opts Opts, update func() (T, erro
|
|||
})
|
||||
}
|
||||
|
||||
// Get will return a cached value or fetch a new one.
|
||||
// GetWithCtx will return a cached value or fetch a new one.
|
||||
// passes a caller context, if caller context cancels nothing
|
||||
// is cached.
|
||||
// Tf the Update function returns an error the value is forwarded as is and not cached.
|
||||
func (t *Cache[T]) Get() (T, error) {
|
||||
v := t.valErr.Load()
|
||||
func (t *Cache[T]) GetWithCtx(ctx context.Context) (T, error) {
|
||||
v := t.val.Load()
|
||||
ttl := t.ttl
|
||||
vTime := t.lastUpdateMs.Load()
|
||||
tNow := time.Now().UnixMilli()
|
||||
if v != nil && tNow-vTime < ttl.Milliseconds() {
|
||||
if v.e == nil {
|
||||
return v.v, nil
|
||||
}
|
||||
if v.e != nil && t.opts.CacheError || t.opts.ReturnLastGood {
|
||||
return v.v, v.e
|
||||
}
|
||||
return *v, nil
|
||||
}
|
||||
|
||||
// Fetch new value.
|
||||
if t.opts.NoWait && v != nil && tNow-vTime < ttl.Milliseconds()*2 && (v.e == nil || t.opts.CacheError) {
|
||||
// Fetch new value asynchronously, while we do not return an error
|
||||
// if v != nil value or
|
||||
if t.opts.NoWait && v != nil && tNow-vTime < ttl.Milliseconds()*2 {
|
||||
if t.updating.TryLock() {
|
||||
go func() {
|
||||
defer t.updating.Unlock()
|
||||
t.update()
|
||||
t.update(context.Background())
|
||||
}()
|
||||
}
|
||||
return v.v, v.e
|
||||
return *v, nil
|
||||
}
|
||||
|
||||
// Get lock. Either we get it or we wait for it.
|
||||
t.updating.Lock()
|
||||
defer t.updating.Unlock()
|
||||
|
||||
if time.Since(time.UnixMilli(t.lastUpdateMs.Load())) < ttl {
|
||||
// There is a new value, release lock and return it.
|
||||
v = t.valErr.Load()
|
||||
t.updating.Unlock()
|
||||
return v.v, v.e
|
||||
}
|
||||
t.update()
|
||||
v = t.valErr.Load()
|
||||
t.updating.Unlock()
|
||||
return v.v, v.e
|
||||
}
|
||||
|
||||
func (t *Cache[T]) update() {
|
||||
val, err := t.updateFn()
|
||||
if err != nil {
|
||||
if t.opts.ReturnLastGood {
|
||||
// Keep last good value.
|
||||
v := t.valErr.Load()
|
||||
if v != nil {
|
||||
val = v.v
|
||||
}
|
||||
if v = t.val.Load(); v != nil {
|
||||
return *v, nil
|
||||
}
|
||||
}
|
||||
t.valErr.Store(&struct {
|
||||
v T
|
||||
e error
|
||||
}{v: val, e: err})
|
||||
t.lastUpdateMs.Store(time.Now().UnixMilli())
|
||||
|
||||
if err := t.update(ctx); err != nil {
|
||||
var empty T
|
||||
return empty, err
|
||||
}
|
||||
|
||||
return *t.val.Load(), nil
|
||||
}
|
||||
|
||||
// Get will return a cached value or fetch a new one.
|
||||
// Tf the Update function returns an error the value is forwarded as is and not cached.
|
||||
func (t *Cache[T]) Get() (T, error) {
|
||||
return t.GetWithCtx(context.Background())
|
||||
}
|
||||
|
||||
func (t *Cache[T]) update(ctx context.Context) error {
|
||||
val, err := t.updateFn(ctx)
|
||||
if err != nil {
|
||||
if t.opts.ReturnLastGood && t.val.Load() != nil {
|
||||
// Keep last good value, so update
|
||||
// does not return an error.
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
t.val.Store(&val)
|
||||
t.lastUpdateMs.Store(time.Now().UnixMilli())
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -18,15 +18,76 @@
|
|||
package cachevalue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func slowCaller(ctx context.Context) error {
|
||||
sl := time.NewTimer(time.Second)
|
||||
defer sl.Stop()
|
||||
|
||||
select {
|
||||
case <-sl.C:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestCacheCtx(t *testing.T) {
|
||||
cache := New[time.Time]()
|
||||
t.Parallel()
|
||||
cache.InitOnce(2*time.Second, Opts{},
|
||||
func(ctx context.Context) (time.Time, error) {
|
||||
return time.Now(), slowCaller(ctx)
|
||||
},
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // cancel context to test.
|
||||
|
||||
_, err := cache.GetWithCtx(ctx)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
t.Fatalf("expected context.Canceled err, got %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
t1, err := cache.GetWithCtx(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("expected nil err, got %v", err)
|
||||
}
|
||||
|
||||
t2, err := cache.GetWithCtx(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("expected nil err, got %v", err)
|
||||
}
|
||||
|
||||
if !t1.Equal(t2) {
|
||||
t.Fatalf("expected time to be equal: %s != %s", t1, t2)
|
||||
}
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
t3, err := cache.GetWithCtx(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("expected nil err, got %v", err)
|
||||
}
|
||||
|
||||
if t1.Equal(t3) {
|
||||
t.Fatalf("expected time to be un-equal: %s == %s", t1, t3)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
cache := New[time.Time]()
|
||||
t.Parallel()
|
||||
cache.InitOnce(2*time.Second, Opts{},
|
||||
func() (time.Time, error) {
|
||||
func(ctx context.Context) (time.Time, error) {
|
||||
return time.Now(), nil
|
||||
},
|
||||
)
|
||||
|
@ -50,7 +111,7 @@ func TestCache(t *testing.T) {
|
|||
func BenchmarkCache(b *testing.B) {
|
||||
cache := New[time.Time]()
|
||||
cache.InitOnce(1*time.Millisecond, Opts{},
|
||||
func() (time.Time, error) {
|
||||
func(ctx context.Context) (time.Time, error) {
|
||||
return time.Now(), nil
|
||||
},
|
||||
)
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"hash"
|
||||
"hash/crc32"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio/internal/hash/sha256"
|
||||
|
@ -71,9 +72,10 @@ const (
|
|||
|
||||
// Checksum is a type and base 64 encoded value.
|
||||
type Checksum struct {
|
||||
Type ChecksumType
|
||||
Encoded string
|
||||
Raw []byte
|
||||
Type ChecksumType
|
||||
Encoded string
|
||||
Raw []byte
|
||||
WantParts int
|
||||
}
|
||||
|
||||
// Is returns if c is all of t.
|
||||
|
@ -260,13 +262,14 @@ func ReadPartCheckSums(b []byte) (res []map[string]string) {
|
|||
}
|
||||
// Skip main checksum
|
||||
b = b[length:]
|
||||
if !typ.Is(ChecksumIncludesMultipart) {
|
||||
continue
|
||||
}
|
||||
parts, n := binary.Uvarint(b)
|
||||
if n <= 0 {
|
||||
break
|
||||
}
|
||||
if !typ.Is(ChecksumIncludesMultipart) {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(res) == 0 {
|
||||
res = make([]map[string]string, parts)
|
||||
}
|
||||
|
@ -292,11 +295,25 @@ func NewChecksumWithType(alg ChecksumType, value string) *Checksum {
|
|||
if !alg.IsSet() {
|
||||
return nil
|
||||
}
|
||||
wantParts := 0
|
||||
if strings.ContainsRune(value, '-') {
|
||||
valSplit := strings.Split(value, "-")
|
||||
if len(valSplit) != 2 {
|
||||
return nil
|
||||
}
|
||||
value = valSplit[0]
|
||||
nParts, err := strconv.Atoi(valSplit[1])
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
alg |= ChecksumMultipart
|
||||
wantParts = nParts
|
||||
}
|
||||
bvalue, err := base64.StdEncoding.DecodeString(value)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
c := Checksum{Type: alg, Encoded: value, Raw: bvalue}
|
||||
c := Checksum{Type: alg, Encoded: value, Raw: bvalue, WantParts: wantParts}
|
||||
if !c.Valid() {
|
||||
return nil
|
||||
}
|
||||
|
@ -325,12 +342,15 @@ func (c *Checksum) AppendTo(b []byte, parts []byte) []byte {
|
|||
b = append(b, crc...)
|
||||
if c.Type.Is(ChecksumMultipart) {
|
||||
var checksums int
|
||||
if c.WantParts > 0 && !c.Type.Is(ChecksumIncludesMultipart) {
|
||||
checksums = c.WantParts
|
||||
}
|
||||
// Ensure we don't divide by 0:
|
||||
if c.Type.RawByteLen() == 0 || len(parts)%c.Type.RawByteLen() != 0 {
|
||||
hashLogIf(context.Background(), fmt.Errorf("internal error: Unexpected checksum length: %d, each checksum %d", len(parts), c.Type.RawByteLen()))
|
||||
checksums = 0
|
||||
parts = nil
|
||||
} else {
|
||||
} else if len(parts) > 0 {
|
||||
checksums = len(parts) / c.Type.RawByteLen()
|
||||
}
|
||||
if !c.Type.Is(ChecksumIncludesMultipart) {
|
||||
|
@ -358,7 +378,7 @@ func (c Checksum) Valid() bool {
|
|||
}
|
||||
|
||||
// Matches returns whether given content matches c.
|
||||
func (c Checksum) Matches(content []byte) error {
|
||||
func (c Checksum) Matches(content []byte, parts int) error {
|
||||
if len(c.Encoded) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -368,6 +388,13 @@ func (c Checksum) Matches(content []byte) error {
|
|||
return err
|
||||
}
|
||||
sum := hasher.Sum(nil)
|
||||
if c.WantParts > 0 && c.WantParts != parts {
|
||||
return ChecksumMismatch{
|
||||
Want: fmt.Sprintf("%s-%d", c.Encoded, c.WantParts),
|
||||
Got: fmt.Sprintf("%s-%d", base64.StdEncoding.EncodeToString(sum), parts),
|
||||
}
|
||||
}
|
||||
|
||||
if !bytes.Equal(sum, c.Raw) {
|
||||
return ChecksumMismatch{
|
||||
Want: c.Encoded,
|
||||
|
|
Loading…
Reference in New Issue