Compare commits

...

6 Commits

Author SHA1 Message Date
Harshavardhana
a9dc061d84 count metrics properly for any failures during drive heal (#20193)
or via `mc admin heal --set 1 --pool 1`
2024-07-30 22:46:26 -07:00
Krishnan Parthasarathi
01a8c09920 Add fmt-gen subcommand (#20192)
fmt-gen subcommand is only available when built with build tag `fmtgen`.
2024-07-30 15:59:48 -07:00
Aditya Manthramurthy
4c8562bcec Fix v2 metrics: Send all ttfb api labels (#20191)
Fix a regression in #19733 where TTFB metrics for all APIs except
GetObject were removed in v2 and v3 metrics. This causes breakage for
existing v2 metrics users. Instead we continue to send TTFB for all APIs
in V2 but only send for GetObject in V3.
2024-07-30 15:28:46 -07:00
Harshavardhana
f13c04629b allow multipart uploads expiration to be dynamic (#20190)
allow multipart uploads expiration to be dyamic

It would seem like the new values will take effect
only after a restart for changes in multipart_expiration.
This PR fixes this by making it dynamic as it should have
been.
2024-07-30 12:01:06 -07:00
Harshavardhana
80ff907d08 add DeleteBulk support, add sufficient deadlines per rename() (#20185)
deadlines per moveToTrash() allows for a more granular timeout
approach for syscalls, instead of an aggregate timeout.

This PR also enhances multipart state cleanup to be optimal by
removing 100's of multipart network rename() calls into single
network call.
2024-07-29 18:56:40 -07:00
Minio Trusted
673df6d517 Update yaml files to latest version RELEASE.2024-07-29T22-14-52Z 2024-07-30 00:00:47 +00:00
22 changed files with 1118 additions and 284 deletions

View File

@@ -761,6 +761,15 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
return nil
}
countOKDrives := func(drives []madmin.HealDriveInfo) (count int) {
for _, drive := range drives {
if drive.State == madmin.DriveStateOk {
count++
}
}
return count
}
// task queued, now wait for the response.
select {
case res := <-task.respCh:
@@ -781,6 +790,11 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
if res.err != nil {
res.result.Detail = res.err.Error()
}
if res.result.ParityBlocks > 0 && res.result.DataBlocks > 0 && res.result.DataBlocks > res.result.ParityBlocks {
if got := countOKDrives(res.result.After.Drives); got < res.result.ParityBlocks {
res.result.Detail = fmt.Sprintf("quorum loss - expected %d minimum, got drive states in OK %d", res.result.ParityBlocks, got)
}
}
return h.pushHealResultItem(res.result)
case <-h.ctx.Done():
return nil

View File

@@ -103,10 +103,9 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
return fi, partsMetadata, err
}
// Removes part.meta given by partName belonging to a multipart upload from minioMetaBucket
func (er erasureObjects) removePartMeta(bucket, object, uploadID, dataDir string, partNumber int) {
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber))
// cleanMultipartPath removes all extraneous files and parts from the multipart folder, this is used per CompleteMultipart.
// do not use this function outside of completeMultipartUpload()
func (er erasureObjects) cleanupMultipartPath(ctx context.Context, paths ...string) {
storageDisks := er.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
@@ -116,42 +115,7 @@ func (er erasureObjects) removePartMeta(bucket, object, uploadID, dataDir string
}
index := index
g.Go(func() error {
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{
Recursive: false,
Immediate: false,
})
return nil
}, index)
}
g.Wait()
}
// Removes part given by partName belonging to a multipart upload from minioMetaBucket
func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir string, partNumber int) {
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber))
storageDisks := er.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
for index, disk := range storageDisks {
if disk == nil {
continue
}
index := index
g.Go(func() error {
// Ignoring failure to remove parts that weren't present in CompleteMultipartUpload
// requests. xl.meta is the authoritative source of truth on which parts constitute
// the object. The presence of parts that don't belong in the object doesn't affect correctness.
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath, DeleteOptions{
Recursive: false,
Immediate: false,
})
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{
Recursive: false,
Immediate: false,
})
_ = storageDisks[index].DeleteBulk(ctx, minioMetaMultipartBucket, paths...)
return nil
}, index)
}
@@ -159,7 +123,7 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri
}
// Clean-up the old multipart uploads. Should be run in a Go routine.
func (er erasureObjects) cleanupStaleUploads(ctx context.Context, expiry time.Duration) {
func (er erasureObjects) cleanupStaleUploads(ctx context.Context) {
// run multiple cleanup's local to this server.
var wg sync.WaitGroup
for _, disk := range er.getLocalDisks() {
@@ -167,7 +131,7 @@ func (er erasureObjects) cleanupStaleUploads(ctx context.Context, expiry time.Du
wg.Add(1)
go func(disk StorageAPI) {
defer wg.Done()
er.cleanupStaleUploadsOnDisk(ctx, disk, expiry)
er.cleanupStaleUploadsOnDisk(ctx, disk)
}(disk)
}
}
@@ -193,7 +157,7 @@ func (er erasureObjects) deleteAll(ctx context.Context, bucket, prefix string) {
}
// Remove the old multipart uploads on the given disk.
func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) {
func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI) {
drivePath := disk.Endpoint().Path
readDirFn(pathJoin(drivePath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
@@ -219,7 +183,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
modTime = fi.ModTime
wait()
}
if time.Since(modTime) < expiry {
if time.Since(modTime) < globalAPIConfig.getStaleUploadsExpiry() {
return nil
}
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
@@ -238,7 +202,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
return nil
}
// Modtime is returned in the Created field. See (*xlStorage).StatVol
if time.Since(vi.Created) < expiry {
if time.Since(vi.Created) < globalAPIConfig.getStaleUploadsExpiry() {
return nil
}
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
@@ -267,7 +231,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error {
wait := deleteMultipartCleanupSleeper.Timer(ctx)
if time.Since(vi.Created) > expiry {
if time.Since(vi.Created) > globalAPIConfig.getStaleUploadsExpiry() {
pathUUID := mustGetUUID()
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
@@ -1359,10 +1323,10 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
}
}
paths := make([]string, 0, len(currentFI.Parts))
// Remove parts that weren't present in CompleteMultipartUpload request.
for _, curpart := range currentFI.Parts {
// Remove part.meta which is not needed anymore.
er.removePartMeta(bucket, object, uploadID, currentFI.DataDir, curpart.Number)
paths = append(paths, pathJoin(uploadIDPath, currentFI.DataDir, fmt.Sprintf("part.%d.meta", curpart.Number)))
if objectPartIndex(fi.Parts, curpart.Number) == -1 {
// Delete the missing part files. e.g,
@@ -1371,10 +1335,12 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
// Request 3: PutObjectPart 2
// Request 4: CompleteMultipartUpload --part 2
// N.B. 1st part is not present. This part should be removed from the storage.
er.removeObjectPart(bucket, object, uploadID, currentFI.DataDir, curpart.Number)
paths = append(paths, pathJoin(uploadIDPath, currentFI.DataDir, fmt.Sprintf("part.%d", curpart.Number)))
}
}
er.cleanupMultipartPath(ctx, paths...) // cleanup all part.N.meta, and skipped part.N's before final rename().
defer func() {
if err == nil {
er.deleteAll(context.Background(), minioMetaMultipartBucket, uploadIDPath)

View File

@@ -525,7 +525,7 @@ func (s *erasureSets) cleanupStaleUploads(ctx context.Context) {
if set == nil {
return
}
set.cleanupStaleUploads(ctx, globalAPIConfig.getStaleUploadsExpiry())
set.cleanupStaleUploads(ctx)
}(set)
}
wg.Wait()

120
cmd/fmt-gen.go Normal file
View File

@@ -0,0 +1,120 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//go:build fmtgen
package cmd
import (
"encoding/json"
"log"
"os"
"path/filepath"
"github.com/klauspost/compress/zip"
"github.com/minio/cli"
)
var fmtGenFlags = []cli.Flag{
cli.IntFlag{
Name: "parity",
Usage: "specify erasure code parity",
},
cli.StringFlag{
Name: "deployment-id",
Usage: "deployment-id of the MinIO cluster for which format.json is needed",
},
cli.StringFlag{
Name: "address",
Value: ":" + GlobalMinioDefaultPort,
Usage: "bind to a specific ADDRESS:PORT, ADDRESS can be an IP or hostname",
EnvVar: "MINIO_ADDRESS",
},
}
var fmtGenCmd = cli.Command{
Name: "fmt-gen",
Usage: "Generate format.json files for an erasure server pool",
Flags: append(fmtGenFlags, GlobalFlags...),
Action: fmtGenMain,
CustomHelpTemplate: `NAME:
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR1 [DIR2..]
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR{1...64}
{{.HelpName}} {{if .VisibleFlags}}[FLAGS] {{end}}DIR{1...64} DIR{65...128}
DIR:
DIR points to a directory on a filesystem. When you want to combine
multiple drives into a single large system, pass one directory per
filesystem separated by space. You may also use a '...' convention
to abbreviate the directory arguments. Remote directories in a
distributed setup are encoded as HTTP(s) URIs.
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
EXAMPLES:
1. Generate format.json.zip containing format.json files for all drives in a distributed MinIO server pool of 32 nodes with 32 drives each.
{{.Prompt}} {{.HelpName}} http://node{1...32}.example.com/mnt/export{1...32}
`,
}
func fmtGenMain(ctxt *cli.Context) {
deploymentID := ctxt.String("deployment-id")
err := buildServerCtxt(ctxt, &globalServerCtxt)
if err != nil {
log.Fatalln(err)
}
handleCommonArgs(globalServerCtxt)
pools, _, err := createServerEndpoints(globalMinioAddr, globalServerCtxt.Layout.pools, globalServerCtxt.Layout.legacy)
if err != nil {
log.Fatalln(err)
}
zipFile, err := os.Create("format.json.zip")
if err != nil {
log.Fatalf("failed to create format.json.zip: %v", err)
}
defer zipFile.Close()
fmtZipW := zip.NewWriter(zipFile)
defer fmtZipW.Close()
for _, pool := range pools { // for each pool
setCount, setDriveCount := pool.SetCount, pool.DrivesPerSet
format := newFormatErasureV3(setCount, setDriveCount)
format.ID = deploymentID
for i := 0; i < setCount; i++ { // for each erasure set
for j := 0; j < setDriveCount; j++ {
newFormat := format.Clone()
newFormat.Erasure.This = format.Erasure.Sets[i][j]
if deploymentID != "" {
newFormat.ID = deploymentID
}
drive := pool.Endpoints[i*setDriveCount+j]
fmtBytes, err := json.Marshal(newFormat)
if err != nil {
//nolint:gocritic
log.Fatalf("failed to marshal format.json for %s: %v", drive.String(), err)
}
fmtJSON := filepath.Join(drive.Host, drive.Path, minioMetaBucket, "format.json")
embedFileInZip(fmtZipW, fmtJSON, fmtBytes, 0o600)
}
}
}
}

View File

@@ -0,0 +1,24 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// # This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//go:build !fmtgen
package cmd
import "github.com/minio/cli"
var fmtGenCmd cli.Command

View File

@@ -150,6 +150,11 @@ type healEntryResult struct {
// healErasureSet lists and heals all objects in a specific erasure set
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error {
bgSeq, found := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if !found {
return errors.New("no local healing sequence initialized, unable to heal the drive")
}
scanMode := madmin.HealNormalScan
// Make sure to copy since `buckets slice`
@@ -163,11 +168,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
for _, bucket := range healBuckets {
_, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{
Recreate: true,
ScanMode: scanMode,
})
if err != nil {
if err := bgSeq.healBucket(objAPI, bucket, true); err != nil {
// Log bucket healing error if any, we shall retry again.
healingLogIf(ctx, err)
}
@@ -264,10 +265,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
tracker.setBucket(bucket)
// Heal current bucket again in case if it is failed
// in the beginning of erasure set healing
if _, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{
Recreate: true,
ScanMode: scanMode,
}); err != nil {
if err := bgSeq.healBucket(objAPI, bucket, true); err != nil {
// Set this such that when we return this function
// we let the caller retry this disk again for the
// buckets that failed healing.
@@ -366,6 +364,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
return false
case results <- result:
bgSeq.countScanned(madmin.HealItemObject)
return true
}
}
@@ -416,8 +415,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
return
}
result = healEntryFailure(0)
bgSeq.countFailed(madmin.HealItemObject)
healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, entry.name, err))
} else {
bgSeq.countHealed(madmin.HealItemObject)
result = healEntrySuccess(uint64(res.ObjectSize))
}
@@ -463,8 +464,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
if versionHealed {
bgSeq.countHealed(madmin.HealItemObject)
result = healEntrySuccess(uint64(version.Size))
} else {
bgSeq.countFailed(madmin.HealItemObject)
result = healEntryFailure(uint64(version.Size))
if version.VersionID != "" {
healingLogIf(ctx, fmt.Errorf("unable to heal object %s/%s-v(%s): %w", bucket, version.Name, version.VersionID, err))

View File

@@ -27,10 +27,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
const (
apiGetObject = "GetObject"
)
// connStats - Network statistics
// Count total input/output transferred bytes during
// the server's life.
@@ -132,7 +128,7 @@ func (bh *bucketHTTPStats) updateHTTPStats(bucket, api string, w *xhttp.Response
return
}
if w != nil && api == apiGetObject {
if w != nil {
// Increment the prometheus http request response histogram with API, Bucket
bucketHTTPRequestsDuration.With(prometheus.Labels{
"api": api,
@@ -437,9 +433,7 @@ func (st *HTTPStats) updateStats(api string, w *xhttp.ResponseRecorder) {
st.totalS3Requests.Inc(api)
// Increment the prometheus http request response histogram with appropriate label
if api == apiGetObject {
httpRequestsDuration.With(prometheus.Labels{"api": api}).Observe(w.TimeToFirstByte.Seconds())
}
httpRequestsDuration.With(prometheus.Labels{"api": api}).Observe(w.TimeToFirstByte.Seconds())
code := w.StatusCode

View File

@@ -107,6 +107,11 @@ func newApp(name string) *cli.App {
// registerCommand registers a cli command.
registerCommand := func(command cli.Command) {
// avoid registering commands which are not being built (via
// go:build tags)
if command.Name == "" {
return
}
commands = append(commands, command)
commandsTree.Insert(command.Name)
}
@@ -134,6 +139,7 @@ func newApp(name string) *cli.App {
// Register all commands.
registerCommand(serverCmd)
registerCommand(fmtGenCmd)
// Set up app.
cli.HelpFlag = cli.BoolFlag{

View File

@@ -19,6 +19,8 @@ package cmd
import (
"context"
"github.com/minio/minio-go/v7/pkg/set"
)
const (
@@ -127,7 +129,8 @@ func loadAPIRequestsHTTPMetrics(ctx context.Context, m MetricValues, _ *metricsC
// This is a `MetricsLoaderFn`.
func loadAPIRequestsTTFBMetrics(ctx context.Context, m MetricValues, _ *metricsCache) error {
renameLabels := map[string]string{"api": "name"}
m.SetHistogram(apiRequestsTTFBSecondsDistribution, httpRequestsDuration, renameLabels, nil,
labelsFilter := map[string]set.StringSet{"api": set.CreateStringSet("GetObject")}
m.SetHistogram(apiRequestsTTFBSecondsDistribution, httpRequestsDuration, labelsFilter, renameLabels, nil,
"type", "s3")
return nil
}
@@ -214,7 +217,8 @@ func loadBucketAPIHTTPMetrics(ctx context.Context, m MetricValues, _ *metricsCac
// This is a `MetricsLoaderFn`.
func loadBucketAPITTFBMetrics(ctx context.Context, m MetricValues, _ *metricsCache, buckets []string) error {
renameLabels := map[string]string{"api": "name"}
m.SetHistogram(apiRequestsTTFBSecondsDistribution, bucketHTTPRequestsDuration, renameLabels,
labelsFilter := map[string]set.StringSet{"api": set.CreateStringSet("GetObject")}
m.SetHistogram(apiRequestsTTFBSecondsDistribution, bucketHTTPRequestsDuration, labelsFilter, renameLabels,
buckets, "type", "s3")
return nil
}

View File

@@ -23,6 +23,7 @@ import (
"strings"
"sync"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/logger"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@@ -247,28 +248,38 @@ func (m *MetricValues) Set(name MetricName, value float64, labels ...string) {
// SetHistogram - sets values for the given MetricName using the provided
// histogram.
//
// `filterByLabels` is a map of label names to list of allowed label values to
// filter by. Note that this filtering happens before any renaming of labels.
//
// `renameLabels` is a map of label names to rename. The keys are the original
// label names and the values are the new label names.
//
// TODO: bucketFilter doc
// `bucketFilter` is a list of bucket values to filter. If this is non-empty,
// only metrics for the given buckets are added.
//
// `extraLabels` are additional labels to add to each metric. They are ordered
// label name and value pairs.
func (m *MetricValues) SetHistogram(name MetricName, hist *prometheus.HistogramVec,
renameLabels map[string]string, bucketFilter []string, extraLabels ...string,
filterByLabels map[string]set.StringSet, renameLabels map[string]string, bucketFilter []string,
extraLabels ...string,
) {
if _, ok := m.descriptors[name]; !ok {
panic(fmt.Sprintf("metric has no description: %s", name))
}
dummyDesc := MetricDescription{}
metricsV2 := getHistogramMetrics(hist, dummyDesc, false)
mainLoop:
for _, metric := range metricsV2 {
for label, allowedValues := range filterByLabels {
if !allowedValues.Contains(metric.VariableLabels[label]) {
continue mainLoop
}
}
// If a bucket filter is provided, only add metrics for the given
// buckets.
if len(bucketFilter) > 0 {
if !slices.Contains(bucketFilter, metric.VariableLabels["bucket"]) {
continue
}
if len(bucketFilter) > 0 && !slices.Contains(bucketFilter, metric.VariableLabels["bucket"]) {
continue
}
labels := make([]string, 0, len(metric.VariableLabels)*2)

View File

@@ -222,6 +222,13 @@ func (d *naughtyDisk) CheckParts(ctx context.Context, volume string, path string
return d.disk.CheckParts(ctx, volume, path, fi)
}
func (d *naughtyDisk) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
if err := d.calcError(); err != nil {
return err
}
return d.disk.DeleteBulk(ctx, volume, paths...)
}
func (d *naughtyDisk) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) {
if err := d.calcError(); err != nil {
return err

View File

@@ -392,24 +392,24 @@ func newFileInfo(object string, dataBlocks, parityBlocks int) (fi FileInfo) {
// ReadMultipleReq contains information of multiple files to read from disk.
type ReadMultipleReq struct {
Bucket string // Bucket. Can be empty if multiple buckets.
Prefix string // Shared prefix of all files. Can be empty. Will be joined to filename without modification.
Files []string // Individual files to read.
MaxSize int64 // Return error if size is exceed.
MetadataOnly bool // Read as XL meta and truncate data.
AbortOn404 bool // Stop reading after first file not found.
MaxResults int // Stop after this many successful results. <= 0 means all.
Bucket string `msg:"bk"` // Bucket. Can be empty if multiple buckets.
Prefix string `msg:"pr,omitempty"` // Shared prefix of all files. Can be empty. Will be joined to filename without modification.
Files []string `msg:"fl"` // Individual files to read.
MaxSize int64 `msg:"ms"` // Return error if size is exceed.
MetadataOnly bool `msg:"mo"` // Read as XL meta and truncate data.
AbortOn404 bool `msg:"ab"` // Stop reading after first file not found.
MaxResults int `msg:"mr"` // Stop after this many successful results. <= 0 means all.
}
// ReadMultipleResp contains a single response from a ReadMultipleReq.
type ReadMultipleResp struct {
Bucket string // Bucket as given by request.
Prefix string // Prefix as given by request.
File string // File name as given in request.
Exists bool // Returns whether the file existed on disk.
Error string // Returns any error when reading.
Data []byte // Contains all data of file.
Modtime time.Time // Modtime of file on disk.
Bucket string `msg:"bk"` // Bucket as given by request.
Prefix string `msg:"pr,omitempty"` // Prefix as given by request.
File string `msg:"fl"` // File name as given in request.
Exists bool `msg:"ex"` // Returns whether the file existed on disk.
Error string `msg:"er,omitempty"` // Returns any error when reading.
Data []byte `msg:"d"` // Contains all data of file.
Modtime time.Time `msg:"m"` // Modtime of file on disk.
}
// DeleteVersionHandlerParams are parameters for DeleteVersionHandler
@@ -516,8 +516,8 @@ type WriteAllHandlerParams struct {
// only after as a 2-phase call, allowing the older dataDir to
// hang-around in-case we need some form of recovery.
type RenameDataResp struct {
Sign []byte
OldDataDir string // contains '<uuid>', it is designed to be passed as value to Delete(bucket, pathJoin(object, dataDir))
Sign []byte `msg:"s"`
OldDataDir string `msg:"od"` // contains '<uuid>', it is designed to be passed as value to Delete(bucket, pathJoin(object, dataDir))
}
const (
@@ -534,15 +534,26 @@ const (
// CheckPartsResp is a response of the storage CheckParts and VerifyFile APIs
type CheckPartsResp struct {
Results []int
Results []int `msg:"r"`
}
// LocalDiskIDs - GetLocalIDs response.
type LocalDiskIDs struct {
IDs []string
IDs []string `msg:"i"`
}
// ListDirResult - ListDir()'s response.
type ListDirResult struct {
Entries []string `msg:"e"`
}
// DeleteBulkReq - send multiple paths in same delete request.
type DeleteBulkReq struct {
Paths []string `msg:"p"`
}
// DeleteVersionsErrsResp - collection of delete errors
// for bulk version deletes
type DeleteVersionsErrsResp struct {
Errs []string `msg:"e"`
}

File diff suppressed because it is too large Load Diff

View File

@@ -348,6 +348,119 @@ func BenchmarkDecodeCheckPartsResp(b *testing.B) {
}
}
func TestMarshalUnmarshalDeleteBulkReq(t *testing.T) {
v := DeleteBulkReq{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgDeleteBulkReq(b *testing.B) {
v := DeleteBulkReq{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgDeleteBulkReq(b *testing.B) {
v := DeleteBulkReq{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalDeleteBulkReq(b *testing.B) {
v := DeleteBulkReq{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeDeleteBulkReq(t *testing.T) {
v := DeleteBulkReq{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeDeleteBulkReq Msgsize() is inaccurate")
}
vn := DeleteBulkReq{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeDeleteBulkReq(b *testing.B) {
v := DeleteBulkReq{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeDeleteBulkReq(b *testing.B) {
v := DeleteBulkReq{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalDeleteFileHandlerParams(t *testing.T) {
v := DeleteFileHandlerParams{}
bts, err := v.MarshalMsg(nil)
@@ -687,6 +800,119 @@ func BenchmarkDecodeDeleteVersionHandlerParams(b *testing.B) {
}
}
func TestMarshalUnmarshalDeleteVersionsErrsResp(t *testing.T) {
v := DeleteVersionsErrsResp{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgDeleteVersionsErrsResp(b *testing.B) {
v := DeleteVersionsErrsResp{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgDeleteVersionsErrsResp(b *testing.B) {
v := DeleteVersionsErrsResp{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalDeleteVersionsErrsResp(b *testing.B) {
v := DeleteVersionsErrsResp{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeDeleteVersionsErrsResp(t *testing.T) {
v := DeleteVersionsErrsResp{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeDeleteVersionsErrsResp Msgsize() is inaccurate")
}
vn := DeleteVersionsErrsResp{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeDeleteVersionsErrsResp(b *testing.B) {
v := DeleteVersionsErrsResp{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeDeleteVersionsErrsResp(b *testing.B) {
v := DeleteVersionsErrsResp{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalDiskInfo(t *testing.T) {
v := DiskInfo{}
bts, err := v.MarshalMsg(nil)

View File

@@ -81,6 +81,7 @@ type StorageAPI interface {
// Metadata operations
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) error
DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions, opts DeleteOptions) []error
DeleteBulk(ctx context.Context, volume string, paths ...string) error
WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) error
UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error
ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (FileInfo, error)

View File

@@ -737,7 +737,9 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
}
dErrResp := &DeleteVersionsErrsResp{}
if err = gob.NewDecoder(reader).Decode(dErrResp); err != nil {
decoder := msgpNewReader(reader)
defer readMsgpReaderPoolPut(decoder)
if err = dErrResp.DecodeMsg(decoder); err != nil {
for i := range errs {
errs[i] = toStorageErr(err)
}
@@ -745,7 +747,11 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
}
for i, dErr := range dErrResp.Errs {
errs[i] = toStorageErr(dErr)
if dErr != "" {
errs[i] = toStorageErr(errors.New(dErr))
} else {
errs[i] = nil
}
}
return errs
@@ -795,6 +801,26 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
return verifyResp, nil
}
func (client *storageRESTClient) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
req := &DeleteBulkReq{Paths: paths}
body, err := req.MarshalMsg(nil)
if err != nil {
return err
}
respBody, err := client.call(ctx, storageRESTMethodDeleteBulk, values, bytes.NewReader(body), int64(len(body)))
if err != nil {
return err
}
defer xhttp.DrainBody(respBody)
_, err = waitForHTTPResponse(respBody)
return toStorageErr(err)
}
func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)

View File

@@ -20,7 +20,7 @@ package cmd
//go:generate msgp -file $GOFILE -unexported
const (
storageRESTVersion = "v61" // Move all Read* calls to http.MethodGet, compact handlers and query params fields
storageRESTVersion = "v62" // Introduce DeleteBulk internode API.
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)
@@ -43,6 +43,7 @@ const (
storageRESTMethodStatInfoFile = "/sfile"
storageRESTMethodReadMultiple = "/rmpl"
storageRESTMethodCleanAbandoned = "/cln"
storageRESTMethodDeleteBulk = "/dblk"
)
const (

View File

@@ -21,7 +21,6 @@ import (
"bufio"
"context"
"encoding/binary"
"encoding/gob"
"encoding/hex"
"errors"
"fmt"
@@ -629,12 +628,6 @@ func (s *storageRESTServer) DeleteFileHandler(p *DeleteFileHandlerParams) (grid.
return grid.NewNPErr(s.getStorage().Delete(context.Background(), p.Volume, p.FilePath, p.Opts))
}
// DeleteVersionsErrsResp - collection of delete errors
// for bulk version deletes
type DeleteVersionsErrsResp struct {
Errs []error
}
// DeleteVersionsHandler - delete a set of a versions.
func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@@ -659,21 +652,20 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http
}
}
dErrsResp := &DeleteVersionsErrsResp{Errs: make([]error, totalVersions)}
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
done := keepHTTPResponseAlive(w)
opts := DeleteOptions{}
errs := s.getStorage().DeleteVersions(r.Context(), volume, versions, opts)
done(nil)
dErrsResp := &DeleteVersionsErrsResp{Errs: make([]string, totalVersions)}
for idx := range versions {
if errs[idx] != nil {
dErrsResp.Errs[idx] = StorageErr(errs[idx].Error())
dErrsResp.Errs[idx] = errs[idx].Error()
}
}
encoder.Encode(dErrsResp)
buf, _ := dErrsResp.MarshalMsg(nil)
w.Write(buf)
}
// RenameDataHandler - renames a meta object and data dir to destination.
@@ -1107,18 +1099,15 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req
return
}
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
done := keepHTTPResponseAlive(w)
resp, err := s.getStorage().VerifyFile(r.Context(), volume, filePath, fi)
done(nil)
done(err)
if err != nil {
s.writeErrorResponse(w, err)
return
}
encoder.Encode(resp)
buf, _ := resp.MarshalMsg(nil)
w.Write(buf)
}
func checkDiskFatalErrs(errs []error) error {
@@ -1243,6 +1232,24 @@ func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request)
}
}
func (s *storageRESTServer) DeleteBulkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
var req DeleteBulkReq
mr := msgpNewReader(r.Body)
defer readMsgpReaderPoolPut(mr)
if err := req.DecodeMsg(mr); err != nil {
s.writeErrorResponse(w, err)
return
}
volume := r.Form.Get(storageRESTVolume)
keepHTTPResponseAlive(w)(s.getStorage().DeleteBulk(r.Context(), volume, req.Paths...))
}
// ReadMultiple returns multiple files
func (s *storageRESTServer) ReadMultiple(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@@ -1325,6 +1332,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(h(server.StatInfoFile))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(h(server.ReadMultiple))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(h(server.CleanAbandonedDataHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteBulk).HandlerFunc(h(server.DeleteBulkHandler))
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler))
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(h(server.ReadVersionHandler))

View File

@@ -36,12 +36,13 @@ func _() {
_ = x[storageMetricReadMultiple-25]
_ = x[storageMetricDeleteAbandonedParts-26]
_ = x[storageMetricDiskInfo-27]
_ = x[storageMetricLast-28]
_ = x[storageMetricDeleteBulk-28]
_ = x[storageMetricLast-29]
}
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleDeleteAbandonedPartsDiskInfoLast"
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleDeleteAbandonedPartsDiskInfoDeleteBulkLast"
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 274, 282, 286}
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 274, 282, 292, 296}
func (i storageMetric) String() string {
if i >= storageMetric(len(_storageMetric_index)-1) {

View File

@@ -70,6 +70,7 @@ const (
storageMetricReadMultiple
storageMetricDeleteAbandonedParts
storageMetricDiskInfo
storageMetricDeleteBulk
// .... add more
@@ -499,6 +500,16 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa
})
}
func (p *xlStorageDiskIDCheck) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDeleteBulk, append([]string{volume}, paths...)...)
if err != nil {
return err
}
defer done(0, &err)
return p.storage.DeleteBulk(ctx, volume, paths...)
}
func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) {
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDelete, volume, path)
if err != nil {

View File

@@ -1079,32 +1079,37 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
return err
}
s.RLock()
legacy := s.formatLegacy
s.RUnlock()
var legacyJSON bool
buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile))
if err != nil {
if !errors.Is(err, errFileNotFound) {
return err
buf, err := xioutil.WithDeadline[[]byte](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) ([]byte, error) {
buf, _, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile))
if err != nil && !errors.Is(err, errFileNotFound) {
return nil, err
}
s.RLock()
legacy := s.formatLegacy
s.RUnlock()
if legacy {
if errors.Is(err, errFileNotFound) && legacy {
buf, _, err = s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1))
if err != nil {
return err
return nil, err
}
legacyJSON = true
}
}
if len(buf) == 0 {
if errors.Is(err, errFileNotFound) && !skipAccessChecks(volume) {
if aerr := Access(volumeDir); aerr != nil && osIsNotExist(aerr) {
return errVolumeNotFound
if len(buf) == 0 {
if errors.Is(err, errFileNotFound) && !skipAccessChecks(volume) {
if aerr := Access(volumeDir); aerr != nil && osIsNotExist(aerr) {
return nil, errVolumeNotFound
}
return nil, errFileNotFound
}
}
return errFileNotFound
return buf, nil
})
if err != nil {
return err
}
if legacyJSON {
@@ -1178,10 +1183,7 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions
errs[i] = ctx.Err()
continue
}
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
if err := w.Run(func() error { return s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...) }); err != nil {
errs[i] = err
}
errs[i] = s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...)
diskHealthCheckOK(ctx, errs[i])
}
@@ -1212,7 +1214,7 @@ func (s *xlStorage) diskAlmostFilled() bool {
return (float64(info.Free)/float64(info.Used)) < almostFilledPercent || (float64(info.FreeInodes)/float64(info.UsedInodes)) < almostFilledPercent
}
func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) (err error) {
func (s *xlStorage) moveToTrashNoDeadline(filePath string, recursive, immediatePurge bool) (err error) {
pathUUID := mustGetUUID()
targetPath := pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket, pathUUID)
@@ -1265,10 +1267,16 @@ func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool)
}
}
}
return nil
}
func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) (err error) {
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() (err error) {
return s.moveToTrashNoDeadline(filePath, recursive, immediatePurge)
})
}
// DeleteVersion - deletes FileInfo metadata for path at `xl.meta`. forceDelMarker
// will force creating a new `xl.meta` to create a new delete marker
func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) {
@@ -2417,7 +2425,41 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive, immediate
return nil
}
// DeleteFile - delete a file at path.
// DeleteBulk - delete many files in bulk to trash.
// this delete does not recursively delete empty
// parents, if you need empty parent delete support
// please use Delete() instead. This API is meant as
// an optimization for Multipart operations.
func (s *xlStorage) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
}
if !skipAccessChecks(volume) {
// Stat a volume entry.
if err = Access(volumeDir); err != nil {
return convertAccessError(err, errVolumeAccessDenied)
}
}
for _, fp := range paths {
// Following code is needed so that we retain SlashSeparator suffix if any in
// path argument.
filePath := pathJoin(volumeDir, fp)
if err = checkPathLength(filePath); err != nil {
return err
}
if err = s.moveToTrash(filePath, false, false); err != nil {
return err
}
}
return nil
}
// Delete - delete a file at path.
func (s *xlStorage) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) {
volumeDir, err := s.getVolDir(volume)
if err != nil {

View File

@@ -2,7 +2,7 @@ version: '3.7'
# Settings and configurations that are common for all containers
x-minio-common: &minio-common
image: quay.io/minio/minio:RELEASE.2024-07-26T20-48-21Z
image: quay.io/minio/minio:RELEASE.2024-07-29T22-14-52Z
command: server --console-address ":9001" http://minio{1...4}/data{1...2}
expose:
- "9000"