diff --git a/test/volume_server/grpc/ec_multi_disk_lifecycle_test.go b/test/volume_server/grpc/ec_multi_disk_lifecycle_test.go index b163a5d9d..418f87dcd 100644 --- a/test/volume_server/grpc/ec_multi_disk_lifecycle_test.go +++ b/test/volume_server/grpc/ec_multi_disk_lifecycle_test.go @@ -136,8 +136,6 @@ func TestEcLifecycleAcrossMultipleDisks(t *testing.T) { conn2, grpcClient2 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) defer conn2.Close() - // VolumeEcShardsInfo only sees one disk's EcVolume; filesystem layout is - // the ground truth for the whole-store shard count. postReconcileLayout := scanShardLayout(t, dataDirs, collection, volumeID) if got, want := totalShardsInLayout(postReconcileLayout), erasure_coding.TotalShardsCount; got != want { t.Fatalf("post-reconcile: total shards on disk mismatch: got %d, want %d (layout=%v)", got, want, postReconcileLayout) @@ -148,11 +146,28 @@ func TestEcLifecycleAcrossMultipleDisks(t *testing.T) { if got, want := len(postReconcileLayout[1]), splitAt; got != want { t.Fatalf("post-reconcile: disk 1 shard count drift: got %d, want %d (layout=%v)", got, want, postReconcileLayout) } - if _, err := grpcClient2.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{ + // VolumeEcShardsInfo must walk every DiskLocation and report the full + // shard set — the verification step in ec_task.go gates source-volume + // deletion on this RPC returning a complete shard inventory. + infoResp, err := grpcClient2.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{ VolumeId: volumeID, - }); err != nil { + }) + if err != nil { t.Fatalf("VolumeEcShardsInfo after redistribute restart: %v", err) } + if got, want := len(infoResp.GetEcShardInfos()), erasure_coding.TotalShardsCount; got != want { + t.Fatalf("VolumeEcShardsInfo after redistribute restart: got %d shards, want %d (per-disk layout=%v)", + got, want, postReconcileLayout) + } + gotShardIds := make(map[uint32]struct{}, len(infoResp.GetEcShardInfos())) + for _, info := range infoResp.GetEcShardInfos() { + gotShardIds[info.GetShardId()] = struct{}{} + } + for shardId := uint32(0); shardId < uint32(erasure_coding.TotalShardsCount); shardId++ { + if _, ok := gotShardIds[shardId]; !ok { + t.Fatalf("VolumeEcShardsInfo missing shard %d (per-disk layout=%v)", shardId, postReconcileLayout) + } + } for _, n := range needles { verifyHTTPRead(t, httpClient, clusterHarness.VolumeAdminURL(), n.fid, n.payload, "after-cross-disk-reconcile") } diff --git a/test/volume_server/grpc/ec_verify_multi_disk_test.go b/test/volume_server/grpc/ec_verify_multi_disk_test.go new file mode 100644 index 000000000..79a82efb2 --- /dev/null +++ b/test/volume_server/grpc/ec_verify_multi_disk_test.go @@ -0,0 +1,175 @@ +package volume_server_grpc_test + +import ( + "context" + "net/http" + "sort" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// TestVolumeEcShardsInfoReturnsAllShardsAcrossDisks drives the full path +// behind the ec.encode source-deletion gate. A multi-disk volume server +// ends up with EC shards split across disks (each registers its own +// EcVolume entry in DiskLocation.ecVolumes), and the volume server's +// VolumeEcShardsInfo RPC must walk every DiskLocation rather than +// reporting whichever disk Store.FindEcVolume picks first. +// +// Pre-fix, verifyEcShardsBeforeDelete refused to delete source volumes — +// the shard-bitmap union across destinations fell short of dataShards + +// parityShards because each destination only reported shards on one of +// its disks. With the handler fix, the same VerifyShardsAcrossServers +// call returns a complete bitmap and the gate opens. +func TestVolumeEcShardsInfoReturnsAllShardsAcrossDisks(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + const ( + dataDirCount = 2 + volumeID = uint32(9558) + collection = "ec-multi-disk-verify" + ) + + clusterHarness := framework.StartSingleVolumeClusterWithDataDirs(t, matrix.P1(), dataDirCount) + dataDirs := clusterHarness.VolumeDataDirs() + if len(dataDirs) != dataDirCount { + t.Fatalf("expected %d data dirs, got %d: %v", dataDirCount, len(dataDirs), dataDirs) + } + + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + framework.AllocateVolume(t, grpcClient, volumeID, collection) + + httpClient := framework.NewHTTPClient() + needles := []struct { + fid string + payload []byte + }{ + {framework.NewFileID(volumeID, 9559, 0xC0FFEE01), bytesOfLen(64, 0xB1)}, + {framework.NewFileID(volumeID, 9560, 0xC0FFEE02), bytesOfLen(8192, 0xB2)}, + {framework.NewFileID(volumeID, 9561, 0xC0FFEE03), bytesOfLen(131072, 0xB3)}, + } + for _, n := range needles { + resp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), n.fid, n.payload) + _ = framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload %s expected 201, got %d", n.fid, resp.StatusCode) + } + } + + if _, err := grpcClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: volumeID, + Collection: collection, + }); err != nil { + t.Fatalf("VolumeEcShardsGenerate: %v", err) + } + + // Generate places every shard plus the .ecx/.ecj/.vif on the .dat's + // disk (disk 0). Mount all 14 there first so the next step's restart + // has a steady starting state. + allShards := make([]uint32, erasure_coding.TotalShardsCount) + for i := range allShards { + allShards[i] = uint32(i) + } + if _, err := grpcClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, + Collection: collection, + ShardIds: allShards, + }); err != nil { + t.Fatalf("VolumeEcShardsMount all shards: %v", err) + } + + // Drop the .dat so the EC shards are the only data path — mirrors the + // real ec.encode flow before verifyEcShardsBeforeDelete fires. + if _, err := grpcClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ + VolumeId: volumeID, + }); err != nil { + t.Fatalf("VolumeDelete (drop .dat): %v", err) + } + + // Move half the shards onto disk 1, leaving .ecx on disk 0. After + // restart, the cross-disk reconcile path attaches each disk's shards + // against its own EcVolume entry — the exact in-memory shape the bug + // reporter saw on a multi-disk destination. + clusterHarness.StopVolumeServer() + const splitAt = 7 + for shard := 0; shard < splitAt; shard++ { + movedFile(t, dataDirs[0], dataDirs[1], collection, volumeID, erasure_coding.ToExt(shard)) + } + if fileExistsIn(dataDirs[1], collection, volumeID, ".ecx") { + t.Fatalf("setup: .ecx must stay on disk 0 to exercise the multi-disk path") + } + + clusterHarness.RestartVolumeServer() + conn2, grpcClient2 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn2.Close() + + postReconcileLayout := scanShardLayout(t, dataDirs, collection, volumeID) + if got, want := totalShardsInLayout(postReconcileLayout), erasure_coding.TotalShardsCount; got != want { + t.Fatalf("post-reconcile: total shards on disk mismatch: got %d, want %d (layout=%v)", got, want, postReconcileLayout) + } + if len(postReconcileLayout[0]) == 0 || len(postReconcileLayout[1]) == 0 { + t.Fatalf("post-reconcile: expected shards on BOTH disks, got per-disk layout %v", postReconcileLayout) + } + + // Direct RPC assertion: VolumeEcShardsInfo must report every shard + // the server holds, not just the ones registered against the first + // matching DiskLocation. + infoResp, err := grpcClient2.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("VolumeEcShardsInfo: %v", err) + } + gotShardIds := make([]int, 0, len(infoResp.GetEcShardInfos())) + for _, info := range infoResp.GetEcShardInfos() { + if info.GetVolumeId() != volumeID { + t.Errorf("EcShardInfo VolumeId=%d, want %d", info.GetVolumeId(), volumeID) + } + gotShardIds = append(gotShardIds, int(info.GetShardId())) + } + sort.Ints(gotShardIds) + wantShardIds := make([]int, erasure_coding.TotalShardsCount) + for i := range wantShardIds { + wantShardIds[i] = i + } + if len(gotShardIds) != len(wantShardIds) { + t.Fatalf("VolumeEcShardsInfo returned %d shards (ids=%v), want %d (ids=%v) — per-disk layout=%v", + len(gotShardIds), gotShardIds, len(wantShardIds), wantShardIds, postReconcileLayout) + } + for i, sid := range wantShardIds { + if gotShardIds[i] != sid { + t.Fatalf("VolumeEcShardsInfo shard ids=%v, want %v (per-disk layout=%v)", + gotShardIds, wantShardIds, postReconcileLayout) + } + } + + // End-to-end assertion via the same helper the worker uses to gate + // source-volume deletion (weed/worker/tasks/erasure_coding/ec_task.go + // verifyEcShardsBeforeDelete). The union across destinations is what + // RequireFullShardSet measures; with one destination that holds every + // shard, the union must cover dataShards + parityShards. + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + servers := []string{clusterHarness.VolumeServerAddress()} + union, perServer := erasure_coding.VerifyShardsAcrossServers(ctx, volumeID, servers, dialOption) + if err := erasure_coding.RequireFullShardSet(volumeID, union, erasure_coding.TotalShardsCount); err != nil { + t.Fatalf("verifyEcShardsBeforeDelete-equivalent gate failed: %v\nper-server inventory: %s\nper-disk layout: %v", + err, erasure_coding.SummarizeShardInventory(perServer), postReconcileLayout) + } + if got, want := union.Count(), erasure_coding.TotalShardsCount; got != want { + t.Fatalf("VerifyShardsAcrossServers union covered %d/%d shards (per-server=%s, layout=%v)", + got, want, erasure_coding.SummarizeShardInventory(perServer), postReconcileLayout) + } +} diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 3134d9d0d..4882101db 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -710,21 +710,39 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ func (vs *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_server_pb.VolumeEcShardsInfoRequest) (*volume_server_pb.VolumeEcShardsInfoResponse, error) { glog.V(0).Infof("VolumeEcShardsInfo: volume %d", req.VolumeId) - glog.V(0).Infof("VolumeEcStatus: %v", req) - vid := needle.VolumeId(req.GetVolumeId()) - ecv, found := vs.store.FindEcVolume(vid) - if !found { - return nil, fmt.Errorf("VolumeEcStatus: EC volume %d not found", vid) - } - shardInfos := make([]*volume_server_pb.EcShardInfo, len(ecv.Shards)) - for i, s := range ecv.Shards { - shardInfos[i] = s.ToEcShardInfo() + // Multi-disk volume servers register one EcVolume per DiskLocation + // that holds shards for the same vid: shards may be spread across + // disks while the .ecx lives on whichever disk owned the original + // .dat. Walk every DiskLocation here so the response reflects the + // full local shard set; the per-disk ecVolumesLock is taken inside + // DiskLocation.FindEcVolume. + var primary *erasure_coding.EcVolume + var seenShards erasure_coding.ShardBits + shardInfos := make([]*volume_server_pb.EcShardInfo, 0, erasure_coding.MaxShardCount) + for _, location := range vs.store.Locations { + ecv, ok := location.FindEcVolume(vid) + if !ok { + continue + } + if primary == nil { + primary = ecv + } + for _, s := range ecv.Shards { + if seenShards.Has(s.ShardId) { + continue + } + seenShards = seenShards.Set(s.ShardId) + shardInfos = append(shardInfos, s.ToEcShardInfo()) + } + } + if primary == nil { + return nil, fmt.Errorf("VolumeEcShardsInfo: EC volume %d not found", vid) } var files, filesDeleted, totalSize uint64 - err := ecv.WalkIndex(func(_ types.NeedleId, _ types.Offset, size types.Size) error { + err := primary.WalkIndex(func(_ types.NeedleId, _ types.Offset, size types.Size) error { // deleted files are counted when computing EC volume sizes. this aligns with VolumeStatus(), // which reports the raw data backend file size, regardless of deleted files. totalSize += uint64(size.Raw()) diff --git a/weed/server/volume_grpc_erasure_coding_test.go b/weed/server/volume_grpc_erasure_coding_test.go index dd82734b8..4c42e8fe1 100644 --- a/weed/server/volume_grpc_erasure_coding_test.go +++ b/weed/server/volume_grpc_erasure_coding_test.go @@ -1,11 +1,19 @@ package weed_server import ( + "context" "os" "path/filepath" + "sort" "testing" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" + "github.com/seaweedfs/seaweedfs/weed/util" ) func TestCheckEcVolumeStatusCountOnlyDataShards(t *testing.T) { @@ -54,3 +62,141 @@ func TestCheckEcVolumeStatusCountOnlyDataShards(t *testing.T) { t.Fatalf("expected shardCount=3, got %d", shardCount) } } + +// TestVolumeEcShardsInfo_AggregatesAcrossDisks pins the multi-disk path: +// when a volume server mounts EC shards for the same volume on more than +// one disk (each disk holds its own EcVolume entry — Store.FindEcVolume +// returns only the first), VolumeEcShardsInfo used to report shards from +// a single disk. The ec.encode verification step (verifyEcShardsBeforeDelete) +// then refused to delete the source volume because the union across +// servers fell short of dataShards + parityShards. The handler must walk +// every DiskLocation so the response covers every shard the server holds. +func TestVolumeEcShardsInfo_AggregatesAcrossDisks(t *testing.T) { + tempDir := t.TempDir() + dir0 := filepath.Join(tempDir, "disk0") + dir1 := filepath.Join(tempDir, "disk1") + for _, d := range []string{dir0, dir1} { + if err := os.MkdirAll(d, 0o755); err != nil { + t.Fatalf("mkdir %s: %v", d, err) + } + } + + const collection = "ec-multi-disk-info" + vid := needle.VolumeId(42) + const dataShards, parityShards = 10, 4 + const datSize int64 = 10 * 1024 * 1024 + + // Two shards on disk0, two on disk1. The .ecx / .ecj / .vif live on + // disk0 so each disk's EcVolume can open the index files via the + // cross-disk fallback in NewEcVolume. + shardsOnDisk0 := []erasure_coding.ShardId{0, 5} + shardsOnDisk1 := []erasure_coding.ShardId{7, 12} + + store := storage.NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id", + []string{dir0, dir1}, + []int32{100, 100}, + []util.MinFreeSpace{{}, {}}, + "", + storage.NeedleMapInMemory, + []types.DiskType{types.HardDriveType, types.HardDriveType}, + nil, + 3, + ) + done := make(chan struct{}) + go func() { + for { + select { + case <-store.NewEcShardsChan: + case <-store.NewVolumesChan: + case <-store.DeletedVolumesChan: + case <-store.DeletedEcShardsChan: + case <-store.StateUpdateChan: + case <-done: + return + } + } + }() + t.Cleanup(func() { + store.Close() + close(done) + }) + + base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid)) + base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid)) + + // .ecx, .ecj, .vif live on disk0. NewEcVolume on disk1 falls back to + // disk0's idx dir. The .ecx needs >0 bytes so HasEcxFileOnDisk does + // not treat it as the corrupt-stub case; a single zero entry is the + // smallest valid index file (WalkIndex iterates one zero-sized needle). + if err := os.WriteFile(base0+".ecx", make([]byte, 16), 0o644); err != nil { + t.Fatalf("write .ecx: %v", err) + } + if err := os.WriteFile(base0+".ecj", nil, 0o644); err != nil { + t.Fatalf("write .ecj: %v", err) + } + if err := volume_info.SaveVolumeInfo(base0+".vif", &volume_server_pb.VolumeInfo{ + Version: uint32(needle.Version3), + DatFileSize: datSize, + EcShardConfig: &volume_server_pb.EcShardConfig{ + DataShards: dataShards, + ParityShards: parityShards, + }, + }); err != nil { + t.Fatalf("save .vif: %v", err) + } + + plant := func(base string, shardId erasure_coding.ShardId) { + t.Helper() + f, err := os.Create(base + erasure_coding.ToExt(int(shardId))) + if err != nil { + t.Fatalf("create shard %d: %v", shardId, err) + } + // MountEcShards does not validate shard size, so any non-empty + // truncate avoids the zero-byte ignore branch in loadAllEcShards. + if err := f.Truncate(1); err != nil { + f.Close() + t.Fatalf("truncate shard %d: %v", shardId, err) + } + f.Close() + } + for _, sid := range shardsOnDisk0 { + plant(base0, sid) + } + for _, sid := range shardsOnDisk1 { + plant(base1, sid) + } + + for _, sid := range append([]erasure_coding.ShardId{}, append(shardsOnDisk0, shardsOnDisk1...)...) { + if err := store.MountEcShards(collection, vid, sid, ""); err != nil { + t.Fatalf("MountEcShards %d.%d: %v", vid, sid, err) + } + } + + vs := &VolumeServer{store: store} + resp, err := vs.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{ + VolumeId: uint32(vid), + }) + if err != nil { + t.Fatalf("VolumeEcShardsInfo: %v", err) + } + + gotShardIds := make([]int, 0, len(resp.GetEcShardInfos())) + for _, info := range resp.GetEcShardInfos() { + if info.GetVolumeId() != uint32(vid) { + t.Errorf("EcShardInfo VolumeId=%d, want %d", info.GetVolumeId(), vid) + } + gotShardIds = append(gotShardIds, int(info.GetShardId())) + } + sort.Ints(gotShardIds) + + want := []int{0, 5, 7, 12} + if len(gotShardIds) != len(want) { + t.Fatalf("VolumeEcShardsInfo returned %d shards (ids=%v), want %d (ids=%v)", + len(gotShardIds), gotShardIds, len(want), want) + } + for i, sid := range want { + if gotShardIds[i] != sid { + t.Fatalf("VolumeEcShardsInfo shard ids=%v, want %v", gotShardIds, want) + } + } +}