Compare commits

...

3 Commits

Author SHA1 Message Date
Bala FA 11aaf7bbf2
Merge d8ef595a24 into ec49fff583 2024-05-08 23:07:22 +05:30
Klaus Post ec49fff583
Accept multipart checksums with part count (#19680)
Accept multipart uploads where the combined checksum provides the expected part count.

It seems this was added by AWS to make the API more consistent, even if the 
data is entirely superfluous on multiple levels.

Improves AWS S3 compatibility.
2024-05-08 09:18:34 -07:00
Bala.FA d8ef595a24 Add cluster ILM metrics in metrics-v3
Signed-off-by: Bala.FA <bala@minio.io>
2024-05-07 08:40:12 +05:30
6 changed files with 113 additions and 13 deletions

View File

@ -1231,7 +1231,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
}
if opts.WantChecksum != nil {
err := opts.WantChecksum.Matches(checksumCombined)
err := opts.WantChecksum.Matches(checksumCombined, len(parts))
if err != nil {
return oi, err
}

View File

@ -273,13 +273,10 @@ const (
vmemory = "virtual_memory_bytes"
cpu = "cpu_total_seconds"
expiryPendingTasks MetricName = "expiry_pending_tasks"
expiryMissedTasks MetricName = "expiry_missed_tasks"
expiryMissedFreeVersions MetricName = "expiry_missed_freeversions"
expiryMissedTierJournalTasks MetricName = "expiry_missed_tierjournal_tasks"
expiryNumWorkers MetricName = "expiry_num_workers"
transitionPendingTasks MetricName = "transition_pending_tasks"
transitionActiveTasks MetricName = "transition_active_tasks"
transitionMissedTasks MetricName = "transition_missed_immediate_tasks"
transitionedBytes MetricName = "transitioned_bytes"

View File

@ -0,0 +1,53 @@
// Copyright (c) 2024 MinIO, Inc.
//
// # This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
)
const (
expiryPendingTasks = "expiry_pending_tasks"
transitionActiveTasks = "transition_active_tasks"
transitionMissedImmediateTasks = "transition_missed_immediate_tasks"
transitionPendingTasks = "transition_pending_tasks"
versionsScanned = "versions_scanned"
)
var (
ilmExpiryPendingTasksMD = NewCounterMD(expiryPendingTasks, "Number of pending ILM expiry tasks in the queue")
ilmTransitionActiveTasksMD = NewCounterMD(transitionActiveTasks, "Number of active ILM transition tasks")
ilmTransitionMissedImmediateTasksMD = NewCounterMD(transitionMissedImmediateTasks, "Number of missed immediate ILM transition tasks")
ilmTransitionPendingTasksMD = NewCounterMD(transitionPendingTasks, "Number of pending ILM transition tasks in the queue")
ilmVersionsScannedMD = NewCounterMD(versionsScanned, "Total number of object versions scanned since server start")
)
// loadClusterILMMetrics - `MetricsLoaderFn` for cluster ILM metrics.
func loadClusterILMMetrics(_ context.Context, m MetricValues, _ *metricsCache) error {
if globalExpiryState != nil {
m.Set(expiryPendingTasks, float64(globalExpiryState.PendingTasks()))
}
if globalTransitionState != nil {
m.Set(transitionActiveTasks, float64(globalTransitionState.ActiveTasks()))
m.Set(transitionMissedImmediateTasks, float64(globalTransitionState.MissedImmediateTasks()))
m.Set(transitionPendingTasks, float64(globalTransitionState.PendingTasks()))
}
m.Set(versionsScanned, float64(globalScannerMetrics.lifetime(scannerMetricApplyVersion)))
return nil
}

View File

@ -51,6 +51,7 @@ const (
clusterAuditCollectorPath collectorPath = "/cluster/audit"
clusterNotificationCollectorPath collectorPath = "/cluster/notification"
clusterIAMCollectorPath collectorPath = "/cluster/iam"
clusterILMCollectorPath collectorPath = "/cluster/ilm"
)
const (
@ -299,6 +300,17 @@ func newMetricGroups(r *prometheus.Registry) *metricsV3Collection {
loadClusterIAMMetrics,
)
clusterILMMG := NewMetricsGroup(clusterILMCollectorPath,
[]MetricDescriptor{
ilmExpiryPendingTasksMD,
ilmTransitionActiveTasksMD,
ilmTransitionMissedImmediateTasksMD,
ilmTransitionPendingTasksMD,
ilmVersionsScannedMD,
},
loadClusterILMMetrics,
)
allMetricGroups := []*MetricsGroup{
apiRequestsMG,
apiBucketMG,
@ -316,6 +328,7 @@ func newMetricGroups(r *prometheus.Registry) *metricsV3Collection {
clusterAuditMG,
clusterNotificationMG,
clusterIAMMG,
clusterILMMG,
}
// Bucket metrics are special, they always include the bucket label. These

View File

@ -268,3 +268,13 @@ The standard metrics group for GoCollector is not shown below.
| `minio_cluster_iam_since_last_sync_millis` | `counter` | Time (in milliseconds) since last successful IAM data sync | |
| `minio_cluster_iam_sync_failures` | `counter` | Number of failed IAM data syncs since server start | |
| `minio_cluster_iam_sync_successes` | `counter` | Number of successful IAM data syncs since server start | |
### `/cluster/ilm`
| Name | Type | Help | Labels |
|-------------------------------------------------------|-----------|------------------------------------------------------------|--------|
| `minio_cluster_ilm_expiry_pending_tasks` | `counter` | Number of pending ILM expiry tasks in the queue | |
| `minio_cluster_ilm_transition_active_tasks` | `counter` | Number of active ILM transition tasks | |
| `minio_cluster_ilm_transition_missed_immediate_tasks` | `counter` | Number of missed immediate ILM transition tasks | |
| `minio_cluster_ilm_transition_pending_tasks` | `counter` | Number of pending ILM transition tasks in the queue | |
| `minio_cluster_ilm_versions_scanned` | `counter` | Total number of object versions scanned since server start | |

View File

@ -27,6 +27,7 @@ import (
"hash"
"hash/crc32"
"net/http"
"strconv"
"strings"
"github.com/minio/minio/internal/hash/sha256"
@ -71,9 +72,10 @@ const (
// Checksum is a type and base 64 encoded value.
type Checksum struct {
Type ChecksumType
Encoded string
Raw []byte
Type ChecksumType
Encoded string
Raw []byte
WantParts int
}
// Is returns if c is all of t.
@ -260,13 +262,14 @@ func ReadPartCheckSums(b []byte) (res []map[string]string) {
}
// Skip main checksum
b = b[length:]
if !typ.Is(ChecksumIncludesMultipart) {
continue
}
parts, n := binary.Uvarint(b)
if n <= 0 {
break
}
if !typ.Is(ChecksumIncludesMultipart) {
continue
}
if len(res) == 0 {
res = make([]map[string]string, parts)
}
@ -292,11 +295,25 @@ func NewChecksumWithType(alg ChecksumType, value string) *Checksum {
if !alg.IsSet() {
return nil
}
wantParts := 0
if strings.ContainsRune(value, '-') {
valSplit := strings.Split(value, "-")
if len(valSplit) != 2 {
return nil
}
value = valSplit[0]
nParts, err := strconv.Atoi(valSplit[1])
if err != nil {
return nil
}
alg |= ChecksumMultipart
wantParts = nParts
}
bvalue, err := base64.StdEncoding.DecodeString(value)
if err != nil {
return nil
}
c := Checksum{Type: alg, Encoded: value, Raw: bvalue}
c := Checksum{Type: alg, Encoded: value, Raw: bvalue, WantParts: wantParts}
if !c.Valid() {
return nil
}
@ -325,12 +342,15 @@ func (c *Checksum) AppendTo(b []byte, parts []byte) []byte {
b = append(b, crc...)
if c.Type.Is(ChecksumMultipart) {
var checksums int
if c.WantParts > 0 && !c.Type.Is(ChecksumIncludesMultipart) {
checksums = c.WantParts
}
// Ensure we don't divide by 0:
if c.Type.RawByteLen() == 0 || len(parts)%c.Type.RawByteLen() != 0 {
hashLogIf(context.Background(), fmt.Errorf("internal error: Unexpected checksum length: %d, each checksum %d", len(parts), c.Type.RawByteLen()))
checksums = 0
parts = nil
} else {
} else if len(parts) > 0 {
checksums = len(parts) / c.Type.RawByteLen()
}
if !c.Type.Is(ChecksumIncludesMultipart) {
@ -358,7 +378,7 @@ func (c Checksum) Valid() bool {
}
// Matches returns whether given content matches c.
func (c Checksum) Matches(content []byte) error {
func (c Checksum) Matches(content []byte, parts int) error {
if len(c.Encoded) == 0 {
return nil
}
@ -368,6 +388,13 @@ func (c Checksum) Matches(content []byte) error {
return err
}
sum := hasher.Sum(nil)
if c.WantParts > 0 && c.WantParts != parts {
return ChecksumMismatch{
Want: fmt.Sprintf("%s-%d", c.Encoded, c.WantParts),
Got: fmt.Sprintf("%s-%d", base64.StdEncoding.EncodeToString(sum), parts),
}
}
if !bytes.Equal(sum, c.Raw) {
return ChecksumMismatch{
Want: c.Encoded,