From 80ca120088be9950fa35467975dd9d8dc1bd4176 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 30 Jan 2024 12:43:25 -0800 Subject: [PATCH] remove checkBucketExist check entirely to avoid fan-out calls (#18917) Each Put, List, Multipart operations heavily rely on making GetBucketInfo() call to verify if bucket exists or not on a regular basis. This has a large performance cost when there are tons of servers involved. We did optimize this part by vectorizing the bucket calls, however its not enough, beyond 100 nodes and this becomes fairly visible in terms of performance. --- cmd/bitrot-streaming.go | 4 +- cmd/bitrot.go | 4 +- cmd/bitrot_test.go | 2 +- cmd/bucket-replication.go | 2 +- cmd/erasure-decode_test.go | 6 +- cmd/erasure-encode_test.go | 8 +-- cmd/erasure-heal_test.go | 4 +- cmd/erasure-healing-common_test.go | 10 ++-- cmd/erasure-healing.go | 12 ++-- cmd/erasure-healing_test.go | 56 +++++++++---------- cmd/erasure-metadata-utils.go | 8 +-- cmd/erasure-metadata.go | 4 +- cmd/erasure-multipart.go | 56 +++++++++++-------- cmd/erasure-object.go | 18 +++--- cmd/erasure-object_test.go | 18 +++--- cmd/erasure-server-pool.go | 21 +++++-- cmd/erasure-sets.go | 2 +- cmd/metacache-server-pool.go | 6 +- cmd/metacache-set.go | 17 +++++- cmd/metacache-walk.go | 9 +-- cmd/naughty-disk_test.go | 16 +++--- cmd/object-api-input-checks.go | 50 ++++++----------- cmd/object-api-listobjects_test.go | 43 +++++++------- cmd/object-api-multipart_test.go | 77 +++++++++++++++---------- cmd/object-api-putobject_test.go | 8 +-- cmd/object-api-utils_test.go | 2 +- cmd/peer-s3-client.go | 2 +- cmd/post-policy_test.go | 2 +- cmd/storage-datatypes.go | 1 + cmd/storage-datatypes_gen.go | 35 ++++++++++-- cmd/storage-interface.go | 16 +++--- cmd/storage-rest-client.go | 35 +++++++----- cmd/storage-rest-common.go | 6 +- cmd/storage-rest-server.go | 17 ++++-- cmd/storage-rest_test.go | 2 +- cmd/xl-storage-disk-id-check.go | 16 +++--- cmd/xl-storage.go | 90 +++++++++++++++++++++--------- cmd/xl-storage_test.go | 14 ++--- 38 files changed, 408 insertions(+), 291 deletions(-) diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index 055632810..7adfe109f 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -90,7 +90,7 @@ func newStreamingBitrotWriterBuffer(w io.Writer, algo BitrotAlgorithm, shardSize } // Returns streaming bitrot writer implementation. -func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.Writer { +func newStreamingBitrotWriter(disk StorageAPI, origvolume, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.Writer { r, w := io.Pipe() h := algo.New() @@ -110,7 +110,7 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i bitrotSumsTotalSize := ceilFrac(length, shardSize) * int64(h.Size()) // Size used for storing bitrot checksums. totalFileSize = bitrotSumsTotalSize + length } - r.CloseWithError(disk.CreateFile(context.TODO(), volume, filePath, totalFileSize, r)) + r.CloseWithError(disk.CreateFile(context.TODO(), origvolume, volume, filePath, totalFileSize, r)) }() return bw } diff --git a/cmd/bitrot.go b/cmd/bitrot.go index c20bf6aad..f858f1b48 100644 --- a/cmd/bitrot.go +++ b/cmd/bitrot.go @@ -102,9 +102,9 @@ func BitrotAlgorithmFromString(s string) (a BitrotAlgorithm) { return } -func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.Writer { +func newBitrotWriter(disk StorageAPI, origvolume, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.Writer { if algo == HighwayHash256S { - return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize) + return newStreamingBitrotWriter(disk, origvolume, volume, filePath, length, algo, shardSize) } return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize) } diff --git a/cmd/bitrot_test.go b/cmd/bitrot_test.go index 6d5df3ded..9f2ebde3a 100644 --- a/cmd/bitrot_test.go +++ b/cmd/bitrot_test.go @@ -36,7 +36,7 @@ func testBitrotReaderWriterAlgo(t *testing.T, bitrotAlgo BitrotAlgorithm) { disk.MakeVol(context.Background(), volume) - writer := newBitrotWriter(disk, volume, filePath, 35, bitrotAlgo, 10) + writer := newBitrotWriter(disk, "", volume, filePath, 35, bitrotAlgo, 10) _, err = writer.Write([]byte("aaaaaaaaaa")) if err != nil { diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 7602f574d..566e42323 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -3393,7 +3393,7 @@ func (p *ReplicationPool) persistToDrive(ctx context.Context, v MRFReplicateEntr for _, localDrive := range localDrives { r := newReader() - err := localDrive.CreateFile(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), -1, r) + err := localDrive.CreateFile(ctx, "", minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), -1, r) r.Close() if err == nil { break diff --git a/cmd/erasure-decode_test.go b/cmd/erasure-decode_test.go index e2c57a7c5..d1b033127 100644 --- a/cmd/erasure-decode_test.go +++ b/cmd/erasure-decode_test.go @@ -106,7 +106,7 @@ func TestErasureDecode(t *testing.T) { buffer := make([]byte, test.blocksize, 2*test.blocksize) writers := make([]io.Writer, len(disks)) for i, disk := range disks { - writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(test.data), writeAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "", "testbucket", "object", erasure.ShardFileSize(test.data), writeAlgorithm, erasure.ShardSize()) } n, err := erasure.Encode(context.Background(), bytes.NewReader(data), writers, buffer, erasure.dataBlocks+1) closeBitrotWriters(writers) @@ -228,7 +228,7 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) { if disk == nil { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(length), DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "", "testbucket", "object", erasure.ShardFileSize(length), DefaultBitrotAlgorithm, erasure.ShardSize()) } // 10000 iterations with random offsets and lengths. @@ -297,7 +297,7 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64, if disk == nil { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "", "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize()) } content := make([]byte, size) diff --git a/cmd/erasure-encode_test.go b/cmd/erasure-encode_test.go index f799e839b..c301afcce 100644 --- a/cmd/erasure-encode_test.go +++ b/cmd/erasure-encode_test.go @@ -41,7 +41,7 @@ func (a badDisk) ReadFileStream(ctx context.Context, volume, path string, offset return nil, errFaultyDisk } -func (a badDisk) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error { +func (a badDisk) CreateFile(ctx context.Context, origvolume, volume, path string, size int64, reader io.Reader) error { return errFaultyDisk } @@ -103,7 +103,7 @@ func TestErasureEncode(t *testing.T) { if disk == OfflineDisk { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "", "testbucket", "object", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize()) } n, err := erasure.Encode(context.Background(), bytes.NewReader(data[test.offset:]), writers, buffer, erasure.dataBlocks+1) closeBitrotWriters(writers) @@ -127,7 +127,7 @@ func TestErasureEncode(t *testing.T) { if disk == nil { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object2", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "", "testbucket", "object2", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize()) } for j := range disks[:test.offDisks] { switch w := writers[j].(type) { @@ -192,7 +192,7 @@ func benchmarkErasureEncode(data, parity, dataDown, parityDown int, size int64, Recursive: false, Immediate: false, }) - writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "", "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize()) } _, err := erasure.Encode(context.Background(), bytes.NewReader(content), writers, buffer, erasure.dataBlocks+1) closeBitrotWriters(writers) diff --git a/cmd/erasure-heal_test.go b/cmd/erasure-heal_test.go index baba08978..70f648f13 100644 --- a/cmd/erasure-heal_test.go +++ b/cmd/erasure-heal_test.go @@ -86,7 +86,7 @@ func TestErasureHeal(t *testing.T) { buffer := make([]byte, test.blocksize, 2*test.blocksize) writers := make([]io.Writer, len(disks)) for i, disk := range disks { - writers[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "", "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize()) } _, err = erasure.Encode(context.Background(), bytes.NewReader(data), writers, buffer, erasure.dataBlocks+1) closeBitrotWriters(writers) @@ -128,7 +128,7 @@ func TestErasureHeal(t *testing.T) { continue } os.Remove(pathJoin(disk.String(), "testbucket", "testobject")) - staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize()) + staleWriters[i] = newBitrotWriter(disk, "", "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize()) } // test case setup is complete - now call Heal() diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index d84918f23..abcb9ff3d 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -246,7 +246,7 @@ func TestListOnlineDisks(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) fi, err := getLatestFileInfo(ctx, partsMetadata, z.serverPools[0].sets[0].defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo %v", err) @@ -424,7 +424,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true, true) + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", true, true) fi, err := getLatestFileInfo(ctx, partsMetadata, z.serverPools[0].sets[0].defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo %v", err) @@ -437,7 +437,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { partsMetadata[j].ModTime = test.modTimes[j] } - if erasureDisks, err = writeUniqueFileInfo(ctx, erasureDisks, bucket, object, partsMetadata, diskCount(erasureDisks)); err != nil { + if erasureDisks, err = writeUniqueFileInfo(ctx, erasureDisks, "", bucket, object, partsMetadata, diskCount(erasureDisks)); err != nil { t.Fatal(ctx, err) } @@ -534,7 +534,7 @@ func TestDisksWithAllParts(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + _, errs := readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) readQuorum := len(erasureDisks) / 2 if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { t.Fatalf("Failed to read xl meta data %v", reducedErr) @@ -542,7 +542,7 @@ func TestDisksWithAllParts(t *testing.T) { // Test 1: Test that all disks are returned without any failures with // unmodified meta data - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) if err != nil { t.Fatalf("Failed to read xl meta data %v", err) } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 74446cc61..c1328a5bc 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -230,7 +230,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object } // Re-read when we have lock... - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true, true) + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, "", bucket, object, versionID, true, true) if isAllNotFound(errs) { err := errFileNotFound if versionID != "" { @@ -518,7 +518,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(latestMeta.Size)+32)) writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) } else { - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, + writers[i] = newBitrotWriter(disk, bucket, minioMetaTmpBucket, partPath, tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } } @@ -796,7 +796,7 @@ func statAllDirs(ctx context.Context, storageDisks []StorageAPI, bucket, prefix } index := index g.Go(func() error { - entries, err := storageDisks[index].ListDir(ctx, bucket, prefix, 1) + entries, err := storageDisks[index].ListDir(ctx, "", bucket, prefix, 1) if err != nil { return err } @@ -810,6 +810,10 @@ func statAllDirs(ctx context.Context, storageDisks []StorageAPI, bucket, prefix return g.Wait() } +func isAllVolumeNotFound(errs []error) bool { + return countErrs(errs, errVolumeNotFound) == len(errs) +} + // isAllNotFound will return if any element of the error slice is not // errFileNotFound, errFileVersionNotFound or errVolumeNotFound. // A 0 length slice will always return false. @@ -993,7 +997,7 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version // Perform quick read without lock. // This allows to quickly check if all is ok or all are missing. - _, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false, false) + _, errs := readAllFileInfo(healCtx, storageDisks, "", bucket, object, versionID, false, false) if isAllNotFound(errs) { err := errFileNotFound if versionID != "" { diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 62393ca43..5bc13b270 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -266,7 +266,7 @@ func TestHealing(t *testing.T) { } disk := er.getDisks()[0] - fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPreHeal, err := disk.ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -289,7 +289,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPostHeal, err := disk.ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -308,7 +308,7 @@ func TestHealing(t *testing.T) { // gone down when an object was replaced by a new object. fileInfoOutDated := fileInfoPreHeal fileInfoOutDated.ModTime = time.Now() - err = disk.WriteMetadata(context.Background(), bucket, object, fileInfoOutDated) + err = disk.WriteMetadata(context.Background(), "", bucket, object, fileInfoOutDated) if err != nil { t.Fatal(err) } @@ -318,7 +318,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -426,11 +426,11 @@ func TestHealingVersioned(t *testing.T) { } disk := er.getDisks()[0] - fileInfoPreHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, ReadOptions{ReadData: false, Healing: true}) + fileInfoPreHeal1, err := disk.ReadVersion(context.Background(), "", bucket, object, oi1.VersionID, ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } - fileInfoPreHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, ReadOptions{ReadData: false, Healing: true}) + fileInfoPreHeal2, err := disk.ReadVersion(context.Background(), "", bucket, object, oi2.VersionID, ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -453,11 +453,11 @@ func TestHealingVersioned(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, ReadOptions{ReadData: false, Healing: true}) + fileInfoPostHeal1, err := disk.ReadVersion(context.Background(), "", bucket, object, oi1.VersionID, ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } - fileInfoPostHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, ReadOptions{ReadData: false, Healing: true}) + fileInfoPostHeal2, err := disk.ReadVersion(context.Background(), "", bucket, object, oi2.VersionID, ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -479,7 +479,7 @@ func TestHealingVersioned(t *testing.T) { // gone down when an object was replaced by a new object. fileInfoOutDated := fileInfoPreHeal1 fileInfoOutDated.ModTime = time.Now() - err = disk.WriteMetadata(context.Background(), bucket, object, fileInfoOutDated) + err = disk.WriteMetadata(context.Background(), "", bucket, object, fileInfoOutDated) if err != nil { t.Fatal(err) } @@ -489,7 +489,7 @@ func TestHealingVersioned(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal1, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPostHeal1, err = disk.ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -499,7 +499,7 @@ func TestHealingVersioned(t *testing.T) { t.Fatal("HealObject failed") } - fileInfoPostHeal2, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPostHeal2, err = disk.ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -637,7 +637,7 @@ func TestHealingDanglingObject(t *testing.T) { // Restore... setDisks(orgDisks[:4]...) - fileInfoPreHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPreHeal, err := disks[0].ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -654,7 +654,7 @@ func TestHealingDanglingObject(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPostHeal, err := disks[0].ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -684,7 +684,7 @@ func TestHealingDanglingObject(t *testing.T) { setDisks(orgDisks[:4]...) disk := getDisk(0) - fileInfoPreHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPreHeal, err = disk.ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -702,7 +702,7 @@ func TestHealingDanglingObject(t *testing.T) { } disk = getDisk(0) - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -733,7 +733,7 @@ func TestHealingDanglingObject(t *testing.T) { setDisks(orgDisks[:4]...) disk = getDisk(0) - fileInfoPreHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPreHeal, err = disk.ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -751,7 +751,7 @@ func TestHealingDanglingObject(t *testing.T) { } disk = getDisk(0) - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), "", bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -836,7 +836,7 @@ func TestHealCorrectQuorum(t *testing.T) { er := set.sets[0] erasureDisks := er.getDisks() - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) nfi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if errors.Is(err, errFileNotFound) { continue @@ -858,12 +858,12 @@ func TestHealCorrectQuorum(t *testing.T) { t.Fatal(err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) if countErrs(errs, nil) != len(fileInfos) { t.Fatal("Expected all xl.meta healed, but partial heal detected") } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false, true) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, "", minioMetaBucket, cfgFile, "", false, true) nfi, err = getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if errors.Is(err, errFileNotFound) { continue @@ -885,7 +885,7 @@ func TestHealCorrectQuorum(t *testing.T) { t.Fatal(err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false, true) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, "", minioMetaBucket, cfgFile, "", false, true) if countErrs(errs, nil) != len(fileInfos) { t.Fatal("Expected all xl.meta healed, but partial heal detected") } @@ -970,7 +970,7 @@ func TestHealObjectCorruptedPools(t *testing.T) { t.Fatalf("Failed to heal object - %v", err) } - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -998,7 +998,7 @@ func TestHealObjectCorruptedPools(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) nfi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -1029,7 +1029,7 @@ func TestHealObjectCorruptedPools(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) nfi, err = getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -1133,7 +1133,7 @@ func TestHealObjectCorruptedXLMeta(t *testing.T) { firstDisk := erasureDisks[0] // Test 1: Remove the object backend files from the first disk. - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -1156,7 +1156,7 @@ func TestHealObjectCorruptedXLMeta(t *testing.T) { t.Errorf("Expected xl.meta file to be present but stat failed - %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) nfi1, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -1179,7 +1179,7 @@ func TestHealObjectCorruptedXLMeta(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) nfi2, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -1277,7 +1277,7 @@ func TestHealObjectCorruptedParts(t *testing.T) { firstDisk := erasureDisks[0] secondDisk := erasureDisks[1] - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, "", bucket, object, "", false, true) fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index a0268a65f..2a3021d3c 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -156,8 +156,8 @@ var readFileInfoIgnoredErrs = append(objectOpIgnoredErrs, io.EOF, // some times we would read without locks, ignore these errors ) -func readFileInfo(ctx context.Context, disk StorageAPI, bucket, object, versionID string, opts ReadOptions) (FileInfo, error) { - fi, err := disk.ReadVersion(ctx, bucket, object, versionID, opts) +func readFileInfo(ctx context.Context, disk StorageAPI, origbucket, bucket, object, versionID string, opts ReadOptions) (FileInfo, error) { + fi, err := disk.ReadVersion(ctx, origbucket, bucket, object, versionID, opts) if err != nil && !IsErr(err, readFileInfoIgnoredErrs...) { logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", @@ -170,7 +170,7 @@ func readFileInfo(ctx context.Context, disk StorageAPI, bucket, object, versionI // Reads all `xl.meta` metadata as a FileInfo slice. // Returns error slice indicating the failed metadata reads. -func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData, healing bool) ([]FileInfo, []error) { +func readAllFileInfo(ctx context.Context, disks []StorageAPI, origbucket string, bucket, object, versionID string, readData, healing bool) ([]FileInfo, []error) { metadataArray := make([]FileInfo, len(disks)) opts := ReadOptions{ @@ -186,7 +186,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve if disks[index] == nil { return errDiskNotFound } - metadataArray[index], err = readFileInfo(ctx, disks[index], bucket, object, versionID, opts) + metadataArray[index], err = readFileInfo(ctx, disks[index], origbucket, bucket, object, versionID, opts) return err }, index) } diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index fb32e414b..4ee7575d3 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -381,7 +381,7 @@ func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Tim } // writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently. -func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix string, files []FileInfo, quorum int) ([]StorageAPI, error) { +func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, origbucket, bucket, prefix string, files []FileInfo, quorum int) ([]StorageAPI, error) { g := errgroup.WithNErrs(len(disks)) // Start writing `xl.meta` to all disks in parallel. @@ -395,7 +395,7 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix fi := files[index] fi.Erasure.Index = index + 1 if fi.IsValid() { - return disks[index].WriteMetadata(ctx, bucket, prefix, fi) + return disks[index].WriteMetadata(ctx, origbucket, bucket, prefix, fi) } return errCorruptedFormat }, index) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 25cbd359a..af5c035aa 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -61,7 +61,7 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string { // checkUploadIDExists - verify if a given uploadID exists and is valid. func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string, write bool) (fi FileInfo, metArr []FileInfo, err error) { defer func() { - if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) { + if errors.Is(err, errFileNotFound) { err = errUploadIDNotFound } }() @@ -71,7 +71,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, minioMetaMultipartBucket, uploadIDPath, "", false, false) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) @@ -87,15 +87,14 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object // List all online disks. _, modTime, etag := listOnlineDisks(storageDisks, partsMetadata, errs, quorum) + var reducedErr error if write { - reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) - if errors.Is(reducedErr, errErasureWriteQuorum) { - return fi, nil, reducedErr - } + reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) } else { - if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { - return fi, nil, reducedErr - } + reducedErr = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum) + } + if reducedErr != nil { + return fi, nil, reducedErr } // Pick one from the first valid metadata. @@ -200,7 +199,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto readDirFn(pathJoin(diskPath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error { readDirFn(pathJoin(diskPath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error { uploadIDPath := pathJoin(shaDir, uploadIDDir) - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{}) + fi, err := disk.ReadVersion(ctx, "", minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{}) if err != nil { return nil } @@ -281,15 +280,14 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec if !disk.IsOnline() { continue } - uploadIDs, err = disk.ListDir(ctx, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1) + uploadIDs, err = disk.ListDir(ctx, bucket, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1) if err != nil { if errors.Is(err, errDiskNotFound) { continue } - if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) { + if errors.Is(err, errFileNotFound) { return result, nil } - logger.LogIf(ctx, err) return result, toObjectErr(err, bucket, object) } break @@ -486,8 +484,8 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, uploadIDPath := er.getUploadIDDir(bucket, object, uploadUUID) // Write updated `xl.meta` to all disks. - if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { - return nil, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) + if _, err := writeUniqueFileInfo(ctx, onlineDisks, bucket, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { + return nil, toObjectErr(err, bucket, object) } return &NewMultipartUploadResult{ UploadID: uploadID, @@ -582,6 +580,13 @@ func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { auditObjectErasureSet(ctx, object, &er) + data := r.Reader + // Validate input data size and it can never be less than zero. + if data.Size() < -1 { + logger.LogIf(ctx, errInvalidArgument, logger.Application) + return pi, toObjectErr(errInvalidArgument) + } + // Read lock for upload id. // Only held while reading the upload metadata. uploadIDRLock := er.NewNSLock(bucket, pathJoin(object, uploadID)) @@ -596,16 +601,12 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Validates if upload ID exists. fi, _, err := er.checkUploadIDExists(rctx, bucket, object, uploadID, true) if err != nil { + if errors.Is(err, errVolumeNotFound) { + return pi, toObjectErr(err, bucket) + } return pi, toObjectErr(err, bucket, object, uploadID) } - data := r.Reader - // Validate input data size and it can never be less than zero. - if data.Size() < -1 { - logger.LogIf(rctx, errInvalidArgument, logger.Application) - return pi, toObjectErr(errInvalidArgument) - } - // Write lock for this part ID, only hold it if we are planning to read from the // streamto avoid any concurrent updates. // @@ -681,7 +682,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo if disk == nil { continue } - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, bucket, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) } toEncode := io.Reader(data) @@ -794,6 +795,9 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u fi, _, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, false) if err != nil { + if errors.Is(err, errVolumeNotFound) { + return result, toObjectErr(err, bucket) + } return result, toObjectErr(err, bucket, object, uploadID) } @@ -979,6 +983,9 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str fi, partsMetadata, err := er.checkUploadIDExists(ctx, bucket, object, uploadID, true) if err != nil { + if errors.Is(err, errVolumeNotFound) { + return oi, toObjectErr(err, bucket) + } return oi, toObjectErr(err, bucket, object, uploadID) } @@ -1337,6 +1344,9 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec // Validates if upload ID exists. if _, _, err = er.checkUploadIDExists(ctx, bucket, object, uploadID, false); err != nil { + if errors.Is(err, errVolumeNotFound) { + return toObjectErr(err, bucket) + } return toObjectErr(err, bucket, object, uploadID) } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 5cb8f52a3..492f58ba0 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -91,7 +91,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d // Read metadata associated with the object from all disks. if srcOpts.VersionID != "" { - metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true, false) + metaArr, errs = readAllFileInfo(ctx, storageDisks, "", srcBucket, srcObject, srcOpts.VersionID, true, false) } else { metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true, false, true) } @@ -179,7 +179,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d } // Write unique `xl.meta` for each disk. - if _, err = writeUniqueFileInfo(ctx, onlineDisks, srcBucket, srcObject, metaArr, writeQuorum); err != nil { + if _, err = writeUniqueFileInfo(ctx, onlineDisks, "", srcBucket, srcObject, metaArr, writeQuorum); err != nil { return oi, toObjectErr(err, srcBucket, srcObject) } @@ -791,7 +791,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s if opts.VersionID != "" { // Read a specific version ID - fi, err = readFileInfo(ctx, disk, bucket, object, opts.VersionID, ropts) + fi, err = readFileInfo(ctx, disk, "", bucket, object, opts.VersionID, ropts) } else { // Read the latest version rfi, err = readRawFileInfo(ctx, disk, bucket, object, readData) @@ -1219,7 +1219,7 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r * } } - if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaBucket, key, partsMetadata, writeQuorum); err != nil { + if _, err = writeUniqueFileInfo(ctx, onlineDisks, "", minioMetaBucket, key, partsMetadata, writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key) } @@ -1449,7 +1449,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st continue } - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, shardFileSize, DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, bucket, minioMetaTmpBucket, tempErasureObj, shardFileSize, DefaultBitrotAlgorithm, erasure.ShardSize()) } toEncode := io.Reader(data) @@ -1469,7 +1469,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st n, erasureErr := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum) closeBitrotWriters(writers) if erasureErr != nil { - return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) + return ObjectInfo{}, toObjectErr(erasureErr, bucket, object) } // Should return IncompleteBody{} error when reader has fewer bytes @@ -2057,7 +2057,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s // Read metadata associated with the object from all disks. if opts.VersionID != "" { - metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false, false) + metaArr, errs = readAllFileInfo(ctx, disks, "", bucket, object, opts.VersionID, false, false) } else { metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false, true) } @@ -2130,7 +2130,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin // Read metadata associated with the object from all disks. if opts.VersionID != "" { - metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false, false) + metaArr, errs = readAllFileInfo(ctx, disks, "", bucket, object, opts.VersionID, false, false) } else { metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false, true) } @@ -2458,7 +2458,7 @@ func (er erasureObjects) DecomTieredObject(ctx context.Context, bucket, object s var onlineDisks []StorageAPI onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi) - if _, err := writeUniqueFileInfo(ctx, onlineDisks, bucket, object, partsMetadata, writeQuorum); err != nil { + if _, err := writeUniqueFileInfo(ctx, onlineDisks, "", bucket, object, partsMetadata, writeQuorum); err != nil { return toObjectErr(err, bucket, object) } diff --git a/cmd/erasure-object_test.go b/cmd/erasure-object_test.go index 617ae534e..a30803f85 100644 --- a/cmd/erasure-object_test.go +++ b/cmd/erasure-object_test.go @@ -521,7 +521,7 @@ func TestGetObjectNoQuorum(t *testing.T) { } for _, disk := range xl.getDisks() { - files, _ := disk.ListDir(ctx, bucket, object, -1) + files, _ := disk.ListDir(ctx, "", bucket, object, -1) for _, file := range files { if file != "xl.meta" { disk.Delete(ctx, bucket, pathJoin(object, file), DeleteOptions{ @@ -625,7 +625,7 @@ func TestHeadObjectNoQuorum(t *testing.T) { t.Fatal(err) } for _, disk := range xl.getDisks() { - files, _ := disk.ListDir(ctx, bucket, object, -1) + files, _ := disk.ListDir(ctx, "", bucket, object, -1) for _, file := range files { if file != "xl.meta" { disk.Delete(ctx, bucket, pathJoin(object, file), DeleteOptions{ @@ -908,7 +908,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false, false) + parts1, errs1 := readAllFileInfo(ctx, erasureDisks, "", bucket, object1, "", false, false) parts1SC := globalStorageClass // Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class @@ -920,7 +920,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false, false) + parts2, errs2 := readAllFileInfo(ctx, erasureDisks, "", bucket, object2, "", false, false) parts2SC := globalStorageClass // Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class @@ -932,7 +932,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false, false) + parts3, errs3 := readAllFileInfo(ctx, erasureDisks, "", bucket, object3, "", false, false) parts3SC := globalStorageClass // Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class @@ -950,7 +950,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false, false) + parts4, errs4 := readAllFileInfo(ctx, erasureDisks, "", bucket, object4, "", false, false) parts4SC := storageclass.Config{ Standard: storageclass.StorageClass{ Parity: 6, @@ -973,7 +973,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false, false) + parts5, errs5 := readAllFileInfo(ctx, erasureDisks, "", bucket, object5, "", false, false) parts5SC := globalStorageClass // Object for test case 6 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting Standard Storage Class @@ -994,7 +994,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false, false) + parts6, errs6 := readAllFileInfo(ctx, erasureDisks, "", bucket, object6, "", false, false) parts6SC := storageclass.Config{ RRS: storageclass.StorageClass{ Parity: 2, @@ -1017,7 +1017,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false, false) + parts7, errs7 := readAllFileInfo(ctx, erasureDisks, "", bucket, object7, "", false, false) parts7SC := storageclass.Config{ Standard: storageclass.StorageClass{ Parity: 5, diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 44d0c5ca9..65e2fc67c 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1312,7 +1312,10 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre merged, err := z.listPath(ctx, &opts) if err != nil && err != io.EOF { - return loi, err + if !isErrBucketNotFound(err) { + logger.LogOnceIf(ctx, err, "erasure-list-objects-path-"+bucket) + } + return loi, toObjectErr(err, bucket) } defer merged.truncate(0) // Release when returning @@ -1461,6 +1464,12 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma } return loi, nil } + if isErrBucketNotFound(err) { + return loi, err + } + if contextCanceled(ctx) { + return ListObjectsInfo{}, ctx.Err() + } } if len(prefix) > 0 && maxKeys == 1 && marker == "" { @@ -1486,7 +1495,9 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma loi.Objects = append(loi.Objects, objInfo) return loi, nil } - + if isErrBucketNotFound(err) { + return ListObjectsInfo{}, err + } if contextCanceled(ctx) { return ListObjectsInfo{}, ctx.Err() } @@ -1497,7 +1508,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma if !isErrBucketNotFound(err) { logger.LogOnceIf(ctx, err, "erasure-list-objects-path-"+bucket) } - return loi, err + return loi, toObjectErr(err, bucket) } merged.forwardPast(opts.Marker) @@ -1542,7 +1553,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma } func (z *erasureServerPools) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { - if err := checkListMultipartArgs(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, z); err != nil { + if err := checkListMultipartArgs(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter); err != nil { return ListMultipartsInfo{}, err } @@ -1961,7 +1972,7 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts // error walker returns error. Optionally if context.Done() is received // then Walk() stops the walker. func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error { - if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil { + if err := checkListObjsArgs(ctx, bucket, prefix, ""); err != nil { // Upon error close the channel. xioutil.SafeClose(results) return err diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 6da3aa512..0409531ec 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -733,7 +733,7 @@ func listDeletedBuckets(ctx context.Context, storageDisks []StorageAPI, delBucke // we ignore disk not found errors return nil } - volsInfo, err := storageDisks[index].ListDir(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix), -1) + volsInfo, err := storageDisks[index].ListDir(ctx, "", minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix), -1) if err != nil { if errors.Is(err, errFileNotFound) { return nil diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 99476293b..186239c9c 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -58,9 +58,10 @@ func renameAllBucketMetacache(epPath string) error { // Other important fields are Limit, Marker. // List ID always derived from the Marker. func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) { - if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker, z); err != nil { + if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker); err != nil { return entries, err } + // Marker points to before the prefix, just ignore it. if o.Marker < o.Prefix { o.Marker = "" @@ -313,6 +314,9 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions, } if isAllNotFound(errs) { + if isAllVolumeNotFound(errs) { + return errVolumeNotFound + } return nil } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index a77e9c011..8ef3f8f3f 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -427,7 +427,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt if !disk.IsOnline() { continue } - _, err := disk.ReadVersion(ctx, minioMetaBucket, + _, err := disk.ReadVersion(ctx, "", minioMetaBucket, o.objectPath(0), "", ReadOptions{}) if err != nil { time.Sleep(retryDelay250) @@ -504,7 +504,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt if !disk.IsOnline() { continue } - _, err := disk.ReadVersion(ctx, minioMetaBucket, + _, err := disk.ReadVersion(ctx, "", minioMetaBucket, o.objectPath(partN), "", ReadOptions{}) if err != nil { time.Sleep(retryDelay250) @@ -1057,7 +1057,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { for { // Get the top entry from each var current metaCacheEntry - var atEOF, fnf, hasErr, agree int + var atEOF, fnf, vnf, hasErr, agree int for i := range topEntries { topEntries[i] = metaCacheEntry{} } @@ -1083,6 +1083,11 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { errDiskNotFound.Error(): atEOF++ fnf++ + // This is a special case, to handle bucket does + // not exist situations. + if errors.Is(err, errVolumeNotFound) { + vnf++ + } continue } hasErr++ @@ -1140,6 +1145,10 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { return errors.New(strings.Join(combinedErr, ", ")) } + if vnf == len(readers) { + return errVolumeNotFound + } + // Break if all at EOF or error. if atEOF+hasErr == len(readers) { if hasErr > 0 && opts.finished != nil { @@ -1147,9 +1156,11 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { } break } + if fnf == len(readers) { return errFileNotFound } + if agree == len(readers) { // Everybody agreed for _, r := range readers { diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 174f05e89..a23847f3f 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -73,12 +73,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if !skipAccessChecks(opts.Bucket) { // Stat a volume entry. if err = Access(volumeDir); err != nil { - if osIsNotExist(err) { - return errVolumeNotFound - } else if isSysErrIO(err) { - return errFaultyDisk - } - return err + return convertAccessError(err, errVolumeAccessDenied) } } @@ -169,7 +164,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if s.walkMu != nil { s.walkMu.Lock() } - entries, err := s.ListDir(ctx, opts.Bucket, current, -1) + entries, err := s.ListDir(ctx, "", opts.Bucket, current, -1) if s.walkMu != nil { s.walkMu.Unlock() } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index ca791ae3a..e36ae82ce 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -172,11 +172,11 @@ func (d *naughtyDisk) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Wr return d.disk.WalkDir(ctx, opts, wr) } -func (d *naughtyDisk) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) { +func (d *naughtyDisk) ListDir(ctx context.Context, origvolume, volume, dirPath string, count int) (entries []string, err error) { if err := d.calcError(); err != nil { return []string{}, err } - return d.disk.ListDir(ctx, volume, dirPath, count) + return d.disk.ListDir(ctx, origvolume, volume, dirPath, count) } func (d *naughtyDisk) ReadFile(ctx context.Context, volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) { @@ -193,11 +193,11 @@ func (d *naughtyDisk) ReadFileStream(ctx context.Context, volume, path string, o return d.disk.ReadFileStream(ctx, volume, path, offset, length) } -func (d *naughtyDisk) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error { +func (d *naughtyDisk) CreateFile(ctx context.Context, origvolume, volume, path string, size int64, reader io.Reader) error { if err := d.calcError(); err != nil { return err } - return d.disk.CreateFile(ctx, volume, path, size, reader) + return d.disk.CreateFile(ctx, origvolume, volume, path, size, reader) } func (d *naughtyDisk) AppendFile(ctx context.Context, volume string, path string, buf []byte) error { @@ -246,11 +246,11 @@ func (d *naughtyDisk) DeleteVersions(ctx context.Context, volume string, version return d.disk.DeleteVersions(ctx, volume, versions, opts) } -func (d *naughtyDisk) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { +func (d *naughtyDisk) WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) (err error) { if err := d.calcError(); err != nil { return err } - return d.disk.WriteMetadata(ctx, volume, path, fi) + return d.disk.WriteMetadata(ctx, origvolume, volume, path, fi) } func (d *naughtyDisk) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) (err error) { @@ -267,11 +267,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi return d.disk.DeleteVersion(ctx, volume, path, fi, forceDelMarker, opts) } -func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { +func (d *naughtyDisk) ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { if err := d.calcError(); err != nil { return FileInfo{}, err } - return d.disk.ReadVersion(ctx, volume, path, versionID, opts) + return d.disk.ReadVersion(ctx, origvolume, volume, path, versionID, opts) } func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) { diff --git a/cmd/object-api-input-checks.go b/cmd/object-api-input-checks.go index a2029bfd9..f13742cf1 100644 --- a/cmd/object-api-input-checks.go +++ b/cmd/object-api-input-checks.go @@ -40,7 +40,7 @@ func checkDelObjArgs(ctx context.Context, bucket, object string) error { // Checks bucket and object name validity, returns nil if both are valid. func checkBucketAndObjectNames(ctx context.Context, bucket, object string) error { // Verify if bucket is valid. - if !isMinioMetaBucketName(bucket) && s3utils.CheckValidBucketName(bucket) != nil { + if !isMinioMetaBucketName(bucket) && s3utils.CheckValidBucketNameStrict(bucket) != nil { return BucketNameInvalid{Bucket: bucket} } // Verify if object is valid. @@ -58,15 +58,12 @@ func checkBucketAndObjectNames(ctx context.Context, bucket, object string) error } // Checks for all ListObjects arguments validity. -func checkListObjsArgs(ctx context.Context, bucket, prefix, marker string, obj getBucketInfoI) error { - // Verify if bucket exists before validating object name. - // This is done on purpose since the order of errors is - // important here bucket does not exist error should - // happen before we return an error for invalid object name. - // FIXME: should be moved to handler layer. - if err := checkBucketExist(ctx, bucket, obj); err != nil { - return err +func checkListObjsArgs(ctx context.Context, bucket, prefix, marker string) error { + // Verify if bucket is valid. + if !isMinioMetaBucketName(bucket) && s3utils.CheckValidBucketNameStrict(bucket) != nil { + return BucketNameInvalid{Bucket: bucket} } + // Validates object prefix validity after bucket exists. if !IsValidObjectPrefix(prefix) { logger.LogIf(ctx, ObjectNameInvalid{ @@ -82,8 +79,8 @@ func checkListObjsArgs(ctx context.Context, bucket, prefix, marker string, obj g } // Checks for all ListMultipartUploads arguments validity. -func checkListMultipartArgs(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, obj ObjectLayer) error { - if err := checkListObjsArgs(ctx, bucket, prefix, keyMarker, obj); err != nil { +func checkListMultipartArgs(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string) error { + if err := checkListObjsArgs(ctx, bucket, prefix, keyMarker); err != nil { return err } if uploadIDMarker != "" { @@ -143,14 +140,11 @@ func checkAbortMultipartArgs(ctx context.Context, bucket, object, uploadID strin return checkMultipartObjectArgs(ctx, bucket, object, uploadID, obj) } -// Checks Object arguments validity, also validates if bucket exists. +// Checks Object arguments validity. func checkObjectArgs(ctx context.Context, bucket, object string, obj ObjectLayer) error { - // Verify if bucket exists before validating object name. - // This is done on purpose since the order of errors is - // important here bucket does not exist error should - // happen before we return an error for invalid object name. - if err := checkBucketExist(ctx, bucket, obj); err != nil { - return err + // Verify if bucket is valid. + if !isMinioMetaBucketName(bucket) && s3utils.CheckValidBucketNameStrict(bucket) != nil { + return BucketNameInvalid{Bucket: bucket} } if err := checkObjectNameForLengthAndSlash(bucket, object); err != nil { @@ -168,8 +162,13 @@ func checkObjectArgs(ctx context.Context, bucket, object string, obj ObjectLayer return nil } -// Checks for PutObject arguments validity, also validates if bucket exists. +// Checks for PutObject arguments validity. func checkPutObjectArgs(ctx context.Context, bucket, object string) error { + // Verify if bucket is valid. + if !isMinioMetaBucketName(bucket) && s3utils.CheckValidBucketNameStrict(bucket) != nil { + return BucketNameInvalid{Bucket: bucket} + } + if err := checkObjectNameForLengthAndSlash(bucket, object); err != nil { return err } @@ -182,16 +181,3 @@ func checkPutObjectArgs(ctx context.Context, bucket, object string) error { } return nil } - -type getBucketInfoI interface { - GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (bucketInfo BucketInfo, err error) -} - -// Checks whether bucket exists and returns appropriate error if not. -func checkBucketExist(ctx context.Context, bucket string, obj getBucketInfoI) error { - _, err := obj.GetBucketInfo(ctx, bucket, BucketOptions{}) - if err != nil { - return err - } - return nil -} diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index de08684d2..28fd824f9 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -155,16 +155,21 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te for i, testCase := range testCases { testCase := testCase t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) { - t.Log("ListObjects, bucket:", testCase.bucketName, "prefix:", - testCase.prefix, "marker:", testCase.marker, "delimiter:", - testCase.delimiter, "maxkeys:", testCase.maxKeys) var err error var resultL ListObjectsInfo var resultV ListObjectVersionsInfo if testCase.versioned { + t.Log("ListObjectVersions, bucket:", testCase.bucketName, "prefix:", + testCase.prefix, "marker:", testCase.marker, "delimiter:", + testCase.delimiter, "maxkeys:", testCase.maxKeys) + resultV, err = obj.ListObjectVersions(context.Background(), testCase.bucketName, testCase.prefix, testCase.marker, "", testCase.delimiter, testCase.maxKeys) } else { + t.Log("ListObjects, bucket:", testCase.bucketName, "prefix:", + testCase.prefix, "marker:", testCase.marker, "delimiter:", + testCase.delimiter, "maxkeys:", testCase.maxKeys) + resultL, err = obj.ListObjects(context.Background(), testCase.bucketName, testCase.prefix, testCase.marker, testCase.delimiter, testCase.maxKeys) } @@ -819,16 +824,14 @@ func _testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler, v shouldPass bool }{ // Test cases with invalid bucket names ( Test number 1-4 ). - {".test", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: ".test"}, false}, - {"Test", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "Test"}, false}, - {"---", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "---"}, false}, - {"ad", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "ad"}, false}, - // Using an existing file for bucket name, but its not a directory (5). - {"simple-file.txt", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "simple-file.txt"}, false}, + {".test", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: ".test"}, false}, + {"Test", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: "Test"}, false}, + {"---", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: "---"}, false}, + {"ad", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: "ad"}, false}, // Valid bucket names, but they do not exist (6-8). - {"volatile-bucket-1", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false}, - {"volatile-bucket-2", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false}, - {"volatile-bucket-3", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-3"}, false}, + {"volatile-bucket-1", "", "", "", 1000, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false}, + {"volatile-bucket-2", "", "", "", 1000, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false}, + {"volatile-bucket-3", "", "", "", 1000, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-3"}, false}, // If marker is *after* the last possible object from the prefix it should return an empty list. {"test-bucket-list-object", "Asia", "europe-object", "", 0, ListObjectsInfo{}, nil, true}, // If the marker is *before* the first possible object from the prefix it should return the first object. @@ -1564,16 +1567,14 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand shouldPass bool }{ // Test cases with invalid bucket names ( Test number 1-4). - {".test", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: ".test"}, false}, - {"Test", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "Test"}, false}, - {"---", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "---"}, false}, - {"ad", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "ad"}, false}, - // Using an existing file for bucket name, but its not a directory (5). - {"simple-file.txt", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "simple-file.txt"}, false}, + {".test", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: ".test"}, false}, + {"Test", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: "Test"}, false}, + {"---", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: "---"}, false}, + {"ad", "", "", "", 0, ListObjectsInfo{}, BucketNameInvalid{Bucket: "ad"}, false}, // Valid bucket names, but they do not exist (6-8). - {"volatile-bucket-1", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false}, - {"volatile-bucket-2", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false}, - {"volatile-bucket-3", "", "", "", 0, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-3"}, false}, + {"volatile-bucket-1", "", "", "", 1000, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false}, + {"volatile-bucket-2", "", "", "", 1000, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false}, + {"volatile-bucket-3", "", "", "", 1000, ListObjectsInfo{}, BucketNotFound{Bucket: "volatile-bucket-3"}, false}, // If marker is *after* the last possible object from the prefix it should return an empty list. {"test-bucket-list-object", "Asia", "europe-object", "", 0, ListObjectsInfo{}, nil, true}, // Setting a non-existing directory to be prefix (10-11). diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index 802a0fbd5..e8c104c46 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -113,7 +113,7 @@ func testObjectAbortMultipartUpload(obj ObjectLayer, instanceType string, t Test uploadID string expectedErrType error }{ - {"--", object, uploadID, BucketNotFound{}}, + {"--", object, uploadID, BucketNameInvalid{}}, {"foo", object, uploadID, BucketNotFound{}}, {bucket, object, "foo-foo", InvalidUploadID{}}, {bucket, object, uploadID, nil}, @@ -194,6 +194,19 @@ func testObjectAPIPutObjectPart(obj ObjectLayer, instanceType string, t TestErrH // Failed to create NewMultipartUpload, abort. t.Fatalf("%s : %s", instanceType, err.Error()) } + + err = obj.MakeBucket(context.Background(), "abc", MakeBucketOptions{}) + if err != nil { + // Failed to create newbucket, abort. + t.Fatalf("%s : %s", instanceType, err.Error()) + } + + resN, err := obj.NewMultipartUpload(context.Background(), "abc", "def", opts) + if err != nil { + // Failed to create NewMultipartUpload, abort. + t.Fatalf("%s : %s", instanceType, err.Error()) + } + uploadID := res.UploadID // Creating a dummy bucket for tests. err = obj.MakeBucket(context.Background(), "unused-bucket", MakeBucketOptions{}) @@ -202,6 +215,8 @@ func testObjectAPIPutObjectPart(obj ObjectLayer, instanceType string, t TestErrH t.Fatalf("%s : %s", instanceType, err.Error()) } + obj.DeleteBucket(context.Background(), "abc", DeleteBucketOptions{}) + // Collection of non-exhaustive PutObjectPart test cases, valid errors // and success responses. testCases := []struct { @@ -221,19 +236,19 @@ func testObjectAPIPutObjectPart(obj ObjectLayer, instanceType string, t TestErrH }{ // Test case 1-4. // Cases with invalid bucket name. - {bucketName: ".test", objName: "obj", PartID: 1, expectedError: fmt.Errorf("%s", "Bucket not found: .test")}, - {bucketName: "------", objName: "obj", PartID: 1, expectedError: fmt.Errorf("%s", "Bucket not found: ------")}, + {bucketName: ".test", objName: "obj", PartID: 1, expectedError: fmt.Errorf("%s", "Bucket name invalid: .test")}, + {bucketName: "------", objName: "obj", PartID: 1, expectedError: fmt.Errorf("%s", "Bucket name invalid: ------")}, { bucketName: "$this-is-not-valid-too", objName: "obj", PartID: 1, - expectedError: fmt.Errorf("%s", "Bucket not found: $this-is-not-valid-too"), + expectedError: fmt.Errorf("%s", "Bucket name invalid: $this-is-not-valid-too"), }, - {bucketName: "a", objName: "obj", PartID: 1, expectedError: fmt.Errorf("%s", "Bucket not found: a")}, + {bucketName: "a", objName: "obj", PartID: 1, expectedError: fmt.Errorf("%s", "Bucket name invalid: a")}, // Test case - 5. // Case with invalid object names. {bucketName: bucket, PartID: 1, expectedError: fmt.Errorf("%s", "Object name invalid: minio-bucket/")}, // Test case - 6. // Valid object and bucket names but non-existent bucket. - {bucketName: "abc", objName: "def", PartID: 1, expectedError: fmt.Errorf("%s", "Bucket not found: abc")}, + {bucketName: "abc", objName: "def", uploadID: resN.UploadID, PartID: 1, expectedError: fmt.Errorf("%s", "Bucket not found: abc")}, // Test Case - 7. // Existing bucket, but using a bucket on which NewMultipartUpload is not Initiated. {bucketName: "unused-bucket", objName: "def", uploadID: "xyz", PartID: 1, expectedError: fmt.Errorf("%s", "Invalid upload id xyz")}, @@ -1041,10 +1056,10 @@ func testListMultipartUploads(obj ObjectLayer, instanceType string, t TestErrHan shouldPass bool }{ // Test cases with invalid bucket names ( Test number 1-4 ). - {".test", "", "", "", "", 0, ListMultipartsInfo{}, BucketNotFound{Bucket: ".test"}, false}, - {"Test", "", "", "", "", 0, ListMultipartsInfo{}, BucketNotFound{Bucket: "Test"}, false}, - {"---", "", "", "", "", 0, ListMultipartsInfo{}, BucketNotFound{Bucket: "---"}, false}, - {"ad", "", "", "", "", 0, ListMultipartsInfo{}, BucketNotFound{Bucket: "ad"}, false}, + {".test", "", "", "", "", 0, ListMultipartsInfo{}, BucketNameInvalid{Bucket: ".test"}, false}, + {"Test", "", "", "", "", 0, ListMultipartsInfo{}, BucketNameInvalid{Bucket: "Test"}, false}, + {"---", "", "", "", "", 0, ListMultipartsInfo{}, BucketNameInvalid{Bucket: "---"}, false}, + {"ad", "", "", "", "", 0, ListMultipartsInfo{}, BucketNameInvalid{Bucket: "ad"}, false}, // Valid bucket names, but they do not exist (Test number 5-7). {"volatile-bucket-1", "", "", "", "", 0, ListMultipartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false}, {"volatile-bucket-2", "", "", "", "", 0, ListMultipartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false}, @@ -1357,15 +1372,15 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks shouldPass bool }{ // Test cases with invalid bucket names (Test number 1-4). - {".test", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: ".test"}, false}, - {"Test", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "Test"}, false}, - {"---", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "---"}, false}, - {"ad", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "ad"}, false}, + {".test", "", "", 0, 0, ListPartsInfo{}, BucketNameInvalid{Bucket: ".test"}, false}, + {"Test", "", "", 0, 0, ListPartsInfo{}, BucketNameInvalid{Bucket: "Test"}, false}, + {"---", "", "", 0, 0, ListPartsInfo{}, BucketNameInvalid{Bucket: "---"}, false}, + {"ad", "", "", 0, 0, ListPartsInfo{}, BucketNameInvalid{Bucket: "ad"}, false}, // Test cases for listing uploadID with single part. // Valid bucket names, but they do not exist (Test number 5-7). - {"volatile-bucket-1", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false}, - {"volatile-bucket-2", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false}, - {"volatile-bucket-3", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-3"}, false}, + {"volatile-bucket-1", "test1", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false}, + {"volatile-bucket-2", "test1", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false}, + {"volatile-bucket-3", "test1", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-3"}, false}, // Test case for Asserting for invalid objectName (Test number 8). {bucketNames[0], "", "", 0, 0, ListPartsInfo{}, ObjectNameInvalid{Bucket: bucketNames[0]}, false}, // Asserting for Invalid UploadID (Test number 9). @@ -1594,15 +1609,15 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler) shouldPass bool }{ // Test cases with invalid bucket names (Test number 1-4). - {".test", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: ".test"}, false}, - {"Test", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "Test"}, false}, - {"---", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "---"}, false}, - {"ad", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "ad"}, false}, + {".test", "", "", 0, 0, ListPartsInfo{}, BucketNameInvalid{Bucket: ".test"}, false}, + {"Test", "", "", 0, 0, ListPartsInfo{}, BucketNameInvalid{Bucket: "Test"}, false}, + {"---", "", "", 0, 0, ListPartsInfo{}, BucketNameInvalid{Bucket: "---"}, false}, + {"ad", "", "", 0, 0, ListPartsInfo{}, BucketNameInvalid{Bucket: "ad"}, false}, // Test cases for listing uploadID with single part. // Valid bucket names, but they do not exist (Test number 5-7). - {"volatile-bucket-1", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false}, - {"volatile-bucket-2", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false}, - {"volatile-bucket-3", "", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-3"}, false}, + {"volatile-bucket-1", "test1", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-1"}, false}, + {"volatile-bucket-2", "test1", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-2"}, false}, + {"volatile-bucket-3", "test1", "", 0, 0, ListPartsInfo{}, BucketNotFound{Bucket: "volatile-bucket-3"}, false}, // Test case for Asserting for invalid objectName (Test number 8). {bucketNames[0], "", "", 0, 0, ListPartsInfo{}, ObjectNameInvalid{Bucket: bucketNames[0]}, false}, // Asserting for Invalid UploadID (Test number 9). @@ -1809,15 +1824,15 @@ func testObjectCompleteMultipartUpload(obj ObjectLayer, instanceType string, t T shouldPass bool }{ // Test cases with invalid bucket names (Test number 1-4). - {".test", "", "", []CompletePart{}, "", BucketNotFound{Bucket: ".test"}, false}, - {"Test", "", "", []CompletePart{}, "", BucketNotFound{Bucket: "Test"}, false}, - {"---", "", "", []CompletePart{}, "", BucketNotFound{Bucket: "---"}, false}, - {"ad", "", "", []CompletePart{}, "", BucketNotFound{Bucket: "ad"}, false}, + {".test", "", "", []CompletePart{}, "", BucketNameInvalid{Bucket: ".test"}, false}, + {"Test", "", "", []CompletePart{}, "", BucketNameInvalid{Bucket: "Test"}, false}, + {"---", "", "", []CompletePart{}, "", BucketNameInvalid{Bucket: "---"}, false}, + {"ad", "", "", []CompletePart{}, "", BucketNameInvalid{Bucket: "ad"}, false}, // Test cases for listing uploadID with single part. // Valid bucket names, but they do not exist (Test number 5-7). - {"volatile-bucket-1", "", "", []CompletePart{}, "", BucketNotFound{Bucket: "volatile-bucket-1"}, false}, - {"volatile-bucket-2", "", "", []CompletePart{}, "", BucketNotFound{Bucket: "volatile-bucket-2"}, false}, - {"volatile-bucket-3", "", "", []CompletePart{}, "", BucketNotFound{Bucket: "volatile-bucket-3"}, false}, + {"volatile-bucket-1", "test1", "", []CompletePart{}, "", BucketNotFound{Bucket: "volatile-bucket-1"}, false}, + {"volatile-bucket-2", "test1", "", []CompletePart{}, "", BucketNotFound{Bucket: "volatile-bucket-2"}, false}, + {"volatile-bucket-3", "test1", "", []CompletePart{}, "", BucketNotFound{Bucket: "volatile-bucket-3"}, false}, // Test case for Asserting for invalid objectName (Test number 8). {bucketNames[0], "", "", []CompletePart{}, "", ObjectNameInvalid{Bucket: bucketNames[0]}, false}, // Asserting for Invalid UploadID (Test number 9). diff --git a/cmd/object-api-putobject_test.go b/cmd/object-api-putobject_test.go index 8be70477b..127af1530 100644 --- a/cmd/object-api-putobject_test.go +++ b/cmd/object-api-putobject_test.go @@ -81,13 +81,13 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl expectedError error }{ // Cases with invalid bucket name. - 0: {bucketName: ".test", objName: "obj", inputData: []byte(""), expectedError: BucketNotFound{Bucket: ".test"}}, - 1: {bucketName: "------", objName: "obj", inputData: []byte(""), expectedError: BucketNotFound{Bucket: "------"}}, + 0: {bucketName: ".test", objName: "obj", inputData: []byte(""), expectedError: BucketNameInvalid{Bucket: ".test"}}, + 1: {bucketName: "------", objName: "obj", inputData: []byte(""), expectedError: BucketNameInvalid{Bucket: "------"}}, 2: { bucketName: "$this-is-not-valid-too", objName: "obj", inputData: []byte(""), - expectedError: BucketNotFound{Bucket: "$this-is-not-valid-too"}, + expectedError: BucketNameInvalid{Bucket: "$this-is-not-valid-too"}, }, - 3: {bucketName: "a", objName: "obj", inputData: []byte(""), expectedError: BucketNotFound{Bucket: "a"}}, + 3: {bucketName: "a", objName: "obj", inputData: []byte(""), expectedError: BucketNameInvalid{Bucket: "a"}}, // Case with invalid object names. 4: {bucketName: bucket, inputData: []byte(""), expectedError: ObjectNameInvalid{Bucket: bucket, Object: ""}}, diff --git a/cmd/object-api-utils_test.go b/cmd/object-api-utils_test.go index 8ba69356f..53148d359 100644 --- a/cmd/object-api-utils_test.go +++ b/cmd/object-api-utils_test.go @@ -111,7 +111,7 @@ func testPathTraversalExploit(obj ObjectLayer, instanceType, bucketName string, z := obj.(*erasureServerPools) xl := z.serverPools[0].sets[0] erasureDisks := xl.getDisks() - parts, errs := readAllFileInfo(ctx, erasureDisks, bucketName, objectName, "", false, false) + parts, errs := readAllFileInfo(ctx, erasureDisks, "", bucketName, objectName, "", false, false) for i := range parts { if errs[i] == nil { if parts[i].Name == objectName { diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go index 6678f489b..58b83c6fe 100644 --- a/cmd/peer-s3-client.go +++ b/cmd/peer-s3-client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2022 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // diff --git a/cmd/post-policy_test.go b/cmd/post-policy_test.go index e3d42368d..a694fef86 100644 --- a/cmd/post-policy_test.go +++ b/cmd/post-policy_test.go @@ -164,7 +164,7 @@ func testPostPolicyReservedBucketExploit(obj ObjectLayer, instanceType string, d z := obj.(*erasureServerPools) xl := z.serverPools[0].sets[0] erasureDisks := xl.getDisks() - parts, errs := readAllFileInfo(ctx, erasureDisks, bucketName, objectName+"/upload.txt", "", false, false) + parts, errs := readAllFileInfo(ctx, erasureDisks, "", bucketName, objectName+"/upload.txt", "", false, false) for i := range parts { if errs[i] == nil { if parts[i].Name == objectName+"/upload.txt" { diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 2e61fa421..2bf5bc835 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -393,6 +393,7 @@ type DeleteVersionHandlerParams struct { type MetadataHandlerParams struct { DiskID string `msg:"id"` Volume string `msg:"v"` + OrigVolume string `msg:"ov"` FilePath string `msg:"fp"` UpdateOpts UpdateMetadataOpts `msg:"uo"` FI FileInfo `msg:"fi"` diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index a87e6ecb9..0f36091ee 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -2942,6 +2942,12 @@ func (z *MetadataHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Volume") return } + case "ov": + z.OrigVolume, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "OrigVolume") + return + } case "fp": z.FilePath, err = dc.ReadString() if err != nil { @@ -2996,9 +3002,9 @@ func (z *MetadataHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *MetadataHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 5 + // map header, size 6 // write "id" - err = en.Append(0x85, 0xa2, 0x69, 0x64) + err = en.Append(0x86, 0xa2, 0x69, 0x64) if err != nil { return } @@ -3017,6 +3023,16 @@ func (z *MetadataHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Volume") return } + // write "ov" + err = en.Append(0xa2, 0x6f, 0x76) + if err != nil { + return + } + err = en.WriteString(z.OrigVolume) + if err != nil { + err = msgp.WrapError(err, "OrigVolume") + return + } // write "fp" err = en.Append(0xa2, 0x66, 0x70) if err != nil { @@ -3059,13 +3075,16 @@ func (z *MetadataHandlerParams) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *MetadataHandlerParams) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 5 + // map header, size 6 // string "id" - o = append(o, 0x85, 0xa2, 0x69, 0x64) + o = append(o, 0x86, 0xa2, 0x69, 0x64) o = msgp.AppendString(o, z.DiskID) // string "v" o = append(o, 0xa1, 0x76) o = msgp.AppendString(o, z.Volume) + // string "ov" + o = append(o, 0xa2, 0x6f, 0x76) + o = msgp.AppendString(o, z.OrigVolume) // string "fp" o = append(o, 0xa2, 0x66, 0x70) o = msgp.AppendString(o, z.FilePath) @@ -3115,6 +3134,12 @@ func (z *MetadataHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Volume") return } + case "ov": + z.OrigVolume, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "OrigVolume") + return + } case "fp": z.FilePath, bts, err = msgp.ReadStringBytes(bts) if err != nil { @@ -3170,7 +3195,7 @@ func (z *MetadataHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *MetadataHandlerParams) Msgsize() (s int) { - s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 2 + msgp.StringPrefixSize + len(z.Volume) + 3 + msgp.StringPrefixSize + len(z.FilePath) + 3 + 1 + 3 + msgp.BoolSize + 3 + z.FI.Msgsize() + s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 2 + msgp.StringPrefixSize + len(z.Volume) + 3 + msgp.StringPrefixSize + len(z.OrigVolume) + 3 + msgp.StringPrefixSize + len(z.FilePath) + 3 + 1 + 3 + msgp.BoolSize + 3 + z.FI.Msgsize() return } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 9d165b148..504ef862d 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -82,17 +82,17 @@ type StorageAPI interface { // Metadata operations DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) error DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions, opts DeleteOptions) []error - WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error + WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) error UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error - ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (FileInfo, error) + ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (FileInfo, error) ReadXL(ctx context.Context, volume, path string, readData bool) (RawFileInfo, error) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (uint64, error) // File operations. - ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) + ListDir(ctx context.Context, origvolume, volume, dirPath string, count int) ([]string, error) ReadFile(ctx context.Context, volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) AppendFile(ctx context.Context, volume string, path string, buf []byte) (err error) - CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error + CreateFile(ctx context.Context, origvolume, olume, path string, size int64, reader io.Reader) error ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error CheckParts(ctx context.Context, volume string, path string, fi FileInfo) error @@ -198,7 +198,7 @@ func (p *unrecognizedDisk) DeleteVol(ctx context.Context, volume string, forceDe return errDiskNotFound } -func (p *unrecognizedDisk) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) { +func (p *unrecognizedDisk) ListDir(ctx context.Context, origvolume, volume, dirPath string, count int) ([]string, error) { return nil, errDiskNotFound } @@ -210,7 +210,7 @@ func (p *unrecognizedDisk) AppendFile(ctx context.Context, volume string, path s return errDiskNotFound } -func (p *unrecognizedDisk) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error { +func (p *unrecognizedDisk) CreateFile(ctx context.Context, origvolume, volume, path string, size int64, reader io.Reader) error { return errDiskNotFound } @@ -260,11 +260,11 @@ func (p *unrecognizedDisk) UpdateMetadata(ctx context.Context, volume, path stri return errDiskNotFound } -func (p *unrecognizedDisk) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { +func (p *unrecognizedDisk) WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) (err error) { return errDiskNotFound } -func (p *unrecognizedDisk) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { +func (p *unrecognizedDisk) ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { return fi, errDiskNotFound } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 1afce16c1..8990b5ced 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -366,11 +366,13 @@ func (client *storageRESTClient) AppendFile(ctx context.Context, volume string, return err } -func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error { +func (client *storageRESTClient) CreateFile(ctx context.Context, origvolume, volume, path string, size int64, reader io.Reader) error { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) values.Set(storageRESTLength, strconv.Itoa(int(size))) + values.Set(storageRESTOrigVolume, origvolume) + respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, io.NopCloser(reader), size) defer xhttp.DrainBody(respBody) if err != nil { @@ -380,12 +382,13 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st return err } -func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error { +func (client *storageRESTClient) WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) error { _, err := storageWriteMetadataHandler.Call(ctx, client.gridConn, &MetadataHandlerParams{ - DiskID: client.diskID, - Volume: volume, - FilePath: path, - FI: fi, + DiskID: client.diskID, + OrigVolume: origvolume, + Volume: volume, + FilePath: path, + FI: fi, }) return toStorageErr(err) } @@ -478,16 +481,17 @@ func readMsgpReaderPoolPut(r *msgp.Reader) { } } -func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { +func (client *storageRESTClient) ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { // Use websocket when not reading data. if !opts.ReadData { resp, err := storageReadVersionHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{ - storageRESTDiskID: client.diskID, - storageRESTVolume: volume, - storageRESTFilePath: path, - storageRESTVersionID: versionID, - storageRESTReadData: strconv.FormatBool(opts.ReadData), - storageRESTHealing: strconv.FormatBool(opts.Healing), + storageRESTDiskID: client.diskID, + storageRESTOrigVolume: origvolume, + storageRESTVolume: volume, + storageRESTFilePath: path, + storageRESTVersionID: versionID, + storageRESTReadData: strconv.FormatBool(opts.ReadData), + storageRESTHealing: strconv.FormatBool(opts.Healing), })) if err != nil { return fi, toStorageErr(err) @@ -496,6 +500,7 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, } values := make(url.Values) + values.Set(storageRESTOrigVolume, origvolume) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) values.Set(storageRESTVersionID, versionID) @@ -612,11 +617,13 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa } // ListDir - lists a directory. -func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) { +func (client *storageRESTClient) ListDir(ctx context.Context, origvolume, volume, dirPath string, count int) (entries []string, err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTDirPath, dirPath) values.Set(storageRESTCount, strconv.Itoa(count)) + values.Set(storageRESTOrigVolume, origvolume) + respBody, err := client.call(ctx, storageRESTMethodListDir, values, nil, -1) if err != nil { return nil, err diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 6997e5ff1..509ff7c49 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -20,7 +20,9 @@ package cmd //go:generate msgp -file $GOFILE -unexported const ( - storageRESTVersion = "v55" // ReadAll, RenameFile migrate to websockets + // Added orig-volume support for CreateFile, WriteMetadata, ReadVersion, ListDir + // this is needed for performance optimization on bucket checks. + storageRESTVersion = "v56" storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -43,7 +45,6 @@ const ( storageRESTMethodStatInfoFile = "/statfile" storageRESTMethodReadMultiple = "/readmultiple" storageRESTMethodCleanAbandoned = "/cleanabandoned" - storageRESTMethodLinkXL = "/linkxl" ) const ( @@ -69,6 +70,7 @@ const ( storageRESTGlob = "glob" storageRESTMetrics = "metrics" storageRESTDriveQuorum = "drive-quorum" + storageRESTOrigVolume = "orig-volume" ) type nsScannerOptions struct { diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 03527e05a..08c1e6414 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -330,8 +330,10 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req if !s.IsValid(w, r) { return } + volume := r.Form.Get(storageRESTVolume) filePath := r.Form.Get(storageRESTFilePath) + origvolume := r.Form.Get(storageRESTOrigVolume) fileSizeStr := r.Form.Get(storageRESTLength) fileSize, err := strconv.Atoi(fileSizeStr) @@ -341,7 +343,7 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req } done, body := keepHTTPReqResponseAlive(w, r) - done(s.getStorage().CreateFile(r.Context(), volume, filePath, int64(fileSize), body)) + done(s.getStorage().CreateFile(r.Context(), origvolume, volume, filePath, int64(fileSize), body)) } var storageDeleteVersionHandler = grid.NewSingleHandler[*DeleteVersionHandlerParams, grid.NoPayload](grid.HandlerDeleteVersion, func() *DeleteVersionHandlerParams { @@ -371,6 +373,7 @@ func (s *storageRESTServer) ReadVersionHandlerWS(params *grid.MSS) (*FileInfo, * if !s.checkID(params.Get(storageRESTDiskID)) { return nil, grid.NewRemoteErr(errDiskNotFound) } + origvolume := params.Get(storageRESTOrigVolume) volume := params.Get(storageRESTVolume) filePath := params.Get(storageRESTFilePath) versionID := params.Get(storageRESTVersionID) @@ -384,7 +387,7 @@ func (s *storageRESTServer) ReadVersionHandlerWS(params *grid.MSS) (*FileInfo, * return nil, grid.NewRemoteErr(err) } - fi, err := s.getStorage().ReadVersion(context.Background(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing}) + fi, err := s.getStorage().ReadVersion(context.Background(), origvolume, volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing}) if err != nil { return nil, grid.NewRemoteErr(err) } @@ -396,6 +399,7 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re if !s.IsValid(w, r) { return } + origvolume := r.Form.Get(storageRESTOrigVolume) volume := r.Form.Get(storageRESTVolume) filePath := r.Form.Get(storageRESTFilePath) versionID := r.Form.Get(storageRESTVersionID) @@ -409,7 +413,7 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re s.writeErrorResponse(w, err) return } - fi, err := s.getStorage().ReadVersion(r.Context(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing}) + fi, err := s.getStorage().ReadVersion(r.Context(), origvolume, volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing}) if err != nil { s.writeErrorResponse(w, err) return @@ -427,10 +431,12 @@ func (s *storageRESTServer) WriteMetadataHandler(p *MetadataHandlerParams) (np g if !s.checkID(p.DiskID) { return grid.NewNPErr(errDiskNotFound) } + volume := p.Volume filePath := p.FilePath + origvolume := p.OrigVolume - err := s.getStorage().WriteMetadata(context.Background(), volume, filePath, p.FI) + err := s.getStorage().WriteMetadata(context.Background(), origvolume, volume, filePath, p.FI) return np, grid.NewRemoteErr(err) } @@ -650,13 +656,14 @@ func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Reques } volume := r.Form.Get(storageRESTVolume) dirPath := r.Form.Get(storageRESTDirPath) + origvolume := r.Form.Get(storageRESTOrigVolume) count, err := strconv.Atoi(r.Form.Get(storageRESTCount)) if err != nil { s.writeErrorResponse(w, err) return } - entries, err := s.getStorage().ListDir(r.Context(), volume, dirPath, count) + entries, err := s.getStorage().ListDir(r.Context(), origvolume, volume, dirPath, count) if err != nil { s.writeErrorResponse(w, err) return diff --git a/cmd/storage-rest_test.go b/cmd/storage-rest_test.go index 71ee81cd8..e1f49dc0b 100644 --- a/cmd/storage-rest_test.go +++ b/cmd/storage-rest_test.go @@ -97,7 +97,7 @@ func testStorageAPIListDir(t *testing.T, storage StorageAPI) { } for i, testCase := range testCases { - result, err := storage.ListDir(context.Background(), testCase.volumeName, testCase.prefix, -1) + result, err := storage.ListDir(context.Background(), "", testCase.volumeName, testCase.prefix, -1) expectErr := (err != nil) if expectErr != testCase.expectErr { diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 8aef3db5d..f85578291 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -439,14 +439,14 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for return w.Run(func() error { return p.storage.DeleteVol(ctx, volume, forceDelete) }) } -func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) (s []string, err error) { +func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, origvolume, volume, dirPath string, count int) (s []string, err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricListDir, volume, dirPath) if err != nil { return nil, err } defer done(&err) - return p.storage.ListDir(ctx, volume, dirPath, count) + return p.storage.ListDir(ctx, origvolume, volume, dirPath, count) } // Legacy API - does not have any deadlines @@ -476,14 +476,14 @@ func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, pa }) } -func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) (err error) { +func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, origvolume, volume, path string, size int64, reader io.Reader) (err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricCreateFile, volume, path) if err != nil { return err } defer done(&err) - return p.storage.CreateFile(ctx, volume, path, size, io.NopCloser(reader)) + return p.storage.CreateFile(ctx, origvolume, volume, path, size, io.NopCloser(reader)) } func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) { @@ -663,7 +663,7 @@ func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path return w.Run(func() error { return p.storage.UpdateMetadata(ctx, volume, path, fi, opts) }) } -func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { +func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) (err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricWriteMetadata, volume, path) if err != nil { return err @@ -671,10 +671,10 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s defer done(&err) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) - return w.Run(func() error { return p.storage.WriteMetadata(ctx, volume, path, fi) }) + return w.Run(func() error { return p.storage.WriteMetadata(ctx, origvolume, volume, path, fi) }) } -func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { +func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadVersion, volume, path) if err != nil { return fi, err @@ -682,7 +682,7 @@ func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, ve defer done(&err) return xioutil.WithDeadline[FileInfo](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result FileInfo, err error) { - return p.storage.ReadVersion(ctx, volume, path, versionID, opts) + return p.storage.ReadVersion(ctx, origvolume, volume, path, versionID, opts) }) } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 3c3d3d014..69059aa61 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1016,11 +1016,23 @@ func (s *xlStorage) DeleteVol(ctx context.Context, volume string, forceDelete bo // ListDir - return all the entries at the given directory path. // If an entry is a directory it will be returned with a trailing SlashSeparator. -func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) { +func (s *xlStorage) ListDir(ctx context.Context, origvolume, volume, dirPath string, count int) (entries []string, err error) { if contextCanceled(ctx) { return nil, ctx.Err() } + if origvolume != "" { + if !skipAccessChecks(origvolume) { + origvolumeDir, err := s.getVolDir(origvolume) + if err != nil { + return nil, err + } + if err = Access(origvolumeDir); err != nil { + return nil, convertAccessError(err, errVolumeAccessDenied) + } + } + } + // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { @@ -1034,15 +1046,9 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i entries, err = readDir(dirPathAbs) } if err != nil { - if err == errFileNotFound { - if !skipAccessChecks(volume) { - if ierr := Access(volumeDir); ierr != nil { - if osIsNotExist(ierr) { - return nil, errVolumeNotFound - } else if isSysErrIO(ierr) { - return nil, errFaultyDisk - } - } + if errors.Is(err, errFileNotFound) && !skipAccessChecks(volume) { + if ierr := Access(volumeDir); ierr != nil { + return nil, convertAccessError(ierr, errVolumeAccessDenied) } } return nil, err @@ -1225,7 +1231,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F metaDataPoolPut(buf) // Never used, return it if fi.Deleted && forceDelMarker { // Create a new xl.meta with a delete marker in it - return s.WriteMetadata(ctx, volume, path, fi) + return s.WriteMetadata(ctx, "", volume, path, fi) } s.RLock() @@ -1344,8 +1350,22 @@ func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi } // WriteMetadata - writes FileInfo metadata for path at `xl.meta` -func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { +func (s *xlStorage) WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) (err error) { if fi.Fresh { + if origvolume != "" { + origvolumeDir, err := s.getVolDir(origvolume) + if err != nil { + return err + } + + if !skipAccessChecks(origvolume) { + // Stat a volume entry. + if err = Access(origvolumeDir); err != nil { + return convertAccessError(err, errVolumeAccessDenied) + } + } + } + var xlMeta xlMetaV2 if err := xlMeta.AddVersion(fi); err != nil { return err @@ -1521,7 +1541,21 @@ type ReadOptions struct { // ReadVersion - reads metadata and returns FileInfo at path `xl.meta` // for all objects less than `32KiB` this call returns data as well // along with metadata. -func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { +func (s *xlStorage) ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { + if origvolume != "" { + origvolumeDir, err := s.getVolDir(origvolume) + if err != nil { + return fi, err + } + + if !skipAccessChecks(origvolume) { + // Stat a volume entry. + if err = Access(origvolumeDir); err != nil { + return fi, convertAccessError(err, errVolumeAccessDenied) + } + } + } + volumeDir, err := s.getVolDir(volume) if err != nil { return fi, err @@ -1968,7 +2002,21 @@ func (c closeWrapper) Close() error { } // CreateFile - creates the file. -func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSize int64, r io.Reader) (err error) { +func (s *xlStorage) CreateFile(ctx context.Context, origvolume, volume, path string, fileSize int64, r io.Reader) (err error) { + if origvolume != "" { + origvolumeDir, err := s.getVolDir(origvolume) + if err != nil { + return err + } + + if !skipAccessChecks(origvolume) { + // Stat a volume entry. + if err = Access(origvolumeDir); err != nil { + return convertAccessError(err, errVolumeAccessDenied) + } + } + } + volumeDir, err := s.getVolDir(volume) if err != nil { return err @@ -2355,23 +2403,13 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f if !skipAccessChecks(srcVolume) { // Stat a volume entry. if err = Access(srcVolumeDir); err != nil { - if osIsNotExist(err) { - return 0, errVolumeNotFound - } else if isSysErrIO(err) { - return 0, errFaultyDisk - } - return 0, err + return 0, convertAccessError(err, errVolumeAccessDenied) } } if !skipAccessChecks(dstVolume) { if err = Access(dstVolumeDir); err != nil { - if osIsNotExist(err) { - return 0, errVolumeNotFound - } else if isSysErrIO(err) { - return 0, errFaultyDisk - } - return 0, err + return 0, convertAccessError(err, errVolumeAccessDenied) } } diff --git a/cmd/xl-storage_test.go b/cmd/xl-storage_test.go index 4caeda557..1556394f3 100644 --- a/cmd/xl-storage_test.go +++ b/cmd/xl-storage_test.go @@ -235,7 +235,7 @@ func TestXLStorageReadVersionLegacy(t *testing.T) { t.Fatalf("Unable to create a file \"as-file\", %s", err) } - fi, err := xlStorage.ReadVersion(context.Background(), "exists-legacy", "as-file", "", ReadOptions{}) + fi, err := xlStorage.ReadVersion(context.Background(), "", "exists-legacy", "as-file", "", ReadOptions{}) if err != nil { t.Fatalf("Unable to read older 'xl.json' content: %s", err) } @@ -325,7 +325,7 @@ func TestXLStorageReadVersion(t *testing.T) { // Run through all the test cases and validate for ReadVersion. for i, testCase := range testCases { - _, err = xlStorage.ReadVersion(context.Background(), testCase.volume, testCase.path, "", ReadOptions{}) + _, err = xlStorage.ReadVersion(context.Background(), "", testCase.volume, testCase.path, "", ReadOptions{}) if err != testCase.err { t.Fatalf("TestXLStorage %d: Expected err \"%s\", got err \"%s\"", i+1, testCase.err, err) } @@ -858,7 +858,7 @@ func TestXLStorageListDir(t *testing.T) { for i, testCase := range testCases { var dirList []string - dirList, err = xlStorage.ListDir(context.Background(), testCase.srcVol, testCase.srcPath, -1) + dirList, err = xlStorage.ListDir(context.Background(), "", testCase.srcVol, testCase.srcPath, -1) if err != testCase.expectedErr { t.Errorf("TestXLStorage case %d: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, err) } @@ -1657,7 +1657,7 @@ func TestXLStorageDeleteVersion(t *testing.T) { Checksums: nil, }, } - if err := xl.WriteMetadata(ctx, volume, object, fi); err != nil { + if err := xl.WriteMetadata(ctx, "", volume, object, fi); err != nil { t.Fatalf("Unable to create object, %s", err) } } @@ -1666,7 +1666,7 @@ func TestXLStorageDeleteVersion(t *testing.T) { t.Helper() for i := range versions { shouldExist := !deleted[i] - fi, err := xl.ReadVersion(ctx, volume, object, versions[i], ReadOptions{}) + fi, err := xl.ReadVersion(ctx, "", volume, object, versions[i], ReadOptions{}) if shouldExist { if err != nil { t.Fatalf("Version %s should exist, but got err %v", versions[i], err) @@ -1713,7 +1713,7 @@ func TestXLStorageDeleteVersion(t *testing.T) { checkVerExist(t) // Meta should be deleted now... - fi, err := xl.ReadVersion(ctx, volume, object, "", ReadOptions{}) + fi, err := xl.ReadVersion(ctx, "", volume, object, "", ReadOptions{}) if err != errFileNotFound { t.Fatalf("Object %s should not exist, but returned: %#v", object, fi) } @@ -1871,7 +1871,7 @@ func TestXLStorageVerifyFile(t *testing.T) { algo = HighwayHash256S shardSize := int64(1024 * 1024) shard := make([]byte, shardSize) - w := newStreamingBitrotWriter(storage, volName, fileName, size, algo, shardSize) + w := newStreamingBitrotWriter(storage, "", volName, fileName, size, algo, shardSize) reader := bytes.NewReader(data) for { // Using io.Copy instead of this loop will not work for us as io.Copy