Compare commits
6 Commits
RELEASE.20
...
RELEASE.20
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9dc061d84 | ||
|
|
01a8c09920 | ||
|
|
4c8562bcec | ||
|
|
f13c04629b | ||
|
|
80ff907d08 | ||
|
|
673df6d517 |
@@ -761,6 +761,15 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
|
||||
return nil
|
||||
}
|
||||
|
||||
countOKDrives := func(drives []madmin.HealDriveInfo) (count int) {
|
||||
for _, drive := range drives {
|
||||
if drive.State == madmin.DriveStateOk {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// task queued, now wait for the response.
|
||||
select {
|
||||
case res := <-task.respCh:
|
||||
@@ -781,6 +790,11 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
|
||||
if res.err != nil {
|
||||
res.result.Detail = res.err.Error()
|
||||
}
|
||||
if res.result.ParityBlocks > 0 && res.result.DataBlocks > 0 && res.result.DataBlocks > res.result.ParityBlocks {
|
||||
if got := countOKDrives(res.result.After.Drives); got < res.result.ParityBlocks {
|
||||
res.result.Detail = fmt.Sprintf("quorum loss - expected %d minimum, got drive states in OK %d", res.result.ParityBlocks, got)
|
||||
}
|
||||
}
|
||||
return h.pushHealResultItem(res.result)
|
||||
case <-h.ctx.Done():
|
||||
return nil
|
||||
|
||||
@@ -103,10 +103,9 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
|
||||
return fi, partsMetadata, err
|
||||
}
|
||||
|
||||
// Removes part.meta given by partName belonging to a multipart upload from minioMetaBucket
|
||||
func (er erasureObjects) removePartMeta(bucket, object, uploadID, dataDir string, partNumber int) {
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
// cleanMultipartPath removes all extraneous files and parts from the multipart folder, this is used per CompleteMultipart.
|
||||
// do not use this function outside of completeMultipartUpload()
|
||||
func (er erasureObjects) cleanupMultipartPath(ctx context.Context, paths ...string) {
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
g := errgroup.WithNErrs(len(storageDisks))
|
||||
@@ -116,42 +115,7 @@ func (er erasureObjects) removePartMeta(bucket, object, uploadID, dataDir string
|
||||
}
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{
|
||||
Recursive: false,
|
||||
Immediate: false,
|
||||
})
|
||||
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
g.Wait()
|
||||
}
|
||||
|
||||
// Removes part given by partName belonging to a multipart upload from minioMetaBucket
|
||||
func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir string, partNumber int) {
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
storageDisks := er.getDisks()
|
||||
|
||||
g := errgroup.WithNErrs(len(storageDisks))
|
||||
for index, disk := range storageDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
// Ignoring failure to remove parts that weren't present in CompleteMultipartUpload
|
||||
// requests. xl.meta is the authoritative source of truth on which parts constitute
|
||||
// the object. The presence of parts that don't belong in the object doesn't affect correctness.
|
||||
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath, DeleteOptions{
|
||||
Recursive: false,
|
||||
Immediate: false,
|
||||
})
|
||||
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{
|
||||
Recursive: false,
|
||||
Immediate: false,
|
||||
})
|
||||
|
||||
_ = storageDisks[index].DeleteBulk(ctx, minioMetaMultipartBucket, paths...)
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
@@ -159,7 +123,7 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri
|
||||
}
|
||||
|
||||
// Clean-up the old multipart uploads. Should be run in a Go routine.
|
||||
func (er erasureObjects) cleanupStaleUploads(ctx context.Context, expiry time.Duration) {
|
||||
func (er erasureObjects) cleanupStaleUploads(ctx context.Context) {
|
||||
// run multiple cleanup's local to this server.
|
||||
var wg sync.WaitGroup
|
||||
for _, disk := range er.getLocalDisks() {
|
||||
@@ -167,7 +131,7 @@ func (er erasureObjects) cleanupStaleUploads(ctx context.Context, expiry time.Du
|
||||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
er.cleanupStaleUploadsOnDisk(ctx, disk, expiry)
|
||||
er.cleanupStaleUploadsOnDisk(ctx, disk)
|
||||
}(disk)
|
||||
}
|
||||
}
|
||||
@@ -193,7 +157,7 @@ func (er erasureObjects) deleteAll(ctx context.Context, bucket, prefix string) {
|
||||
}
|
||||
|
||||
// Remove the old multipart uploads on the given disk.
|
||||
func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) {
|
||||
func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI) {
|
||||
drivePath := disk.Endpoint().Path
|
||||
|
||||
readDirFn(pathJoin(drivePath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
|
||||
@@ -219,7 +183,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
||||
modTime = fi.ModTime
|
||||
wait()
|
||||
}
|
||||
if time.Since(modTime) < expiry {
|
||||
if time.Since(modTime) < globalAPIConfig.getStaleUploadsExpiry() {
|
||||
return nil
|
||||
}
|
||||
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
|
||||
@@ -238,7 +202,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
||||
return nil
|
||||
}
|
||||
// Modtime is returned in the Created field. See (*xlStorage).StatVol
|
||||
if time.Since(vi.Created) < expiry {
|
||||
if time.Since(vi.Created) < globalAPIConfig.getStaleUploadsExpiry() {
|
||||
return nil
|
||||
}
|
||||
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
|
||||
@@ -267,7 +231,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
||||
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
|
||||
return w.Run(func() error {
|
||||
wait := deleteMultipartCleanupSleeper.Timer(ctx)
|
||||
if time.Since(vi.Created) > expiry {
|
||||
if time.Since(vi.Created) > globalAPIConfig.getStaleUploadsExpiry() {
|
||||
pathUUID := mustGetUUID()
|
||||
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
|
||||
|
||||
@@ -1359,10 +1323,10 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
}
|
||||
}
|
||||
|
||||
paths := make([]string, 0, len(currentFI.Parts))
|
||||
// Remove parts that weren't present in CompleteMultipartUpload request.
|
||||
for _, curpart := range currentFI.Parts {
|
||||
// Remove part.meta which is not needed anymore.
|
||||
er.removePartMeta(bucket, object, uploadID, currentFI.DataDir, curpart.Number)
|
||||
paths = append(paths, pathJoin(uploadIDPath, currentFI.DataDir, fmt.Sprintf("part.%d.meta", curpart.Number)))
|
||||
|
||||
if objectPartIndex(fi.Parts, curpart.Number) == -1 {
|
||||
// Delete the missing part files. e.g,
|
||||
@@ -1371,10 +1335,12 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
// Request 3: PutObjectPart 2
|
||||
// Request 4: CompleteMultipartUpload --part 2
|
||||
// N.B. 1st part is not present. This part should be removed from the storage.
|
||||
er.removeObjectPart(bucket, object, uploadID, currentFI.DataDir, curpart.Number)
|
||||
paths = append(paths, pathJoin(uploadIDPath, currentFI.DataDir, fmt.Sprintf("part.%d", curpart.Number)))
|
||||
}
|
||||
}
|
||||
|
||||
er.cleanupMultipartPath(ctx, paths...) // cleanup all part.N.meta, and skipped part.N's before final rename().
|
||||
|
||||
defer func() {
|
||||
if err == nil {
|
||||
er.deleteAll(context.Background(), minioMetaMultipartBucket, uploadIDPath)
|
||||
|
||||
@@ -525,7 +525,7 @@ func (s *erasureSets) cleanupStaleUploads(ctx context.Context) {
|
||||
if set == nil {
|
||||
return
|
||||
}
|
||||
set.cleanupStaleUploads(ctx, globalAPIConfig.getStaleUploadsExpiry())
|
||||
set.cleanupStaleUploads(ctx)
|
||||
}(set)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
120
cmd/fmt-gen.go
Normal file
120
cmd/fmt-gen.go
Normal file
@@ -0,0 +1,120 @@
|
||||
// Copyright (c) 2015-2024 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//go:build fmtgen
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/klauspost/compress/zip"
|
||||
"github.com/minio/cli"
|
||||
)
|
||||
|
||||
var fmtGenFlags = []cli.Flag{
|
||||
cli.IntFlag{
|
||||
Name: "parity",
|
||||
Usage: "specify erasure code parity",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "deployment-id",
|
||||
Usage: "deployment-id of the MinIO cluster for which format.json is needed",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "address",
|
||||
Value: ":" + GlobalMinioDefaultPort,
|
||||
Usage: "bind to a specific ADDRESS:PORT, ADDRESS can be an IP or hostname",
|
||||
EnvVar: "MINIO_ADDRESS",
|
||||
},
|
||||
}
|
||||
|
||||
var fmtGenCmd = cli.Command{
|
||||
Name: "fmt-gen",
|
||||
Usage: "Generate format.json files for an erasure server pool",
|
||||
Flags: append(fmtGenFlags, GlobalFlags...),
|
||||
Action: fmtGenMain,
|
||||
CustomHelpTemplate: `NAME:
|
||||
{{.HelpName}} - {{.Usage}}
|
||||
|
||||
USAGE:
|
||||
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR1 [DIR2..]
|
||||
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR{1...64}
|
||||
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR{1...64} DIR{65...128}
|
||||
|
||||
DIR:
|
||||
DIR points to a directory on a filesystem. When you want to combine
|
||||
multiple drives into a single large system, pass one directory per
|
||||
filesystem separated by space. You may also use a '...' convention
|
||||
to abbreviate the directory arguments. Remote directories in a
|
||||
distributed setup are encoded as HTTP(s) URIs.
|
||||
{{if .VisibleFlags}}
|
||||
FLAGS:
|
||||
{{range .VisibleFlags}}{{.}}
|
||||
{{end}}{{end}}
|
||||
EXAMPLES:
|
||||
1. Generate format.json.zip containing format.json files for all drives in a distributed MinIO server pool of 32 nodes with 32 drives each.
|
||||
{{.Prompt}} {{.HelpName}} http://node{1...32}.example.com/mnt/export{1...32}
|
||||
|
||||
`,
|
||||
}
|
||||
|
||||
func fmtGenMain(ctxt *cli.Context) {
|
||||
deploymentID := ctxt.String("deployment-id")
|
||||
err := buildServerCtxt(ctxt, &globalServerCtxt)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
handleCommonArgs(globalServerCtxt)
|
||||
pools, _, err := createServerEndpoints(globalMinioAddr, globalServerCtxt.Layout.pools, globalServerCtxt.Layout.legacy)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
zipFile, err := os.Create("format.json.zip")
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create format.json.zip: %v", err)
|
||||
}
|
||||
defer zipFile.Close()
|
||||
fmtZipW := zip.NewWriter(zipFile)
|
||||
defer fmtZipW.Close()
|
||||
for _, pool := range pools { // for each pool
|
||||
setCount, setDriveCount := pool.SetCount, pool.DrivesPerSet
|
||||
format := newFormatErasureV3(setCount, setDriveCount)
|
||||
format.ID = deploymentID
|
||||
for i := 0; i < setCount; i++ { // for each erasure set
|
||||
for j := 0; j < setDriveCount; j++ {
|
||||
newFormat := format.Clone()
|
||||
newFormat.Erasure.This = format.Erasure.Sets[i][j]
|
||||
if deploymentID != "" {
|
||||
newFormat.ID = deploymentID
|
||||
}
|
||||
drive := pool.Endpoints[i*setDriveCount+j]
|
||||
fmtBytes, err := json.Marshal(newFormat)
|
||||
if err != nil {
|
||||
//nolint:gocritic
|
||||
log.Fatalf("failed to marshal format.json for %s: %v", drive.String(), err)
|
||||
}
|
||||
fmtJSON := filepath.Join(drive.Host, drive.Path, minioMetaBucket, "format.json")
|
||||
embedFileInZip(fmtZipW, fmtJSON, fmtBytes, 0o600)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
24
cmd/fmt-gen_notsupported.go
Normal file
24
cmd/fmt-gen_notsupported.go
Normal file
@@ -0,0 +1,24 @@
|
||||
// Copyright (c) 2015-2024 MinIO, Inc.
|
||||
//
|
||||
// # This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//go:build !fmtgen
|
||||
|
||||
package cmd
|
||||
|
||||
import "github.com/minio/cli"
|
||||
|
||||
var fmtGenCmd cli.Command
|
||||
@@ -150,6 +150,11 @@ type healEntryResult struct {
|
||||
|
||||
// healErasureSet lists and heals all objects in a specific erasure set
|
||||
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error {
|
||||
bgSeq, found := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||
if !found {
|
||||
return errors.New("no local healing sequence initialized, unable to heal the drive")
|
||||
}
|
||||
|
||||
scanMode := madmin.HealNormalScan
|
||||
|
||||
// Make sure to copy since `buckets slice`
|
||||
@@ -163,11 +168,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
}
|
||||
|
||||
for _, bucket := range healBuckets {
|
||||
_, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{
|
||||
Recreate: true,
|
||||
ScanMode: scanMode,
|
||||
})
|
||||
if err != nil {
|
||||
if err := bgSeq.healBucket(objAPI, bucket, true); err != nil {
|
||||
// Log bucket healing error if any, we shall retry again.
|
||||
healingLogIf(ctx, err)
|
||||
}
|
||||
@@ -264,10 +265,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
tracker.setBucket(bucket)
|
||||
// Heal current bucket again in case if it is failed
|
||||
// in the beginning of erasure set healing
|
||||
if _, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{
|
||||
Recreate: true,
|
||||
ScanMode: scanMode,
|
||||
}); err != nil {
|
||||
if err := bgSeq.healBucket(objAPI, bucket, true); err != nil {
|
||||
// Set this such that when we return this function
|
||||
// we let the caller retry this disk again for the
|
||||
// buckets that failed healing.
|
||||
@@ -366,6 +364,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
}
|
||||
return false
|
||||
case results <- result:
|
||||
bgSeq.countScanned(madmin.HealItemObject)
|
||||
return true
|
||||
}
|
||||
}
|
||||
@@ -416,8 +415,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
return
|
||||
}
|
||||
result = healEntryFailure(0)
|
||||
bgSeq.countFailed(madmin.HealItemObject)
|
||||
healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, entry.name, err))
|
||||
} else {
|
||||
bgSeq.countHealed(madmin.HealItemObject)
|
||||
result = healEntrySuccess(uint64(res.ObjectSize))
|
||||
}
|
||||
|
||||
@@ -463,8 +464,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
}
|
||||
|
||||
if versionHealed {
|
||||
bgSeq.countHealed(madmin.HealItemObject)
|
||||
result = healEntrySuccess(uint64(version.Size))
|
||||
} else {
|
||||
bgSeq.countFailed(madmin.HealItemObject)
|
||||
result = healEntryFailure(uint64(version.Size))
|
||||
if version.VersionID != "" {
|
||||
healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s-v(%s): %w", bucket, version.Name, version.VersionID, err))
|
||||
|
||||
@@ -27,10 +27,6 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
apiGetObject = "GetObject"
|
||||
)
|
||||
|
||||
// connStats - Network statistics
|
||||
// Count total input/output transferred bytes during
|
||||
// the server's life.
|
||||
@@ -132,7 +128,7 @@ func (bh *bucketHTTPStats) updateHTTPStats(bucket, api string, w *xhttp.Response
|
||||
return
|
||||
}
|
||||
|
||||
if w != nil && api == apiGetObject {
|
||||
if w != nil {
|
||||
// Increment the prometheus http request response histogram with API, Bucket
|
||||
bucketHTTPRequestsDuration.With(prometheus.Labels{
|
||||
"api": api,
|
||||
@@ -437,9 +433,7 @@ func (st *HTTPStats) updateStats(api string, w *xhttp.ResponseRecorder) {
|
||||
st.totalS3Requests.Inc(api)
|
||||
|
||||
// Increment the prometheus http request response histogram with appropriate label
|
||||
if api == apiGetObject {
|
||||
httpRequestsDuration.With(prometheus.Labels{"api": api}).Observe(w.TimeToFirstByte.Seconds())
|
||||
}
|
||||
httpRequestsDuration.With(prometheus.Labels{"api": api}).Observe(w.TimeToFirstByte.Seconds())
|
||||
|
||||
code := w.StatusCode
|
||||
|
||||
|
||||
@@ -107,6 +107,11 @@ func newApp(name string) *cli.App {
|
||||
|
||||
// registerCommand registers a cli command.
|
||||
registerCommand := func(command cli.Command) {
|
||||
// avoid registering commands which are not being built (via
|
||||
// go:build tags)
|
||||
if command.Name == "" {
|
||||
return
|
||||
}
|
||||
commands = append(commands, command)
|
||||
commandsTree.Insert(command.Name)
|
||||
}
|
||||
@@ -134,6 +139,7 @@ func newApp(name string) *cli.App {
|
||||
|
||||
// Register all commands.
|
||||
registerCommand(serverCmd)
|
||||
registerCommand(fmtGenCmd)
|
||||
|
||||
// Set up app.
|
||||
cli.HelpFlag = cli.BoolFlag{
|
||||
|
||||
@@ -19,6 +19,8 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/minio/minio-go/v7/pkg/set"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -127,7 +129,8 @@ func loadAPIRequestsHTTPMetrics(ctx context.Context, m MetricValues, _ *metricsC
|
||||
// This is a `MetricsLoaderFn`.
|
||||
func loadAPIRequestsTTFBMetrics(ctx context.Context, m MetricValues, _ *metricsCache) error {
|
||||
renameLabels := map[string]string{"api": "name"}
|
||||
m.SetHistogram(apiRequestsTTFBSecondsDistribution, httpRequestsDuration, renameLabels, nil,
|
||||
labelsFilter := map[string]set.StringSet{"api": set.CreateStringSet("GetObject")}
|
||||
m.SetHistogram(apiRequestsTTFBSecondsDistribution, httpRequestsDuration, labelsFilter, renameLabels, nil,
|
||||
"type", "s3")
|
||||
return nil
|
||||
}
|
||||
@@ -214,7 +217,8 @@ func loadBucketAPIHTTPMetrics(ctx context.Context, m MetricValues, _ *metricsCac
|
||||
// This is a `MetricsLoaderFn`.
|
||||
func loadBucketAPITTFBMetrics(ctx context.Context, m MetricValues, _ *metricsCache, buckets []string) error {
|
||||
renameLabels := map[string]string{"api": "name"}
|
||||
m.SetHistogram(apiRequestsTTFBSecondsDistribution, bucketHTTPRequestsDuration, renameLabels,
|
||||
labelsFilter := map[string]set.StringSet{"api": set.CreateStringSet("GetObject")}
|
||||
m.SetHistogram(apiRequestsTTFBSecondsDistribution, bucketHTTPRequestsDuration, labelsFilter, renameLabels,
|
||||
buckets, "type", "s3")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio-go/v7/pkg/set"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -247,28 +248,38 @@ func (m *MetricValues) Set(name MetricName, value float64, labels ...string) {
|
||||
// SetHistogram - sets values for the given MetricName using the provided
|
||||
// histogram.
|
||||
//
|
||||
// `filterByLabels` is a map of label names to list of allowed label values to
|
||||
// filter by. Note that this filtering happens before any renaming of labels.
|
||||
//
|
||||
// `renameLabels` is a map of label names to rename. The keys are the original
|
||||
// label names and the values are the new label names.
|
||||
//
|
||||
// TODO: bucketFilter doc
|
||||
// `bucketFilter` is a list of bucket values to filter. If this is non-empty,
|
||||
// only metrics for the given buckets are added.
|
||||
//
|
||||
// `extraLabels` are additional labels to add to each metric. They are ordered
|
||||
// label name and value pairs.
|
||||
func (m *MetricValues) SetHistogram(name MetricName, hist *prometheus.HistogramVec,
|
||||
renameLabels map[string]string, bucketFilter []string, extraLabels ...string,
|
||||
filterByLabels map[string]set.StringSet, renameLabels map[string]string, bucketFilter []string,
|
||||
extraLabels ...string,
|
||||
) {
|
||||
if _, ok := m.descriptors[name]; !ok {
|
||||
panic(fmt.Sprintf("metric has no description: %s", name))
|
||||
}
|
||||
dummyDesc := MetricDescription{}
|
||||
metricsV2 := getHistogramMetrics(hist, dummyDesc, false)
|
||||
mainLoop:
|
||||
for _, metric := range metricsV2 {
|
||||
for label, allowedValues := range filterByLabels {
|
||||
if !allowedValues.Contains(metric.VariableLabels[label]) {
|
||||
continue mainLoop
|
||||
}
|
||||
}
|
||||
|
||||
// If a bucket filter is provided, only add metrics for the given
|
||||
// buckets.
|
||||
if len(bucketFilter) > 0 {
|
||||
if !slices.Contains(bucketFilter, metric.VariableLabels["bucket"]) {
|
||||
continue
|
||||
}
|
||||
if len(bucketFilter) > 0 && !slices.Contains(bucketFilter, metric.VariableLabels["bucket"]) {
|
||||
continue
|
||||
}
|
||||
|
||||
labels := make([]string, 0, len(metric.VariableLabels)*2)
|
||||
|
||||
@@ -222,6 +222,13 @@ func (d *naughtyDisk) CheckParts(ctx context.Context, volume string, path string
|
||||
return d.disk.CheckParts(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return err
|
||||
}
|
||||
return d.disk.DeleteBulk(ctx, volume, paths...)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return err
|
||||
|
||||
@@ -392,24 +392,24 @@ func newFileInfo(object string, dataBlocks, parityBlocks int) (fi FileInfo) {
|
||||
|
||||
// ReadMultipleReq contains information of multiple files to read from disk.
|
||||
type ReadMultipleReq struct {
|
||||
Bucket string // Bucket. Can be empty if multiple buckets.
|
||||
Prefix string // Shared prefix of all files. Can be empty. Will be joined to filename without modification.
|
||||
Files []string // Individual files to read.
|
||||
MaxSize int64 // Return error if size is exceed.
|
||||
MetadataOnly bool // Read as XL meta and truncate data.
|
||||
AbortOn404 bool // Stop reading after first file not found.
|
||||
MaxResults int // Stop after this many successful results. <= 0 means all.
|
||||
Bucket string `msg:"bk"` // Bucket. Can be empty if multiple buckets.
|
||||
Prefix string `msg:"pr,omitempty"` // Shared prefix of all files. Can be empty. Will be joined to filename without modification.
|
||||
Files []string `msg:"fl"` // Individual files to read.
|
||||
MaxSize int64 `msg:"ms"` // Return error if size is exceed.
|
||||
MetadataOnly bool `msg:"mo"` // Read as XL meta and truncate data.
|
||||
AbortOn404 bool `msg:"ab"` // Stop reading after first file not found.
|
||||
MaxResults int `msg:"mr"` // Stop after this many successful results. <= 0 means all.
|
||||
}
|
||||
|
||||
// ReadMultipleResp contains a single response from a ReadMultipleReq.
|
||||
type ReadMultipleResp struct {
|
||||
Bucket string // Bucket as given by request.
|
||||
Prefix string // Prefix as given by request.
|
||||
File string // File name as given in request.
|
||||
Exists bool // Returns whether the file existed on disk.
|
||||
Error string // Returns any error when reading.
|
||||
Data []byte // Contains all data of file.
|
||||
Modtime time.Time // Modtime of file on disk.
|
||||
Bucket string `msg:"bk"` // Bucket as given by request.
|
||||
Prefix string `msg:"pr,omitempty"` // Prefix as given by request.
|
||||
File string `msg:"fl"` // File name as given in request.
|
||||
Exists bool `msg:"ex"` // Returns whether the file existed on disk.
|
||||
Error string `msg:"er,omitempty"` // Returns any error when reading.
|
||||
Data []byte `msg:"d"` // Contains all data of file.
|
||||
Modtime time.Time `msg:"m"` // Modtime of file on disk.
|
||||
}
|
||||
|
||||
// DeleteVersionHandlerParams are parameters for DeleteVersionHandler
|
||||
@@ -516,8 +516,8 @@ type WriteAllHandlerParams struct {
|
||||
// only after as a 2-phase call, allowing the older dataDir to
|
||||
// hang-around in-case we need some form of recovery.
|
||||
type RenameDataResp struct {
|
||||
Sign []byte
|
||||
OldDataDir string // contains '<uuid>', it is designed to be passed as value to Delete(bucket, pathJoin(object, dataDir))
|
||||
Sign []byte `msg:"s"`
|
||||
OldDataDir string `msg:"od"` // contains '<uuid>', it is designed to be passed as value to Delete(bucket, pathJoin(object, dataDir))
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -534,15 +534,26 @@ const (
|
||||
|
||||
// CheckPartsResp is a response of the storage CheckParts and VerifyFile APIs
|
||||
type CheckPartsResp struct {
|
||||
Results []int
|
||||
Results []int `msg:"r"`
|
||||
}
|
||||
|
||||
// LocalDiskIDs - GetLocalIDs response.
|
||||
type LocalDiskIDs struct {
|
||||
IDs []string
|
||||
IDs []string `msg:"i"`
|
||||
}
|
||||
|
||||
// ListDirResult - ListDir()'s response.
|
||||
type ListDirResult struct {
|
||||
Entries []string `msg:"e"`
|
||||
}
|
||||
|
||||
// DeleteBulkReq - send multiple paths in same delete request.
|
||||
type DeleteBulkReq struct {
|
||||
Paths []string `msg:"p"`
|
||||
}
|
||||
|
||||
// DeleteVersionsErrsResp - collection of delete errors
|
||||
// for bulk version deletes
|
||||
type DeleteVersionsErrsResp struct {
|
||||
Errs []string `msg:"e"`
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -348,6 +348,119 @@ func BenchmarkDecodeCheckPartsResp(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalDeleteBulkReq(t *testing.T) {
|
||||
v := DeleteBulkReq{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgDeleteBulkReq(b *testing.B) {
|
||||
v := DeleteBulkReq{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgDeleteBulkReq(b *testing.B) {
|
||||
v := DeleteBulkReq{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalDeleteBulkReq(b *testing.B) {
|
||||
v := DeleteBulkReq{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecodeDeleteBulkReq(t *testing.T) {
|
||||
v := DeleteBulkReq{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecodeDeleteBulkReq Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := DeleteBulkReq{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncodeDeleteBulkReq(b *testing.B) {
|
||||
v := DeleteBulkReq{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecodeDeleteBulkReq(b *testing.B) {
|
||||
v := DeleteBulkReq{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalDeleteFileHandlerParams(t *testing.T) {
|
||||
v := DeleteFileHandlerParams{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
@@ -687,6 +800,119 @@ func BenchmarkDecodeDeleteVersionHandlerParams(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalDeleteVersionsErrsResp(t *testing.T) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgDeleteVersionsErrsResp(b *testing.B) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgDeleteVersionsErrsResp(b *testing.B) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalDeleteVersionsErrsResp(b *testing.B) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecodeDeleteVersionsErrsResp(t *testing.T) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecodeDeleteVersionsErrsResp Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := DeleteVersionsErrsResp{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncodeDeleteVersionsErrsResp(b *testing.B) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecodeDeleteVersionsErrsResp(b *testing.B) {
|
||||
v := DeleteVersionsErrsResp{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalDiskInfo(t *testing.T) {
|
||||
v := DiskInfo{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
|
||||
@@ -81,6 +81,7 @@ type StorageAPI interface {
|
||||
// Metadata operations
|
||||
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) error
|
||||
DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions, opts DeleteOptions) []error
|
||||
DeleteBulk(ctx context.Context, volume string, paths ...string) error
|
||||
WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) error
|
||||
UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error
|
||||
ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (FileInfo, error)
|
||||
|
||||
@@ -737,7 +737,9 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
||||
}
|
||||
|
||||
dErrResp := &DeleteVersionsErrsResp{}
|
||||
if err = gob.NewDecoder(reader).Decode(dErrResp); err != nil {
|
||||
decoder := msgpNewReader(reader)
|
||||
defer readMsgpReaderPoolPut(decoder)
|
||||
if err = dErrResp.DecodeMsg(decoder); err != nil {
|
||||
for i := range errs {
|
||||
errs[i] = toStorageErr(err)
|
||||
}
|
||||
@@ -745,7 +747,11 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
||||
}
|
||||
|
||||
for i, dErr := range dErrResp.Errs {
|
||||
errs[i] = toStorageErr(dErr)
|
||||
if dErr != "" {
|
||||
errs[i] = toStorageErr(errors.New(dErr))
|
||||
} else {
|
||||
errs[i] = nil
|
||||
}
|
||||
}
|
||||
|
||||
return errs
|
||||
@@ -795,6 +801,26 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
|
||||
return verifyResp, nil
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
|
||||
req := &DeleteBulkReq{Paths: paths}
|
||||
body, err := req.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodDeleteBulk, values, bytes.NewReader(body), int64(len(body)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer xhttp.DrainBody(respBody)
|
||||
|
||||
_, err = waitForHTTPResponse(respBody)
|
||||
return toStorageErr(err)
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
|
||||
@@ -20,7 +20,7 @@ package cmd
|
||||
//go:generate msgp -file $GOFILE -unexported
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v61" // Move all Read* calls to http.MethodGet, compact handlers and query params fields
|
||||
storageRESTVersion = "v62" // Introduce DeleteBulk internode API.
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
@@ -43,6 +43,7 @@ const (
|
||||
storageRESTMethodStatInfoFile = "/sfile"
|
||||
storageRESTMethodReadMultiple = "/rmpl"
|
||||
storageRESTMethodCleanAbandoned = "/cln"
|
||||
storageRESTMethodDeleteBulk = "/dblk"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -21,7 +21,6 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/gob"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -629,12 +628,6 @@ func (s *storageRESTServer) DeleteFileHandler(p *DeleteFileHandlerParams) (grid.
|
||||
return grid.NewNPErr(s.getStorage().Delete(context.Background(), p.Volume, p.FilePath, p.Opts))
|
||||
}
|
||||
|
||||
// DeleteVersionsErrsResp - collection of delete errors
|
||||
// for bulk version deletes
|
||||
type DeleteVersionsErrsResp struct {
|
||||
Errs []error
|
||||
}
|
||||
|
||||
// DeleteVersionsHandler - delete a set of a versions.
|
||||
func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
@@ -659,21 +652,20 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http
|
||||
}
|
||||
}
|
||||
|
||||
dErrsResp := &DeleteVersionsErrsResp{Errs: make([]error, totalVersions)}
|
||||
|
||||
setEventStreamHeaders(w)
|
||||
encoder := gob.NewEncoder(w)
|
||||
done := keepHTTPResponseAlive(w)
|
||||
|
||||
opts := DeleteOptions{}
|
||||
errs := s.getStorage().DeleteVersions(r.Context(), volume, versions, opts)
|
||||
done(nil)
|
||||
|
||||
dErrsResp := &DeleteVersionsErrsResp{Errs: make([]string, totalVersions)}
|
||||
for idx := range versions {
|
||||
if errs[idx] != nil {
|
||||
dErrsResp.Errs[idx] = StorageErr(errs[idx].Error())
|
||||
dErrsResp.Errs[idx] = errs[idx].Error()
|
||||
}
|
||||
}
|
||||
encoder.Encode(dErrsResp)
|
||||
|
||||
buf, _ := dErrsResp.MarshalMsg(nil)
|
||||
w.Write(buf)
|
||||
}
|
||||
|
||||
// RenameDataHandler - renames a meta object and data dir to destination.
|
||||
@@ -1107,18 +1099,15 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
setEventStreamHeaders(w)
|
||||
encoder := gob.NewEncoder(w)
|
||||
done := keepHTTPResponseAlive(w)
|
||||
resp, err := s.getStorage().VerifyFile(r.Context(), volume, filePath, fi)
|
||||
done(nil)
|
||||
|
||||
done(err)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
encoder.Encode(resp)
|
||||
buf, _ := resp.MarshalMsg(nil)
|
||||
w.Write(buf)
|
||||
}
|
||||
|
||||
func checkDiskFatalErrs(errs []error) error {
|
||||
@@ -1243,6 +1232,24 @@ func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *storageRESTServer) DeleteBulkHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
var req DeleteBulkReq
|
||||
mr := msgpNewReader(r.Body)
|
||||
defer readMsgpReaderPoolPut(mr)
|
||||
|
||||
if err := req.DecodeMsg(mr); err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
volume := r.Form.Get(storageRESTVolume)
|
||||
keepHTTPResponseAlive(w)(s.getStorage().DeleteBulk(r.Context(), volume, req.Paths...))
|
||||
}
|
||||
|
||||
// ReadMultiple returns multiple files
|
||||
func (s *storageRESTServer) ReadMultiple(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
@@ -1325,6 +1332,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(h(server.StatInfoFile))
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(h(server.ReadMultiple))
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(h(server.CleanAbandonedDataHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteBulk).HandlerFunc(h(server.DeleteBulkHandler))
|
||||
|
||||
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler))
|
||||
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(h(server.ReadVersionHandler))
|
||||
|
||||
@@ -36,12 +36,13 @@ func _() {
|
||||
_ = x[storageMetricReadMultiple-25]
|
||||
_ = x[storageMetricDeleteAbandonedParts-26]
|
||||
_ = x[storageMetricDiskInfo-27]
|
||||
_ = x[storageMetricLast-28]
|
||||
_ = x[storageMetricDeleteBulk-28]
|
||||
_ = x[storageMetricLast-29]
|
||||
}
|
||||
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleDeleteAbandonedPartsDiskInfoLast"
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleDeleteAbandonedPartsDiskInfoDeleteBulkLast"
|
||||
|
||||
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 274, 282, 286}
|
||||
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 274, 282, 292, 296}
|
||||
|
||||
func (i storageMetric) String() string {
|
||||
if i >= storageMetric(len(_storageMetric_index)-1) {
|
||||
|
||||
@@ -70,6 +70,7 @@ const (
|
||||
storageMetricReadMultiple
|
||||
storageMetricDeleteAbandonedParts
|
||||
storageMetricDiskInfo
|
||||
storageMetricDeleteBulk
|
||||
|
||||
// .... add more
|
||||
|
||||
@@ -499,6 +500,16 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa
|
||||
})
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
|
||||
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDeleteBulk, append([]string{volume}, paths...)...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer done(0, &err)
|
||||
|
||||
return p.storage.DeleteBulk(ctx, volume, paths...)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) {
|
||||
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDelete, volume, path)
|
||||
if err != nil {
|
||||
|
||||
@@ -1079,32 +1079,37 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
|
||||
return err
|
||||
}
|
||||
|
||||
s.RLock()
|
||||
legacy := s.formatLegacy
|
||||
s.RUnlock()
|
||||
|
||||
var legacyJSON bool
|
||||
buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile))
|
||||
if err != nil {
|
||||
if !errors.Is(err, errFileNotFound) {
|
||||
return err
|
||||
buf, err := xioutil.WithDeadline[[]byte](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) ([]byte, error) {
|
||||
buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile))
|
||||
if err != nil && !errors.Is(err, errFileNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.RLock()
|
||||
legacy := s.formatLegacy
|
||||
s.RUnlock()
|
||||
if legacy {
|
||||
if errors.Is(err, errFileNotFound) && legacy {
|
||||
buf, _, err = s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1))
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
legacyJSON = true
|
||||
}
|
||||
}
|
||||
|
||||
if len(buf) == 0 {
|
||||
if errors.Is(err, errFileNotFound) && !skipAccessChecks(volume) {
|
||||
if aerr := Access(volumeDir); aerr != nil && osIsNotExist(aerr) {
|
||||
return errVolumeNotFound
|
||||
if len(buf) == 0 {
|
||||
if errors.Is(err, errFileNotFound) && !skipAccessChecks(volume) {
|
||||
if aerr := Access(volumeDir); aerr != nil && osIsNotExist(aerr) {
|
||||
return nil, errVolumeNotFound
|
||||
}
|
||||
return nil, errFileNotFound
|
||||
}
|
||||
}
|
||||
return errFileNotFound
|
||||
return buf, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if legacyJSON {
|
||||
@@ -1178,10 +1183,7 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions
|
||||
errs[i] = ctx.Err()
|
||||
continue
|
||||
}
|
||||
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
|
||||
if err := w.Run(func() error { return s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...) }); err != nil {
|
||||
errs[i] = err
|
||||
}
|
||||
errs[i] = s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...)
|
||||
diskHealthCheckOK(ctx, errs[i])
|
||||
}
|
||||
|
||||
@@ -1212,7 +1214,7 @@ func (s *xlStorage) diskAlmostFilled() bool {
|
||||
return (float64(info.Free)/float64(info.Used)) < almostFilledPercent || (float64(info.FreeInodes)/float64(info.UsedInodes)) < almostFilledPercent
|
||||
}
|
||||
|
||||
func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) (err error) {
|
||||
func (s *xlStorage) moveToTrashNoDeadline(filePath string, recursive, immediatePurge bool) (err error) {
|
||||
pathUUID := mustGetUUID()
|
||||
targetPath := pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket, pathUUID)
|
||||
|
||||
@@ -1265,10 +1267,16 @@ func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) (err error) {
|
||||
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
|
||||
return w.Run(func() (err error) {
|
||||
return s.moveToTrashNoDeadline(filePath, recursive, immediatePurge)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteVersion - deletes FileInfo metadata for path at `xl.meta`. forceDelMarker
|
||||
// will force creating a new `xl.meta` to create a new delete marker
|
||||
func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) {
|
||||
@@ -2417,7 +2425,41 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive, immediate
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteFile - delete a file at path.
|
||||
// DeleteBulk - delete many files in bulk to trash.
|
||||
// this delete does not recursively delete empty
|
||||
// parents, if you need empty parent delete support
|
||||
// please use Delete() instead. This API is meant as
|
||||
// an optimization for Multipart operations.
|
||||
func (s *xlStorage) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !skipAccessChecks(volume) {
|
||||
// Stat a volume entry.
|
||||
if err = Access(volumeDir); err != nil {
|
||||
return convertAccessError(err, errVolumeAccessDenied)
|
||||
}
|
||||
}
|
||||
|
||||
for _, fp := range paths {
|
||||
// Following code is needed so that we retain SlashSeparator suffix if any in
|
||||
// path argument.
|
||||
filePath := pathJoin(volumeDir, fp)
|
||||
if err = checkPathLength(filePath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = s.moveToTrash(filePath, false, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete - delete a file at path.
|
||||
func (s *xlStorage) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) {
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
|
||||
@@ -2,7 +2,7 @@ version: '3.7'
|
||||
|
||||
# Settings and configurations that are common for all containers
|
||||
x-minio-common: &minio-common
|
||||
image: quay.io/minio/minio:RELEASE.2024-07-26T20-48-21Z
|
||||
image: quay.io/minio/minio:RELEASE.2024-07-29T22-14-52Z
|
||||
command: server --console-address ":9001" http://minio{1...4}/data{1...2}
|
||||
expose:
|
||||
- "9000"
|
||||
|
||||
Reference in New Issue
Block a user