mirror of https://github.com/minio/minio.git
Merge branch 'master' into metrics-v3-cluster-scanner
This commit is contained in:
commit
22a4899c1a
|
@ -23,10 +23,9 @@ http {
|
|||
# include /etc/nginx/conf.d/*.conf;
|
||||
|
||||
upstream minio {
|
||||
server minio1:9000;
|
||||
server minio2:9000;
|
||||
server minio3:9000;
|
||||
server minio4:9000;
|
||||
server minio1:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio2:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio3:9000 max_fails=1 fail_timeout=10s;
|
||||
}
|
||||
|
||||
upstream console {
|
||||
|
|
|
@ -23,14 +23,14 @@ http {
|
|||
# include /etc/nginx/conf.d/*.conf;
|
||||
|
||||
upstream minio {
|
||||
server minio1:9000;
|
||||
server minio2:9000;
|
||||
server minio3:9000;
|
||||
server minio4:9000;
|
||||
server minio5:9000;
|
||||
server minio6:9000;
|
||||
server minio7:9000;
|
||||
server minio8:9000;
|
||||
server minio1:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio2:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio3:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio4:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio5:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio6:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio7:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio8:9000 max_fails=1 fail_timeout=10s;
|
||||
}
|
||||
|
||||
upstream console {
|
||||
|
|
|
@ -23,10 +23,10 @@ http {
|
|||
# include /etc/nginx/conf.d/*.conf;
|
||||
|
||||
upstream minio {
|
||||
server minio1:9000;
|
||||
server minio2:9000;
|
||||
server minio3:9000;
|
||||
server minio4:9000;
|
||||
server minio1:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio2:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio3:9000 max_fails=1 fail_timeout=10s;
|
||||
server minio4:9000 max_fails=1 fail_timeout=10s;
|
||||
}
|
||||
|
||||
upstream console {
|
||||
|
|
|
@ -899,6 +899,20 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
|||
if success {
|
||||
validResp++
|
||||
}
|
||||
|
||||
if totalResp >= minDisks && opts.FastGetObjInfo {
|
||||
rw.Lock()
|
||||
ok := countErrs(errs, errFileNotFound) >= minDisks || countErrs(errs, errFileVersionNotFound) >= minDisks
|
||||
rw.Unlock()
|
||||
if ok {
|
||||
err = errFileNotFound
|
||||
if opts.VersionID != "" {
|
||||
err = errFileVersionNotFound
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if totalResp < er.setDriveCount {
|
||||
if !opts.FastGetObjInfo {
|
||||
continue
|
||||
|
|
|
@ -105,8 +105,8 @@ func (h *metricsV3Server) listMetrics(path string) http.Handler {
|
|||
if collPath.isDescendantOf(path) {
|
||||
if v, ok := h.metricsData.mgMap[collPath]; ok {
|
||||
matchingMG[collPath] = v
|
||||
} else {
|
||||
matchingMG[collPath] = h.metricsData.bucketMGMap[collPath]
|
||||
} else if v, ok := h.metricsData.bucketMGMap[collPath]; ok {
|
||||
matchingMG[collPath] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -391,6 +391,9 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, origvolume, vol
|
|||
}
|
||||
|
||||
func (client *storageRESTClient) WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
|
||||
defer cancel()
|
||||
|
||||
_, err := storageWriteMetadataRPC.Call(ctx, client.gridConn, &MetadataHandlerParams{
|
||||
DiskID: *client.diskID.Load(),
|
||||
OrigVolume: origvolume,
|
||||
|
@ -402,6 +405,9 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, origvolume,
|
|||
}
|
||||
|
||||
func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
|
||||
defer cancel()
|
||||
|
||||
_, err := storageUpdateMetadataRPC.Call(ctx, client.gridConn, &MetadataHandlerParams{
|
||||
DiskID: *client.diskID.Load(),
|
||||
Volume: volume,
|
||||
|
@ -413,6 +419,9 @@ func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, pat
|
|||
}
|
||||
|
||||
func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
|
||||
defer cancel()
|
||||
|
||||
_, err = storageDeleteVersionRPC.Call(ctx, client.gridConn, &DeleteVersionHandlerParams{
|
||||
DiskID: *client.diskID.Load(),
|
||||
Volume: volume,
|
||||
|
@ -426,6 +435,9 @@ func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path
|
|||
|
||||
// WriteAll - write all data to a file.
|
||||
func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, path string, b []byte) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
|
||||
defer cancel()
|
||||
|
||||
_, err := storageWriteAllRPC.Call(ctx, client.gridConn, &WriteAllHandlerParams{
|
||||
DiskID: *client.diskID.Load(),
|
||||
Volume: volume,
|
||||
|
@ -497,6 +509,9 @@ func readMsgpReaderPoolPut(r *msgp.Reader) {
|
|||
}
|
||||
|
||||
func (client *storageRESTClient) ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
|
||||
defer cancel()
|
||||
|
||||
// Use websocket when not reading data.
|
||||
if !opts.ReadData {
|
||||
resp, err := storageReadVersionRPC.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{
|
||||
|
@ -537,6 +552,9 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, origvolume, vo
|
|||
|
||||
// ReadXL - reads all contents of xl.meta of a file.
|
||||
func (client *storageRESTClient) ReadXL(ctx context.Context, volume string, path string, readData bool) (rf RawFileInfo, err error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
|
||||
defer cancel()
|
||||
|
||||
// Use websocket when not reading data.
|
||||
if !readData {
|
||||
resp, err := storageReadXLRPC.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{
|
||||
|
@ -570,6 +588,9 @@ func (client *storageRESTClient) ReadXL(ctx context.Context, volume string, path
|
|||
|
||||
// ReadAll - reads all contents of a file.
|
||||
func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, path string) ([]byte, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
|
||||
defer cancel()
|
||||
|
||||
gridBytes, err := storageReadAllRPC.Call(ctx, client.gridConn, &ReadAllHandlerParams{
|
||||
DiskID: *client.diskID.Load(),
|
||||
Volume: volume,
|
||||
|
|
|
@ -25,6 +25,10 @@ import (
|
|||
"github.com/minio/pkg/v2/env"
|
||||
)
|
||||
|
||||
const (
|
||||
envMaxDriveTimeout = "MINIO_DRIVE_MAX_TIMEOUT"
|
||||
)
|
||||
|
||||
// DefaultKVS - default KVS for drive
|
||||
var DefaultKVS = config.KVS{
|
||||
config.KV{
|
||||
|
@ -65,8 +69,9 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
|
|||
if err = config.CheckValidKeys(config.DriveSubSys, kvs, DefaultKVS); err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
// if not set. Get default value from environment
|
||||
d := kvs.GetWithDefault(MaxTimeout, DefaultKVS)
|
||||
d := env.Get(envMaxDriveTimeout, kvs.GetWithDefault(MaxTimeout, DefaultKVS))
|
||||
if d == "" {
|
||||
d = env.Get("_MINIO_DRIVE_MAX_TIMEOUT", "")
|
||||
if d == "" {
|
||||
|
|
|
@ -331,6 +331,7 @@ func (m *muxClient) handleOneWayStream(respHandler chan<- Response, respServer <
|
|||
if !ok {
|
||||
return
|
||||
}
|
||||
sendResp:
|
||||
select {
|
||||
case respHandler <- resp:
|
||||
m.respMu.Lock()
|
||||
|
@ -341,18 +342,49 @@ func (m *muxClient) handleOneWayStream(respHandler chan<- Response, respServer <
|
|||
case <-m.ctx.Done():
|
||||
// Client canceled. Don't block.
|
||||
// Next loop will catch it.
|
||||
case <-pingTimer:
|
||||
if !m.doPing(respHandler) {
|
||||
return
|
||||
}
|
||||
goto sendResp
|
||||
}
|
||||
case <-pingTimer:
|
||||
if time.Since(time.Unix(atomic.LoadInt64(&m.LastPong), 0)) > clientPingInterval*2 {
|
||||
m.addErrorNonBlockingClose(respHandler, ErrDisconnected)
|
||||
if !m.doPing(respHandler) {
|
||||
return
|
||||
}
|
||||
// Send new ping.
|
||||
gridLogIf(m.ctx, m.send(message{Op: OpPing, MuxID: m.MuxID}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// doPing checks last ping time and sends another ping.
|
||||
func (m *muxClient) doPing(respHandler chan<- Response) (ok bool) {
|
||||
m.respMu.Lock()
|
||||
if m.closed {
|
||||
m.respMu.Unlock()
|
||||
// Already closed. This is not an error state;
|
||||
// we may just be delivering the last responses.
|
||||
return true
|
||||
}
|
||||
|
||||
// Only check ping when not closed.
|
||||
if got := time.Since(time.Unix(atomic.LoadInt64(&m.LastPong), 0)); got > clientPingInterval*2 {
|
||||
m.respMu.Unlock()
|
||||
if debugPrint {
|
||||
fmt.Printf("Mux %d: last pong %v ago, disconnecting\n", m.MuxID, got)
|
||||
}
|
||||
m.addErrorNonBlockingClose(respHandler, ErrDisconnected)
|
||||
return false
|
||||
}
|
||||
|
||||
// Send new ping
|
||||
err := m.sendLocked(message{Op: OpPing, MuxID: m.MuxID})
|
||||
m.respMu.Unlock()
|
||||
if err != nil {
|
||||
m.addErrorNonBlockingClose(respHandler, err)
|
||||
}
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// responseCh is the channel to that goes to the requester.
|
||||
// internalResp is the channel that comes from the server.
|
||||
func (m *muxClient) handleTwowayResponses(responseCh chan<- Response, internalResp <-chan Response) {
|
||||
|
|
Loading…
Reference in New Issue